From ebd8f46a16cc29b94a6da8bf7e6fe76fd91ac707 Mon Sep 17 00:00:00 2001 From: steffen-illium Date: Tue, 11 May 2021 16:20:34 +0200 Subject: [PATCH] Refactoring of movement logic --- environments/factory/base_factory.py | 61 +++++++++++++------------- environments/factory/simple_factory.py | 1 - environments/helpers.py | 22 ++++++++-- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/environments/factory/base_factory.py b/environments/factory/base_factory.py index 8adfbc3..4fe7291 100644 --- a/environments/factory/base_factory.py +++ b/environments/factory/base_factory.py @@ -1,20 +1,17 @@ +import random + import numpy as np from pathlib import Path from environments import helpers as h class BaseFactory: - LEVELS_DIR = 'levels' - _level_idx = 0 - _agent_start_idx = 1 - _is_free_cell = 0 - _is_occupied_cell = 1 def __init__(self, level='simple', n_agents=1, max_steps=1e3): self.n_agents = n_agents self.max_steps = max_steps self.level = h.one_hot_level( - h.parse_level(Path(__file__).parent / self.LEVELS_DIR / f'{level}.txt') + h.parse_level(Path(__file__).parent / h.LEVELS_DIR / f'{level}.txt') ) self.slice_strings = {0: 'level', **{i: f'agent#{i}' for i in range(1, self.n_agents+1)}} self.reset() @@ -24,53 +21,57 @@ class BaseFactory: self.steps = 0 # Agent placement ... agents = np.zeros((self.n_agents, *self.level.shape), dtype=np.int8) - floor_tiles = np.argwhere(self.level == self._is_free_cell) + floor_tiles = np.argwhere(self.level == h.IS_FREE_CELL) # ... on random positions np.random.shuffle(floor_tiles) for i, (x, y) in enumerate(floor_tiles[:self.n_agents]): - agents[i, x, y] = self._is_occupied_cell + agents[i, x, y] = h.IS_OCCUPIED_CELL # state.shape = level, agent 1,..., agent n, self.state = np.concatenate((np.expand_dims(self.level, axis=0), agents), axis=0) # Returns State, Reward, Done, Info return self.state, 0, self.done, {} def step(self, actions): - assert type(actions) in [int, list] - if type(actions) == int: - actions = [actions] + actions = [actions] if isinstance(actions, int) else actions + assert isinstance(actions, list), f'"actions has to be in [{int, list}]' self.steps += 1 r = 0 - collision_vecs = np.zeros((self.n_agents, self.state.shape[0])) # n_agents x n_slices - for i, a in enumerate(actions): - old_pos, new_pos, valid = h.check_agent_move(state=self.state, dim=i+self._agent_start_idx, action=a) - if valid: # Does not collide width level boundrys - self.make_move(i, old_pos, new_pos) - else: # Trying to leave the level - collision_vecs[i, self._level_idx] = self._is_occupied_cell # Collides with level boundrys - # For each agent check for abitrary collions: - for i in range(self.n_agents): # Note: might as well save the positions (redundant): return value of make_move - agent_slice = self.state[i+self._agent_start_idx] - x, y = np.argwhere(agent_slice == self._is_occupied_cell)[0] # current position of agent i - collisions_vec = self.state[:, x, y].copy() # "vertical fiber" at position of agent i - collisions_vec[i+self._agent_start_idx] = self._is_free_cell # no self-collisions - collision_vecs[i] += collisions_vec + collision_vecs = np.zeros((self.n_agents, self.state.shape[0])) # n_agents x n_slices + actions = list(enumerate(actions)) + random.shuffle(actions) + + for agent_i, action in actions: + new_pos, collision_vec, did_collide = self.move_or_colide(agent_i, action) + collision_vecs[agent_i] = collision_vec + reward, info = self.step_core(collision_vecs, actions, r) r += reward if self.steps >= self.max_steps: self.done = True return self.state, r, self.done, info - def make_move(self, agent_i, old_pos, new_pos): + def move(self, agent_i, old_pos, new_pos): (x, y), (x_new, y_new) = old_pos, new_pos - self.state[agent_i+self._agent_start_idx, x, y] = self._is_free_cell - self.state[agent_i+self._agent_start_idx, x_new, y_new] = self._is_occupied_cell - return new_pos + self.state[agent_i + h.AGENT_START_IDX, x, y] = h.IS_FREE_CELL + self.state[agent_i + h.AGENT_START_IDX, x_new, y_new] = h.IS_OCCUPIED_CELL + + def move_or_colide(self, agent_i, action) -> ((int, int), bool): + old_pos, new_pos, collision_vec, did_collide = h.check_agent_move(state=self.state, + dim=agent_i + h.AGENT_START_IDX, + action=action) + if not did_collide: + # Does not collide width level boundrys + self.move(agent_i, old_pos, new_pos) + return new_pos, collision_vec, did_collide + else: + # Agent seems to be trying to collide in this step + return old_pos, collision_vec, did_collide @property def free_cells(self) -> np.ndarray: free_cells = self.state.sum(0) - free_cells = np.argwhere(free_cells == self._is_free_cell) + free_cells = np.argwhere(free_cells == h.IS_FREE_CELL) np.random.shuffle(free_cells) return free_cells diff --git a/environments/factory/simple_factory.py b/environments/factory/simple_factory.py index 9e4fbb6..3af863d 100644 --- a/environments/factory/simple_factory.py +++ b/environments/factory/simple_factory.py @@ -8,7 +8,6 @@ class SimpleFactory(BaseFactory): super(SimpleFactory, self).__init__(*args, **kwargs) self.slice_strings.update({self.state.shape[0]-1: 'dirt'}) - def spawn_dirt(self): free_for_dirt = self.free_cells for x, y in free_for_dirt[:self.max_dirt]: # randomly distribute dirt across the grid diff --git a/environments/helpers.py b/environments/helpers.py index cd3154f..45870b0 100644 --- a/environments/helpers.py +++ b/environments/helpers.py @@ -3,6 +3,11 @@ from pathlib import Path # Constants WALL = '#' +LEVELS_DIR = 'levels' +LEVEL_IDX = 0 +AGENT_START_IDX = 1 +IS_FREE_CELL = 0 +IS_OCCUPIED_CELL = 1 # Utility functions @@ -51,13 +56,24 @@ def check_agent_move(state, dim, action): y_new -= 1 else: pass - # Check validity + + # Check if agent colides with grid boundrys valid = not ( x_new < 0 or y_new < 0 or x_new >= agent_slice.shape[0] or y_new >= agent_slice.shape[0] - ) # if agent tried to leave the grid - return (x, y), (x_new, y_new), valid + ) + + if valid: + collisions_vec = state[:, x_new, y_new].copy() # "vertical fiber" at position of agent i + collisions_vec[dim] = IS_FREE_CELL # no self-collisions + pass + else: + collisions_vec = state[:, x, y].copy() # "vertical fiber" at position of agent i + collisions_vec[dim] = IS_FREE_CELL # no self-collisions + collisions_vec[LEVEL_IDX] = IS_OCCUPIED_CELL + did_collide = collisions_vec.sum(0) != IS_FREE_CELL + return (x, y), (x_new, y_new), collisions_vec, did_collide if __name__ == '__main__':