refactoring and init.py

This commit is contained in:
Steffen Illium
2023-06-20 18:21:43 +02:00
parent 1332cee7e1
commit c7d77acbbe
138 changed files with 328 additions and 320 deletions

View File

@@ -0,0 +1,76 @@
from typing import List, Union
from mfg_package.environment import constants as c
from mfg_package.environment.actions import Action
from mfg_package.environment.entity.entity import Entity
from mfg_package.utils.render import RenderEntity
from mfg_package.utils import renderer
from mfg_package.utils.helpers import is_move
from mfg_package.utils.results import ActionResult, Result
class Agent(Entity):
@property
def obs_tag(self):
return self.name
@property
def actions(self):
return self._actions
@property
def observations(self):
return self._observations
@property
def can_collide(self):
return True
def step_result(self):
pass
@property
def collection(self):
return self._collection
@property
def state(self):
return self._state or ActionResult(entity=self, identifier=c.NOOP, validity=c.VALID, reward=0)
def __init__(self, actions: List[Action], observations: List[str], *args, **kwargs):
super(Agent, self).__init__(*args, **kwargs)
self.step_result = dict()
self._actions = actions
self._observations = observations
self._state: Union[Result, None] = None
# noinspection PyAttributeOutsideInit
def clear_temp_state(self):
self._state = None
return self
def summarize_state(self):
state_dict = super().summarize_state()
state_dict.update(valid=bool(self.state.validity), action=str(self.state.identifier))
return state_dict
def set_state(self, action_result):
self._state = action_result
def render(self):
i = next(idx for idx, x in enumerate(self._collection) if x.name == self.name)
curr_state = self.state
if curr_state.identifier == c.COLLISION:
render_state = renderer.STATE_COLLISION
elif curr_state.validity:
if curr_state.identifier == c.NOOP:
render_state = renderer.STATE_IDLE
elif is_move(curr_state.identifier):
render_state = renderer.STATE_MOVE
else:
render_state = renderer.STATE_VALID
else:
render_state = renderer.STATE_INVALID
return RenderEntity(c.AGENT, self.pos, 1, 'none', render_state, i + 1, real_name=self.name)

View File

@@ -0,0 +1,79 @@
import abc
from mfg_package.environment import constants as c
from mfg_package.environment.entity.object import EnvObject
from mfg_package.utils.render import RenderEntity
class Entity(EnvObject, abc.ABC):
"""Full Env Entity that lives on the environment Grid. Doors, Items, DirtPile etc..."""
@property
def has_position(self):
return self.pos != c.VALUE_NO_POS
@property
def x(self):
return self.pos[0]
@property
def y(self):
return self.pos[1]
@property
def pos(self):
return self._tile.pos
@property
def tile(self):
return self._tile
@property
def last_tile(self):
try:
return self._last_tile
except AttributeError:
# noinspection PyAttributeOutsideInit
self._last_tile = None
return self._last_tile
@property
def last_pos(self):
try:
return self.last_tile.pos
except AttributeError:
return c.VALUE_NO_POS
@property
def direction_of_view(self):
last_x, last_y = self.last_pos
curr_x, curr_y = self.pos
return last_x - curr_x, last_y - curr_y
def move(self, next_tile):
curr_tile = self.tile
if not_same_tile := curr_tile != next_tile:
if valid := next_tile.enter(self):
curr_tile.leave(self)
self._tile = next_tile
self._last_tile = curr_tile
for observer in self.observers:
observer.notify_change_pos(self)
return valid
return not_same_tile
def __init__(self, tile, **kwargs):
super().__init__(**kwargs)
self._tile = tile
tile.enter(self)
def summarize_state(self) -> dict:
return dict(name=str(self.name), x=int(self.x), y=int(self.y),
tile=str(self.tile.name), can_collide=bool(self.can_collide))
@abc.abstractmethod
def render(self):
return RenderEntity(self.__class__.__name__.lower(), self.pos)
def __repr__(self):
return super(Entity, self).__repr__() + f'(@{self.pos})'

View File

@@ -0,0 +1,18 @@
# noinspection PyAttributeOutsideInit
class BoundEntityMixin:
@property
def bound_entity(self):
return self._bound_entity
@property
def name(self):
return f'{self.__class__.__name__}({self._bound_entity.name})'
def belongs_to_entity(self, entity):
return entity == self.bound_entity
def bind_to(self, entity):
self._bound_entity = entity

View File

