Description and better naming scheme for the CleanUp/Dirt-Module

This commit is contained in:
Steffen Illium 2023-10-26 16:10:08 +02:00
parent 5e49b1228f
commit e2e3da641e
4 changed files with 79 additions and 37 deletions

View File

@ -1,4 +1,4 @@
from .actions import CleanUp
from .entitites import DirtPile
from .groups import DirtPiles
from .rules import DirtRespawnRule, DirtSmearOnMove, DirtAllCleanDone
from .rules import SpawnDirt, EntitiesSmearDirtOnMove, DoneOnAllDirtCleaned

View File

@ -32,11 +32,9 @@ class DirtPile(Entity):
# Edit this if you want items to be drawn in the ops differntly
return self._amount
def __init__(self, *args, max_local_amount=5, initial_amount=2, spawn_variation=0.05, **kwargs):
def __init__(self, *args, amount=2, max_local_amount=5, **kwargs):
super(DirtPile, self).__init__(*args, **kwargs)
self._amount = abs(initial_amount + (
random.normal(loc=0, scale=spawn_variation, size=1).item() * initial_amount)
)
self._amount = amount
self.max_local_amount = max_local_amount
def set_new_amount(self, amount):

View File

@ -3,6 +3,7 @@ from marl_factory_grid.environment.groups.mixins import PositionMixin
from marl_factory_grid.modules.clean_up.entitites import DirtPile
from marl_factory_grid.environment import constants as c
from marl_factory_grid.utils.results import Result
class DirtPiles(PositionMixin, EnvObjects):
@ -15,34 +16,32 @@ class DirtPiles(PositionMixin, EnvObjects):
return sum([dirt.amount for dirt in self])
def __init__(self, *args,
initial_amount=2,
initial_dirt_ratio=0.05,
dirt_spawn_r_var=0.1,
max_local_amount=5,
clean_amount=1,
max_global_amount: int = 20, **kwargs):
super(DirtPiles, self).__init__(*args, **kwargs)
self.clean_amount = clean_amount
self.initial_amount = initial_amount
self.initial_dirt_ratio = initial_dirt_ratio
self.dirt_spawn_r_var = dirt_spawn_r_var
self.max_global_amount = max_global_amount
self.max_local_amount = max_local_amount
def spawn(self, then_dirty_positions, amount) -> bool:
for pos in then_dirty_positions:
def spawn(self, then_dirty_positions, amount_s) -> Result:
spawn_counter = 0
for idx, pos in enumerate(then_dirty_positions):
if not self.amount > self.max_global_amount:
amount = amount_s[idx] if isinstance(amount_s, list) else amount_s
if dirt := self.by_pos(pos):
new_value = dirt.amount + amount
dirt.set_new_amount(new_value)
else:
dirt = DirtPile(pos, initial_amount=amount, spawn_variation=self.dirt_spawn_r_var)
dirt = DirtPile(pos, amount=amount)
self.add_item(dirt)
spawn_counter += 1
else:
return c.NOT_VALID
return c.VALID
return Result(identifier=f'{self.name}_spawn', validity=c.NOT_VALID, reward=0,
value=spawn_counter)
return Result(identifier=f'{self.name}_spawn', validity=c.VALID, reward=0, value=spawn_counter)
def trigger_dirt_spawn(self, state, initial_spawn=False) -> bool:
def trigger_dirt_spawn(self, n, amount, state, n_var=0.2, amount_var=0.2) -> Result:
free_for_dirt = [x for x in state.entities.floorlist if len(state.entities.pos_dict[x]) == 1 or (
len(state.entities.pos_dict[x]) == 2 and isinstance(next(y for y in x), DirtPile))]
# free_for_dirt = [x for x in state[c.FLOOR]
@ -51,10 +50,10 @@ class DirtPiles(PositionMixin, EnvObjects):
# isinstance(next(y for y in x.guests), DirtPile))]
state.rng.shuffle(free_for_dirt)
var = self.dirt_spawn_r_var
new_spawn = abs(self.initial_dirt_ratio + (state.rng.uniform(-var, var) if initial_spawn else 0))
n_dirty_positions = max(0, int(new_spawn * len(free_for_dirt)))
return self.spawn(free_for_dirt[:n_dirty_positions], self.initial_amount)
new_spawn = int(abs(n + (state.rng.uniform(-n_var, n_var))))
new_amount_s = [abs(amount + (amount*state.rng.uniform(-amount_var, amount_var))) for _ in range(new_spawn)]
n_dirty_positions = free_for_dirt[:new_spawn]
return self.spawn(n_dirty_positions, new_amount_s)
def __repr__(self):
s = super(DirtPiles, self).__repr__()

View File

