diff --git a/environments/factory/base/base_factory.py b/environments/factory/base/base_factory.py index 633a02b..7d8448e 100644 --- a/environments/factory/base/base_factory.py +++ b/environments/factory/base/base_factory.py @@ -1,7 +1,8 @@ import abc import time +from enum import Enum from pathlib import Path -from typing import List, Union, Iterable +from typing import List, Union, Iterable, Dict import gym import numpy as np @@ -14,8 +15,8 @@ from environments.factory.base.shadow_casting import Map from environments.factory.renderer import Renderer, RenderEntity from environments.helpers import Constants as c, Constants from environments import helpers as h -from environments.factory.base.objects import Slice, Agent, Tile, Action -from environments.factory.base.registers import StateSlices, Actions, Entities, Agents, Doors, FloorTiles +from environments.factory.base.objects import Agent, Tile, Action +from environments.factory.base.registers import Actions, Entities, Agents, Doors, FloorTiles, WallTiles from environments.utility_classes import MovementProperties REC_TAC = 'rec' @@ -30,9 +31,13 @@ class BaseFactory(gym.Env): @property def observation_space(self): - slices = self._slices.n_observable_slices - level_shape = (self.pomdp_r * 2 + 1, self.pomdp_r * 2 + 1) if self.pomdp_r else self._level_shape - space = spaces.Box(low=0, high=1, shape=(slices, *level_shape), dtype=np.float32) + if r := self.pomdp_r: + z = self._obs_cube.shape[0] + xy = r*2 + 1 + level_shape = (z, xy, xy) + else: + level_shape = self._obs_cube.shape + space = spaces.Box(low=0, high=1, shape=level_shape, dtype=np.float32) return space @property @@ -51,8 +56,8 @@ class BaseFactory(gym.Env): def __init__(self, level_name='simple', n_agents=1, max_steps=int(5e2), pomdp_r: Union[None, int] = 0, movement_properties: MovementProperties = MovementProperties(), parse_doors=False, - combin_agent_slices_in_obs: bool = False, frames_to_stack=0, record_episodes=False, - omit_agent_slice_in_obs=False, done_at_collision=False, cast_shadows=True, + combin_agent_obs: bool = False, frames_to_stack=0, record_episodes=False, + omit_agent_in_obs=False, done_at_collision=False, cast_shadows=True, verbose=False, doors_have_area=True, env_seed=time.time_ns(), **kwargs): assert frames_to_stack != 1 and frames_to_stack >= 0, "'frames_to_stack' cannot be negative or 1." @@ -69,8 +74,8 @@ class BaseFactory(gym.Env): self.max_steps = max_steps self.pomdp_r = pomdp_r - self.combin_agent_slices_in_obs = combin_agent_slices_in_obs - self.omit_agent_slice_in_obs = omit_agent_slice_in_obs + self.combin_agent_obs = combin_agent_obs + self.omit_agent_in_obs = omit_agent_in_obs self.cast_shadows = cast_shadows self.frames_to_stack = frames_to_stack @@ -87,86 +92,74 @@ class BaseFactory(gym.Env): # Reset self.reset() - def _init_state_slices(self) -> StateSlices: - state_slices = StateSlices() - + def _base_init_env(self): # Objects + entities = {} # Level level_filepath = Path(__file__).parent.parent / h.LEVELS_DIR / f'{self.level_name}.txt' parsed_level = h.parse_level(level_filepath) - level = [Slice(c.LEVEL, h.one_hot_level(parsed_level), is_blocking_light=True)] - self._level_shape = level[0].shape + level_array = h.one_hot_level(parsed_level) + self._level_shape = level_array.shape + + # Walls + walls = WallTiles.from_argwhere_coordinates( + np.argwhere(level_array == c.OCCUPIED_CELL.value), + self._level_shape + ) + entities.update({c.WALLS: walls}) + + # Floor + floor = FloorTiles.from_argwhere_coordinates( + np.argwhere(level_array == c.FREE_CELL.value), + self._level_shape + ) + entities.update({c.FLOOR: floor}) + + # NOPOS + self.NO_POS_TILE = Tile(c.NO_POS, c.NO_POS.value) # Doors parsed_doors = h.one_hot_level(parsed_level, c.DOOR) - if parsed_doors.any(): - doors = [Slice(c.DOORS, parsed_doors, is_blocking_light=True)] - else: - doors = [] + if np.any(parsed_doors): + door_tiles = [floor.by_pos(pos) for pos in np.argwhere(parsed_doors == c.OCCUPIED_CELL.value)] + doors = Doors.from_tiles(door_tiles, self._level_shape, context=floor, is_blocking_light=True) + entities.update({c.DOORS: doors}) # Agents - agents = [] - agent_names = [f'{c.AGENT.value}#{i}' for i in range(self.n_agents)] + agents = Agents.from_tiles(floor.empty_tiles[:self.n_agents], self._level_shape) + entities.update({c.AGENT: agents}) - if self.combin_agent_slices_in_obs and self.omit_agent_slice_in_obs: - if self.n_agents == 1: - observables = [False] - else: - observables = [True] + ([False] * (self.n_agents - 1)) - elif self.combin_agent_slices_in_obs and not self.omit_agent_slice_in_obs: - observables = [True] + ([False] * (self.n_agents - 1)) - elif not self.combin_agent_slices_in_obs and self.omit_agent_slice_in_obs: - observables = [False] + ([True] * (self.n_agents - 1)) - elif not self.combin_agent_slices_in_obs and not self.omit_agent_slice_in_obs: - observables = [True] * self.n_agents - else: - raise RuntimeError('This should not happen!') - - for observable, agent_name in zip(observables, agent_names): - agents.append(Slice(agent_name, np.zeros_like(level[0].slice, dtype=np.float32), is_observable=observable)) - state_slices.register_additional_items(level+doors+agents+self.additional_slices) - return state_slices - - def _init_obs_cube(self) -> np.ndarray: - x, y = self._slices.by_enum(c.LEVEL).shape - state = np.zeros((len(self._slices), x, y), dtype=np.float32) - state[0] = self._slices.by_enum(c.LEVEL).slice - if r := self.pomdp_r: - self._padded_obs_cube = np.full((len(self._slices), x + r*2, y + r*2), c.FREE_CELL.value, dtype=np.float32) - self._padded_obs_cube[0] = c.OCCUPIED_CELL.value - self._padded_obs_cube[:, r:r+x, r:r+y] = state - if self.combin_agent_slices_in_obs and self.n_agents > 1: - self._combined_obs_cube = np.zeros(self.observation_space.shape, dtype=np.float32) - return state - - def _init_entities(self): - # Tile Init - self._tiles = FloorTiles.from_argwhere_coordinates(self._slices.by_enum(c.LEVEL).free_tiles) - - # Door Init - if self.parse_doors: - tiles = [self._tiles.by_pos(x) for x in self._slices.by_enum(c.DOORS).occupied_tiles] - self._doors = Doors.from_tiles(tiles, context=self._tiles, has_area=self.doors_have_area) - - # Agent Init on random positions - self._agents = Agents.from_tiles(self._base_rng.choice(self._tiles, self.n_agents)) - entities = Entities() - entities.register_additional_items([self._agents]) - - if self.parse_doors: - entities.register_additional_items([self._doors]) + # All entities + self._entities = Entities() + self._entities.register_additional_items(entities) + # Additional Entitites from SubEnvs if additional_entities := self.additional_entities: - entities.register_additional_items(additional_entities) + self._entities.register_additional_items(additional_entities) - return entities + # Return + return self._entities + + def _init_obs_cube(self): + arrays = self._entities.arrays + + if self.omit_agent_in_obs and self.n_agents == 1: + del arrays[c.AGENT] + obs_cube_z = sum([a.shape[0] if not self._entities[key].is_per_agent else 1 for key, a in arrays.items()]) + self._obs_cube = np.zeros((obs_cube_z, *self._level_shape), dtype=np.float32) + + # Optionally Pad this obs cube for pomdp cases + if r := self.pomdp_r: + x, y = self._level_shape + self._padded_obs_cube = np.full((obs_cube_z, x + r*2, y + r*2), c.SHADOWED_CELL.value, dtype=np.float32) + # self._padded_obs_cube[0] = c.OCCUPIED_CELL.value + self._padded_obs_cube[:, r:r+x, r:r+y] = self._obs_cube def reset(self) -> (np.ndarray, int, bool, dict): - self._slices = self._init_state_slices() - self._obs_cube = self._init_obs_cube() - self._entitites = self._init_entities() + _ = self._base_init_env() + self._init_obs_cube() self.do_additional_reset() - self._flush_state() + self._steps = 0 obs = self._get_observations() @@ -182,7 +175,7 @@ class BaseFactory(gym.Env): self.hook_pre_step() # Move this in a seperate function? - for action, agent in zip(actions, self._agents): + for action, agent in zip(actions, self._entities[c.AGENT]): agent.clear_temp_sate() action_obj = self._actions[action] if self._actions.is_moving_action(action_obj): @@ -200,9 +193,6 @@ class BaseFactory(gym.Env): # In-between step Hook for later use info = self.do_additional_step() - # Write to observation cube - self._flush_state() - tiles_with_collisions = self.get_all_tiles_with_collisions() for tile in tiles_with_collisions: guests = tile.guests_that_can_collide @@ -216,7 +206,7 @@ class BaseFactory(gym.Env): # Step the door close intervall if self.parse_doors: - self._doors.tick_doors() + self._entities[c.DOORS].tick_doors() # Finalize reward, reward_info = self.calculate_reward() @@ -237,9 +227,9 @@ class BaseFactory(gym.Env): def _handle_door_interaction(self, agent): # Check if agent really is standing on a door: if self.doors_have_area: - door = self._doors.get_near_position(agent.pos) + door = self._entities[c.DOORS].get_near_position(agent.pos) else: - door = self._doors.by_pos(agent.pos) + door = self._entities[c.DOORS].by_pos(agent.pos) if door is not None: door.use() return c.VALID.value @@ -247,36 +237,44 @@ class BaseFactory(gym.Env): else: return c.NOT_VALID.value - def _flush_state(self): - self._obs_cube[np.arange(len(self._slices)) != self._slices.get_idx(c.LEVEL)] = c.FREE_CELL.value - if self.parse_doors: - for door in self._doors: - if door.is_open and self._obs_cube[self._slices.get_idx(c.DOORS)][door.pos] != c.OPEN_DOOR.value: - self._obs_cube[self._slices.get_idx(c.DOORS)][door.pos] = c.OPEN_DOOR.value - elif door.is_closed and self._obs_cube[self._slices.get_idx(c.DOORS)][door.pos] != c.CLOSED_DOOR.value: - self._obs_cube[self._slices.get_idx(c.DOORS)][door.pos] = c.CLOSED_DOOR.value - for agent in self._agents: - self._obs_cube[self._slices.get_idx_by_name(agent.name)][agent.pos] = c.OCCUPIED_CELL.value - if agent.last_pos != c.NO_POS: - self._obs_cube[self._slices.get_idx_by_name(agent.name)][agent.last_pos] = c.FREE_CELL.value - def _get_observations(self) -> np.ndarray: if self.n_agents == 1: - obs = self._build_per_agent_obs(self._agents[0]) + obs = self._build_per_agent_obs(self._entities[c.AGENT][0]) elif self.n_agents >= 2: - obs = np.stack([self._build_per_agent_obs(agent) for agent in self._agents]) + obs = np.stack([self._build_per_agent_obs(agent) for agent in self._entities[c.AGENT]]) else: raise ValueError('n_agents cannot be smaller than 1!!') return obs def _build_per_agent_obs(self, agent: Agent) -> np.ndarray: - first_agent_slice = self._slices.AGENTSTARTIDX + plain_arrays = self._entities.arrays + if self.omit_agent_in_obs and self.n_agents == 1: + del plain_arrays[c.AGENT] + + running_idx, shadowing_idxs, can_be_shadowed_idxs = 0, [], [] + + for key, array in plain_arrays.items(): + if self._entities[key].is_per_agent: + per_agent_idx = self._entities[key].get_idx_by_name(agent.name) + z = 1 + self._obs_cube[running_idx: z] = array[per_agent_idx] + else: + z = array.shape[0] + self._obs_cube[running_idx: z] = array + # Define which OBS SLices cast a Shadow + if self._entities[key].is_blocking_light: + for i in range(z): + shadowing_idxs.append(running_idx + i) + # Define which OBS SLices are effected by shadows + if self._entities[key].can_be_shadowed: + for i in range(z): + can_be_shadowed_idxs.append(running_idx + i) + running_idx += z + if r := self.pomdp_r: x, y = self._level_shape self._padded_obs_cube[:, r:r + x, r:r + y] = self._obs_cube - global_x, global_y = agent.pos - global_x += r - global_y += r + global_x, global_y = map(sum, zip(agent.pos, (r, r))) x0, x1 = max(0, global_x - self.pomdp_r), global_x + self.pomdp_r + 1 y0, y1 = max(0, global_y - self.pomdp_r), global_y + self.pomdp_r + 1 obs = self._padded_obs_cube[:, x0:x1, y0:y1] @@ -284,10 +282,9 @@ class BaseFactory(gym.Env): obs = self._obs_cube if self.cast_shadows: - obs_block_light = [obs[idx] != c.OCCUPIED_CELL.value for idx, obs_slice - in enumerate(self._slices) if obs_slice.is_blocking_light] + obs_block_light = [obs[idx] != c.OCCUPIED_CELL.value for idx in shadowing_idxs] door_shadowing = False - if door := self._doors.by_pos(agent.pos): + if door := self._entities[c.DOORS].by_pos(agent.pos): if door.is_closed: for group in door.connectivity_subgroups: if agent.last_pos not in group: @@ -298,8 +295,9 @@ class BaseFactory(gym.Env): xs, ys = zip(*blocking) else: xs, ys = zip(*group) - # noinspection PyTypeChecker - obs_block_light[self._slices.get_idx(c.LEVEL)][xs, ys] = False + + # noinspection PyUnresolvedReferences + obs_block_light[0][xs, ys] = False light_block_map = Map((np.prod(obs_block_light, axis=0) != True).astype(int)) if self.pomdp_r: @@ -310,28 +308,18 @@ class BaseFactory(gym.Env): # noinspection PyUnboundLocalVariable light_block_map[xs, ys] = 0 agent.temp_light_map = light_block_map - for obs_idx in range(obs.shape[0]): - if self._slices[obs_idx].can_be_shadowed: - obs[obs_idx] = (obs[obs_idx] * light_block_map) - ( - (1 - light_block_map) * obs[self._slices.get_idx(c.LEVEL)] - ) + for obs_idx in can_be_shadowed_idxs: + obs[obs_idx] = (obs[obs_idx] * light_block_map) - ( + (1 - light_block_map) * obs[0] + ) - if self.combin_agent_slices_in_obs and self.n_agents > 1: - agent_obs = np.sum(obs[[key for key, l_slice in self._slices.items() if c.AGENT.name in l_slice.name and - (not self.omit_agent_slice_in_obs and l_slice.name != agent.name)]], - axis=0, keepdims=True) - obs = np.concatenate((obs[:first_agent_slice], agent_obs, obs[first_agent_slice+self.n_agents:])) return obs else: - if self.omit_agent_slice_in_obs: - obs_new = obs[[key for key, val in self._slices.items() if val.name != agent.name]] - return obs_new - else: - return obs + return obs def get_all_tiles_with_collisions(self) -> List[Tile]: tiles_with_collisions = list() - for tile in self._tiles: + for tile in self._entities[c.FLOOR]: if tile.is_occupied(): guests = [guest for guest in tile.guests if guest.can_collide] if len(guests) >= 2: @@ -353,7 +341,7 @@ class BaseFactory(gym.Env): x_new = agent.x + x_diff y_new = agent.y + y_diff - new_tile = self._tiles.by_pos((x_new, y_new)) + new_tile = self._entities[c.FLOOR].by_pos((x_new, y_new)) if new_tile: valid = c.VALID else: @@ -362,13 +350,13 @@ class BaseFactory(gym.Env): return tile, valid if self.parse_doors and agent.last_pos != c.NO_POS: - if door := self._doors.by_pos(new_tile.pos): + if door := self._entities[c.DOORS].by_pos(new_tile.pos): if door.can_collide: return agent.tile, c.NOT_VALID else: # door.is_closed: pass - if door := self._doors.by_pos(agent.pos): + if door := self._entities[c.DOORS].by_pos(agent.pos): if door.is_open: pass else: # door.is_closed: @@ -388,7 +376,7 @@ class BaseFactory(gym.Env): info_dict = dict() reward = 0 - for agent in self._agents: + for agent in self._entities[c.AGENT]: if self._actions.is_moving_action(agent.temp_action): if agent.temp_valid: # info_dict.update(movement=1) @@ -427,16 +415,15 @@ class BaseFactory(gym.Env): height, width = self._obs_cube.shape[1:] self._renderer = Renderer(width, height, view_radius=self.pomdp_r, fps=5) - walls = [RenderEntity('wall', pos) - for pos in np.argwhere(self._slices.by_enum(c.LEVEL).slice == c.OCCUPIED_CELL.value)] + walls = [RenderEntity('wall', wall.pos) for wall in self._entities[c.WALLS]] agents = [] - for i, agent in enumerate(self._agents): + for i, agent in enumerate(self._entities[c.AGENT]): name, state = h.asset_str(agent) agents.append(RenderEntity(name, agent.pos, 1, 'none', state, i + 1, agent.temp_light_map)) doors = [] if self.parse_doors: - for i, door in enumerate(self._doors): + for i, door in enumerate(self._entities[c.DOORS]): name, state = 'door_open' if door.is_open else 'door_closed', 'blank' doors.append(RenderEntity(name, door.pos, 1, 'none', state, i + 1)) additional_assets = self.render_additional_assets() @@ -454,7 +441,9 @@ class BaseFactory(gym.Env): def _summarize_state(self): summary = {f'{REC_TAC}_step': self._steps} - for entity in self._entitites: + + self._entities[c.WALLS].summarize_state() + for entity in self._entities: if hasattr(entity, 'summarize_state'): summary.update({f'{REC_TAC}_{entity.name}': entity.summarize_state()}) return summary @@ -475,24 +464,14 @@ class BaseFactory(gym.Env): return [] @property - def additional_entities(self) -> Union[Entities, List[Entities]]: + def additional_entities(self) -> Dict[(Enum, Entities)]: """ When heriting from this Base Class, you musst implement this methode!!! :return: A single Entites collection or a list of such. :rtype: Union[Entities, List[Entities]] """ - return [] - - @property - def additional_slices(self) -> Union[Slice, List[Slice]]: - """ - When heriting from this Base Class, you musst implement this methode!!! - - :return: A list of Slice-objects. - :rtype: List[Slice] - """ - return [] + return {} # Functions which provide additions to functions of the base class # Always call super!!!!!! diff --git a/environments/factory/base/objects.py b/environments/factory/base/objects.py index 757579d..39a4e44 100644 --- a/environments/factory/base/objects.py +++ b/environments/factory/base/objects.py @@ -4,22 +4,23 @@ from environments.helpers import Constants as c import itertools -def sub(p, q): - return p - q - - class Object: def __bool__(self): return True + @property + def is_blocking_light(self): + return self._is_blocking_light + @property def name(self): return self._name - def __init__(self, name, name_is_identifier=False, **kwargs): + def __init__(self, name, name_is_identifier=False, is_blocking_light=False, **kwargs): name = name.name if hasattr(name, 'name') else name self._name = f'{self.__class__.__name__}#{name}' if name_is_identifier else name + self._is_blocking_light = is_blocking_light if kwargs: print(f'Following kwargs were passed, but ignored: {kwargs}') @@ -33,40 +34,6 @@ class Action(Object): super(Action, self).__init__(*args) -class Slice(Object): - - @property - def is_observable(self): - return self._is_observable - - @property - def shape(self): - return self.slice.shape - - @property - def occupied_tiles(self): - return np.argwhere(self.slice == c.OCCUPIED_CELL.value) - - @property - def free_tiles(self): - return np.argwhere(self.slice == c.FREE_CELL.value) - - def __init__(self, identifier, arrayslice, is_blocking_light=False, can_be_shadowed=True, is_observable=True): - super(Slice, self).__init__(identifier) - self.slice = arrayslice - self.is_blocking_light = is_blocking_light - self.can_be_shadowed = can_be_shadowed - self._is_observable = is_observable - - def set_slice(self, new_slice: np.ndarray): - assert self.slice.shape == new_slice.shape - self.slice = new_slice - - -class Wall(Object): - pass - - class Tile(Object): @property @@ -118,6 +85,10 @@ class Tile(Object): return True +class Wall(Tile): + pass + + class Entity(Object): @property @@ -153,41 +124,6 @@ class Entity(Object): return self.__dict__.copy() -class MoveableEntity(Entity): - - @property - def last_tile(self): - return self._last_tile - - @property - def last_pos(self): - if self._last_tile: - return self._last_tile.pos - else: - return c.NO_POS - - @property - def direction_of_view(self): - last_x, last_y = self.last_pos - curr_x, curr_y = self.pos - return last_x-curr_x, last_y-curr_y - - def __init__(self, *args, **kwargs): - super(MoveableEntity, self).__init__(*args, **kwargs) - self._last_tile = None - - def move(self, next_tile): - curr_tile = self.tile - if curr_tile != next_tile: - next_tile.enter(self) - curr_tile.leave(self) - self._tile = next_tile - self._last_tile = curr_tile - return True - else: - return False - - class Door(Entity): @property @@ -268,6 +204,41 @@ class Door(Entity): return False +class MoveableEntity(Entity): + + @property + def last_tile(self): + return self._last_tile + + @property + def last_pos(self): + if self._last_tile: + return self._last_tile.pos + else: + return c.NO_POS + + @property + def direction_of_view(self): + last_x, last_y = self.last_pos + curr_x, curr_y = self.pos + return last_x-curr_x, last_y-curr_y + + def __init__(self, *args, **kwargs): + super(MoveableEntity, self).__init__(*args, **kwargs) + self._last_tile = None + + def move(self, next_tile): + curr_tile = self.tile + if curr_tile != next_tile: + next_tile.enter(self) + curr_tile.leave(self) + self._tile = next_tile + self._last_tile = curr_tile + return True + else: + return False + + class Agent(MoveableEntity): def __init__(self, *args, **kwargs): diff --git a/environments/factory/base/registers.py b/environments/factory/base/registers.py index 18e884d..b60959d 100644 --- a/environments/factory/base/registers.py +++ b/environments/factory/base/registers.py @@ -1,10 +1,11 @@ import random +from abc import ABC from enum import Enum -from typing import List, Union +from typing import List, Union, Dict import numpy as np -from environments.factory.base.objects import Entity, Tile, Agent, Door, Slice, Action +from environments.factory.base.objects import Entity, Tile, Agent, Door, Action, Wall from environments.utility_classes import MovementProperties from environments import helpers as h from environments.helpers import Constants as c @@ -13,10 +14,6 @@ from environments.helpers import Constants as c class Register: _accepted_objects = Entity - @classmethod - def from_argwhere_coordinates(cls, positions: [(int, int)], tiles): - return cls.from_tiles([tiles.by_pos(position) for position in positions]) - @property def name(self): return self.__class__.__name__ @@ -25,7 +22,7 @@ class Register: def n(self): return len(self) - def __init__(self): + def __init__(self, *args, **kwargs): self._register = dict() self._names = dict() @@ -35,17 +32,18 @@ class Register: def __iter__(self): return iter(self.values()) - def __add__(self, other: _accepted_objects): + def register_item(self, other: _accepted_objects): assert isinstance(other, self._accepted_objects), f'All item names have to be of type ' \ f'{self._accepted_objects}, ' \ f'but were {other.__class__}.,' - self._names.update({other.name: len(self._register)}) - self._register.update({len(self._register): other}) + new_idx = len(self._register) + self._names.update({other.name: new_idx}) + self._register.update({new_idx: other}) return self def register_additional_items(self, others: List[_accepted_objects]): for other in others: - self + other + self.register_item(other) return self def keys(self): @@ -60,8 +58,9 @@ class Register: def __getitem__(self, item): try: return self._register[item] - except KeyError: + except KeyError as e: print('NO') + print(e) raise def by_name(self, item): @@ -82,29 +81,66 @@ class Register: def get_idx(self, enum_obj: Enum): return self._names[enum_obj.name] + +class ObjectRegister(Register): + def __init__(self, level_shape: (int, int), *args, individual_slices=False, is_per_agent=False, **kwargs): + super(ObjectRegister, self).__init__(*args, **kwargs) + self.is_per_agent = is_per_agent + self.individual_slices = individual_slices + self._level_shape = level_shape + self._array = None + + def register_item(self, other): + super(ObjectRegister, self).register_item(other) + if self._array is None: + self._array = np.zeros((1, *self._level_shape)) + else: + if self.individual_slices: + self._array = np.concatenate((self._array, np.zeros(1, *self._level_shape))) + + +class EntityObjectRegister(ObjectRegister, ABC): + + def as_array(self): + raise NotImplementedError + @classmethod - def from_tiles(cls, tiles, **kwargs): + def from_tiles(cls, tiles, *args, **kwargs): # objects_name = cls._accepted_objects.__name__ - entities = [cls._accepted_objects(i, tile, name_is_identifier=True, **kwargs) for i, tile in enumerate(tiles)] - registered_obj = cls() - registered_obj.register_additional_items(entities) - return registered_obj + entities = [cls._accepted_objects(i, tile, name_is_identifier=True, **kwargs) + for i, tile in enumerate(tiles)] + register_obj = cls(*args) + register_obj.register_additional_items(entities) + return register_obj - -class EntityRegister(Register): + @classmethod + def from_argwhere_coordinates(cls, positions: [(int, int)], tiles, *args, **kwargs): + return cls.from_tiles([tiles.by_pos(position) for position in positions], *args, **kwargs) @property def positions(self): - return [agent.pos for agent in self] + return list(self._tiles.keys()) - def __init__(self): - super(EntityRegister, self).__init__() + @property + def tiles(self): + return [entity.tile for entity in self] + + def __init__(self, *args, is_blocking_light=False, is_observable=True, can_be_shadowed=True, **kwargs): + super(EntityObjectRegister, self).__init__(*args, **kwargs) + self.can_be_shadowed = can_be_shadowed self._tiles = dict() + self.is_blocking_light = is_blocking_light + self.is_observable = is_observable - def __add__(self, other): - super(EntityRegister, self).__add__(other) + def register_item(self, other): + super(EntityObjectRegister, self).register_item(other) self._tiles[other.pos] = other + def register_additional_items(self, others): + for other in others: + self.register_item(other) + return self + def by_pos(self, pos): if isinstance(pos, np.ndarray): pos = tuple(pos) @@ -114,9 +150,34 @@ class EntityRegister(Register): return None +class MovingEntityObjectRegister(EntityObjectRegister, ABC): + + def __init__(self, *args, **kwargs): + super(MovingEntityObjectRegister, self).__init__(*args, **kwargs) + + def by_pos(self, pos): + if isinstance(pos, np.ndarray): + pos = tuple(pos) + try: + return [x for x in self if x == pos][0] + except IndexError: + return None + + def delete_item(self, item): + self + + class Entities(Register): - _accepted_objects = Register + _accepted_objects = EntityObjectRegister + + @property + def arrays(self): + return {key: val.as_array() for key, val in self.items() if val.is_observable} + + @property + def names(self): + return list(self._register.keys()) def __init__(self): super(Entities, self).__init__() @@ -124,23 +185,64 @@ class Entities(Register): def __iter__(self): return iter([x for sublist in self.values() for x in sublist]) - @classmethod - def from_argwhere_coordinates(cls, positions): - raise AttributeError() + def register_item(self, other: dict): + assert not any([key for key in other.keys() if key in self._names]), \ + "This group of entities has already been registered!" + self._register.update(other) + return self + + def register_additional_items(self, others: Dict): + return self.register_item(others) -class FloorTiles(EntityRegister): - _accepted_objects = Tile +class WallTiles(EntityObjectRegister): + _accepted_objects = Wall + _light_blocking = True + + def as_array(self): + if not np.any(self._array): + x, y = zip(*[x.pos for x in self]) + self._array[0, x, y] = self.encoding + return self._array + + def __init__(self, *args, **kwargs): + super(WallTiles, self).__init__(*args, individual_slices=False, is_blocking_light=self._light_blocking, **kwargs) + + @property + def encoding(self): + return c.OCCUPIED_CELL.value + + @property + def array(self): + return self._array @classmethod - def from_argwhere_coordinates(cls, argwhere_coordinates): - tiles = cls() + def from_argwhere_coordinates(cls, argwhere_coordinates, *args, **kwargs): + tiles = cls(*args, **kwargs) # noinspection PyTypeChecker tiles.register_additional_items( - [cls._accepted_objects(i, pos, name_is_identifier=True) for i, pos in enumerate(argwhere_coordinates)] + [cls._accepted_objects(i, pos, name_is_identifier=True, is_blocking_light=cls._light_blocking) + for i, pos in enumerate(argwhere_coordinates)] ) return tiles + @classmethod + def from_tiles(cls, tiles, *args, **kwargs): + raise RuntimeError() + + +class FloorTiles(WallTiles): + + _accepted_objects = Tile + _light_blocking = False + + def __init__(self, *args, **kwargs): + super(self.__class__, self).__init__(*args, is_observable=False, **kwargs) + + @property + def encoding(self): + return c.FREE_CELL.value + @property def occupied_tiles(self): tiles = [tile for tile in self if tile.is_occupied()] @@ -153,8 +255,22 @@ class FloorTiles(EntityRegister): random.shuffle(tiles) return tiles + @classmethod + def from_tiles(cls, tiles, *args, **kwargs): + raise RuntimeError() -class Agents(EntityRegister): + +class Agents(MovingEntityObjectRegister): + + def as_array(self): + self._array[:] = c.FREE_CELL.value + # noinspection PyTupleAssignmentBalance + z, x, y = range(len(self)), *zip(*[x.pos for x in self]) + self._array[z, x, y] = c.OCCUPIED_CELL.value + if self.individual_slices: + return self._array + else: + return self._array.sum(axis=0, keepdims=True) _accepted_objects = Agent @@ -163,7 +279,17 @@ class Agents(EntityRegister): return [agent.pos for agent in self] -class Doors(EntityRegister): +class Doors(EntityObjectRegister): + + def __init__(self, *args, **kwargs): + super(Doors, self).__init__(*args, is_blocking_light=True, **kwargs) + + def as_array(self): + self._array[:] = 0 + for door in self: + self._array[0, door.x, door.y] = door.encoding + return self._array + _accepted_objects = Door def get_near_position(self, position: (int, int)) -> Union[None, Door]: @@ -221,47 +347,6 @@ class Actions(Register): return action == h.EnvActions.USE_DOOR.name -class StateSlices(Register): - - _accepted_objects = Slice - @property - def n_observable_slices(self): - return len([x for x in self if x.is_observable]) - - - @property - def AGENTSTARTIDX(self): - if self._agent_start_idx: - return self._agent_start_idx - else: - self._agent_start_idx = min([idx for idx, x in self.items() if c.AGENT.value in x.name]) - return self._agent_start_idx - - def __init__(self): - super(StateSlices, self).__init__() - self._agent_start_idx = None - - def _gather_occupation(self, excluded_slices): - exclusion = excluded_slices or [] - assert isinstance(exclusion, (int, list)) - exclusion = exclusion if isinstance(exclusion, list) else [exclusion] - - result = np.sum([x for i, x in self.items() if i not in exclusion], axis=0) - return result - - def free_cells(self, excluded_slices: Union[None, List[int], int] = None) -> np.array: - occupation = self._gather_occupation(excluded_slices) - free_cells = np.argwhere(occupation == c.IS_FREE_CELL) - np.random.shuffle(free_cells) - return free_cells - - def occupied_cells(self, excluded_slices: Union[None, List[int], int] = None) -> np.array: - occupation = self._gather_occupation(excluded_slices) - occupied_cells = np.argwhere(occupation == c.IS_OCCUPIED_CELL.value) - np.random.shuffle(occupied_cells) - return occupied_cells - - class Zones(Register): @property @@ -279,9 +364,9 @@ class Zones(Register): self._accounting_zones = list() self._danger_zones = list() for symbol in np.unique(parsed_level): - if symbol == h.WALL: + if symbol == c.WALL.value: continue - elif symbol == h.DANGER_ZONE: + elif symbol == c.DANGER_ZONE.value: self + symbol slices.append(h.one_hot_level(parsed_level, symbol)) self._danger_zones.append(symbol) diff --git a/environments/factory/double_task_factory.py b/environments/factory/double_task_factory.py index 76e537b..50710a9 100644 --- a/environments/factory/double_task_factory.py +++ b/environments/factory/double_task_factory.py @@ -1,22 +1,21 @@ import time -from collections import deque +from collections import deque, UserList from enum import Enum -from typing import List, Union, NamedTuple +from typing import List, Union, NamedTuple, Dict import numpy as np from environments.factory.simple_factory import SimpleFactory from environments.helpers import Constants as c from environments import helpers as h -from environments.factory.base.objects import Agent, Slice, Entity, Action -from environments.factory.base.registers import Entities, Register, EntityRegister +from environments.factory.base.objects import Agent, Entity, Action, Tile, MoveableEntity +from environments.factory.base.registers import Entities, EntityObjectRegister, ObjectRegister, \ + MovingEntityObjectRegister from environments.factory.renderer import RenderEntity -PICK_UP = 'pick_up' -DROP_OFF = 'drop_off' NO_ITEM = 0 -ITEM_DROP_OFF = -1 +ITEM_DROP_OFF = 1 def inventory_slice_name(agent_i): @@ -26,8 +25,106 @@ def inventory_slice_name(agent_i): return f'{c.INVENTORY.name}_{agent_i}' +class Item(MoveableEntity): + + @property + def can_collide(self): + return False + + def encoding(self): + # Edit this if you want items to be drawn in the ops differntly + return 1 + + +class ItemRegister(MovingEntityObjectRegister): + + def as_array(self): + self._array[:] = c.FREE_CELL.value + for item in self: + if item.pos != c.NO_POS.value: + self._array[0, item.x, item.y] = item.encoding() + return self._array + + _accepted_objects = Item + + def spawn_items(self, tiles: List[Tile]): + items = [Item(idx, tile) for idx, tile in enumerate(tiles)] + self.register_additional_items(items) + + +class Inventory(UserList): + + @property + def is_blocking_light(self): + return False + + @property + def name(self): + return self.agent.name + + def __init__(self, pomdp_r: int, level_shape: (int, int), agent: Agent, capacity: int): + super(Inventory, self).__init__() + self.agent = agent + self.capacity = capacity + self.pomdp_r = pomdp_r + self._level_shape = level_shape + self._array = np.zeros((1, *self._level_shape)) + + def as_array(self): + self._array[:] = c.FREE_CELL.value + max_x = self.pomdp_r * 2 + 1 if self.pomdp_r else self._level_shape[0] + if self.pomdp_r: + x, y = max(self.agent.x - self.pomdp_r, 0), max(self.agent.y - self.pomdp_r, 0) + else: + x, y = (0, 0) + + for item_idx, item in enumerate(self): + x_diff, y_diff = divmod(item_idx, max_x) + self._array[0].slice[int(x + x_diff), int(y + y_diff)] = item.encoding + return self._array + + def __repr__(self): + return f'{self.__class__.__name__}[{self.agent.name}]({self.data})' + + def append(self, item) -> None: + if len(self) < self.capacity: + super(Inventory, self).append(item) + else: + raise RuntimeError('Inventory is full') + + +class Inventories(ObjectRegister): + + _accepted_objects = Inventory + is_blocking_light = False + can_be_shadowed = False + + def __init__(self, *args, **kwargs): + super(Inventories, self).__init__(*args, is_per_agent=True, **kwargs) + self.is_observable = True + + def as_array(self): + # self._array[:] = c.FREE_CELL.value + for inv_idx, inventory in enumerate(self): + self._array[inv_idx] = inventory.as_array() + return self._array + + def spawn_inventories(self, agents, pomdp_r, capacity): + inventories = [self._accepted_objects(pomdp_r, self._level_shape, agent, capacity) + for _, agent in enumerate(agents)] + self.register_additional_items(inventories) + + class DropOffLocation(Entity): + @property + def can_collide(self): + return False + + @property + def encoding(self): + return ITEM_DROP_OFF + def __init__(self, *args, storage_size_until_full: int = 5, **kwargs): super(DropOffLocation, self).__init__(*args, **kwargs) self.storage = deque(maxlen=storage_size_until_full or None) @@ -45,20 +142,28 @@ class DropOffLocation(Entity): return False if not self.storage.maxlen else self.storage.maxlen == len(self.storage) -class DropOffLocations(EntityRegister): +class DropOffLocations(EntityObjectRegister): + _accepted_objects = DropOffLocation + def as_array(self): + self._array[:] = c.FREE_CELL.value + for item in self: + if item.pos != c.NO_POS.value: + self._array[0, item.x, item.y] = item.encoding + return self._array + class ItemProperties(NamedTuple): n_items: int = 5 # How many items are there at the same time spawn_frequency: int = 5 # Spawn Frequency in Steps n_drop_off_locations: int = 5 # How many DropOff locations are there at the same time max_dropoff_storage_size: int = 0 # How many items are needed until the drop off is full - max_agent_storage_size: int = 5 # How many items are needed until the agent inventory is full + max_agent_inventory_capacity: int = 5 # How many items are needed until the agent inventory is full agent_can_interact: bool = True # Whether agents have the possibility to interact with the domain items -# noinspection PyAttributeOutsideInit,PyUnresolvedReferences +# noinspection PyAttributeOutsideInit, PyAbstractClass class DoubleTaskFactory(SimpleFactory): # noinspection PyMissingConstructor def __init__(self, item_properties: ItemProperties, *args, with_dirt=False, env_seed=time.time_ns(), **kwargs): @@ -66,48 +171,34 @@ class DoubleTaskFactory(SimpleFactory): kwargs.update(env_seed=env_seed) self._item_rng = np.random.default_rng(env_seed) assert item_properties.n_items < kwargs.get('pomdp_r', 0) ** 2 or not kwargs.get('pomdp_r', 0) - self._super = self.__class__ if with_dirt else SimpleFactory + self._super = DoubleTaskFactory if with_dirt else SimpleFactory super(self._super, self).__init__(*args, **kwargs) @property def additional_actions(self) -> Union[Action, List[Action]]: + # noinspection PyUnresolvedReferences super_actions = super(self._super, self).additional_actions super_actions.append(Action(h.EnvActions.ITEM_ACTION)) return super_actions @property - def additional_entities(self) -> Union[Entities, List[Entities]]: + def additional_entities(self) -> Dict[(Enum, Entities)]: + # noinspection PyUnresolvedReferences super_entities = super(self._super, self).additional_entities - self._drop_offs = self.spawn_drop_off_location() - return super_entities + [self._drop_offs] - @property - def additional_slices(self) -> Union[Slice, List[Slice]]: - super_slices = super(self._super, self).additional_slices - super_slices.append(Slice(c.ITEM, np.zeros(self._level_shape))) - super_slices.extend([Slice(inventory_slice_name(agent_i), np.zeros(self._level_shape), can_be_shadowed=False) - for agent_i in range(self.n_agents)]) - return super_slices + empty_tiles = self._entities[c.FLOOR].empty_tiles[:self.item_properties.n_drop_off_locations] + drop_offs = DropOffLocations.from_tiles(empty_tiles, self._level_shape, + storage_size_until_full=self.item_properties.max_dropoff_storage_size) + item_register = ItemRegister(self._level_shape) + empty_tiles = self._entities[c.FLOOR].empty_tiles[:self.item_properties.n_items] + item_register.spawn_items(empty_tiles) - def _flush_state(self): - super(self._super, self)._flush_state() + inventories = Inventories(self._level_shape) + inventories.spawn_inventories(self._entities[c.AGENT], self.pomdp_r, + self.item_properties.max_agent_inventory_capacity) - # Flush environmental item state - slice_idx = self._slices.get_idx(c.ITEM) - self._obs_cube[slice_idx] = self._slices[slice_idx].slice - - # Flush per agent inventory state - for agent in self._agents: - agent_slice_idx = self._slices.get_idx_by_name(inventory_slice_name(agent.name)) - # Hard reset the Inventory Stat in OBS cube - self._slices[agent_slice_idx].slice[:] = 0 - if len(agent.inventory) > 0: - max_x = self.pomdp_r * 2 + 1 if self.pomdp_r else self._level_shape[0] - x, y = (0, 0) if not self.pomdp_r else (max(agent.x - self.pomdp_r, 0), max(agent.y - self.pomdp_r, 0)) - for item_idx, item in enumerate(agent.inventory): - x_diff, y_diff = divmod(item_idx, max_x) - self._slices[agent_slice_idx].slice[int(x+x_diff), int(y+y_diff)] = item - self._obs_cube[agent_slice_idx] = self._slices[agent_slice_idx].slice + super_entities.update({c.DROP_OFF: drop_offs, c.ITEM: item_register, c.INVENTORY: inventories}) + return super_entities def _is_item_action(self, action): if isinstance(action, int): @@ -117,29 +208,25 @@ class DoubleTaskFactory(SimpleFactory): return action == h.EnvActions.ITEM_ACTION.name def do_item_action(self, agent: Agent): - item_slice = self._slices.by_enum(c.ITEM).slice - - if item := item_slice[agent.pos]: - if item == ITEM_DROP_OFF: - if agent.inventory: - drop_off = self._drop_offs.by_pos(agent.pos) - valid = drop_off.place_item(agent.inventory.pop(0)) - return valid - else: - return c.NOT_VALID - - elif item != NO_ITEM: - max_sto_size = self.item_properties.max_agent_storage_size or np.prod(self.observation_space.shape[1:]) - if len(agent.inventory) < max_sto_size: - agent.inventory.append(item_slice[agent.pos]) - item_slice[agent.pos] = NO_ITEM - else: - return c.NOT_VALID - return c.VALID + inventory = self._entities[c.INVENTORY].by_name(agent.name) + if drop_off := self._entities[c.DROP_OFF].by_pos(agent.pos): + if inventory: + valid = drop_off.place_item(inventory.pop(0)) + return valid + else: + return c.NOT_VALID + elif item := self._entities[c.ITEM].by_pos(agent.pos): + try: + inventory.append(item) + item.move(self.NO_POS_TILE) + return c.VALID + except RuntimeError: + return c.NOT_VALID else: return c.NOT_VALID def do_additional_actions(self, agent: Agent, action: int) -> Union[None, bool]: + # noinspection PyUnresolvedReferences valid = super(self._super, self).do_additional_actions(agent, action) if valid is None: if self._is_item_action(action): @@ -154,38 +241,35 @@ class DoubleTaskFactory(SimpleFactory): return valid def do_additional_reset(self) -> None: + # noinspection PyUnresolvedReferences super(self._super, self).do_additional_reset() - self.spawn_items(self.item_properties.n_items) self._next_item_spawn = self.item_properties.spawn_frequency - for agent in self._agents: - agent.inventory = list() + self.trigger_item_spawn() + + def trigger_item_spawn(self): + if item_to_spawns := max(0, (self.item_properties.n_items - len(self._entities[c.ITEM]))): + empty_tiles = self._entities[c.FLOOR].empty_tiles[:item_to_spawns] + self._entities[c.ITEM].spawn_items(empty_tiles) + self._next_item_spawn = self.item_properties.spawn_frequency + self.print(f'{item_to_spawns} new items have been spawned; next spawn in {self._next_item_spawn}') + else: + self.print('No Items are spawning, limit is reached.') def do_additional_step(self) -> dict: + # noinspection PyUnresolvedReferences info_dict = super(self._super, self).do_additional_step() if not self._next_item_spawn: - if item_to_spawns := max(0, (self.item_properties.n_items - - (np.sum(self._slices.by_enum(c.ITEM).slice.astype(bool)) - 1))): - self.spawn_items(item_to_spawns) - self._next_item_spawn = self.item_properties.spawn_frequency - else: - self.print('No Items are spawning, limit is reached.') + self.trigger_item_spawn() else: self._next_item_spawn -= 1 return info_dict - def spawn_drop_off_location(self): - empty_tiles = self._tiles.empty_tiles[:self.item_properties.n_drop_off_locations] - drop_offs = DropOffLocations.from_tiles(empty_tiles, - storage_size_until_full=self.item_properties.max_dropoff_storage_size) - xs, ys = zip(*[drop_off.pos for drop_off in drop_offs]) - self._slices.by_enum(c.ITEM).slice[xs, ys] = ITEM_DROP_OFF - return drop_offs - def calculate_additional_reward(self, agent: Agent) -> (int, dict): + # noinspection PyUnresolvedReferences reward, info_dict = super(self._super, self).calculate_additional_reward(agent) if self._is_item_action(agent.temp_action): if agent.temp_valid: - if agent.pos in self._drop_offs.positions: + if self._entities[c.DROP_OFF].by_pos(agent.pos): info_dict.update({f'{agent.name}_item_dropoff': 1}) reward += 1 @@ -198,21 +282,14 @@ class DoubleTaskFactory(SimpleFactory): return reward, info_dict def render_additional_assets(self, mode='human'): + # noinspection PyUnresolvedReferences additional_assets = super(self._super, self).render_additional_assets() - item_slice = self._slices.by_enum(c.ITEM).slice - items = [RenderEntity(DROP_OFF if item_slice[tile.pos] == ITEM_DROP_OFF else c.ITEM.value, tile.pos) - for tile in [tile for tile in self._tiles if item_slice[tile.pos] != NO_ITEM]] + items = [RenderEntity(c.ITEM.value, item.tile.pos) for item in self._entities[c.ITEM]] additional_assets.extend(items) + drop_offs = [RenderEntity(c.DROP_OFF.value, drop_off.tile.pos) for drop_off in self._entities[c.DROP_OFF]] + additional_assets.extend(drop_offs) return additional_assets - def spawn_items(self, n_items): - tiles = self._tiles.empty_tiles[:n_items] - item_slice = self._slices.by_enum(c.ITEM).slice - # when all items should be 1 - xs, ys = zip(*[tile.pos for tile in tiles]) - item_slice[xs, ys] = 1 - pass - if __name__ == '__main__': import random @@ -226,6 +303,7 @@ if __name__ == '__main__': record_episodes=False, verbose=False ) + # noinspection DuplicatedCode n_actions = factory.action_space.n - 1 _ = factory.observation_space diff --git a/environments/factory/simple_factory.py b/environments/factory/simple_factory.py index 78f614d..e5cf2eb 100644 --- a/environments/factory/simple_factory.py +++ b/environments/factory/simple_factory.py @@ -1,6 +1,6 @@ import time from enum import Enum -from typing import List, Union, NamedTuple +from typing import List, Union, NamedTuple, Dict import random import numpy as np @@ -8,8 +8,8 @@ import numpy as np from environments.helpers import Constants as c from environments import helpers as h from environments.factory.base.base_factory import BaseFactory -from environments.factory.base.objects import Agent, Action, Slice -from environments.factory.base.registers import Entities +from environments.factory.base.objects import Agent, Action, Entity +from environments.factory.base.registers import Entities, MovingEntityObjectRegister from environments.factory.renderer import RenderEntity from environments.utility_classes import MovementProperties @@ -36,6 +36,70 @@ class DirtProperties(NamedTuple): on_obs_slice: Enum = ObsSlice.LEVEL +class Dirt(Entity): + + @property + def can_collide(self): + return False + + @property + def amount(self): + return self._amount + + def encoding(self): + # Edit this if you want items to be drawn in the ops differntly + return self._amount + + def __init__(self, *args, amount=None, **kwargs): + super(Dirt, self).__init__(*args, **kwargs) + self._amount = amount + + def set_new_amount(self, amount): + self._amount = amount + + +class DirtRegister(MovingEntityObjectRegister): + + def as_array(self): + if self._array is not None: + self._array[:] = c.FREE_CELL.value + for key, dirt in self.items(): + if dirt.amount == 0: + self.delete_item(key) + self._array[0, dirt.x, dirt.y] = dirt.amount + else: + self._array = np.zeros((1, *self._level_shape)) + return self._array + + _accepted_objects = Dirt + + @property + def amount(self): + return sum([dirt.amount for dirt in self]) + + @property + def dirt_properties(self): + return self._dirt_properties + + def __init__(self, dirt_properties, *args): + super(DirtRegister, self).__init__(*args) + self._dirt_properties: DirtProperties = dirt_properties + + def spawn_dirt(self, then_dirty_tiles) -> None: + if not self.amount > self.dirt_properties.max_global_amount: + # randomly distribute dirt across the grid + for tile in then_dirty_tiles: + dirt = self.by_pos(tile.pos) + if dirt is None: + dirt = Dirt(0, tile, amount=self.dirt_properties.gain_amount) + self.register_item(dirt) + else: + new_value = dirt.amount + self.dirt_properties.gain_amount + dirt.set_new_amount(min(new_value, self.dirt_properties.max_local_amount)) + else: + pass + + def softmax(x): """Compute softmax values for each sets of scores in x.""" e_x = np.exp(x - np.max(x)) @@ -46,7 +110,7 @@ def entropy(x): return -(x * np.log(x + 1e-8)).sum() -# noinspection PyAttributeOutsideInit +# noinspection PyAttributeOutsideInit, PyAbstractClass class SimpleFactory(BaseFactory): @property @@ -57,16 +121,12 @@ class SimpleFactory(BaseFactory): return super_actions @property - def additional_entities(self) -> Union[Entities, List[Entities]]: + def additional_entities(self) -> Dict[(Enum, Entities)]: super_entities = super(SimpleFactory, self).additional_entities + dirt_register = DirtRegister(self.dirt_properties, self._level_shape) + super_entities.update(({c.DIRT: dirt_register})) return super_entities - @property - def additional_slices(self) -> List[Slice]: - super_slices = super(SimpleFactory, self).additional_slices - super_slices.extend([Slice(c.DIRT, np.zeros(self._level_shape))]) - return super_slices - def _is_clean_up_action(self, action: Union[str, Action, int]): if isinstance(action, int): action = self._actions[action] @@ -77,62 +137,48 @@ class SimpleFactory(BaseFactory): def __init__(self, *args, dirt_properties: DirtProperties = DirtProperties(), env_seed=time.time_ns(), **kwargs): self.dirt_properties = dirt_properties self._dirt_rng = np.random.default_rng(env_seed) + self._dirt: DirtRegister kwargs.update(env_seed=env_seed) super(SimpleFactory, self).__init__(*args, **kwargs) - def _flush_state(self): - super(SimpleFactory, self)._flush_state() - dirt_slice_idx = self._slices.get_idx(c.DIRT) - self._obs_cube[dirt_slice_idx] = self._slices[dirt_slice_idx].slice - def render_additional_assets(self, mode='human'): additional_assets = super(SimpleFactory, self).render_additional_assets() - dirt_slice = self._slices.by_enum(c.DIRT).slice - dirt = [RenderEntity('dirt', tile.pos, min(0.15 + dirt_slice[tile.pos], 1.5), 'scale') - for tile in [tile for tile in self._tiles if dirt_slice[tile.pos]]] + dirt = [RenderEntity('dirt', dirt.tile.pos, min(0.15 + dirt.amount, 1.5), 'scale') + for dirt in self._entities[c.DIRT]] additional_assets.extend(dirt) return additional_assets - def spawn_dirt(self) -> None: - dirt_slice = self._slices.by_enum(c.DIRT).slice - # dirty_tiles = [tile for tile in self._tiles if dirt_slice[tile.pos]] - curr_dirt_amount = dirt_slice.sum() - if not curr_dirt_amount > self.dirt_properties.max_global_amount: - free_for_dirt = self._tiles.empty_tiles - - # randomly distribute dirt across the grid - new_spawn = self._dirt_rng.uniform(0, self.dirt_properties.max_spawn_ratio) - n_dirt_tiles = max(0, int(new_spawn * len(free_for_dirt))) - for tile in free_for_dirt[:n_dirt_tiles]: - new_value = dirt_slice[tile.pos] + self.dirt_properties.gain_amount - dirt_slice[tile.pos] = min(new_value, self.dirt_properties.max_local_amount) - else: - pass - def clean_up(self, agent: Agent) -> bool: - dirt_slice = self._slices.by_enum(c.DIRT).slice - if old_dirt_amount := dirt_slice[agent.pos]: - new_dirt_amount = old_dirt_amount - self.dirt_properties.clean_amount - dirt_slice[agent.pos] = max(new_dirt_amount, c.FREE_CELL.value) + if dirt := self._entities[c.DIRT].by_pos(agent.pos): + new_dirt_amount = dirt.amount - self.dirt_properties.clean_amount + dirt.set_new_amount(max(new_dirt_amount, c.FREE_CELL.value)) return True else: return False + def trigger_dirt_spawn(self): + free_for_dirt = self._entities[c.FLOOR].empty_tiles + new_spawn = self._dirt_rng.uniform(0, self.dirt_properties.max_spawn_ratio) + n_dirt_tiles = max(0, int(new_spawn * len(free_for_dirt))) + self._entities[c.DIRT].spawn_dirt(free_for_dirt[:n_dirt_tiles]) + def do_additional_step(self) -> dict: info_dict = super(SimpleFactory, self).do_additional_step() if smear_amount := self.dirt_properties.dirt_smear_amount: - dirt_slice = self._slices.by_enum(c.DIRT).slice - for agent in self._agents: + for agent in self._entities[c.AGENT]: if agent.temp_valid and agent.last_pos != c.NO_POS: - if dirt := dirt_slice[agent.last_pos]: - if smeared_dirt := round(dirt * smear_amount, 2): - dirt_slice[agent.last_pos] = max(0, dirt_slice[agent.last_pos]-smeared_dirt) - dirt_slice[agent.pos] = min((self.dirt_properties.max_local_amount, - dirt_slice[agent.pos] + smeared_dirt) - ) + if old_pos_dirt := self._entities[c.DIRT].by_pos(agent.last_pos): + if smeared_dirt := round(old_pos_dirt.amount * smear_amount, 2): + old_pos_dirt.set_new_amount(max(0, old_pos_dirt.amount-smeared_dirt)) + if new_pos_dirt := self._entities[c.DIRT].by_pos(agent.pos): + new_pos_dirt.set_new_amount(max(0, new_pos_dirt.amount + smeared_dirt)) + else: + self._entities[c.Dirt].spawn_dirt(agent.tile) + new_pos_dirt = self._entities[c.DIRT].by_pos(agent.pos) + new_pos_dirt.set_new_amount(max(0, new_pos_dirt.amount + smeared_dirt)) if not self._next_dirt_spawn: - self.spawn_dirt() + self.trigger_dirt_spawn() self._next_dirt_spawn = self.dirt_properties.spawn_frequency else: self._next_dirt_spawn -= 1 @@ -154,17 +200,16 @@ class SimpleFactory(BaseFactory): def do_additional_reset(self) -> None: super(SimpleFactory, self).do_additional_reset() - self.spawn_dirt() + self.trigger_dirt_spawn() self._next_dirt_spawn = self.dirt_properties.spawn_frequency def calculate_additional_reward(self, agent: Agent) -> (int, dict): reward, info_dict = super(SimpleFactory, self).calculate_additional_reward(agent) - dirt_slice = self._slices.by_enum(c.DIRT).slice - dirty_tiles = [dirt_slice[tile.pos] for tile in self._tiles if dirt_slice[tile.pos]] - current_dirt_amount = sum(dirty_tiles) - dirty_tile_count = len(dirty_tiles) + dirt = [dirt.amount for dirt in self._entities[c.DIRT]] + current_dirt_amount = sum(dirt) + dirty_tile_count = len(dirt) if dirty_tile_count: - dirt_distribution_score = entropy(softmax(dirt_slice)) / dirty_tile_count + dirt_distribution_score = entropy(softmax(np.asarray(dirt)) / dirty_tile_count) else: dirt_distribution_score = 0 @@ -204,6 +249,7 @@ if __name__ == '__main__': record_episodes=False, verbose=False ) + # noinspection DuplicatedCode n_actions = factory.action_space.n - 1 _ = factory.observation_space diff --git a/environments/helpers.py b/environments/helpers.py index 2ec7ee1..f8538b1 100644 --- a/environments/helpers.py +++ b/environments/helpers.py @@ -15,15 +15,18 @@ IGNORED_DF_COLUMNS = ['Episode', 'Run', 'train_step', 'step', 'index', 'dirt_amo # Constants class Constants(Enum): WALL = '#' + WALLS = 'Walls' + FLOOR = 'Floor' DOOR = 'D' DANGER_ZONE = 'x' - LEVEL = 'level' + LEVEL = 'Level' AGENT = 'Agent' FREE_CELL = 0 OCCUPIED_CELL = 1 + SHADOWED_CELL = -1 NO_POS = (-9999, -9999) - DOORS = 'doors' + DOORS = 'Doors' CLOSED_DOOR = 1 OPEN_DOOR = -1 @@ -33,11 +36,12 @@ class Constants(Enum): NOT_VALID = False # Dirt Env - DIRT = 'dirt' + DIRT = 'Dirt' # Item Env - ITEM = 'item' - INVENTORY = 'inventory' + ITEM = 'Item' + INVENTORY = 'Inventory' + DROP_OFF = 'Drop_Off' def __bool__(self): return bool(self.value) diff --git a/main.py b/main.py index 3a7dcc2..d3f5374 100644 --- a/main.py +++ b/main.py @@ -108,13 +108,16 @@ if __name__ == '__main__': for modeL_type in [A2C, PPO, DQN]: # ,RegDQN, QRDQN]: for seed in range(3): - with DoubleTaskFactory(n_agents=1, with_dirt=False, - item_properties=item_props, dirt_properties=None, movement_properties=move_props, - pomdp_radius=2, max_steps=500, parse_doors=True, - level_name='rooms', frames_to_stack=3, - omit_agent_slice_in_obs=True, combin_agent_slices_in_obs=True, record_episodes=False, - cast_shadows=True, doors_have_area=False, seed=seed - ) as env: + with SimpleFactory(n_agents=1, + # with_dirt=True, + # item_properties=item_props, + dirt_properties=dirt_props, + movement_properties=move_props, + pomdp_radius=2, max_steps=500, parse_doors=True, + level_name='rooms', frames_to_stack=3, + omit_agent_slice_in_obs=True, combin_agent_slices_in_obs=True, record_episodes=False, + cast_shadows=True, doors_have_area=False, seed=seed + ) as env: if modeL_type.__name__ in ["PPO", "A2C"]: kwargs = dict(ent_coef=0.01)