From e2e3da641eae3efc80a6fb167120c424141ef4e6 Mon Sep 17 00:00:00 2001 From: Steffen Illium Date: Thu, 26 Oct 2023 16:10:08 +0200 Subject: [PATCH] Description and better naming scheme for the CleanUp/Dirt-Module --- .../modules/clean_up/__init__.py | 2 +- .../modules/clean_up/entitites.py | 6 +- marl_factory_grid/modules/clean_up/groups.py | 33 ++++---- marl_factory_grid/modules/clean_up/rules.py | 75 +++++++++++++++---- 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/marl_factory_grid/modules/clean_up/__init__.py b/marl_factory_grid/modules/clean_up/__init__.py index 82c4668..31cb841 100644 --- a/marl_factory_grid/modules/clean_up/__init__.py +++ b/marl_factory_grid/modules/clean_up/__init__.py @@ -1,4 +1,4 @@ from .actions import CleanUp from .entitites import DirtPile from .groups import DirtPiles -from .rules import DirtRespawnRule, DirtSmearOnMove, DirtAllCleanDone +from .rules import SpawnDirt, EntitiesSmearDirtOnMove, DoneOnAllDirtCleaned diff --git a/marl_factory_grid/modules/clean_up/entitites.py b/marl_factory_grid/modules/clean_up/entitites.py index 9d5e4cf..8ac8a0c 100644 --- a/marl_factory_grid/modules/clean_up/entitites.py +++ b/marl_factory_grid/modules/clean_up/entitites.py @@ -32,11 +32,9 @@ class DirtPile(Entity): # Edit this if you want items to be drawn in the ops differntly return self._amount - def __init__(self, *args, max_local_amount=5, initial_amount=2, spawn_variation=0.05, **kwargs): + def __init__(self, *args, amount=2, max_local_amount=5, **kwargs): super(DirtPile, self).__init__(*args, **kwargs) - self._amount = abs(initial_amount + ( - random.normal(loc=0, scale=spawn_variation, size=1).item() * initial_amount) - ) + self._amount = amount self.max_local_amount = max_local_amount def set_new_amount(self, amount): diff --git a/marl_factory_grid/modules/clean_up/groups.py b/marl_factory_grid/modules/clean_up/groups.py index 0ee6893..c47e972 100644 --- a/marl_factory_grid/modules/clean_up/groups.py +++ b/marl_factory_grid/modules/clean_up/groups.py @@ -3,6 +3,7 @@ from marl_factory_grid.environment.groups.mixins import PositionMixin from marl_factory_grid.modules.clean_up.entitites import DirtPile from marl_factory_grid.environment import constants as c +from marl_factory_grid.utils.results import Result class DirtPiles(PositionMixin, EnvObjects): @@ -15,46 +16,44 @@ class DirtPiles(PositionMixin, EnvObjects): return sum([dirt.amount for dirt in self]) def __init__(self, *args, - initial_amount=2, - initial_dirt_ratio=0.05, - dirt_spawn_r_var=0.1, max_local_amount=5, clean_amount=1, max_global_amount: int = 20, **kwargs): super(DirtPiles, self).__init__(*args, **kwargs) self.clean_amount = clean_amount - self.initial_amount = initial_amount - self.initial_dirt_ratio = initial_dirt_ratio - self.dirt_spawn_r_var = dirt_spawn_r_var self.max_global_amount = max_global_amount self.max_local_amount = max_local_amount - def spawn(self, then_dirty_positions, amount) -> bool: - for pos in then_dirty_positions: + def spawn(self, then_dirty_positions, amount_s) -> Result: + spawn_counter = 0 + for idx, pos in enumerate(then_dirty_positions): if not self.amount > self.max_global_amount: + amount = amount_s[idx] if isinstance(amount_s, list) else amount_s if dirt := self.by_pos(pos): new_value = dirt.amount + amount dirt.set_new_amount(new_value) else: - dirt = DirtPile(pos, initial_amount=amount, spawn_variation=self.dirt_spawn_r_var) + dirt = DirtPile(pos, amount=amount) self.add_item(dirt) + spawn_counter += 1 else: - return c.NOT_VALID - return c.VALID + return Result(identifier=f'{self.name}_spawn', validity=c.NOT_VALID, reward=0, + value=spawn_counter) + return Result(identifier=f'{self.name}_spawn', validity=c.VALID, reward=0, value=spawn_counter) - def trigger_dirt_spawn(self, state, initial_spawn=False) -> bool: + def trigger_dirt_spawn(self, n, amount, state, n_var=0.2, amount_var=0.2) -> Result: free_for_dirt = [x for x in state.entities.floorlist if len(state.entities.pos_dict[x]) == 1 or ( - len(state.entities.pos_dict[x]) == 2 and isinstance(next(y for y in x), DirtPile))] + len(state.entities.pos_dict[x]) == 2 and isinstance(next(y for y in x), DirtPile))] # free_for_dirt = [x for x in state[c.FLOOR] # if len(x.guests) == 0 or ( # len(x.guests) == 1 and # isinstance(next(y for y in x.guests), DirtPile))] state.rng.shuffle(free_for_dirt) - var = self.dirt_spawn_r_var - new_spawn = abs(self.initial_dirt_ratio + (state.rng.uniform(-var, var) if initial_spawn else 0)) - n_dirty_positions = max(0, int(new_spawn * len(free_for_dirt))) - return self.spawn(free_for_dirt[:n_dirty_positions], self.initial_amount) + new_spawn = int(abs(n + (state.rng.uniform(-n_var, n_var)))) + new_amount_s = [abs(amount + (amount*state.rng.uniform(-amount_var, amount_var))) for _ in range(new_spawn)] + n_dirty_positions = free_for_dirt[:new_spawn] + return self.spawn(n_dirty_positions, new_amount_s) def __repr__(self): s = super(DirtPiles, self).__repr__() diff --git a/marl_factory_grid/modules/clean_up/rules.py b/marl_factory_grid/modules/clean_up/rules.py index b7637c9..7b3bf2d 100644 --- a/marl_factory_grid/modules/clean_up/rules.py +++ b/marl_factory_grid/modules/clean_up/rules.py @@ -7,53 +7,98 @@ from marl_factory_grid.utils.results import TickResult from marl_factory_grid.utils.results import DoneResult -class DirtAllCleanDone(Rule): +class DoneOnAllDirtCleaned(Rule): - def __init__(self): + def __init__(self, reward: float = r.CLEAN_UP_ALL): + """ + Defines a 'Done'-condition which tirggers, when there is no more 'Dirt' in the environment. + + :type reward: float + :parameter reward: Given reward when condition triggers. + """ super().__init__() + self.reward = reward def on_check_done(self, state) -> [DoneResult]: if len(state[d.DIRT]) == 0 and state.curr_step: - return [DoneResult(validity=c.VALID, identifier=self.name, reward=r.CLEAN_UP_ALL)] + return [DoneResult(validity=c.VALID, identifier=self.name, reward=self.reward)] return [DoneResult(validity=c.NOT_VALID, identifier=self.name, reward=0)] -class DirtRespawnRule(Rule): +class SpawnDirt(Rule): - def __init__(self, spawn_freq=15): + def __init__(self, initial_n: int, initial_amount: float, respawn_n: int, respawn_amount: float, + n_var: float = 0.2, amount_var: float = 0.2, spawn_freq: int = 15): + """ + Defines the spawn pattern of intial and additional 'Dirt'-entitites. + First chooses positions, then trys to spawn dirt until 'respawn_n' or the maximal global amount is reached. + If there is allready some, it is topped up to min(max_local_amount, amount). + + :type spawn_freq: int + :parameter spawn_freq: In which frequency should this Rule try to spawn new 'Dirt'? + :type respawn_n: int + :parameter respawn_n: How many respawn positions are considered. + :type initial_n: int + :parameter initial_n: How much initial positions are considered. + :type amount_var: float + :parameter amount_var: Variance of amount to spawn. + :type n_var: float + :parameter n_var: Variance of n to spawn. + :type respawn_amount: float + :parameter respawn_amount: Defines how much dirt 'amount' is placed every 'spawn_freq' ticks. + :type initial_amount: float + :parameter initial_amount: Defines how much dirt 'amount' is initially placed. + + """ super().__init__() + self.amount_var = amount_var + self.n_var = n_var + self.respawn_amount = respawn_amount + self.respawn_n = respawn_n + self.initial_amount = initial_amount + self.initial_n = initial_n self.spawn_freq = spawn_freq self._next_dirt_spawn = spawn_freq def on_init(self, state, lvl_map) -> str: - state[d.DIRT].trigger_dirt_spawn(state, initial_spawn=True) - return f'Initial Dirt was spawned on: {[x.pos for x in state[d.DIRT]]}' + result = state[d.DIRT].trigger_dirt_spawn(self.initial_n, self.initial_amount, state, + n_var=self.n_var, amount_var=self.amount_var) + state.print(f'Initial Dirt was spawned on: {[x.pos for x in state[d.DIRT]]}') + return result def tick_step(self, state): if self._next_dirt_spawn < 0: pass # No DirtPile Spawn elif not self._next_dirt_spawn: - validity = state[d.DIRT].trigger_dirt_spawn(state) - - return [TickResult(entity=None, validity=validity, identifier=self.name, reward=0)] + result = [state[d.DIRT].trigger_dirt_spawn(self.respawn_n, self.respawn_amount, state, + n_var=self.n_var, amount_var=self.amount_var)] self._next_dirt_spawn = self.spawn_freq else: self._next_dirt_spawn -= 1 - return [] + result = [] + return result -class DirtSmearOnMove(Rule): +class EntitiesSmearDirtOnMove(Rule): - def __init__(self, smear_amount: float = 0.2): + def __init__(self, smear_ratio: float = 0.2): + """ + Enables 'smearing'. Entities that move through dirt, will leave a trail behind them. + They take dirt * smear_ratio of it with them to their next position. + + :type smear_ratio: float + :parameter smear_ratio: How much percent dirt is smeared by entities to their next position. + """ + assert smear_ratio < 1, "'Smear Amount' must be smaller than 1" super().__init__() - self.smear_amount = smear_amount + self.smear_ratio = smear_ratio def tick_post_step(self, state): results = list() for entity in state.moving_entites: if is_move(entity.state.identifier) and entity.state.validity == c.VALID: if old_pos_dirt := state[d.DIRT].by_pos(entity.last_pos): - if smeared_dirt := round(old_pos_dirt.amount * self.smear_amount, 2): + if smeared_dirt := round(old_pos_dirt.amount * self.smear_ratio, 2): if state[d.DIRT].spawn(entity.pos, amount=smeared_dirt): # pos statt tile results.append(TickResult(identifier=self.name, entity=entity, reward=0, validity=c.VALID))