mirror of
https://github.com/illiumst/marl-factory-grid.git
synced 2025-12-20 05:56:07 +01:00
Some Dokumentation (tools.py)
This commit is contained in:
@@ -2,44 +2,68 @@ import importlib
|
|||||||
import inspect
|
import inspect
|
||||||
from os import PathLike
|
from os import PathLike
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Union
|
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from marl_factory_grid.environment import constants as c
|
from marl_factory_grid.environment import constants as c
|
||||||
from marl_factory_grid.utils.helpers import locate_and_import_class
|
from marl_factory_grid.utils.helpers import locate_and_import_class
|
||||||
|
|
||||||
ACTION = 'Action'
|
ACTION = 'Action'
|
||||||
GENERAL = 'General'
|
GENERAL = 'General'
|
||||||
ENTITIES = 'Objects'
|
ENTITIES = 'Objects'
|
||||||
OBSERVATIONS = 'Observations'
|
OBSERVATIONS = 'Observations'
|
||||||
RULES = 'Rule'
|
RULES = 'Rule'
|
||||||
ASSETS = 'Assets'
|
TESTS = 'Tests'
|
||||||
EXCLUDED = ['identifier', 'args', 'kwargs', 'Move', 'Agent', 'GlobalPositions', 'Walls',
|
EXCLUDED = ['identifier', 'args', 'kwargs', 'Move', 'Agent', 'GlobalPositions', 'Walls',
|
||||||
'TemplateRule', 'Entities', 'EnvObjects', 'Zones', ]
|
'TemplateRule', 'Entities', 'EnvObjects', 'Zones', ]
|
||||||
|
|
||||||
|
|
||||||
class ConfigExplainer:
|
class ConfigExplainer:
|
||||||
|
|
||||||
def __init__(self, custom_path: Union[None, PathLike] = None):
|
def __init__(self, custom_path: None | PathLike = None):
|
||||||
"""
|
"""
|
||||||
TODO
|
This utility serves as a helper for debugging and exploring available modules and classes.
|
||||||
|
Does not do anything unless told.
|
||||||
|
The functions get_xxxxx() retrieves and returns the information and save_xxxxx() dumps them to disk.
|
||||||
|
|
||||||
|
get_all() and save_all() helps geting a general overview.
|
||||||
|
|
||||||
:return:
|
When provided with a custom path, your own modules become available.
|
||||||
|
|
||||||
|
:param custom_path: Path to your custom module folder.
|
||||||
"""
|
"""
|
||||||
self.base_path = Path(__file__).parent.parent.resolve()
|
self.base_path = Path(__file__).parent.parent.resolve()
|
||||||
self.custom_path = custom_path
|
self.custom_path = Path(custom_path) if custom_path is not None else custom_path
|
||||||
self.searchspace = [ACTION, GENERAL, ENTITIES, OBSERVATIONS, RULES, ASSETS]
|
self.searchspace = [ACTION, GENERAL, ENTITIES, OBSERVATIONS, RULES, TESTS]
|
||||||
|
|
||||||
def explain_module(self, class_to_explain):
|
@staticmethod
|
||||||
|
def _explain_module(class_to_explain):
|
||||||
|
"""
|
||||||
|
INTERNAL USE ONLY
|
||||||
|
"""
|
||||||
parameters = inspect.signature(class_to_explain).parameters
|
parameters = inspect.signature(class_to_explain).parameters
|
||||||
explained = {class_to_explain.__name__:
|
explained = {class_to_explain.__name__:
|
||||||
{key: val.default for key, val in parameters.items() if key not in EXCLUDED}
|
{key: val.default for key, val in parameters.items() if key not in EXCLUDED}
|
||||||
}
|
}
|
||||||
return explained
|
return explained
|
||||||
|
|
||||||
|
def _get_by_identifier(self, identifier):
|
||||||
|
"""
|
||||||
|
INTERNAL USE ONLY
|
||||||
|
"""
|
||||||
|
entities_base_cls = locate_and_import_class(identifier, self.base_path)
|
||||||
|
module_paths = [x.resolve() for x in self.base_path.rglob('*.py') if x.is_file() and '__init__' not in x.name]
|
||||||
|
found_entities = self._load_and_compare(entities_base_cls, module_paths)
|
||||||
|
if self.custom_path is not None:
|
||||||
|
module_paths = [x.resolve() for x in self.custom_path.rglob('*.py') if x.is_file()
|
||||||
|
and '__init__' not in x.name]
|
||||||
|
found_entities.update(self._load_and_compare(entities_base_cls, module_paths))
|
||||||
|
return found_entities
|
||||||
|
|
||||||
def _load_and_compare(self, compare_class, paths):
|
def _load_and_compare(self, compare_class, paths):
|
||||||
|
"""
|
||||||
|
INTERNAL USE ONLY
|
||||||
|
"""
|
||||||
conf = {}
|
conf = {}
|
||||||
package_pos = next(idx for idx, x in enumerate(Path(__file__).resolve().parts) if x == 'marl_factory_grid')
|
package_pos = next(idx for idx, x in enumerate(Path(__file__).resolve().parts) if x == 'marl_factory_grid')
|
||||||
for module_path in paths:
|
for module_path in paths:
|
||||||
@@ -50,40 +74,97 @@ class ConfigExplainer:
|
|||||||
mod = mods.__getattribute__(key)
|
mod = mods.__getattribute__(key)
|
||||||
try:
|
try:
|
||||||
if issubclass(mod, compare_class) and mod != compare_class:
|
if issubclass(mod, compare_class) and mod != compare_class:
|
||||||
conf.update(self.explain_module(mod))
|
conf.update(self._explain_module(mod))
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
def save_actions(self, output_conf_file: PathLike = Path('../../quickstart') / 'explained_actions.yml'):
|
@staticmethod
|
||||||
self._save_to_file(self.get_entities(), output_conf_file, ACTION)
|
def _save_to_file(data: dict, filepath: PathLike, tag: str = ''):
|
||||||
|
"""
|
||||||
|
INTERNAL USE ONLY
|
||||||
|
"""
|
||||||
|
filepath = Path(filepath)
|
||||||
|
yaml.Dumper.ignore_aliases = lambda *args: True
|
||||||
|
with filepath.open('w') as f:
|
||||||
|
yaml.dump(data, f, encoding='utf-8')
|
||||||
|
print(f'Example config {"for " + tag + " " if tag else " "}dumped')
|
||||||
|
print(f'See file: {filepath}')
|
||||||
|
|
||||||
def get_actions(self):
|
def get_actions(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Retrieve all actions from module folders.
|
||||||
|
|
||||||
|
:returns: A list of all available actions.
|
||||||
|
"""
|
||||||
actions = self._get_by_identifier(ACTION)
|
actions = self._get_by_identifier(ACTION)
|
||||||
assert all(not x for x in actions.values()), 'Please only provide Names, no Mappings.'
|
assert all(not x for x in actions.values()), 'Please only provide Names, no Mappings.'
|
||||||
actions = list(actions.keys())
|
actions = list(actions.keys())
|
||||||
actions.extend([c.MOVE8, c.MOVE4])
|
actions.extend([c.MOVE8, c.MOVE4])
|
||||||
# TODO: Print to file!
|
|
||||||
return actions
|
return actions
|
||||||
|
|
||||||
def save_entities(self, output_conf_file: PathLike = Path('../../quickstart') / 'explained_entities.yml'):
|
def get_all(self) -> dict[str]:
|
||||||
self._save_to_file(self.get_entities(), output_conf_file, ENTITIES)
|
"""
|
||||||
|
Retrieve all available configurations from module folders.
|
||||||
|
|
||||||
|
:returns: A dictionary of all available configurations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
config_dict = {
|
||||||
|
'General': self.get_general_section(),
|
||||||
|
'Agents': self.get_agent_section(),
|
||||||
|
'Entities': self.get_entities(),
|
||||||
|
'Rules': self.get_rules()
|
||||||
|
}
|
||||||
|
return config_dict
|
||||||
|
|
||||||
def get_entities(self):
|
def get_entities(self):
|
||||||
|
"""
|
||||||
|
Retrieve all entities from module folders.
|
||||||
|
|
||||||
|
:returns: A list of all available entities.
|
||||||
|
"""
|
||||||
entities = self._get_by_identifier(ENTITIES)
|
entities = self._get_by_identifier(ENTITIES)
|
||||||
return entities
|
return entities
|
||||||
|
|
||||||
def save_rules(self, output_conf_file: PathLike = Path('../../quickstart') / 'explained_rules.yml'):
|
@staticmethod
|
||||||
self._save_to_file(self.get_entities(), output_conf_file, RULES)
|
def get_general_section():
|
||||||
|
"""
|
||||||
|
Build the general section.
|
||||||
|
|
||||||
def get_rules(self):
|
:returns: A list of all available entities.
|
||||||
|
"""
|
||||||
|
general = {'level_name': 'rooms', 'env_seed': 69, 'verbose': False,
|
||||||
|
'pomdp_r': 3, 'individual_rewards': True, 'tests': False}
|
||||||
|
return general
|
||||||
|
|
||||||
|
def get_agent_section(self):
|
||||||
|
"""
|
||||||
|
Build the Agent section and retrieve all available actions and observations from module folders.
|
||||||
|
|
||||||
|
:returns: Agent section.
|
||||||
|
"""
|
||||||
|
agents = dict(
|
||||||
|
ExampleAgentName=dict(
|
||||||
|
Actions=self.get_actions(),
|
||||||
|
Observations=self.get_observations())),
|
||||||
|
return agents
|
||||||
|
|
||||||
|
def get_rules(self) -> dict[str]:
|
||||||
|
"""
|
||||||
|
Retrieve all rules from module folders.
|
||||||
|
|
||||||
|
:returns: All available rules.
|
||||||
|
"""
|
||||||
rules = self._get_by_identifier(RULES)
|
rules = self._get_by_identifier(RULES)
|
||||||
return rules
|
return rules
|
||||||
|
|
||||||
def get_assets(self):
|
def get_observations(self) -> list[str]:
|
||||||
pass
|
"""
|
||||||
|
Retrieve all agent observations from module folders.
|
||||||
|
|
||||||
def get_observations(self):
|
:returns: A list of all available observations.
|
||||||
|
"""
|
||||||
names = [c.ALL, c.COMBINED, c.SELF, c.OTHERS, "Agent['ExampleAgentName']"]
|
names = [c.ALL, c.COMBINED, c.SELF, c.OTHERS, "Agent['ExampleAgentName']"]
|
||||||
for key, val in self.get_entities().items():
|
for key, val in self.get_entities().items():
|
||||||
try:
|
try:
|
||||||
@@ -101,45 +182,47 @@ class ConfigExplainer:
|
|||||||
names.extend(e)
|
names.extend(e)
|
||||||
return names
|
return names
|
||||||
|
|
||||||
def _get_by_identifier(self, identifier):
|
def save_actions(self, output_conf_file: PathLike = Path('../../quickstart') / 'actions.yml'):
|
||||||
entities_base_cls = locate_and_import_class(identifier, self.base_path)
|
"""
|
||||||
module_paths = [x.resolve() for x in self.base_path.rglob('*.py') if x.is_file() and '__init__' not in x.name]
|
Write all availale actions to a file.
|
||||||
found_entities = self._load_and_compare(entities_base_cls, module_paths)
|
:param output_conf_file: File to write to. Defaults to ../../quickstart/actions.yml
|
||||||
if self.custom_path is not None:
|
"""
|
||||||
module_paths = [x.resolve() for x in self.custom_path.rglob('*.py') if x.is_file()
|
self._save_to_file(self.get_entities(), output_conf_file, ACTION)
|
||||||
and '__init__' not in x.name]
|
|
||||||
found_entities.update(self._load_and_compare(entities_base_cls, module_paths))
|
|
||||||
return found_entities
|
|
||||||
|
|
||||||
def save_all(self, output_conf_file: PathLike = Path('../../quickstart') / 'explained.yml'):
|
def save_entities(self, output_conf_file: PathLike = Path('../../quickstart') / 'entities.yml'):
|
||||||
|
"""
|
||||||
|
Write all availale entities to a file.
|
||||||
|
:param output_conf_file: File to write to. Defaults to ../../quickstart/entities.yml
|
||||||
|
"""
|
||||||
|
self._save_to_file(self.get_entities(), output_conf_file, ENTITIES)
|
||||||
|
|
||||||
|
def save_observations(self, output_conf_file: PathLike = Path('../../quickstart') / 'observations.yml'):
|
||||||
|
"""
|
||||||
|
Write all availale observations to a file.
|
||||||
|
:param output_conf_file: File to write to. Defaults to ../../quickstart/observations.yml
|
||||||
|
"""
|
||||||
|
self._save_to_file(self.get_entities(), output_conf_file, OBSERVATIONS)
|
||||||
|
|
||||||
|
def save_rules(self, output_conf_file: PathLike = Path('../../quickstart') / 'rules.yml'):
|
||||||
|
"""
|
||||||
|
Write all availale rules to a file.
|
||||||
|
:param output_conf_file: File to write to. Defaults to ../../quickstart/rules.yml
|
||||||
|
"""
|
||||||
|
self._save_to_file(self.get_entities(), output_conf_file, RULES)
|
||||||
|
|
||||||
|
def save_all(self, output_conf_file: PathLike = Path('../../quickstart') / 'all.yml'):
|
||||||
|
"""
|
||||||
|
Write all availale keywords to a file.
|
||||||
|
:param output_conf_file: File to write to. Defaults to ../../quickstart/all.yml
|
||||||
|
"""
|
||||||
self._save_to_file(self.get_all(), output_conf_file, 'ALL')
|
self._save_to_file(self.get_all(), output_conf_file, 'ALL')
|
||||||
|
|
||||||
def get_all(self):
|
|
||||||
config_dict = {GENERAL: {'level_name': 'rooms', 'env_seed': 69, 'verbose': False,
|
|
||||||
'pomdp_r': 3, 'individual_rewards': True},
|
|
||||||
'Agents': dict(
|
|
||||||
ExampleAgentName=dict(
|
|
||||||
Actions=self.get_actions(),
|
|
||||||
Observations=self.get_observations())),
|
|
||||||
'Entities': self.get_entities(),
|
|
||||||
'Rules': self.get_rules(),
|
|
||||||
'Assets': self.get_assets()}
|
|
||||||
return config_dict
|
|
||||||
|
|
||||||
def _save_to_file(self, data: dict, filepath: PathLike, tag: str = ''):
|
|
||||||
filepath = Path(filepath)
|
|
||||||
yaml.Dumper.ignore_aliases = lambda *args: True
|
|
||||||
with filepath.open('w') as f:
|
|
||||||
yaml.dump(data, f, encoding='utf-8')
|
|
||||||
print(f'Example config {"for " + tag + " " if tag else " "}dumped')
|
|
||||||
print(f'See file: {filepath}')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
ce = ConfigExplainer()
|
ce = ConfigExplainer()
|
||||||
ce.get_actions()
|
# ce.get_actions()
|
||||||
ce.get_entities()
|
# ce.get_entities()
|
||||||
ce.get_rules()
|
# ce.get_rules()
|
||||||
ce.get_observations()
|
# ce.get_observations()
|
||||||
ce.get_assets()
|
|
||||||
all_conf = ce.get_all()
|
all_conf = ce.get_all()
|
||||||
|
ce.save_all()
|
||||||
|
|||||||
@@ -19,10 +19,8 @@ class MarlFrameStack(gym.ObservationWrapper):
|
|||||||
@dataclass
|
@dataclass
|
||||||
class RenderEntity:
|
class RenderEntity:
|
||||||
"""
|
"""
|
||||||
TODO
|
This class defines the interface to communicate with the Renderer. Name and pos are used to load an asset file
|
||||||
|
named name.png and place it at the given pos.
|
||||||
|
|
||||||
:return:
|
|
||||||
"""
|
"""
|
||||||
name: str
|
name: str
|
||||||
pos: np.array
|
pos: np.array
|
||||||
@@ -37,10 +35,8 @@ class RenderEntity:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class Floor:
|
class Floor:
|
||||||
"""
|
"""
|
||||||
TODO
|
This class defines Entity like Floor-Objects, which do not come with the overhead.
|
||||||
|
Solely used for field-of-view calculation.
|
||||||
|
|
||||||
:return:
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
Reference in New Issue
Block a user