Description and better naming scheme for the Battery-Module

plus: Agent are not able to move, when discharged.
This commit is contained in:
Steffen Illium 2023-10-26 17:40:43 +02:00
parent ce4108380f
commit ac557232a1
7 changed files with 144 additions and 29 deletions

View File

@ -20,6 +20,14 @@ class Agent(Entity):
def var_can_move(self): def var_can_move(self):
return True return True
@property
def var_is_paralyzed(self):
return len(self._paralyzed)
@property
def paralyze_reasons(self):
return [x for x in self._paralyzed]
@property @property
def var_is_blocking_pos(self): def var_is_blocking_pos(self):
return False return False
@ -57,6 +65,7 @@ class Agent(Entity):
def __init__(self, actions: List[Action], observations: List[str], *args, **kwargs): def __init__(self, actions: List[Action], observations: List[str], *args, **kwargs):
super(Agent, self).__init__(*args, **kwargs) super(Agent, self).__init__(*args, **kwargs)
self._paralyzed = set()
self.step_result = dict() self.step_result = dict()
self._actions = actions self._actions = actions
self._observations = observations self._observations = observations
@ -75,6 +84,17 @@ class Agent(Entity):
def set_state(self, action_result): def set_state(self, action_result):
self._state = action_result self._state = action_result
def paralyze(self, reason):
self._paralyzed.add(reason)
return c.VALID
def de_paralyze(self, reason):
try:
self._paralyzed.remove(reason)
return c.VALID
except KeyError:
return c.NOT_VALID
def render(self): def render(self):
i = next(idx for idx, x in enumerate(self._collection) if x.name == self.name) i = next(idx for idx, x in enumerate(self._collection) if x.name == self.name)
curr_state = self.state curr_state = self.state

View File

@ -1,4 +1,4 @@
from .actions import BtryCharge from .actions import BtryCharge
from .entitites import Pod, Battery from .entitites import Pod, Battery
from .groups import ChargePods, Batteries from .groups import ChargePods, Batteries
from .rules import BtryDoneAtDischarge, Btry from .rules import BtryDoneAtDischarge, BatteryDecharge

View File

@ -1,16 +1,17 @@
from typing import Union from typing import Union
import marl_factory_grid.modules.batteries.constants
from marl_factory_grid.environment.actions import Action from marl_factory_grid.environment.actions import Action
from marl_factory_grid.utils.results import ActionResult from marl_factory_grid.utils.results import ActionResult
from marl_factory_grid.modules.batteries import constants as b, rewards as r from marl_factory_grid.modules.batteries import constants as b
from marl_factory_grid.environment import constants as c from marl_factory_grid.environment import constants as c
class BtryCharge(Action): class BtryCharge(Action):
def __init__(self): def __init__(self):
super().__init__(b.CHARGE) super().__init__(b.ACTION_CHARGE)
def do(self, entity, state) -> Union[None, ActionResult]: def do(self, entity, state) -> Union[None, ActionResult]:
if charge_pod := state[b.CHARGE_PODS].by_pos(entity.pos): if charge_pod := state[b.CHARGE_PODS].by_pos(entity.pos):
@ -23,4 +24,4 @@ class BtryCharge(Action):
valid = c.NOT_VALID valid = c.NOT_VALID
state.print(f'{entity.name} failed to charged batteries at {entity.pos}.') state.print(f'{entity.name} failed to charged batteries at {entity.pos}.')
return ActionResult(entity=entity, identifier=self._identifier, validity=valid, return ActionResult(entity=entity, identifier=self._identifier, validity=valid,
reward=r.CHARGE_VALID if valid else r.CHARGE_FAIL) reward=marl_factory_grid.modules.batteries.constants.REWARD_CHARGE_VALID if valid else marl_factory_grid.modules.batteries.constants.Reward_CHARGE_FAIL)

View File

@ -6,5 +6,14 @@ BATTERIES = 'Batteries'
BATTERY_DISCHARGED = 'DISCHARGED' BATTERY_DISCHARGED = 'DISCHARGED'
CHARGE_POD_SYMBOL = 1 CHARGE_POD_SYMBOL = 1
ACTION_CHARGE = 'do_charge_action'
CHARGE = 'do_charge_action' REWARD_CHARGE_VALID: float = 0.1
Reward_CHARGE_FAIL: float = -0.1
REWARD_BATTERY_DISCHARGED: float = -1.0
REWARD_DISCHARGE_DONE: float = -1.0
GROUPED = "single"
SINGLE = "grouped"
MODES = [GROUPED, SINGLE]

View File

@ -1,3 +0,0 @@
CHARGE_VALID: float = 0.1
CHARGE_FAIL: float = -0.1
BATTERY_DISCHARGED: float = -1.0

