Merge remote-tracking branch 'origin/documentation' into documentation

This commit is contained in:
Chanumask 2024-01-06 21:21:37 +01:00
commit 70bbdd256f
31 changed files with 520 additions and 211 deletions

View File

@ -0,0 +1,88 @@
Creating a New Scenario
=======================
Creating a new scenario in the `marl-factory-grid` environment allows you to customize the environment to fit your specific requirements. This guide provides step-by-step instructions on how to create a new scenario, including defining a configuration file, designing a level, and potentially adding new entities, rules, and assets. See the "modifications.rst" file for more information on how to modify existing entities, levels, rules, groups and assets.
### Step 1: Define Configuration File
1. **Create a Configuration File:** Start by creating a new configuration file (`.yaml`) for your scenario. This file will contain settings such as the number of agents, environment dimensions, and other parameters. You can use existing configuration files as templates.
2. **Specify Custom Parameters:** Modify the configuration file to include any custom parameters specific to your scenario. For example, you can set the respawn rate of entities or define specific rewards.
### Step 2: Design the Level
1. **Create a Level File:** Design the layout of your environment by creating a new level file (`.txt`). Use symbols such as `#` for walls, `-` for walkable floors, and introduce new symbols for custom entities.
2. **Define Entity Locations:** Specify the initial locations of entities, including agents and any new entities introduced in your scenario. These spawn locations are typically provided in the conf file.
### Step 3: Introduce New Entities
1. **Create New Entity Modules:** If your scenario involves introducing new entities, create new entity modules in the `marl_factory_grid/environment/entity` directory. Define their behavior, properties, and any custom actions they can perform. Check out the template module.
2. **Update Configuration:** Update the configuration file to include settings related to your new entities, such as spawn rates, initial quantities, or any specific behaviors.
### Step 4: Implement Custom Rules
1. **Create Rule Modules:** If your scenario requires custom rules, create new rule modules in the `marl_factory_grid/environment/rules` directory. Implement the necessary logic to govern the behavior of entities in your scenario and use the provided environment hooks.
2. **Update Configuration:** If your custom rules have configurable parameters, update the configuration file to include these settings and activate the rule by adding it to the conf file.
### Step 5: Add Custom Assets (Optional)
1. **Include Custom Asset Files:** If your scenario introduces new assets (e.g., images for entities), include the necessary asset files in the appropriate directories, such as `marl_factory_grid/environment/assets`.
### Step 6: Test and Experiment
1. **Run Your Scenario:** Use the provided scripts or write your own script to run the scenario with your customized configuration. Observe the behavior of agents and entities in the environment.
2. **Iterate and Experiment:** Adjust configuration parameters, level design, or introduce new elements based on your observations. Iterate through this process until your scenario meets your desired specifications.
Congratulations! You have successfully created a new scenario in the `marl-factory-grid` environment. Experiment with different configurations, levels, entities, and rules to design unique and engaging environments for your simulations. Below you find an example of how to create a new scenario.
New Example Scenario: Apple Resource Dilemma
--------------------------------------------
To provide you with an example, we'll guide you through creating the "Apple Resource Dilemma" scenario using the steps outlined in the tutorial.
In this example scenario, agents face a dilemma of collecting apples. The apples only spawn if there are already enough in the environment. If agents collect them at the beginning, they won't respawn as quickly as if they wait for more to spawn before collecting.
### Step 1: Define Configuration File
1. **Create a Configuration File:** Start by creating a new configuration file, e.g., `apple_dilemma_config.yaml`. Use the default config file as a good starting point.
2. **Specify Custom Parameters:** Add custom parameters to control the behavior of your scenario. Also delete unused entities, actions and observations from the default config file such as dirt piles.
### Step 2: Design the Level
1. Create a Level File: Design the layout of your environment by creating a new level file, e.g., apple_dilemma_level.txt.
Of course you can also just use or modify an existing level.
2. Define Entity Locations: Specify the initial locations of entities, including doors (D). Since the apples will likely be spawning randomly, it would not make sense to encode their spawn in the level file.
### Step 3: Introduce New Entities
1. Create New Entity Modules: Create a new entity module for the apple in the `marl_factory_grid/environment/entity` directory. Use the module template or existing modules as inspiration. Instead of creating a new agent, the item agent can be used as he is already configured to collect all items and drop them off at designated locations.
2. Update Configuration: Update the configuration file to include settings related to your new entities. Agents need to be able to interact and observe them.
### Step 4: Implement Custom Rules
1. Create Rule Modules: You might want to create new rule modules. For example, apple_respawn_rule.py could be inspired from the dirt respawn rule:
>>> from marl_factory_grid.environment.rules.rule import Rule
class AppleRespawnRule(Rule):
def __init__(self, apple_spawn_rate=0.1):
super().__init__()
self.apple_spawn_rate = apple_spawn_rate
def tick_post_step(self, state):
# Logic to respawn apples based on spawn rate
pass
2. Update Configuration: Update the configuration file to include the new rule.
### Step 5: Add Custom Assets (Optional)
1. Include Custom Asset Files: If your scenario introduces new assets (e.g., images for entities), include the necessary files in the appropriate directories, such as `marl_factory_grid/environment/assets`.
### Step 6: Test and Experiment