@ -7,53 +7,98 @@ from marl_factory_grid.utils.results import TickResult
from marl_factory_grid.utils.results import DoneResult
class DirtAllCleanDone(Rule):
class DoneOnAllDirtCleaned(Rule):
def __init__(self):
def __init__(self, reward: float = r.CLEAN_UP_ALL):
"""
Defines a 'Done'-condition which tirggers, when there is no more 'Dirt' in the environment.
:type reward: float
:parameter reward: Given reward when condition triggers.
"""
super().__init__()
self.reward = reward
def on_check_done(self, state) -> [DoneResult]:
if len(state[d.DIRT]) == 0 and state.curr_step:
return [DoneResult(validity=c.VALID, identifier=self.name, reward=r.CLEAN_UP_ALL)]
return [DoneResult(validity=c.VALID, identifier=self.name, reward=self.reward)]
return [DoneResult(validity=c.NOT_VALID, identifier=self.name, reward=0)]
class DirtRespawnRule(Rule):
class SpawnDirt(Rule):
def __init__(self, spawn_freq=15):
def __init__(self, initial_n: int, initial_amount: float, respawn_n: int, respawn_amount: float,
n_var: float = 0.2, amount_var: float = 0.2, spawn_freq: int = 15):
"""
Defines the spawn pattern of intial and additional 'Dirt'-entitites.
First chooses positions, then trys to spawn dirt until 'respawn_n' or the maximal global amount is reached.
If there is allready some, it is topped up to min(max_local_amount, amount).
:type spawn_freq: int
:parameter spawn_freq: In which frequency should this Rule try to spawn new 'Dirt'?
:type respawn_n: int
:parameter respawn_n: How many respawn positions are considered.
:type initial_n: int
:parameter initial_n: How much initial positions are considered.
:type amount_var: float
:parameter amount_var: Variance of amount to spawn.
:type n_var: float
:parameter n_var: Variance of n to spawn.
:type respawn_amount: float
:parameter respawn_amount: Defines how much dirt 'amount' is placed every 'spawn_freq' ticks.
:type initial_amount: float
:parameter initial_amount: Defines how much dirt 'amount' is initially placed.
"""
super().__init__()
self.amount_var = amount_var
self.n_var = n_var
self.respawn_amount = respawn_amount
self.respawn_n = respawn_n
self.initial_amount = initial_amount
self.initial_n = initial_n
self.spawn_freq = spawn_freq
self._next_dirt_spawn = spawn_freq
def on_init(self, state, lvl_map) -> str:
state[d.DIRT].trigger_dirt_spawn(state, initial_spawn=True)
return f'Initial Dirt was spawned on: {[x.pos for x in state[d.DIRT]]}'
result = state[d.DIRT].trigger_dirt_spawn(self.initial_n, self.initial_amount, state,
n_var=self.n_var, amount_var=self.amount_var)
state.print(f'Initial Dirt was spawned on: {[x.pos for x in state[d.DIRT]]}')
return result
def tick_step(self, state):
if self._next_dirt_spawn < 0:
pass # No DirtPile Spawn
elif not self._next_dirt_spawn:
validity = state[d.DIRT].trigger_dirt_spawn(state)
return [TickResult(entity=None, validity=validity, identifier=self.name, reward=0)]
result = [state[d.DIRT].trigger_dirt_spawn(self.respawn_n, self.respawn_amount, state,
n_var=self.n_var, amount_var=self.amount_var)]
self._next_dirt_spawn = self.spawn_freq
else:
self._next_dirt_spawn -= 1
return []
result = []
return result
class DirtSmearOnMove(Rule):
class EntitiesSmearDirtOnMove(Rule):
def __init__(self, smear_amount: float = 0.2):
def __init__(self, smear_ratio: float = 0.2):
"""
Enables 'smearing'. Entities that move through dirt, will leave a trail behind them.
They take dirt * smear_ratio of it with them to their next position.
:type smear_ratio: float
:parameter smear_ratio: How much percent dirt is smeared by entities to their next position.
"""
assert smear_ratio < 1, "'Smear Amount' must be smaller than 1"
super().__init__()
self.smear_amount = smear_amount
self.smear_ratio = smear_ratio
def tick_post_step(self, state):
results = list()
for entity in state.moving_entites:
if is_move(entity.state.identifier) and entity.state.validity == c.VALID:
if old_pos_dirt := state[d.DIRT].by_pos(entity.last_pos):
if smeared_dirt := round(old_pos_dirt.amount * self.smear_amount, 2):
if smeared_dirt := round(old_pos_dirt.amount * self.smear_ratio, 2):
if state[d.DIRT].spawn(entity.pos, amount=smeared_dirt): # pos statt tile
results.append(TickResult(identifier=self.name, entity=entity,
reward=0, validity=c.VALID))