View File

@ -1,15 +1,51 @@
from typing import List, Union from typing import List, Union
import marl_factory_grid.modules.batteries.constants
from marl_factory_grid.environment.rules import Rule from marl_factory_grid.environment.rules import Rule
from marl_factory_grid.utils.results import TickResult, DoneResult from marl_factory_grid.utils.results import TickResult, DoneResult
from marl_factory_grid.environment import constants as c from marl_factory_grid.environment import constants as c
from marl_factory_grid.modules.batteries import constants as b, rewards as r from marl_factory_grid.modules.batteries import constants as b
class Btry(Rule): class BatteryDecharge(Rule):
def __init__(self, initial_charge: float = 0.8, per_action_costs: Union[dict, float] = 0.02): def __init__(self, initial_charge: float = 0.8, per_action_costs: Union[dict, float] = 0.02,
battery_charge_reward: float = b.REWARD_CHARGE_VALID,
battery_failed_reward: float = b.Reward_CHARGE_FAIL,
battery_discharge_reward: float = b.REWARD_BATTERY_DISCHARGED,
paralyze_agents_on_discharge: bool = False):
f"""
Enables the Battery Charge/Discharge functionality.
:type paralyze_agents_on_discharge: bool
:param paralyze_agents_on_discharge: Wether agents are still able to perform actions when discharged.
:type per_action_costs: Union[dict, float] = 0.02
:param per_action_costs: 1. dict: with an action name as key, provide a value for each
(maybe walking is less tedious as opening a door? Just saying...).
2. float: each action "costs" the same.
----
!!! Does not introduce any Env.-Done condition.
!!! Batterys can only be charged if agent posses the "Charge(Action.
!!! Batterys can only be charged if there are "Charpods" and they are spawned!
----
:type initial_charge: float
:param initial_charge: How much juice they have.
:type battery_discharge_reward: float
:param battery_discharge_reward: Negativ reward, when agents let their batters discharge.
Default: {b.REWARD_BATTERY_DISCHARGED}
:type battery_failed_reward: float
:param battery_failed_reward: Negativ reward, when agent cannot charge, but do (overcharge, not on station).
Default: {b.Reward_CHARGE_FAIL}
:type battery_charge_reward: float
:param battery_charge_reward: Positive reward, when agent actually charge their battery.
Default: {b.REWARD_CHARGE_VALID}
"""
super().__init__() super().__init__()
self.paralyze_agents_on_discharge = paralyze_agents_on_discharge
self.battery_discharge_reward = battery_discharge_reward
self.battery_failed_reward = battery_failed_reward
self.battery_charge_reward = battery_charge_reward
self.per_action_costs = per_action_costs self.per_action_costs = per_action_costs
self.initial_charge = initial_charge self.initial_charge = initial_charge
@ -17,9 +53,6 @@ class Btry(Rule):
assert len(state[c.AGENT]), "There are no agents, did you already spawn them?" assert len(state[c.AGENT]), "There are no agents, did you already spawn them?"
state[b.BATTERIES].spawn(state[c.AGENT], self.initial_charge) state[b.BATTERIES].spawn(state[c.AGENT], self.initial_charge)
def tick_pre_step(self, state) -> List[TickResult]:
pass
def tick_step(self, state) -> List[TickResult]: def tick_step(self, state) -> List[TickResult]:
# Decharge # Decharge
batteries = state[b.BATTERIES] batteries = state[b.BATTERIES]
@ -43,27 +76,78 @@ class Btry(Rule):
if btry.is_discharged: if btry.is_discharged:
state.print(f'Battery of {btry.bound_entity.name} is discharged!') state.print(f'Battery of {btry.bound_entity.name} is discharged!')
results.append( results.append(
TickResult(self.name, entity=btry.bound_entity, reward=r.BATTERY_DISCHARGED, validity=c.VALID)) TickResult(self.name, entity=btry.bound_entity, reward=self.battery_discharge_reward,
else: validity=c.VALID)
pass )
if self.paralyze_agents_on_discharge:
btry.bound_entity.paralyze(self.name)
results.append(
TickResult("Paralyzed", entity=btry.bound_entity, reward=0, validity=c.VALID)
)
state.print(f'{btry.bound_entity.name} has just been paralyzed!')
if btry.bound_entity.var_is_paralyzed and not btry.is_discharged:
btry.bound_entity.de_paralyze(self.name)
results.append(
TickResult("De-Paralyzed", entity=btry.bound_entity, reward=0, validity=c.VALID)
)
state.print(f'{btry.bound_entity.name} has just been de-paralyzed!')
return results return results
class BtryDoneAtDischarge(Rule): class BtryDoneAtDischarge(BatteryDecharge):
def __init__(self): def __init__(self, reward_discharge_done=b.REWARD_DISCHARGE_DONE, mode: str = b.SINGLE, **kwargs):
super().__init__() f"""
Enables the Battery Charge/Discharge functionality. Additionally
:type mode: str
:param mode: Does this Done rule trigger, when any battery is or all batteries are discharged?
:type per_action_costs: Union[dict, float] = 0.02
:param per_action_costs: 1. dict: with an action name as key, provide a value for each
(maybe walking is less tedious as opening a door? Just saying...).
2. float: each action "costs" the same.
:type initial_charge: float
:param initial_charge: How much juice they have.
:type reward_discharge_done: float
:param reward_discharge_done: Global negativ reward, when agents let their batters discharge.
Default: {b.REWARD_BATTERY_DISCHARGED}
:type battery_discharge_reward: float
:param battery_discharge_reward: Negativ reward, when agents let their batters discharge.
Default: {b.REWARD_BATTERY_DISCHARGED}
:type battery_failed_reward: float
:param battery_failed_reward: Negativ reward, when agent cannot charge, but do (overcharge, not on station).
Default: {b.Reward_CHARGE_FAIL}
:type battery_charge_reward: float
:param battery_charge_reward: Positive reward, when agent actually charge their battery.
Default: {b.REWARD_CHARGE_VALID}
"""
super().__init__(**kwargs)
self.mode = mode
self.reward_discharge_done = reward_discharge_done
def on_check_done(self, state) -> List[DoneResult]: def on_check_done(self, state) -> List[DoneResult]:
if btry_done := any(battery.is_discharged for battery in state[b.BATTERIES]): any_discharged = (self.mode == b.SINGLE and any(battery.is_discharged for battery in state[b.BATTERIES]))
return [DoneResult(self.name, validity=c.VALID, reward=r.BATTERY_DISCHARGED)] all_discharged = (self.mode == b.SINGLE and all(battery.is_discharged for battery in state[b.BATTERIES]))
if any_discharged or all_discharged:
return [DoneResult(self.name, validity=c.VALID, reward=self.reward_discharge_done)]
else: else:
return [DoneResult(self.name, validity=c.NOT_VALID, reward=0)] return [DoneResult(self.name, validity=c.NOT_VALID, reward=0)]
class PodRules(Rule): class SpawnChargePods(Rule):
def __init__(self, n_pods: int, charge_rate: float = 0.4, multi_charge: bool = False): def __init__(self, n_pods: int, charge_rate: float = 0.4, multi_charge: bool = False):
"""
Spawn Chargepods in accordance to the given parameters.
:type n_pods: int
:param n_pods: How many charge pods are there?
:type charge_rate: float
:param charge_rate: How much juice does each use of the charge action top up?
:type multi_charge: bool
:param multi_charge: Whether multiple agents are able to charge at the same time.
"""
super().__init__() super().__init__()
self.multi_charge = multi_charge self.multi_charge = multi_charge
self.charge_rate = charge_rate self.charge_rate = charge_rate
@ -74,5 +158,5 @@ class PodRules(Rule):
empty_positions = state.entities.empty_positions() empty_positions = state.entities.empty_positions()
pods = pod_collection.from_coordinates(empty_positions, entity_kwargs=dict( pods = pod_collection.from_coordinates(empty_positions, entity_kwargs=dict(
multi_charge=self.multi_charge, charge_rate=self.charge_rate) multi_charge=self.multi_charge, charge_rate=self.charge_rate)
) )
pod_collection.add_items(pods) pod_collection.add_items(pods)

View File

@ -88,11 +88,15 @@ class Gamestate(object):
results.extend(self.rules.tick_pre_step_all(self)) results.extend(self.rules.tick_pre_step_all(self))
for idx, action_int in enumerate(actions): for idx, action_int in enumerate(actions):
agent = self[c.AGENT][idx].clear_temp_state() if not agent.var_is_paralyzed:
action = agent.actions[action_int] agent = self[c.AGENT][idx].clear_temp_state()
action_result = action.do(agent, self) action = agent.actions[action_int]
results.append(action_result) action_result = action.do(agent, self)
agent.set_state(action_result) results.append(action_result)
agent.set_state(action_result)
else:
self.print(f"{agent.name} is paralied because of: {agent.paralyze_reasons}")
continue
results.extend(self.rules.tick_step_all(self)) results.extend(self.rules.tick_step_all(self))
results.extend(self.rules.tick_post_step_all(self)) results.extend(self.rules.tick_post_step_all(self))