mirror of
https://github.com/illiumst/marl-factory-grid.git
synced 2025-07-05 17:11:35 +02:00
added itemagent test
This commit is contained in:
@ -1,27 +1,44 @@
|
|||||||
Agents:
|
Agents:
|
||||||
Wolfgang:
|
Clean test agent:
|
||||||
Actions:
|
Actions:
|
||||||
- Noop
|
- Noop
|
||||||
- Charge
|
- Charge
|
||||||
- Clean
|
- Clean
|
||||||
- DestAction
|
- DoorUse
|
||||||
- DoorUse
|
- Move8
|
||||||
- ItemAction
|
|
||||||
- Move8
|
|
||||||
Observations:
|
Observations:
|
||||||
- Combined:
|
- Combined:
|
||||||
- Other
|
- Other
|
||||||
- Walls
|
- Walls
|
||||||
- GlobalPosition
|
- GlobalPosition
|
||||||
- Battery
|
- Battery
|
||||||
- ChargePods
|
- ChargePods
|
||||||
- DirtPiles
|
- DirtPiles
|
||||||
- Destinations
|
- Destinations
|
||||||
- Doors
|
- Doors
|
||||||
- Items
|
- Maintainers
|
||||||
- Inventory
|
Item test agent:
|
||||||
- DropOffLocations
|
Actions:
|
||||||
- Maintainers
|
- Noop
|
||||||
|
- Charge
|
||||||
|
- DestAction
|
||||||
|
- DoorUse
|
||||||
|
- ItemAction
|
||||||
|
- Move8
|
||||||
|
Observations:
|
||||||
|
- Combined:
|
||||||
|
- Other
|
||||||
|
- Walls
|
||||||
|
- GlobalPosition
|
||||||
|
- Battery
|
||||||
|
- ChargePods
|
||||||
|
- Destinations
|
||||||
|
- Doors
|
||||||
|
- Items
|
||||||
|
- Inventory
|
||||||
|
- DropOffLocations
|
||||||
|
- Maintainers
|
||||||
|
|
||||||
Entities:
|
Entities:
|
||||||
|
|
||||||
Batteries:
|
Batteries:
|
||||||
@ -89,4 +106,5 @@ Rules:
|
|||||||
|
|
||||||
Tests:
|
Tests:
|
||||||
MaintainerTest: {}
|
MaintainerTest: {}
|
||||||
# DirtAgentTest: {}
|
DirtAgentTest: {}
|
||||||
|
ItemAgentTest: {}
|
||||||
|
@ -2,7 +2,9 @@ import unittest
|
|||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
import marl_factory_grid.modules.maintenance.constants as M
|
import marl_factory_grid.modules.maintenance.constants as M
|
||||||
from marl_factory_grid.modules import Door, Machine
|
from marl_factory_grid.algorithms.static.TSP_dirt_agent import TSPDirtAgent
|
||||||
|
from marl_factory_grid.environment.entity.agent import Agent
|
||||||
|
from marl_factory_grid.modules import Door, Machine, DirtPile, Item, DropOffLocation, ItemAction
|
||||||
from marl_factory_grid.utils.results import TickResult, DoneResult, ActionResult
|
from marl_factory_grid.utils.results import TickResult, DoneResult, ActionResult
|
||||||
import marl_factory_grid.environment.constants as c
|
import marl_factory_grid.environment.constants as c
|
||||||
|
|
||||||
@ -56,18 +58,13 @@ class MaintainerTest(Test):
|
|||||||
|
|
||||||
# has valid actionresult
|
# has valid actionresult
|
||||||
self.assertIsInstance(maintainer.state, ActionResult)
|
self.assertIsInstance(maintainer.state, ActionResult)
|
||||||
self.assertEqual(maintainer.state.validity, True)
|
# self.assertEqual(maintainer.state.validity, True)
|
||||||
# print(f"state validity {maintainer.state.validity}")
|
# print(f"state validity {maintainer.state.validity}")
|
||||||
|
|
||||||
# will open doors when standing in front
|
# will open doors when standing in front
|
||||||
if maintainer._closed_door_in_path(state):
|
if maintainer._closed_door_in_path(state):
|
||||||
self.assertEqual(maintainer.get_move_action(state).name, 'use_door')
|
self.assertEqual(maintainer.get_move_action(state).name, 'use_door')
|
||||||
|
|
||||||
# elif maintainer._path and len(maintainer._path) > 1:
|
|
||||||
# can move
|
|
||||||
# print(f"maintainer move: {maintainer.move(maintainer._path[1], state)}")
|
|
||||||
# self.assertTrue(maintainer.move(maintainer._path[1], state))
|
|
||||||
|
|
||||||
# if maintainer._next and not maintainer._path:
|
# if maintainer._next and not maintainer._path:
|
||||||
# finds valid targets when at target location
|
# finds valid targets when at target location
|
||||||
# route = maintainer.calculate_route(maintainer._last[-1], state.floortile_graph)
|
# route = maintainer.calculate_route(maintainer._last[-1], state.floortile_graph)
|
||||||
@ -76,19 +73,17 @@ class MaintainerTest(Test):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
def tick_post_step(self, state) -> List[TickResult]:
|
def tick_post_step(self, state) -> List[TickResult]:
|
||||||
# do maintainers actions have correct effects on environment i.e. doors open, machines heal
|
# do maintainers' actions have correct effects on environment i.e. doors open, machines heal
|
||||||
for maintainer in state.entities[M.MAINTAINERS]:
|
for maintainer in state.entities[M.MAINTAINERS]:
|
||||||
if maintainer._path and self.temp_state_dict != {}:
|
if maintainer._path and self.temp_state_dict != {}:
|
||||||
last_action = self.temp_state_dict[maintainer.identifier]
|
last_action = self.temp_state_dict[maintainer.identifier]
|
||||||
print(last_action.identifier)
|
|
||||||
if last_action.identifier == 'DoorUse':
|
if last_action.identifier == 'DoorUse':
|
||||||
if door := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
|
if door := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
|
||||||
isinstance(entity, Door)), None):
|
isinstance(entity, Door)), None):
|
||||||
self.assertTrue(door.is_open)
|
self.assertTrue(door.is_open)
|
||||||
if last_action.identifier == 'MachineAction':
|
if last_action.identifier == 'MachineAction':
|
||||||
if machine := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
|
if machine := next((entity for entity in state.entities.get_entities_near_pos(maintainer.pos) if
|
||||||
isinstance(entity, Machine)), None):
|
isinstance(entity, Machine)), None):
|
||||||
print(f"machine hp: {machine.health}")
|
|
||||||
self.assertEqual(machine.health, 100)
|
self.assertEqual(machine.health, 100)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
@ -107,6 +102,7 @@ class DirtAgentTest(Test):
|
|||||||
environment.
|
environment.
|
||||||
"""
|
"""
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.temp_state_dict = {}
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_init(self, state, lvl_map):
|
def on_init(self, state, lvl_map):
|
||||||
@ -116,20 +112,101 @@ class DirtAgentTest(Test):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
def tick_step(self, state) -> List[TickResult]:
|
def tick_step(self, state) -> List[TickResult]:
|
||||||
for agent in state.entities[c.AGENT]:
|
for dirtagent in [a for a in state.entities[c.AGENT] if "Clean" in a.identifier]: # isinstance TSPDirtAgent
|
||||||
print(agent)
|
|
||||||
# has valid actionresult
|
# has valid actionresult
|
||||||
self.assertIsInstance(agent.state, ActionResult)
|
self.assertIsInstance(dirtagent.state, ActionResult)
|
||||||
# self.assertEqual(agent.state.validity, True)
|
# self.assertEqual(agent.state.validity, True)
|
||||||
|
# print(f"state validity {maintainer.state.validity}")
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def tick_post_step(self, state) -> List[TickResult]:
|
def tick_post_step(self, state) -> List[TickResult]:
|
||||||
# action success?
|
# do agents' actions have correct effects on environment i.e. doors open, dirt is cleaned
|
||||||
# collisions? if yes, reported?
|
for dirtagent in [a for a in state.entities[c.AGENT] if "Clean" in a.identifier]: # isinstance TSPDirtAgent
|
||||||
|
if self.temp_state_dict != {}: # and
|
||||||
|
last_action = self.temp_state_dict[dirtagent.identifier]
|
||||||
|
if last_action.identifier == 'DoorUse':
|
||||||
|
if door := next((entity for entity in state.entities.get_entities_near_pos(dirtagent.pos) if
|
||||||
|
isinstance(entity, Door)), None):
|
||||||
|
self.assertTrue(door.is_open) # TODO catch if someone closes a door in same moment
|
||||||
|
if last_action.identifier == 'Clean':
|
||||||
|
if dirt := next((entity for entity in state.entities.get_entities_near_pos(dirtagent.pos) if
|
||||||
|
isinstance(entity, DirtPile)), None):
|
||||||
|
# print(f"dirt left on pos: {dirt.amount}")
|
||||||
|
self.assertTrue(
|
||||||
|
dirt.amount < 5) # TODO amount one step before - clean amount?
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def on_check_done(self, state) -> List[DoneResult]:
|
def on_check_done(self, state) -> List[DoneResult]:
|
||||||
|
for dirtagent in [a for a in state.entities[c.AGENT] if "Clean" in a.identifier]: # isinstance TSPDirtAgent
|
||||||
|
temp_state = dirtagent._status
|
||||||
|
self.temp_state_dict[dirtagent.identifier] = temp_state
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# class ItemAgentTest(Test):
|
|
||||||
|
class ItemAgentTest(Test):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Tests whether the dirt agent will perform the correct actions and whether the actions register correctly in the
|
||||||
|
environment.
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
self.temp_state_dict = {}
|
||||||
|
pass
|
||||||
|
|
||||||
|
def on_init(self, state, lvl_map):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def on_reset(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def tick_step(self, state) -> List[TickResult]:
|
||||||
|
for itemagent in [a for a in state.entities[c.AGENT] if "Item" in a.identifier]: # isinstance TSPItemAgent
|
||||||
|
# has valid actionresult
|
||||||
|
self.assertIsInstance(itemagent.state, ActionResult)
|
||||||
|
# self.assertEqual(agent.state.validity, True)
|
||||||
|
# print(f"state validity {maintainer.state.validity}")
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def tick_post_step(self, state) -> List[TickResult]:
|
||||||
|
# do agents' actions have correct effects on environment i.e. doors open, items are picked up and dropped off
|
||||||
|
for itemagent in [a for a in state.entities[c.AGENT] if "Item" in a.identifier]: # isinstance TSPItemAgent
|
||||||
|
|
||||||
|
if self.temp_state_dict != {}: # and
|
||||||
|
last_action = self.temp_state_dict[itemagent.identifier]
|
||||||
|
if last_action.identifier == 'DoorUse':
|
||||||
|
if door := next((entity for entity in state.entities.get_entities_near_pos(itemagent.pos) if
|
||||||
|
isinstance(entity, Door)), None):
|
||||||
|
self.assertTrue(door.is_open)
|
||||||
|
if last_action.identifier == 'ItemAction':
|
||||||
|
|
||||||
|
print(last_action.valid_drop_off_reward) # kann man das nehmen für dropoff vs pickup?
|
||||||
|
# valid pickup?
|
||||||
|
|
||||||
|
# If it was a pick-up action
|
||||||
|
nearby_items = [e for e in state.entities.get_entities_near_pos(itemagent.pos) if
|
||||||
|
isinstance(e, Item)]
|
||||||
|
self.assertNotIn(Item, nearby_items)
|
||||||
|
|
||||||
|
# If the agent has the item in its inventory
|
||||||
|
self.assertTrue(itemagent.bound_entity)
|
||||||
|
|
||||||
|
# If it was a drop-off action
|
||||||
|
nearby_drop_offs = [e for e in state.entities.get_entities_near_pos(itemagent.pos) if
|
||||||
|
isinstance(e, DropOffLocation)]
|
||||||
|
if nearby_drop_offs:
|
||||||
|
dol = nearby_drop_offs[0]
|
||||||
|
self.assertTrue(dol.bound_entity) # item in drop-off location?
|
||||||
|
|
||||||
|
# Ensure the item is not in the inventory after dropping off
|
||||||
|
self.assertNotIn(Item, state.entities.get_entities_near_pos(itemagent.pos))
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def on_check_done(self, state) -> List[DoneResult]:
|
||||||
|
for itemagent in [a for a in state.entities[c.AGENT] if "Item" in a.identifier]: # isinstance TSPItemAgent
|
||||||
|
temp_state = itemagent._status
|
||||||
|
self.temp_state_dict[itemagent.identifier] = temp_state
|
||||||
|
return []
|
||||||
|
@ -4,6 +4,7 @@ from random import randint
|
|||||||
from tqdm import trange
|
from tqdm import trange
|
||||||
|
|
||||||
from marl_factory_grid.algorithms.static.TSP_dirt_agent import TSPDirtAgent
|
from marl_factory_grid.algorithms.static.TSP_dirt_agent import TSPDirtAgent
|
||||||
|
from marl_factory_grid.algorithms.static.TSP_item_agent import TSPItemAgent
|
||||||
from marl_factory_grid.environment.factory import Factory
|
from marl_factory_grid.environment.factory import Factory
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
@ -22,7 +23,7 @@ if __name__ == '__main__':
|
|||||||
if render:
|
if render:
|
||||||
factory.render()
|
factory.render()
|
||||||
action_spaces = factory.action_space
|
action_spaces = factory.action_space
|
||||||
agents = [TSPDirtAgent(factory, 0)]
|
agents = [TSPDirtAgent(factory, 0), TSPItemAgent(factory, 1)]
|
||||||
while not done:
|
while not done:
|
||||||
a = [randint(0, x.n - 1) for x in action_spaces]
|
a = [randint(0, x.n - 1) for x in action_spaces]
|
||||||
obs_type, _, _, done, info = factory.step(a)
|
obs_type, _, _, done, info = factory.step(a)
|
||||||
|
Reference in New Issue
Block a user