mirror of
https://github.com/illiumst/marl-factory-grid.git
synced 2025-05-23 07:16:44 +02:00
maintainer test
This commit is contained in:
parent
f25f90a78b
commit
26318a7b48
@ -1,17 +1,20 @@
|
||||
import abc
|
||||
from typing import List
|
||||
import unittest
|
||||
|
||||
from marl_factory_grid.utils.results import TickResult, DoneResult
|
||||
from marl_factory_grid.modules import Door, Machine, Maintainer
|
||||
from marl_factory_grid.utils.results import TickResult, DoneResult, ActionResult
|
||||
import marl_factory_grid.modules.maintenance.constants as M
|
||||
from marl_factory_grid.environment import constants as c
|
||||
|
||||
|
||||
class Test(abc.ABC):
|
||||
class Test(unittest.TestCase):
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.__class__.__name__
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
super().__init__()
|
||||
|
||||
def __repr__(self):
|
||||
return f'{self.name}'
|
||||
@ -35,9 +38,77 @@ class Test(abc.ABC):
|
||||
return []
|
||||
|
||||
|
||||
class FirstTest(Test):
|
||||
class MaintainerTest(Test):
|
||||
|
||||
def __init__(self):
|
||||
print("firstTest")
|
||||
super().__init__()
|
||||
pass
|
||||
|
||||
def tick_step(self, state) -> List[TickResult]:
|
||||
for maintainer in state.entities[M.MAINTAINERS]:
|
||||
|
||||
# has valid actionresult
|
||||
self.assertIsInstance(maintainer.state, ActionResult)
|
||||
self.assertEqual(maintainer.state.validity, True)
|
||||
|
||||
# will open doors when standing in front
|
||||
if maintainer._closed_door_in_path(state):
|
||||
self.assertEqual(maintainer.get_move_action(state).name, 'use_door')
|
||||
|
||||
if maintainer._path:
|
||||
# can move
|
||||
print(maintainer.move(maintainer._path[1], state)) # 0 immer false ausser schritt 1, 1 meistens true nicht immer
|
||||
# self.assertTrue(maintainer.move(maintainer._path[1], state))
|
||||
|
||||
else:
|
||||
# finds valid targets oder hier?
|
||||
route = maintainer.calculate_route(maintainer._last[-1], state.flootile_graph)
|
||||
if entities_at_target_location := [entity for entity in state.entities.by_pos(route[-1])]:
|
||||
self.assertTrue(any(isinstance(e, Machine) for e in entities_at_target_location))
|
||||
|
||||
return []
|
||||
|
||||
def tick_post_step(self, state) -> List[TickResult]:
|
||||
for maintainer in state.entities[M.MAINTAINERS]:
|
||||
if maintainer._path:
|
||||
# if action was door use: was door opened successfully?
|
||||
if maintainer._closed_door_in_path(state):
|
||||
# print(maintainer.get_move_action(state))
|
||||
door = next(
|
||||
(entity for entity in state.entities.by_pos(maintainer._path[0]) if isinstance(entity, Door)),
|
||||
None)
|
||||
self.assertEqual(door.is_open, True)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
class DirtAgentTest(Test):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
pass
|
||||
|
||||
def on_init(self, state, lvl_map):
|
||||
# dirtagent richtig gespawnt?
|
||||
return []
|
||||
|
||||
def on_reset(self):
|
||||
return []
|
||||
|
||||
def tick_step(self, state) -> List[TickResult]:
|
||||
# check observation correct?
|
||||
# can open doors
|
||||
# can find way
|
||||
# can move
|
||||
# clean action success? action result valid
|
||||
return []
|
||||
|
||||
def tick_post_step(self, state) -> List[TickResult]:
|
||||
# action success?
|
||||
# collisions? if yes, reported?
|
||||
return []
|
||||
|
||||
def on_check_done(self, state) -> List[DoneResult]:
|
||||
return []
|
||||
|
||||
# class ItemAgentTest(Test):
|
||||
|
@ -9,7 +9,6 @@ from marl_factory_grid.environment import constants as c
|
||||
|
||||
from marl_factory_grid.modules.destinations import constants as d
|
||||
from marl_factory_grid.modules.destinations.entitites import Destination
|
||||
from marl_factory_grid.utils.states import Gamestate
|
||||
|
||||
|
||||
ANY = 'any'
|
||||
@ -118,7 +117,7 @@ class SpawnDestinationsPerAgent(Rule):
|
||||
per_agent_d = {agent_name: [ast.literal_eval(x) for x in value]}
|
||||
self.per_agent_positions.update(**per_agent_d)
|
||||
|
||||
def on_reset(self, state: Gamestate):
|
||||
def on_reset(self, state):
|
||||
for (agent_name, coords_or_quantity) in self.per_agent_positions.items():
|
||||
agent = h.get_first(state[c.AGENT], lambda x: agent_name in x.name)
|
||||
assert agent
|
||||
|
@ -12,10 +12,11 @@ class MoveMaintainers(Rule):
|
||||
super().__init__()
|
||||
|
||||
def tick_step(self, state) -> List[TickResult]:
|
||||
move_results = []
|
||||
for maintainer in state[M.MAINTAINERS]:
|
||||
maintainer.tick(state)
|
||||
# Todo: Return a Result Object.
|
||||
return []
|
||||
result = maintainer.tick(state)
|
||||
move_results.append(result)
|
||||
return move_results
|
||||
|
||||
|
||||
class DoneAtMaintainerCollision(Rule):
|
||||
|
@ -22,13 +22,6 @@ Agents:
|
||||
- Inventory
|
||||
- DropOffLocations
|
||||
- Maintainers
|
||||
# This is special for agents, as each one is different and can act as an adversary e.g.
|
||||
Positions:
|
||||
- (16, 7)
|
||||
- (16, 6)
|
||||
- (16, 3)
|
||||
- (16, 4)
|
||||
- (16, 5)
|
||||
Entities:
|
||||
Batteries:
|
||||
initial_charge: 0.8
|
||||
@ -86,7 +79,7 @@ Rules:
|
||||
done_at_collisions: false
|
||||
|
||||
# Done Conditions
|
||||
DoneAtDestinationReachAny:
|
||||
DoneAtDestinationReach:
|
||||
DoneOnAllDirtCleaned:
|
||||
DoneAtBatteryDischarge:
|
||||
DoneAtMaintainerCollision:
|
||||
@ -94,4 +87,4 @@ Rules:
|
||||
max_steps: 500
|
||||
|
||||
Tests:
|
||||
FirstTest: {}
|
||||
MaintainerTest: {}
|
Loading…
x
Reference in New Issue
Block a user