@@ -0,0 +1,126 @@
from collections import defaultdict
from typing import Union
from mfg_package.environment import constants as c
class Object:
"""Generell Objects for Organisation and Maintanance such as Actions etc..."""
_u_idx = defaultdict(lambda: 0)
def __bool__(self):
return True
@property
def observers(self):
return self._observers
@property
def name(self):
if self._str_ident is not None:
return f'{self.__class__.__name__}[{self._str_ident}]'
return f'{self.__class__.__name__}#{self.identifier_int}'
@property
def identifier(self):
if self._str_ident is not None:
return self._str_ident
else:
return self.name
def __init__(self, str_ident: Union[str, None] = None, **kwargs):
self._observers = []
self._str_ident = str_ident
self.identifier_int = self._identify_and_count_up()
self._collection = None
if kwargs:
print(f'Following kwargs were passed, but ignored: {kwargs}')
def __repr__(self):
return f'{self.name}'
def __eq__(self, other) -> bool:
return other == self.identifier
def __hash__(self):
return hash(self.identifier)
def _identify_and_count_up(self):
idx = Object._u_idx[self.__class__.__name__]
Object._u_idx[self.__class__.__name__] += 1
return idx
def set_collection(self, collection):
self._collection = collection
def add_observer(self, observer):
self.observers.append(observer)
observer.notify_change_pos(self)
def del_observer(self, observer):
self.observers.remove(observer)
class EnvObject(Object):
"""Objects that hold Information that are observable, but have no position on the environment grid. Inventories etc..."""
_u_idx = defaultdict(lambda: 0)
@property
def obs_tag(self):
try:
return self._collection.name or self.name
except AttributeError:
return self.name
@property
def is_blocking_light(self):
try:
return self._collection.is_blocking_light or False
except AttributeError:
return False
@property
def can_move(self):
try:
return self._collection.can_move or False
except AttributeError:
return False
@property
def is_blocking_pos(self):
try:
return self._collection.is_blocking_pos or False
except AttributeError:
return False
@property
def has_position(self):
try:
return self._collection.has_position or False
except AttributeError:
return False
@property
def can_collide(self):
try:
return self._collection.can_collide or False
except AttributeError:
return False
@property
def encoding(self):
return c.VALUE_OCCUPIED_CELL
def __init__(self, **kwargs):
super(EnvObject, self).__init__(**kwargs)
def change_parent_collection(self, other_collection):
other_collection.add_item(self)
self._collection.delete_env_object(self)
self._collection = other_collection
return self._collection == other_collection

View File

@@ -0,0 +1,45 @@
import math
import numpy as np
from mfg_package.environment.entity.mixin import BoundEntityMixin
from mfg_package.environment.entity.object import Object, EnvObject
##########################################################################
# ####################### Objects and Entitys ########################## #
##########################################################################
class PlaceHolder(Object):
def __init__(self, *args, fill_value=0, **kwargs):
super().__init__(*args, **kwargs)
self._fill_value = fill_value
@property
def can_collide(self):
return False
@property
def encoding(self):
return self._fill_value
@property
def name(self):
return "PlaceHolder"
class GlobalPosition(BoundEntityMixin, EnvObject):
@property
def encoding(self):
if self._normalized:
return tuple(np.divide(self._bound_entity.pos, self._level_shape))
else:
return self.bound_entity.pos
def __init__(self, *args, normalized: bool = True, **kwargs):
super(GlobalPosition, self).__init__(*args, **kwargs)
self._level_shape = math.sqrt(self.size)
self._normalized = normalized

View File

@@ -0,0 +1,131 @@
from typing import List
import numpy as np
from mfg_package.environment import constants as c
from mfg_package.environment.entity.object import EnvObject
from mfg_package.utils.render import RenderEntity
from mfg_package.utils import helpers as h
class Floor(EnvObject):
@property
def has_position(self):
return True
@property
def can_collide(self):
return False
@property
def can_move(self):
return False
@property
def is_blocking_pos(self):
return False
@property
def is_blocking_light(self):
return False
@property
def neighboring_floor_pos(self):
return [x.pos for x in self.neighboring_floor]
@property
def neighboring_floor(self):
if self._neighboring_floor:
pass
else:
self._neighboring_floor = [x for x in [self._collection.by_pos(np.add(self.pos, pos))
for pos in h.POS_MASK.reshape(-1, 2)
if not np.all(pos == [0, 0])]
if x]
return self._neighboring_floor
@property
def encoding(self):
return c.VALUE_OCCUPIED_CELL
@property
def guests_that_can_collide(self):
return [x for x in self.guests if x.can_collide]
@property
def guests(self):
return self._guests.values()
@property
def x(self):
return self.pos[0]
@property
def y(self):
return self.pos[1]
@property
def is_blocked(self):
return any([x.is_blocking_pos for x in self.guests])
def __init__(self, pos, **kwargs):
super(Floor, self).__init__(**kwargs)
self._guests = dict()
self.pos = tuple(pos)
self._neighboring_floor: List[Floor] = list()
self._blocked_by = None
def __len__(self):
return len(self._guests)
def is_empty(self):
return not len(self._guests)
def is_occupied(self):
return bool(len(self._guests))
def enter(self, guest):
if (guest.name not in self._guests and not self.is_blocked) and not (guest.is_blocking_pos and self.is_occupied()):
self._guests.update({guest.name: guest})
return c.VALID
else:
return c.NOT_VALID
def leave(self, guest):
try:
del self._guests[guest.name]
except (ValueError, KeyError):
return c.NOT_VALID
return c.VALID
def __repr__(self):
return f'{self.name}(@{self.pos})'
def summarize_state(self, **_):
return dict(name=self.name, x=int(self.x), y=int(self.y))
def render(self):
return None
class Wall(Floor):
@property
def can_collide(self):
return True
@property
def encoding(self):
return c.VALUE_OCCUPIED_CELL
def render(self):
return RenderEntity(c.WALL, self.pos)
@property
def is_blocking_pos(self):
return True
@property
def is_blocking_light(self):
return True