This commit is contained in:
Steffen Illium
2023-06-21 11:28:36 +02:00
parent 95c85bfedd
commit d11b1a8172
133 changed files with 225 additions and 225 deletions

View File

@@ -0,0 +1,37 @@
from typing import Union
from marl_factory_grid.environment.actions import Action
from marl_factory_grid.utils.results import ActionResult
from marl_factory_grid.modules.items import constants as i, rewards as r
from marl_factory_grid.environment import constants as c
class ItemAction(Action):
def __init__(self):
super().__init__(i.ITEM_ACTION)
def do(self, entity, state) -> Union[None, ActionResult]:
inventory = state[i.INVENTORY].by_entity(entity)
if drop_off := state[i.DROP_OFF].by_pos(entity.pos):
if inventory:
valid = drop_off.place_item(inventory.pop())
else:
valid = c.NOT_VALID
if valid:
state.print(f'{entity.name} just dropped of an item at {drop_off.pos}.')
else:
state.print(f'{entity.name} just tried to drop off at {entity.pos}, but failed.')
reward = r.DROP_OFF_VALID if valid else r.DROP_OFF_FAIL
return ActionResult(entity=entity, identifier=self._identifier, validity=valid, reward=reward)
elif item := state[i.ITEM].by_pos(entity.pos):
item.change_parent_collection(inventory)
item.set_tile_to(state.NO_POS_TILE)
state.print(f'{entity.name} just picked up an item at {entity.pos}')
return ActionResult(entity=entity, identifier=self._identifier, validity=c.VALID, reward=r.PICK_UP_VALID)
else:
state.print(f'{entity.name} just tried to pick up an item at {entity.pos}, but failed.')
return ActionResult(entity=entity, identifier=self._identifier, validity=c.NOT_VALID, reward=r.PICK_UP_FAIL)

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@@ -0,0 +1,11 @@
from typing import NamedTuple
SYMBOL_NO_ITEM = 0
SYMBOL_DROP_OFF = 1
# Item Env
ITEM = 'Items'
INVENTORY = 'Inventories'
DROP_OFF = 'DropOffLocations'
ITEM_ACTION = 'ITEMACTION'

View File

@@ -0,0 +1,64 @@
from collections import deque
from marl_factory_grid.environment.entity.entity import Entity
from marl_factory_grid.environment import constants as c
from marl_factory_grid.utils.render import RenderEntity
from marl_factory_grid.modules.items import constants as i
class Item(Entity):
def render(self):
return RenderEntity(i.ITEM, self.tile.pos) if self.pos != c.VALUE_NO_POS else None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._auto_despawn = -1
@property
def auto_despawn(self):
return self._auto_despawn
@property
def encoding(self):
# Edit this if you want items to be drawn in the ops differently
return 1
def set_auto_despawn(self, auto_despawn):
self._auto_despawn = auto_despawn
def set_tile_to(self, no_pos_tile):
self._tile = no_pos_tile
def summarize_state(self) -> dict:
super_summarization = super(Item, self).summarize_state()
super_summarization.update(dict(auto_despawn=self.auto_despawn))
return super_summarization
class DropOffLocation(Entity):
def render(self):
return RenderEntity(i.DROP_OFF, self.tile.pos)
@property
def encoding(self):
return i.SYMBOL_DROP_OFF
def __init__(self, *args, storage_size_until_full: int = 5, auto_item_despawn_interval: int = 5, **kwargs):
super(DropOffLocation, self).__init__(*args, **kwargs)
self.auto_item_despawn_interval = auto_item_despawn_interval
self.storage = deque(maxlen=storage_size_until_full or None)
def place_item(self, item: Item):
if self.is_full:
raise RuntimeWarning("There is currently no way to clear the storage or make it unfull.")
return bc.NOT_VALID
else:
self.storage.append(item)
item.set_auto_despawn(self.auto_item_despawn_interval)
return c.VALID
@property
def is_full(self):
return False if not self.storage.maxlen else self.storage.maxlen == len(self.storage)

View File