View File

@ -6,6 +6,7 @@
installation
usage
modifications
creating a new scenario
source
.. note::

View File

@ -17,6 +17,16 @@ future_planning = 7
class TSPBaseAgent(ABC):
def __init__(self, state, agent_i, static_problem: bool = True):
"""
Abstract base class for agents in the environment.
:param state: The environment state
:type state:
:param agent_i: Index of the agent
:type agent_i: int
:param static_problem: Indicates whether the TSP is a static problem. (Default: True)
:type static_problem: bool
"""
self.static_problem = static_problem
self.local_optimization = True
self._env = state
@ -26,9 +36,25 @@ class TSPBaseAgent(ABC):
@abstractmethod
def predict(self, *_, **__) -> int:
"""
Predicts the next action based on the environment state.
:return: Predicted action.
:rtype: int
"""
return 0
def _use_door_or_move(self, door, target):
"""
Helper method to decide whether to use a door or move towards a target.
:param door: Door entity.
:type door: Door
:param target: Target type. For example 'Dirt', 'Dropoff' or 'Destination'
:type target: str
:return: Action to perform (use door or move).
"""
if door.is_closed:
# Translate the action_object to an integer to have the same output as any other model
action = do.ACTION_DOOR_USE
@ -37,6 +63,15 @@ class TSPBaseAgent(ABC):
return action
def calculate_tsp_route(self, target_identifier):
"""
Calculate the TSP route to reach a target.
:param target_identifier: Identifier of the target entity
:type target_identifier: str
:return: TSP route
:rtype: List[int]
"""
positions = [x for x in self._env.state[target_identifier].positions if x != c.VALUE_NO_POS]
if self.local_optimization:
nodes = \
@ -55,6 +90,15 @@ class TSPBaseAgent(ABC):
return route
def _door_is_close(self, state):
"""
Check if a door is close to the agent's position.
:param state: Current environment state.
:type state: Gamestate
:return: Closest door entity or None if no door is close.
:rtype: Door | None
"""
try:
return next(y for x in state.entities.neighboring_positions(self.state.pos)
for y in state.entities.pos_dict[x] if do.DOOR in y.name)
@ -62,9 +106,27 @@ class TSPBaseAgent(ABC):
return None
def _has_targets(self, target_identifier):
"""
Check if there are targets available in the environment.
:param target_identifier: Identifier of the target entity.
:type target_identifier: str
:return: True if there are targets, False otherwise.
:rtype: bool
"""
return bool(len([x for x in self._env.state[target_identifier] if x.pos != c.VALUE_NO_POS]) >= 1)
def _predict_move(self, target_identifier):
"""
Predict the next move based on the given target.
:param target_identifier: Identifier of the target entity.
:type target_identifier: str
:return: Predicted action.
:rtype: int
"""
if self._has_targets(target_identifier):
if self.static_problem:
if not self._static_route:

View File

@ -8,9 +8,18 @@ future_planning = 7
class TSPDirtAgent(TSPBaseAgent):
def __init__(self, *args, **kwargs):
"""
Initializes a TSPDirtAgent that aims to clean dirt in the environment.
"""
super(TSPDirtAgent, self).__init__(*args, **kwargs)
def predict(self, *_, **__):
"""
Predicts the next action based on the presence of dirt in the environment.
:return: Predicted action.
:rtype: int
"""
if self._env.state[di.DIRT].by_pos(self.state.pos) is not None:
# Translate the action_object to an integer to have the same output as any other model
action = di.CLEAN_UP

View File

@ -14,6 +14,12 @@ MODE_BRING = 'Mode_Bring'
class TSPItemAgent(TSPBaseAgent):
def __init__(self, *args, mode=MODE_GET, **kwargs):
"""
Initializes a TSPItemAgent that colects items in the environment, stores them in his inventory and drops them off
at a drop-off location.
:param mode: Mode of the agent, either MODE_GET or MODE_BRING.
"""
super(TSPItemAgent, self).__init__(*args, **kwargs)
self.mode = mode
@ -46,6 +52,12 @@ class TSPItemAgent(TSPBaseAgent):
return action_obj
def _choose(self):
"""
Internal Usage. Chooses the action based on the agent's mode and the environment state.
:return: Chosen action.
:rtype: int
"""
target = i.DROP_OFF if self.mode == MODE_BRING else i.ITEM
if len(self._env.state[i.ITEM]) >= 1:
action = self._predict_move(target)

View File

