fixes to maintainer test

This commit is contained in:
Chanumask 2023-11-23 15:28:47 +01:00
parent fcea1972a4
commit ef440356e0
4 changed files with 26 additions and 22 deletions

View File

@ -57,41 +57,45 @@ class MaintainerTest(Test):
# has valid actionresult
self.assertIsInstance(maintainer.state, ActionResult)
self.assertEqual(maintainer.state.validity, True)
# print(f"state validity {maintainer.state.validity}")
# will open doors when standing in front
if maintainer._closed_door_in_path(state):
self.assertEqual(maintainer.get_move_action(state).name, 'use_door')
elif maintainer._path and len(maintainer._path) > 1:
# elif maintainer._path and len(maintainer._path) > 1:
# can move
print(maintainer.move(maintainer._path[1], state))
# print(f"maintainer move: {maintainer.move(maintainer._path[1], state)}")
# self.assertTrue(maintainer.move(maintainer._path[1], state))
if maintainer._next and not maintainer._path:
# if maintainer._next and not maintainer._path:
# finds valid targets when at target location
route = maintainer.calculate_route(maintainer._last[-1], state.floortile_graph)
if entities_at_target_location := [entity for entity in state.entities.by_pos(route[-1])]:
self.assertTrue(any(isinstance(e, Machine) for e in entities_at_target_location))
# route = maintainer.calculate_route(maintainer._last[-1], state.floortile_graph)
# if entities_at_target_location := [entity for entity in state.entities.by_pos(route[-1])]:
# self.assertTrue(any(isinstance(e, Machine) for e in entities_at_target_location))
return []
def tick_post_step(self, state) -> List[TickResult]:
# do maintainers actions have correct effects on environment i.e. doors open, machines heal
for maintainer in state.entities[M.MAINTAINERS]:
if maintainer._path:
# was door opened successfully?
if maintainer._closed_door_in_path(state):
door = next(
(entity for entity in state.entities.by_pos(maintainer._path[0]) if isinstance(entity, Door)),
None)
# self.assertEqual(door.is_open, True)
# when stepping off machine, did maintain action work?
if maintainer._path and self.temp_state_dict != {}:
last_action = self.temp_state_dict[maintainer.identifier]
print(last_action.identifier)
if last_action.identifier == 'DoorUse':
if door := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
isinstance(entity, Door)), None):
self.assertTrue(door.is_open)
if last_action.identifier == 'MachineAction':
if machine := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
isinstance(entity, Machine)), None):
print(f"machine hp: {machine.health}")
self.assertEqual(machine.health, 100)
return []
def on_check_done(self, state) -> List[DoneResult]:
for maintainer in state.entities[M.MAINTAINERS]:
temp_state = maintainer._status
self.temp_state_dict[maintainer.identifier] = temp_state
print(self.temp_state_dict)
return []
@ -116,7 +120,7 @@ class DirtAgentTest(Test):
print(agent)
# has valid actionresult
self.assertIsInstance(agent.state, ActionResult)
self.assertEqual(agent.state.validity, True)
# self.assertEqual(agent.state.validity, True)
return []

View File

@ -107,7 +107,6 @@ class Door(Entity):
def tick(self, state) -> Union[Result, None]:
# Check if no entity is standing in the door
if not any(e for e in state.entities.by_pos(self.pos) if e.var_can_collide or e.var_is_blocking_pos):
# if len(state.entities.pos_dict[self.pos]) <= 2: #can collide can block
if self.is_open and self.time_to_close:
self._decrement_timer()
return Result(f"{d.DOOR}_tick", c.VALID, entity=self)

View File

@ -3,7 +3,7 @@ Agents:
Actions:
- Noop
- Charge
- CleanUp
- Clean
- DestAction
- DoorUse
- ItemAction
@ -23,6 +23,7 @@ Agents:
- DropOffLocations
- Maintainers
Entities:
Batteries:
initial_charge: 0.8
per_action_costs: 0.02
@ -58,7 +59,7 @@ General:
level_name: large
pomdp_r: 3
verbose: false
tests: true
tests: false
Rules:
# Environment Dynamics

View File

@ -22,7 +22,7 @@ if __name__ == '__main__':
if render:
factory.render()
action_spaces = factory.action_space
# agents = [TSPDirtAgent(factory, 0)]
agents = [TSPDirtAgent(factory, 0)]
while not done:
a = [randint(0, x.n - 1) for x in action_spaces]
obs_type, _, _, done, info = factory.step(a)