Restructuring

This commit is contained in:
steffen-illium
2021-06-17 16:21:13 +02:00
parent d9d8784338
commit c8883a9c0d
6 changed files with 94 additions and 73 deletions

View File

@ -36,6 +36,10 @@ class BaseFactory(gym.Env):
def movement_actions(self):
return self._actions.movement_actions
@property
def has_doors(self):
return hasattr(self, '_doors')
def __enter__(self):
return self if self.frames_to_stack == 0 else FrameStack(self, self.frames_to_stack)
@ -43,7 +47,7 @@ class BaseFactory(gym.Env):
self.close()
def __init__(self, level_name='simple', n_agents=1, max_steps=int(5e2), pomdp_radius: Union[None, int] = 0,
movement_properties: MovementProperties = MovementProperties(),
movement_properties: MovementProperties = MovementProperties(), parse_doors=False,
combin_agent_slices_in_obs: bool = False, frames_to_stack=0,
omit_agent_slice_in_obs=False, **kwargs):
assert (combin_agent_slices_in_obs != omit_agent_slice_in_obs) or \
@ -64,25 +68,31 @@ class BaseFactory(gym.Env):
self.done_at_collision = False
self._state_slices = StateSlices()
# Level
level_filepath = Path(__file__).parent / h.LEVELS_DIR / f'{self.level_name}.txt'
parsed_level = h.parse_level(level_filepath)
self._level = h.one_hot_level(parsed_level)
parsed_doors = h.one_hot_level(parsed_level, h.DOOR)
if parsed_doors.any():
self._doors = parsed_doors
level_slices = ['level', 'doors']
can_use_doors = True
else:
level_slices = ['level']
can_use_doors = False
level_slices = [h.LEVEL]
# Doors
if parse_doors:
parsed_doors = h.one_hot_level(parsed_level, h.DOOR)
if parsed_doors.any():
self._doors = parsed_doors
level_slices.append(h.DOORS)
# Agents
offset = len(level_slices)
self._state_slices.register_additional_items([*level_slices,
*[f'agent#{i}' for i in range(offset, n_agents + offset)]])
# Additional Slices from SubDomains
if 'additional_slices' in kwargs:
self._state_slices.register_additional_items(kwargs.get('additional_slices'))
self._zones = Zones(parsed_level)
self._actions = Actions(self.movement_properties, can_use_doors=can_use_doors)
self._actions = Actions(self.movement_properties, can_use_doors=self.has_doors)
self._actions.register_additional_items(self.additional_actions)
self.reset()
@ -99,30 +109,29 @@ class BaseFactory(gym.Env):
raise NotImplementedError('Please register additional actions ')
def reset(self) -> (np.ndarray, int, bool, dict):
slices = [np.expand_dims(self._level, 0)]
self._steps = 0
self._agent_states = list()
# Door Init
if self.has_doors:
self._door_states = [DoorState(i, tuple(pos)) for i, pos
in enumerate(np.argwhere(self._doors == h.IS_OCCUPIED_CELL))]
slices.append(np.expand_dims(self._doors, 0))
# Agent placement ...
agents = np.zeros((self.n_agents, *self._level.shape), dtype=np.int8)
floor_tiles = np.argwhere(self._level == h.IS_FREE_CELL)
# ... on random positions
np.random.shuffle(floor_tiles)
agents = np.zeros((self.n_agents, *self._level.shape), dtype=np.int8)
for i, (x, y) in enumerate(floor_tiles[:self.n_agents]):
agents[i, x, y] = h.IS_OCCUPIED_CELL
agent_state = AgentState(i, -1)
agent_state.update(pos=(x, y))
agent_state = AgentState(i, -1, pos=(x, y))
self._agent_states.append(agent_state)
# state.shape = level, agent 1,..., agent n,
if 'doors' in self._state_slices.values():
self._door_states = [DoorState(i, tuple(pos)) for i, pos
in enumerate(np.argwhere(self._doors == h.IS_OCCUPIED_CELL))]
self._state = np.concatenate((np.expand_dims(self._level, axis=0),
np.expand_dims(self._doors, axis=0),
agents), axis=0)
slices.append(agents)
else:
self._state = np.concatenate((np.expand_dims(self._level, axis=0), agents), axis=0)
# Returns State
# GLOBAL STATE
self._state = np.concatenate(slices, axis=0)
return None
def _get_observations(self) -> np.ndarray:
@ -138,21 +147,22 @@ class BaseFactory(gym.Env):
first_agent_slice = self._state_slices.AGENTSTARTIDX
# Todo: make this more efficient!
if self.pomdp_radius:
global_pos = self._agent_states[agent_i].pos
x0, x1 = max(0, global_pos[0] - self.pomdp_radius), global_pos[0] + self.pomdp_radius + 1
y0, y1 = max(0, global_pos[1] - self.pomdp_radius), global_pos[1] + self.pomdp_radius + 1
pomdp_diameter = self.pomdp_radius * 2 + 1
global_x, global_y = self._agent_states[agent_i].pos
x0, x1 = max(0, global_x - self.pomdp_radius), global_x + self.pomdp_radius + 1
y0, y1 = max(0, global_y - self.pomdp_radius), global_y + self.pomdp_radius + 1
obs = self._state[:, x0:x1, y0:y1]
if obs.shape[1] != self.pomdp_radius * 2 + 1 or obs.shape[2] != self.pomdp_radius * 2 + 1:
obs_padded = np.full((obs.shape[0], self.pomdp_radius * 2 + 1, self.pomdp_radius * 2 + 1), 1)
a_pos = np.argwhere(obs[first_agent_slice + agent_i] == h.IS_OCCUPIED_CELL)[0]
if obs.shape[1] != pomdp_diameter or obs.shape[2] != pomdp_diameter:
obs_padded = np.full((obs.shape[0], pomdp_diameter, pomdp_diameter), h.IS_OCCUPIED_CELL)
local_x, local_y = np.argwhere(obs[first_agent_slice + agent_i] == h.IS_OCCUPIED_CELL)[0]
obs_padded[:,
abs(a_pos[0]-self.pomdp_radius):abs(a_pos[0]-self.pomdp_radius)+obs.shape[1],
abs(a_pos[1]-self.pomdp_radius):abs(a_pos[1]-self.pomdp_radius)+obs.shape[2]] = obs
abs(local_x-self.pomdp_radius):abs(local_x-self.pomdp_radius)+obs.shape[1],
abs(local_y-self.pomdp_radius):abs(local_y-self.pomdp_radius)+obs.shape[2]] = obs
obs = obs_padded
else:
obs = self._state
if self.omit_agent_slice_in_obs:
obs_new = obs[[key for key, val in self._state_slices.items() if 'agent' not in val]]
obs_new = obs[[key for key, val in self._state_slices.items() if h.AGENT not in val]]
return obs_new
else:
if self.combin_agent_slices_in_obs:
@ -174,16 +184,19 @@ class BaseFactory(gym.Env):
# Move this in a seperate function?
for agent_i, action in enumerate(actions):
agent = self._agent_states[agent_i]
if self._actions.is_moving_action(action):
pos, valid = self.move_or_colide(agent_i, action)
elif self._actions.is_no_op(action):
pos, valid = self._agent_states[agent_i].pos, h.VALID
pos, valid = agent.pos, h.VALID
elif self._actions.is_door_usage(action):
try:
# Check if agent raly stands on a door:
if self._state[self._state_slices.by_name(h.DOORS)][agent.pos] in [h.IS_OCCUPIED_CELL, ]:
door = [door for door in self._door_states if door.pos == self._agent_states[agent_i].pos][0]
door.use()
pos, valid = self._agent_states[agent_i].pos, h.VALID
except IndexError:
# When he doesn't...
else:
pos, valid = self._agent_states[agent_i].pos, h.NOT_VALID
else:
pos, valid = self.do_additional_actions(agent_i, action)
@ -202,6 +215,7 @@ class BaseFactory(gym.Env):
door.time_to_close -= 1
elif door.is_open and not door.time_to_close and door.pos not in agents_pos:
door.use()
self._state[self._state_slices.by_name(h.DOORS)] = 1 if door.is_closed else -1
reward, info = self.calculate_reward(self._agent_states)
@ -230,11 +244,12 @@ class BaseFactory(gym.Env):
collisions_vec[self._state_slices.by_name('doors')] = h.IS_FREE_CELL # no door-collisions
if agent_state.action_valid:
# ToDo: Place a function hook here
# All well, no collision.
# Place a function hook here if needed.
pass
else:
# Place a marker to indicate a collision with the level boundrys
collisions_vec[h.LEVEL_IDX] = h.IS_OCCUPIED_CELL
collisions_vec[self._state_slices.by_name(h.LEVEL)] = h.IS_OCCUPIED_CELL
return collisions_vec
def do_move(self, agent_i: int, old_pos: (int, int), new_pos: (int, int)) -> None:
@ -265,7 +280,7 @@ class BaseFactory(gym.Env):
x_new = x + x_diff
y_new = y + y_diff
if h.DOORS in self._state_slices.values() and self._agent_states[agent_i]._last_pos != (-1, -1):
if self.has_doors and self._agent_states[agent_i]._last_pos != (-1, -1):
door = [door for door in self._door_states if door.pos == (x, y)]
if door:
door = door[0]
@ -298,7 +313,7 @@ class BaseFactory(gym.Env):
else:
pass
valid = h.check_position(self._state[h.LEVEL_IDX], (x_new, y_new))
valid = h.check_position(self._state[self._state_slices.by_name(h.LEVEL)], (x_new, y_new))
return (x, y), (x_new, y_new), valid