Logging Monitor Callback
This commit is contained in:
@ -17,7 +17,7 @@ DIRT_INDEX = -1
|
||||
|
||||
@dataclass
|
||||
class DirtProperties:
|
||||
clean_amount = 0.25
|
||||
clean_amount = 10
|
||||
max_spawn_ratio = 0.1
|
||||
gain_amount = 0.1
|
||||
spawn_frequency = 5
|
||||
@ -31,8 +31,9 @@ class SimpleFactory(BaseFactory):
|
||||
def _is_clean_up_action(self, action):
|
||||
return self.action_space.n - 1 == action
|
||||
|
||||
def __init__(self, *args, dirt_properties: DirtProperties, **kwargs):
|
||||
def __init__(self, *args, dirt_properties: DirtProperties, verbose=False, **kwargs):
|
||||
self._dirt_properties = dirt_properties
|
||||
self.verbose = verbose
|
||||
super(SimpleFactory, self).__init__(*args, **kwargs)
|
||||
self.slice_strings.update({self.state.shape[0]-1: 'dirt'})
|
||||
self.renderer = None # expensive - dont use it when not required !
|
||||
@ -81,6 +82,9 @@ class SimpleFactory(BaseFactory):
|
||||
return pos, cleanup_was_sucessfull
|
||||
|
||||
def step(self, actions):
|
||||
if self.state[h.LEVEL_IDX][self.agent_i_position(0)] == h.IS_OCCUPIED_CELL:
|
||||
print(f'fAgent placed on wall!!!!, step is :{self.steps}')
|
||||
raise Exception('Agent placed on wall!!!!')
|
||||
_, _, _, info = super(SimpleFactory, self).step(actions)
|
||||
if not self.next_dirt_spawn:
|
||||
self.spawn_dirt()
|
||||
@ -94,12 +98,6 @@ class SimpleFactory(BaseFactory):
|
||||
if self._is_clean_up_action(action):
|
||||
agent_i_pos = self.agent_i_position(agent_i)
|
||||
_, valid = self.clean_up(agent_i_pos)
|
||||
if valid:
|
||||
print(f'Agent {agent_i} did just clean up some dirt at {agent_i_pos}.')
|
||||
self.monitor.add('dirt_cleaned', self._dirt_properties.clean_amount)
|
||||
else:
|
||||
print(f'Agent {agent_i} just tried to clean up some dirt at {agent_i_pos}, but was unsucsessfull.')
|
||||
self.monitor.add('failed_cleanup_attempt', 1)
|
||||
return agent_i_pos, valid
|
||||
else:
|
||||
raise RuntimeError('This should not happen!!!')
|
||||
@ -120,25 +118,44 @@ class SimpleFactory(BaseFactory):
|
||||
dirty_tiles = len(np.nonzero(self.state[DIRT_INDEX]))
|
||||
|
||||
try:
|
||||
this_step_reward = (dirty_tiles / current_dirt_amount)
|
||||
# penalty = current_dirt_amount
|
||||
penalty = 0
|
||||
except (ZeroDivisionError, RuntimeWarning):
|
||||
this_step_reward = 0
|
||||
|
||||
penalty = 0
|
||||
inforcements = 0
|
||||
for agent_state in agent_states:
|
||||
collisions = agent_state.collisions
|
||||
print(f't = {self.steps}\tAgent {agent_state.i} has collisions with '
|
||||
f'{[self.slice_strings[entity] for entity in collisions if entity != self.string_slices["dirt"]]}')
|
||||
if self._is_clean_up_action(agent_state.action) and agent_state.action_valid:
|
||||
this_step_reward += 1
|
||||
cols = agent_state.collisions
|
||||
self.print(f't = {self.steps}\tAgent {agent_state.i} has collisions with '
|
||||
f'{[self.slice_strings[entity] for entity in cols if entity != self.string_slices["dirt"]]}')
|
||||
if self._is_clean_up_action(agent_state.action):
|
||||
if agent_state.action_valid:
|
||||
inforcements += 10
|
||||
self.print(f'Agent {agent_state.i} did just clean up some dirt at {agent_state.pos}.')
|
||||
self.monitor.add('dirt_cleaned', self._dirt_properties.clean_amount)
|
||||
else:
|
||||
self.print(f'Agent {agent_state.i} just tried to clean up some dirt '
|
||||
f'at {agent_state.pos}, but was unsucsessfull.')
|
||||
self.monitor.add('failed_cleanup_attempt', 1)
|
||||
elif self._is_moving_action(agent_state.action):
|
||||
if not agent_state.action_valid:
|
||||
penalty += 10
|
||||
else:
|
||||
inforcements += 1
|
||||
|
||||
for entity in collisions:
|
||||
for entity in cols:
|
||||
if entity != self.string_slices["dirt"]:
|
||||
self.monitor.add(f'agent_{agent_state.i}_vs_{self.slice_strings[entity]}', 1)
|
||||
|
||||
this_step_reward = max(0, inforcements-penalty)
|
||||
self.monitor.set('dirt_amount', current_dirt_amount)
|
||||
self.monitor.set('dirty_tiles', dirty_tiles)
|
||||
print(f"reward is {this_step_reward}")
|
||||
self.print(f"reward is {this_step_reward}")
|
||||
return this_step_reward, {}
|
||||
|
||||
def print(self, string):
|
||||
if self.verbose:
|
||||
print(string)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
render = True
|
||||
|
Reference in New Issue
Block a user