diff --git a/environments/factory/base_factory.py b/environments/factory/base_factory.py index ef1e2c0..faf99fd 100644 --- a/environments/factory/base_factory.py +++ b/environments/factory/base_factory.py @@ -3,9 +3,33 @@ from typing import Tuple, List, Union, Iterable import numpy as np from pathlib import Path + +from attr import dataclass + from environments import helpers as h +@dataclass +class AgentState: + i: int + action: int + + pos = None + collision_vector = None + action_valid = None + + @property + def collisions(self): + return np.argwhere(self.collision_vector != 0).flatten() + + def update(self, **kwargs): # is this hacky?? o.0 + for key, value in kwargs.items(): + if hasattr(self, key): + self.__setattr__(key, value) + else: + raise AttributeError(f'"{key}" cannot be updated, this attr is not a part of {self.__class__.__name__}') + + class BaseFactory: @property @@ -43,29 +67,29 @@ class BaseFactory: def step(self, actions): actions = [actions] if isinstance(actions, int) else actions - assert isinstance(actions, list), f'"actions has to be in [{int, list}]' + assert isinstance(actions, list), f'"actions" has to be in [{int, list}]' self.steps += 1 - # FixMe: Why do we need this? - r = 0 - # Move this in a seperate function? - actions = list(enumerate(actions)) - random.shuffle(actions) - for agent_i, action in actions: + states = list() + for agent_i, action in enumerate(actions): + agent_i_state = AgentState(agent_i, action) if self._is_moving_action(action): pos, valid = self.move_or_colide(agent_i, action) else: pos, valid = self.additional_actions(agent_i, action) - actions[agent_i] = (agent_i, action, pos, valid) + # Update state accordingly + agent_i_state.update(pos=pos, action_valid=valid) + states.append(agent_i_state) - collision_vecs = self.check_all_collisions(actions, self.state.shape[0]) + for i, collision_vec in enumerate(self.check_all_collisions(states, self.state.shape[0])): + states[i].update(collision_vector=collision_vec) + + reward, info = self.calculate_reward(states) - reward, info = self.calculate_reward(collision_vecs, [a[1] for a in actions], r) - r += reward if self.steps >= self.max_steps: self.done = True - return self.state, r, self.done, info + return self.state, reward, self.done, info def _is_moving_action(self, action): if action < self.movement_actions: @@ -73,22 +97,24 @@ class BaseFactory: else: return False - def check_all_collisions(self, agent_action_pos_valid_tuples: (int, int, (int, int), bool), collisions: int) -> np.ndarray: - collision_vecs = np.zeros((len(agent_action_pos_valid_tuples), collisions)) # n_agents x n_slices - for agent_i, action, pos, valid in agent_action_pos_valid_tuples: - if self._is_moving_action(action): - collision_vecs[agent_i] = self.check_collisions(agent_i, pos, valid) + def check_all_collisions(self, agent_states: List[AgentState], collisions: int) -> np.ndarray: + collision_vecs = np.zeros((len(agent_states), collisions)) # n_agents x n_slices + for agent_state in agent_states: + # Register only collisions of moving agents + if self._is_moving_action(agent_state.action): + collision_vecs[agent_state.i] = self.check_collisions(agent_state) return collision_vecs - def check_collisions(self, agent_i: int, pos: (int, int), valid: bool) -> np.ndarray: - pos_x, pos_y = pos + def check_collisions(self, agent_state: AgentState) -> np.ndarray: + pos_x, pos_y = agent_state.pos # FixMe: We need to find a way to spare out some dimensions, eg. an info dimension etc... a[?,] - collisions_vec = self.state[:, pos_x, pos_y].copy() # "vertical fiber" at position of agent i - collisions_vec[h.AGENT_START_IDX + agent_i] = h.IS_FREE_CELL # no self-collisions - if valid: + collisions_vec = self.state[:, pos_x, pos_y].copy() # "vertical fiber" at position of agent i + collisions_vec[h.AGENT_START_IDX + agent_state.i] = h.IS_FREE_CELL # no self-collisions + if agent_state.action_valid: # ToDo: Place a function hook here pass else: + # Place a marker to indicate a collision with the level boundrys collisions_vec[h.LEVEL_IDX] = h.IS_OCCUPIED_CELL return collisions_vec @@ -102,15 +128,18 @@ class BaseFactory: dim=agent_i + h.AGENT_START_IDX, action=action) if valid: - # Does not collide width level boundrys + # Does not collide width level boundaries self.do_move(agent_i, old_pos, new_pos) return new_pos, valid else: # Agent seems to be trying to collide in this step return old_pos, valid - def agent_i_position(self, agent_i): - return tuple(np.argwhere(self.state[h.AGENT_START_IDX+agent_i] == h.IS_OCCUPIED_CELL).flatten()) + def agent_i_position(self, agent_i: int) -> (int, int): + positions = np.argwhere(self.state[h.AGENT_START_IDX+agent_i] == h.IS_OCCUPIED_CELL) + assert positions.shape[0] == 1 + pos_x, pos_y = positions[0] # a.flatten() + return pos_x, pos_y @property def free_cells(self) -> np.ndarray: @@ -119,7 +148,10 @@ class BaseFactory: np.random.shuffle(free_cells) return free_cells - def calculate_reward(self, collisions_vec: np.ndarray, actions: Iterable[int], r: int) -> (int, dict): + def calculate_reward(self, agent_states: List[AgentState]) -> (int, dict): # Returns: Reward, Info # Set to "raise NotImplementedError" return 0, {} + + def render(self): + raise NotImplementedError diff --git a/environments/factory/simple_factory.py b/environments/factory/simple_factory.py index fe38709..c0ccc79 100644 --- a/environments/factory/simple_factory.py +++ b/environments/factory/simple_factory.py @@ -19,11 +19,11 @@ class SimpleFactory(BaseFactory): self.state = np.concatenate((self.state, dirt_slice)) # dirt is now the last slice self.spawn_dirt() - def calculate_reward(self, collisions_vecs, actions, r): - for agent_i, cols in enumerate(collisions_vecs): - cols = np.argwhere(cols != 0).flatten() - print(f't = {self.steps}\tAgent {agent_i} has collisions with ' - f'{[self.slice_strings[entity] for entity in cols]}') + def calculate_reward(self, agent_states): + for agent_state in agent_states: + collisions = agent_state.collisions + print(f't = {self.steps}\tAgent {agent_state.i} has collisions with ' + f'{[self.slice_strings[entity] for entity in collisions]}') return 0, {} diff --git a/environments/factory/simple_factory_getting_dirty.py b/environments/factory/simple_factory_getting_dirty.py index dddd3ff..d0a542a 100644 --- a/environments/factory/simple_factory_getting_dirty.py +++ b/environments/factory/simple_factory_getting_dirty.py @@ -1,12 +1,17 @@ import numpy as np +from attr import dataclass + from environments.factory.base_factory import BaseFactory from collections import namedtuple from typing import Iterable from environments import helpers as h DIRT_INDEX = -1 -DirtProperties = namedtuple('DirtProperties', ['clean_amount', 'max_spawn_ratio', 'gain_amount'], - defaults=[0.25, 0.1, 0.1]) +@dataclass +class DirtProperties: + clean_amount = 0.25 + max_spawn_ratio = 0.1 + gain_amount = 0.1 class GettingDirty(BaseFactory): @@ -15,7 +20,7 @@ class GettingDirty(BaseFactory): def _clean_up_action(self): return self.movement_actions + 1 - 1 - def __init__(self, *args, dirt_properties:DirtProperties, **kwargs): + def __init__(self, *args, dirt_properties: DirtProperties, **kwargs): self._dirt_properties = dirt_properties super(GettingDirty, self).__init__(*args, **kwargs) self.slice_strings.update({self.state.shape[0]-1: 'dirt'}) @@ -58,7 +63,7 @@ class GettingDirty(BaseFactory): self.state = np.concatenate((self.state, dirt_slice)) # dirt is now the last slice self.spawn_dirt() - def calculate_reward(self, collisions_vecs: np.ndarray, actions: Iterable[int], r: int) -> (int, dict): + def calculate_reward(self, collisions_vecs: np.ndarray, actions: Iterable[int]) -> (int, dict): for agent_i, cols in enumerate(collisions_vecs): cols = np.argwhere(cols != 0).flatten() print(f't = {self.steps}\tAgent {agent_i} has collisions with ' @@ -68,8 +73,8 @@ class GettingDirty(BaseFactory): if __name__ == '__main__': import random - dirt_properties = DirtProperties() - factory = GettingDirty(n_agents=1, dirt_properties=dirt_properties) + dirt_props = DirtProperties() + factory = GettingDirty(n_agents=1, dirt_properties=dirt_props) random_actions = [random.randint(0, 8) for _ in range(200)] for action in random_actions: state, r, done, _ = factory.step(action)