@@ -0,0 +1,101 @@
from typing import List
from marl_factory_grid.environment.groups.env_objects import EnvObjects
from marl_factory_grid.environment.groups.objects import Objects
from marl_factory_grid.environment.groups.mixins import PositionMixin, IsBoundMixin, HasBoundedMixin
from marl_factory_grid.environment.entity.wall_floor import Floor
from marl_factory_grid.environment.entity.agent import Agent
from marl_factory_grid.modules.items.entitites import Item, DropOffLocation
class Items(PositionMixin, EnvObjects):
_entity = Item
is_blocking_light: bool = False
can_collide: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def spawn_items(self, tiles: List[Floor]):
items = [self._entity(tile) for tile in tiles]
self.add_items(items)
def despawn_items(self, items: List[Item]):
items = [items] if isinstance(items, Item) else items
for item in items:
del self[item]
class Inventory(IsBoundMixin, EnvObjects):
_accepted_objects = Item
@property
def obs_tag(self):
return self.name
def __init__(self, agent: Agent, *args, **kwargs):
super(Inventory, self).__init__(*args, **kwargs)
self._collection = None
self.bind(agent)
def summarize_states(self, **kwargs):
attr_dict = {key: val for key, val in self.__dict__.items() if not key.startswith('_') and key != 'data'}
attr_dict.update(dict(items=[val.summarize_state(**kwargs) for key, val in self.items()]))
attr_dict.update(dict(name=self.name, belongs_to=self._bound_entity.name))
return attr_dict
def pop(self):
item_to_pop = self[0]
self.delete_env_object(item_to_pop)
return item_to_pop
def set_collection(self, collection):
self._collection = collection
class Inventories(HasBoundedMixin, Objects):
_entity = Inventory
can_move = False
@property
def obs_pairs(self):
return [(x.name, x) for x in self]
def __init__(self, size, *args, **kwargs):
super(Inventories, self).__init__(*args, **kwargs)
self.size = size
self._obs = None
self._lazy_eval_transforms = []
def spawn_inventories(self, agents):
inventories = [self._entity(agent, self.size,)
for _, agent in enumerate(agents)]
self.add_items(inventories)
def idx_by_entity(self, entity):
try:
return next((idx for idx, inv in enumerate(self) if inv.belongs_to_entity(entity)))
except StopIteration:
return None
def by_entity(self, entity):
try:
return next((inv for inv in self if inv.belongs_to_entity(entity)))
except StopIteration:
return None
def summarize_states(self, **kwargs):
return [val.summarize_states(**kwargs) for key, val in self.items()]
class DropOffLocations(PositionMixin, EnvObjects):
_entity = DropOffLocation
is_blocking_light: bool = False
can_collide: bool = False
def __init__(self, *args, **kwargs):
super(DropOffLocations, self).__init__(*args, **kwargs)

View File

@@ -0,0 +1,4 @@
DROP_OFF_VALID: float = 0.1
DROP_OFF_FAIL: float = -0.1
PICK_UP_FAIL: float = -0.1
PICK_UP_VALID: float = 0.1

View File

@@ -0,0 +1,79 @@
from typing import List
from marl_factory_grid.environment.rules import Rule
from marl_factory_grid.environment import constants as c
from marl_factory_grid.utils.results import TickResult
from marl_factory_grid.modules.items import constants as i
from marl_factory_grid.modules.items.entitites import DropOffLocation
class ItemRules(Rule):
def __init__(self, n_items: int = 5, spawn_frequency: int = 15,
n_locations: int = 5, max_dropoff_storage_size: int = 0):
super().__init__()
self.spawn_frequency = spawn_frequency
self._next_item_spawn = spawn_frequency
self.n_items = n_items
self.max_dropoff_storage_size = max_dropoff_storage_size
self.n_locations = n_locations
def on_init(self, state):
self.trigger_drop_off_location_spawn(state)
self._next_item_spawn = self.spawn_frequency
self.trigger_inventory_spawn(state)
self.trigger_item_spawn(state)
def tick_step(self, state):
for item in list(state[i.ITEM].values()):
if item.auto_despawn >= 1:
item.set_auto_despawn(item.auto_despawn - 1)
elif not item.auto_despawn:
state[i.ITEM].delete_env_object(item)
else:
pass
if not self._next_item_spawn:
self.trigger_item_spawn(state)
else:
self._next_item_spawn = max(0, self._next_item_spawn - 1)
return []
def trigger_item_spawn(self, state):
if item_to_spawns := max(0, (self.n_items - len(state[i.ITEM]))):
empty_tiles = state[c.FLOOR].empty_tiles[:item_to_spawns]
state[i.ITEM].spawn_items(empty_tiles)
self._next_item_spawn = self.spawn_frequency
state.print(f'{item_to_spawns} new items have been spawned; next spawn in {self._next_item_spawn}')
return len(empty_tiles)
else:
state.print('No Items are spawning, limit is reached.')
return 0
@staticmethod
def trigger_inventory_spawn(state):
state[i.INVENTORY].spawn_inventories(state[c.AGENT])
def tick_post_step(self, state) -> List[TickResult]:
for item in list(state[i.ITEM].values()):
if item.auto_despawn >= 1:
item.set_auto_despawn(item.auto_despawn-1)
elif not item.auto_despawn:
state[i.ITEM].delete_env_object(item)
else:
pass
if not self._next_item_spawn:
if spawned_items := self.trigger_item_spawn(state):
return [TickResult(self.name, validity=c.VALID, value=spawned_items, entity=None)]
else:
return [TickResult(self.name, validity=c.NOT_VALID, value=0, entity=None)]
else:
self._next_item_spawn = max(0, self._next_item_spawn-1)
return []
def trigger_drop_off_location_spawn(self, state):
empty_tiles = state[c.FLOOR].empty_tiles[:self.n_locations]
do_entites = state[i.DROP_OFF]
drop_offs = [DropOffLocation(tile) for tile in empty_tiles]
do_entites.add_items(drop_offs)