@ -9,9 +9,20 @@ future_planning = 7
class TSPTargetAgent(TSPBaseAgent):
def __init__(self, *args, **kwargs):
"""
Initializes a TSPTargetAgent that aims to reach destinations.
"""
super(TSPTargetAgent, self).__init__(*args, **kwargs)
def _handle_doors(self, state):
"""
Internal Usage. Handles the doors in the environment.
:param state: The current environment state.
:type state: marl_factory_grid.utils.states.Gamestate
:return: Closest door entity or None if no doors are close.
:rtype: marl_factory_grid.environment.entity.object.Entity or None
"""
try:
return next(y for x in state.entities.neighboring_positions(self.state.pos)

View File

@ -8,8 +8,20 @@ future_planning = 7
class TSPRandomAgent(TSPBaseAgent):
def __init__(self, n_actions, *args, **kwargs):
"""
Initializes a TSPRandomAgent that performs random actions from within his action space.
:param n_actions: Number of possible actions.
:type n_actions: int
"""
super(TSPRandomAgent, self).__init__(*args, **kwargs)
self.n_action = n_actions
def predict(self, *_, **__):
"""
Predicts the next action randomly.
:return: Predicted action.
:rtype: int
"""
return randint(0, self.n_action - 1)

View File

@ -6,25 +6,30 @@ from marl_factory_grid.environment import rewards as r, constants as c
from marl_factory_grid.utils.helpers import MOVEMAP
from marl_factory_grid.utils.results import ActionResult
TYPE_COLLISION = 'collision'
class Action(abc.ABC):
@property
def name(self):
return self._identifier
@abc.abstractmethod
def __init__(self, identifier: str, default_valid_reward: float, default_fail_reward: float,
def __init__(self, identifier: str, default_valid_reward: float, default_fail_reward: float,
valid_reward: float | None = None, fail_reward: float | None = None):
"""
Todo
Abstract base class representing an action that can be performed in the environment.
:param identifier:
:param default_valid_reward:
:param default_fail_reward:
:param valid_reward:
:param fail_reward:
:param identifier: A unique identifier for the action.
:type identifier: str
:param default_valid_reward: Default reward for a valid action.
:type default_valid_reward: float
:param default_fail_reward: Default reward for a failed action.
:type default_fail_reward: float
:param valid_reward: Custom reward for a valid action (optional).
:type valid_reward: Union[float, optional]
:param fail_reward: Custom reward for a failed action (optional).
:type fail_reward: Union[float, optional]
"""
self.fail_reward = fail_reward if fail_reward is not None else default_fail_reward
self.valid_reward = valid_reward if valid_reward is not None else default_valid_reward
@ -46,6 +51,9 @@ class Action(abc.ABC):
return f'Action[{self._identifier}]'
def get_result(self, validity, entity, action_introduced_collision=False):
"""
Generate an ActionResult for the action based on its validity.
"""
reward = self.valid_reward if validity else self.fail_reward
return ActionResult(self.__class__.__name__, validity, reward=reward, entity=entity,
action_introduced_collision=action_introduced_collision)

View File

@ -13,30 +13,28 @@ class Entity(Object, abc.ABC):
@property
def state(self):
"""
TODO
:return:
Get the current status of the entity. Not to be confused with the Gamestate.
:return: status
"""
return self._status or State(entity=self, identifier=c.NOOP, validity=c.VALID)
@property
def var_has_position(self):
"""
TODO
Check if the entity has a position.
:return:
:return: True if the entity has a position, False otherwise.
:rtype: bool
"""
return self.pos != c.VALUE_NO_POS
@property
def var_is_blocking_light(self):
"""
TODO
Check if the entity is blocking light.
:return:
:return: True if the entity is blocking light, False otherwise.
:rtype: bool
"""
try:
return self._collection.var_is_blocking_light or False
@ -46,10 +44,10 @@ class Entity(Object, abc.ABC):
@property
def var_can_move(self):
"""
TODO
Check if the entity can move.
:return:
:return: True if the entity can move, False otherwise.
:rtype: bool
"""
try:
return self._collection.var_can_move or False
@ -59,10 +57,10 @@ class Entity(Object, abc.ABC):
@property
def var_is_blocking_pos(self):
"""
TODO
Check if the entity is blocking a position when standing on it.
:return:
:return: True if the entity is blocking a position, False otherwise.
:rtype: bool
"""
try:
return self._collection.var_is_blocking_pos or False
@ -72,10 +70,10 @@ class Entity(Object, abc.ABC):
@property
def var_can_collide(self):
"""
TODO
Check if the entity can collide.
:return:
:return: True if the entity can collide, False otherwise.
:rtype: bool
"""
try:
return self._collection.var_can_collide or False
@ -85,39 +83,40 @@ class Entity(Object, abc.ABC):
@property
def x(self):
"""
TODO
Get the x-coordinate of the entity's position.
:return:
:return: The x-coordinate of the entity's position.
:rtype: int
"""
return self.pos[0]
@property
def y(self):
"""
TODO
Get the y-coordinate of the entity's position.
:return:
:return: The y-coordinate of the entity's position.
:rtype: int
"""
return self.pos[1]
@property
def pos(self):
"""
TODO
Get the current position of the entity.
:return:
:return: The current position of the entity.
:rtype: tuple
"""
return self._pos
def set_pos(self, pos) -> bool:
"""
TODO
Set the position of the entity.
:return:
:param pos: The new position.
:type pos: tuple
:return: True if setting the position is successful, False otherwise.
"""
assert isinstance(pos, tuple) and len(pos) == 2
self._pos = pos
@ -126,10 +125,10 @@ class Entity(Object, abc.ABC):
@property
def last_pos(self):
"""
TODO
Get the last position of the entity.
:return:
:return: The last position of the entity.
:rtype: tuple
"""
try:
return self._last_pos
@ -141,22 +140,49 @@ class Entity(Object, abc.ABC):
@property
def direction_of_view(self):
"""
TODO
Get the current direction of view of the entity.
:return:
:return: The current direction of view of the entity.
:rtype: int
"""
if self._last_pos != c.VALUE_NO_POS:
return 0, 0
else:
return np.subtract(self._last_pos, self.pos)
def __init__(self, pos, bind_to=None, **kwargs):
"""
Abstract base class representing entities in the environment grid.
:param pos: The initial position of the entity.
:type pos: tuple
:param bind_to: Entity to which this entity is bound (Default: None)
:type bind_to: Entity or None
"""
super().__init__(**kwargs)
self._view_directory = c.VALUE_NO_POS
self._status = None
self._pos = pos
self._last_pos = pos
self._collection = None
if bind_to:
try:
self.bind_to(bind_to)
except AttributeError:
print(f'Objects of class "{self.__class__.__name__}" can not be bound to other entities.')
exit()
def move(self, next_pos, state):
"""
TODO
Move the entity to a new position.
:param next_pos: The next position to move the entity to.
:type next_pos: tuple
:param state: The current state of the environment.
:type state: marl_factory_grid.environment.state.Gamestate
:return:
:return: True if the move is valid, False otherwise.
:rtype: bool
"""
next_pos = next_pos
curr_pos = self._pos
@ -172,43 +198,22 @@ class Entity(Object, abc.ABC):
# Bad naming... Was the same was the same pos, not moving....
return not_same_pos
def __init__(self, pos, bind_to=None, **kwargs):
"""
Full Env Entity that lives on the environment Grid. Doors, Items, DirtPile etc...
TODO
:return:
"""
super().__init__(**kwargs)
self._view_directory = c.VALUE_NO_POS
self._status = None
self._pos = pos
self._last_pos = pos
self._collection = None
if bind_to:
try:
self.bind_to(bind_to)
except AttributeError:
print(f'Objects of class "{self.__class__.__name__}" can not be bound to other entities.')
exit()
def summarize_state(self) -> dict:
"""
TODO
Summarize the current state of the entity.
:return:
:return: A dictionary containing the name, x-coordinate, y-coordinate, and can_collide property of the entity.
:rtype: dict
"""
return dict(name=str(self.name), x=int(self.x), y=int(self.y), can_collide=bool(self.var_can_collide))
@abc.abstractmethod
def render(self):
"""
TODO
Abstract method to render the entity.
:return:
:return: A rendering entity representing the entity's appearance in the environment.
:rtype: marl_factory_grid.utils.utility_classes.RenderEntity
"""
return RenderEntity(self.__class__.__name__.lower(), self.pos)
@ -223,19 +228,22 @@ class Entity(Object, abc.ABC):
@property
def encoding(self):
"""
TODO
Get the encoded representation of the entity.
:return:
:return: The encoded representation.
:rtype: int
"""
return c.VALUE_OCCUPIED_CELL
def change_parent_collection(self, other_collection):
"""
TODO
Change the parent collection of the entity.
:param other_collection: The new parent collection.
:type other_collection: marl_factory_grid.environment.collections.Collection
:return:
:return: True if the change is successful, False otherwise.
:rtype: bool
"""
other_collection.add_item(self)
self._collection.delete_env_object(self)
@ -245,9 +253,9 @@ class Entity(Object, abc.ABC):
@property
def collection(self):
"""
TODO
Get the parent collection of the entity.
:return:
:return: The parent collection.
:rtype: marl_factory_grid.environment.collections.Collection
"""
return self._collection

View File

@ -12,17 +12,15 @@ class Object:
@property
def bound_entity(self):
"""
TODO
Returns the entity to which this object is bound.
:return:
:return: The bound entity.
"""
return self._bound_entity
@property
def var_can_be_bound(self) -> bool:
"""
TODO
Indicates if it is possible to bind this object to another Entity or Object.
:return: Whether this object can be bound.
@ -35,30 +33,27 @@ class Object:
@property
def observers(self) -> set:
"""
TODO
Returns the set of observers for this object.
:return:
:return: Set of observers.
"""
return self._observers
@property
def name(self):
"""
TODO
Returns a string representation of the object's name.
:return:
:return: The name of the object.
"""
return f'{self.__class__.__name__}[{self.identifier}]'
@property
def identifier(self):
"""
TODO
Returns the unique identifier of the object.
:return:
:return: The unique identifier.
"""
if self._str_ident is not None:
return self._str_ident
@ -67,23 +62,19 @@ class Object:
def reset_uid(self):
"""
TODO
Resets the unique identifier counter for this class.
:return:
:return: True if the reset was successful.
"""
self._u_idx = defaultdict(lambda: 0)
return True
def __init__(self, str_ident: Union[str, None] = None, **kwargs):
"""
Generell Objects for Organisation and Maintanance such as Actions etc...
General Objects for Organisation and Maintenance such as Actions, etc.
TODO
:param str_ident:
:return:
:param str_ident: A string identifier for the object.
:return: None
"""
self._status = None
self._bound_entity = None
@ -147,28 +138,28 @@ class Object:
def bind_to(self, entity):
"""
TODO
Binds the object to a specified entity.
:return:
:param entity: The entity to bind to.
:return: The validity of the binding.
"""
self._bound_entity = entity
return c.VALID
def belongs_to_entity(self, entity):
"""
TODO
Checks if the object belongs to a specified entity.
:return:
:param entity: The entity to check against.
:return: True if the object belongs to the entity, False otherwise.
"""
return self._bound_entity == entity
def unbind(self):
"""
TODO
Unbinds the object from its current entity.
:return:
:return: The entity that the object was previously bound to.
"""
previously_bound = self._bound_entity
self._bound_entity = None

View File

@ -4,7 +4,7 @@ from marl_factory_grid.environment.entity.object import Object
##########################################################################
# ####################### Objects and Entitys ########################## #
# ####################### Objects and Entities ########################## #
##########################################################################
@ -12,10 +12,11 @@ class PlaceHolder(Object):
def __init__(self, *args, fill_value=0, **kwargs):
"""
TODO
A placeholder object that can be used as an observation during training. It is designed to be later replaced
with a meaningful observation that wasn't initially present in the training run.
:return:
:param fill_value: The default value to fill the placeholder observation (Default: 0)
:type fill_value: Any
"""
super().__init__(*args, **kwargs)
self._fill_value = fill_value
@ -23,20 +24,20 @@ class PlaceHolder(Object):
@property
def var_can_collide(self):
"""
TODO
Indicates whether this placeholder object can collide with other entities. Always returns False.
:return:
:return: False
:rtype: bool
"""
return False
@property
def encoding(self):
"""
TODO
Get the fill value representing the placeholder observation.
:return:
:return: The fill value
:rtype: Any
"""
return self._fill_value
@ -54,10 +55,10 @@ class GlobalPosition(Object):
@property
def encoding(self):
"""
TODO
Get the encoded representation of the global position based on whether normalization is enabled.
:return:
:return: The encoded representation of the global position
:rtype: tuple[float, float] or tuple[int, int]
"""
if self._normalized:
return tuple(np.divide(self._bound_entity.pos, self._shape))
@ -66,10 +67,14 @@ class GlobalPosition(Object):
def __init__(self, agent, level_shape, *args, normalized: bool = True, **kwargs):
"""
TODO
A utility class representing the global position of an entity in the environment.
:return:
:param agent: The agent entity to which the global position is bound.
:type agent: marl_factory_grid.environment.entity.agent.Agent
:param level_shape: The shape of the environment level.
:type level_shape: tuple[int, int]
:param normalized: Indicates whether the global position should be normalized (Default: True)
:type normalized: bool
"""
super(GlobalPosition, self).__init__(*args, **kwargs)
self.bind_to(agent)

View File

@ -7,10 +7,7 @@ class Wall(Entity):
def __init__(self, *args, **kwargs):
"""
TODO
:return:
A class representing a wall entity in the environment.
"""
super().__init__(*args, **kwargs)

View File

@ -24,47 +24,48 @@ class Factory(gym.Env):
@property
def action_space(self):
"""
TODO
The action space defines the set of all possible actions that an agent can take in the environment.
:return:
:return: Action space
:rtype: gym.Space
"""
return self.state[c.AGENT].action_space
@property
def named_action_space(self):
"""
TODO
Returns the named action space for agents.
:return:
:return: Named action space
:rtype: dict[str, dict[str, list[int]]]
"""
return self.state[c.AGENT].named_action_space
@property
def observation_space(self):
"""
TODO
The observation space represents all the information that an agent can receive from the environment at a given
time step.
:return:
:return: Observation space.
:rtype: gym.Space
"""
return self.obs_builder.observation_space(self.state)
@property
def named_observation_space(self):
"""
TODO
Returns the named observation space for the environment.
:return:
:return: Named observation space.
:rtype: (dict, dict)
"""
return self.obs_builder.named_observation_space(self.state)
@property
def params(self) -> dict:
"""
FIXME LAGEGY
FIXME LEGACY
:return:
@ -80,10 +81,14 @@ class Factory(gym.Env):
def __init__(self, config_file: Union[str, PathLike], custom_modules_path: Union[None, PathLike] = None,
custom_level_path: Union[None, PathLike] = None):
"""
TODO
Initializes the marl-factory-grid as Gym environment.
:return:
:param config_file: Path to the configuration file.
:type config_file: Union[str, PathLike]
:param custom_modules_path: Path to custom modules directory. (Default: None)
:type custom_modules_path: Union[None, PathLike]
:param custom_level_path: Path to custom level file. (Default: None)
:type custom_level_path: Union[None, PathLike]
"""
self._config_file = config_file
self.conf = FactoryConfigParser(self._config_file, custom_modules_path)

View File

@ -16,85 +16,128 @@ class Rule(abc.ABC):
@property
def name(self):
"""
TODO
Get the name of the rule.
:return:
"""
:return: The name of the rule.
:rtype: str
"""
return self.__class__.__name__
def __init__(self):
"""
TODO
Abstract base class representing a rule in the environment.
This class provides a framework for defining rules that govern the behavior of the environment. Rules can be
implemented by inheriting from this class and overriding specific methods.
:return:
"""
pass
def __repr__(self):
def __repr__(self) -> str:
"""
Return a string representation of the rule.
:return: A string representation of the rule.
:rtype: str
"""
return f'{self.name}'
def on_init(self, state, lvl_map):
"""
TODO
Initialize the rule when the environment is created.
This method is called during the initialization of the environment. It allows the rule to perform any setup or
initialization required.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:param lvl_map: The map of the level.
:type lvl_map: marl_factory_grid.environment.level.LevelMap
:return: List of TickResults generated during initialization.
:rtype: List[TickResult]
"""
return []
def on_reset_post_spawn(self, state) -> List[TickResult]:
"""
TODO
Execute actions after entities are spawned during a reset.
This method is called after entities are spawned during a reset. It allows the rule to perform any actions
required at this stage.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of TickResults generated after entity spawning.
:rtype: List[TickResult]
"""
return []
def on_reset(self, state) -> List[TickResult]:
"""
TODO
Execute actions during a reset.
This method is called during a reset. It allows the rule to perform any actions required at this stage.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of TickResults generated during a reset.
:rtype: List[TickResult]
"""
return []
def tick_pre_step(self, state) -> List[TickResult]:
"""
TODO
Execute actions before the main step of the environment.
This method is called before the main step of the environment. It allows the rule to perform any actions
required before the main step.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of TickResults generated before the main step.
:rtype: List[TickResult]
"""
return []
def tick_step(self, state) -> List[TickResult]:
"""
TODO
Execute actions during the main step of the environment.
This method is called during the main step of the environment. It allows the rule to perform any actions
required during the main step.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of TickResults generated during the main step.
:rtype: List[TickResult]
"""
return []
def tick_post_step(self, state) -> List[TickResult]:
"""
TODO
Execute actions after the main step of the environment.
This method is called after the main step of the environment. It allows the rule to perform any actions
required after the main step.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of TickResults generated after the main step.
:rtype: List[TickResult]
"""
return []
def on_check_done(self, state) -> List[DoneResult]:
"""
TODO
Check conditions for the termination of the environment.
This method is called to check conditions for the termination of the environment. It allows the rule to
specify conditions under which the environment should be considered done.
:return:
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of DoneResults indicating whether the environment is done.
:rtype: List[DoneResult]
"""
return []
@ -160,15 +203,23 @@ class DoneAtMaxStepsReached(Rule):
def __init__(self, max_steps: int = 500):
"""
TODO
A rule that terminates the environment when a specified maximum number of steps is reached.
:return:
"""
:param max_steps: The maximum number of steps before the environment is considered done.
:type max_steps: int
"""
super().__init__()
self.max_steps = max_steps
def on_check_done(self, state):
"""
Check if the maximum number of steps is reached, and if so, mark the environment as done.
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: List of DoneResults indicating whether the environment is done.
:rtype: List[DoneResult]
"""
if self.max_steps <= state.curr_step:
return [DoneResult(validity=c.VALID, identifier=self.name)]
return []
@ -178,14 +229,23 @@ class AssignGlobalPositions(Rule):
def __init__(self):
"""
TODO
A rule that assigns global positions to agents when the environment is reset.
:return:
:return: None
"""
super().__init__()
def on_reset(self, state, lvl_map):
"""
Assign global positions to agents when the environment is reset.
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:param lvl_map: The map of the current level.
:type lvl_map: marl_factory_grid.levels.level.LevelMap
:return: An empty list, as no additional results are generated by this rule during the reset.
:rtype: List[TickResult]
"""
from marl_factory_grid.environment.entity.util import GlobalPosition
for agent in state[c.AGENT]:
gp = GlobalPosition(agent, lvl_map.level_shape)
@ -197,10 +257,15 @@ class WatchCollisions(Rule):
def __init__(self, reward=r.COLLISION, done_at_collisions: bool = False, reward_at_done=r.COLLISION_DONE):
"""
TODO
A rule that monitors collisions between entities in the environment.
:return:
:param reward: The reward assigned for each collision.
:type reward: float
:param done_at_collisions: If True, marks the environment as done when collisions occur.
:type done_at_collisions: bool
:param reward_at_done: The reward assigned when the environment is marked as done due to collisions.
:type reward_at_done: float
:return: None
"""
super().__init__()
self.reward_at_done = reward_at_done
@ -209,6 +274,14 @@ class WatchCollisions(Rule):
self.curr_done = False
def tick_post_step(self, state) -> List[TickResult]:
"""
Monitors collisions between entities after each step in the environment.
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: A list of TickResult objects representing collisions and their associated rewards.
:rtype: List[TickResult]
"""
self.curr_done = False
results = list()
for agent in state[c.AGENT]:
@ -234,6 +307,14 @@ class WatchCollisions(Rule):
return results
def on_check_done(self, state) -> List[DoneResult]:
"""
Checks if the environment should be marked as done based on collision conditions.
:param state: The current game state.
:type state: marl_factory_grid.utils.states.GameState
:return: A list of DoneResult objects representing the conditions for marking the environment as done.
:rtype: List[DoneResult]
"""
if self.done_at_collisions:
inter_entity_collision_detected = self.curr_done
collision_in_step = any(h.is_move(x.state.identifier) and x.state.action_introduced_collision

View File

@ -12,7 +12,7 @@ class Charge(Action):
def __init__(self):
"""
Checks if a charge pod is present at the entity's position.
Checks if a charge pod is present at the agent's position.
If found, it attempts to charge the battery using the charge pod.
"""
super().__init__(b.ACTION_CHARGE, b.REWARD_CHARGE_VALID, b.Reward_CHARGE_FAIL)

View File

@ -31,7 +31,7 @@ class Battery(Object):
def __init__(self, initial_charge_level, owner, *args, **kwargs):
"""
Represents a battery entity in the environment that can be bound to an agent and charged at chargepods.
Represents a battery entity in the environment that can be bound to an agent and charged at charge pods.
:param initial_charge_level: The current charge level of the battery, ranging from 0 to 1.
:type initial_charge_level: float
@ -45,7 +45,7 @@ class Battery(Object):
def do_charge_action(self, amount) -> bool:
"""
Updates the Battery's charge level accordingly.
Updates the Battery's charge level according to the passed value.
:param amount: Amount added to the Battery's charge level.
:returns: whether the battery could be charged. if not, it was already fully charged.
@ -59,7 +59,7 @@ class Battery(Object):
def decharge(self, amount) -> bool:
"""
Decreases the charge value of a battery. Currently only riggered by the battery-decharge rule.
Decreases the charge value of a battery. Currently only triggered by the battery-decharge rule.
"""
if self.charge_level != 0:
# noinspection PyTypeChecker
@ -84,11 +84,11 @@ class ChargePod(Entity):
"""
Represents a charging pod for batteries in the environment.
:param charge_rate: The rate at which the charging pod charges batteries. Default is 0.4.
:param charge_rate: The rate at which the charging pod charges batteries. Defaults to 0.4.
:type charge_rate: float
:param multi_charge: Indicates whether the charging pod supports charging multiple batteries simultaneously.
Default is False.
Defaults to False.
:type multi_charge: bool
"""
super(ChargePod, self).__init__(*args, **kwargs)
@ -97,7 +97,8 @@ class ChargePod(Entity):
def charge_battery(self, entity, state) -> bool:
"""
Checks whether the battery can be charged. If so, triggers the charge action.
Triggers the battery charge action if possible. Impossible if battery at full charge level or more than one
agent at charge pods' position.
:returns: whether the action was successful (valid) or not.
"""

View File

@ -19,7 +19,7 @@ class Batteries(Collection):
def __init__(self, size, initial_charge_level=1.0, *args, **kwargs):
"""
A collection of batteries that can spawn batteries.
A collection of batteries that is in charge of spawning batteries. (spawned batteries are bound to agents)
:param size: The maximum allowed size of the collection. Ensures that the collection does not exceed this size.
:type size: int

View File

@ -12,7 +12,7 @@ class Clean(Action):
def __init__(self):
"""
Attempts to reduce dirt amount on entity's position.
Attempts to reduce dirt amount on entity's position. Fails if no dirt is found at the at agents' position.
"""
super().__init__(d.CLEAN_UP, d.REWARD_CLEAN_UP_VALID, d.REWARD_CLEAN_UP_FAIL)

View File

@ -18,7 +18,8 @@ class DirtPile(Entity):
def __init__(self, *args, amount=2, max_local_amount=5, **kwargs):
"""
Represents a pile of dirt at a specific position in the environment.
Represents a pile of dirt at a specific position in the environment that agents can interact with. Agents can
clean the dirt pile or, depending on activated rules, interact with it in different ways.
:param amount: The amount of dirt in the pile.
:type amount: float

View File

@ -10,7 +10,7 @@ class DestAction(Action):
def __init__(self):
"""
Attempts to wait at destination.
The agent performing this action attempts to wait at the destination in order to receive a reward.
"""
super().__init__(d.DESTINATION, d.REWARD_WAIT_VALID, d.REWARD_WAIT_FAIL)

View File

@ -38,7 +38,11 @@ class Destination(Entity):
def has_just_been_reached(self, state):
"""
Checks if the destination has just been reached based on the current state.
Checks if the destination has been reached in the last environment step.
:return: the agent that has just reached the destination or whether any agent in the environment has
performed actions equal to or exceeding the specified limit
:rtype: Union[Agent, bool]
"""
if self.was_reached():
return False

View File

@ -10,7 +10,8 @@ class DoorUse(Action):
def __init__(self, **kwargs):
"""
Attempts to interact with door (open/close it) and returns an action result if successful.
The agent performing this action attempts to interact with door (open/close it), returning an action result if
successful.
"""
super().__init__(d.ACTION_DOOR_USE, d.REWARD_USE_DOOR_VALID, d.REWARD_USE_DOOR_FAIL, **kwargs)

View File

@ -19,7 +19,7 @@ class DoorIndicator(Entity):
def __init__(self, *args, **kwargs):
"""
Is added around a door for agents to see.
Is added as a padding around doors so agents can see doors earlier.
"""
super().__init__(*args, **kwargs)
self.__delattr__('move')

View File

@ -9,7 +9,7 @@ class DoorAutoClose(Rule):
def __init__(self, close_frequency: int = 10):
"""
This rule closes doors, that have been opened automatically, when no entity is blocking the position.
This rule closes doors that have been opened automatically when no entity is blocking the position.
:type close_frequency: int
:param close_frequency: How many ticks after opening, should the door close?

View File

@ -62,7 +62,7 @@ class Inventory(IsBoundMixin, Collection):
def __init__(self, agent, *args, **kwargs):
"""
An inventory that can hold items picked up by the agent this is bound to.
An inventory that can hold items picked up by the agent it is bound to.
:param agent: The agent this inventory is bound to and belongs to.
:type agent: Agent
@ -96,7 +96,7 @@ class Inventory(IsBoundMixin, Collection):
def clear_temp_state(self):
"""
Entites need this, but inventories have no state.
Entities need this, but inventories have no state.
"""
pass
@ -123,7 +123,7 @@ class Inventories(Objects):
def __init__(self, size: int, *args, **kwargs):
"""
TODO
A collection of all inventories used to spawn an inventory per agent.
"""
super(Inventories, self).__init__(*args, **kwargs)
self.size = size

View File

@ -11,7 +11,8 @@ class MachineAction(Action):
def __init__(self):
"""
Attempts to maintain the machine and returns an action result if successful.
When performing this action, the maintainer attempts to maintain the machine at his current position, returning
an action result if successful.
"""
super().__init__(m.MACHINE_ACTION, m.MAINTAIN_VALID, m.MAINTAIN_FAIL)

View File

@ -14,7 +14,8 @@ class Machine(Entity):
def __init__(self, *args, work_interval: int = 10, pause_interval: int = 15, **kwargs):
"""
Represents a machine entity that the maintainer will try to maintain.
Represents a machine entity that the maintainer will try to maintain by performing the maintenance action.
Machines' health depletes over time.
:param work_interval: How long should the machine work before pausing.
:type work_interval: int
@ -31,7 +32,8 @@ class Machine(Entity):
def maintain(self) -> bool:
"""
Attempts to maintain the machine by increasing its health.
Attempts to maintain the machine by increasing its health, which is only possible if the machine is at a maximum
of 98/100 HP.
"""
if self.status == m.STATE_WORK:
return c.NOT_VALID

View File

@ -16,8 +16,9 @@ from ..doors import DoorUse
class Maintainer(Entity):
def __init__(self, objective, action, *args, **kwargs):
"""
Represents the maintainer entity that aims to maintain machines.
self.action_ = """
Represents the maintainer entity that aims to maintain machines. The maintainer calculates its route using nx
shortest path and restores the health of machines it visits to 100.
:param objective: The maintainer's objective, e.g., "Machines".
:type objective: str

View File

@ -27,7 +27,7 @@ class Maintainers(Collection):
def __init__(self, *args, **kwargs):
"""
A collection of maintainers
A collection of maintainers that is used to spawn them.
"""
super().__init__(*args, **kwargs)

View File

@ -23,7 +23,6 @@ class FactoryConfigParser(object):
"""
This class parses the factory env config file.
:param config_path: Path to where the 'config.yml' is.
:param custom_modules_path: Additional search path for custom modules, levels, entities, etc..
"""
@ -45,7 +44,6 @@ class FactoryConfigParser(object):
self._n_abbr_dict = defaultdict(lambda: 'th', {1: 'st', 2: 'nd', 3: 'rd'})
return self._n_abbr_dict[n]
@property
def agent_actions(self):
return self._get_sub_list('Agents', "Actions")
@ -176,7 +174,7 @@ class FactoryConfigParser(object):
['Actions', 'Observations', 'Positions', 'Clones']}
parsed_agents_conf[name] = dict(
actions=parsed_actions, observations=observations, positions=positions, other=other_kwargs
)
)
clones = self.agents[name].get('Clones', 0)
if clones:

View File

@ -13,10 +13,12 @@ from marl_factory_grid.utils.results import Result, DoneResult
class StepRules:
def __init__(self, *args):
"""
TODO
Manages a collection of rules to be applied at each step of the environment.
The StepRules class allows you to organize and apply custom rules during the simulation, ensuring that the
corresponding hooks for all rules are called at the appropriate times.
:return:
:param args: Optional Rule objects to initialize the StepRules with.
"""
if args:
self.rules = list(args)
@ -90,10 +92,18 @@ class Gamestate(object):
def __init__(self, entities, agents_conf, rules: List[Rule], lvl_shape, env_seed=69, verbose=False):
"""
TODO
The `Gamestate` class represents the state of the game environment.
:return:
:param lvl_shape: The shape of the game level.
:type lvl_shape: tuple
:param entities: The entities present in the environment.
:type entities: Entities
:param agents_conf: Agent configurations for the environment.
:type agents_conf: Any
:param verbose: Controls verbosity in the environment.
:type verbose: bool
:param rules: Organizes and applies custom rules during the simulation.
:type rules: StepRules
"""
self.lvl_shape = lvl_shape
self.entities = entities