mirror of
https://github.com/illiumst/marl-factory-grid.git
synced 2025-07-07 10:01:36 +02:00
WIP: removing tiles
This commit is contained in:
@ -27,55 +27,49 @@ class Entity(EnvObject, abc.ABC):
|
||||
|
||||
@property
|
||||
def pos(self):
|
||||
return self._tile.pos
|
||||
return self._pos
|
||||
|
||||
@property
|
||||
def tile(self):
|
||||
return self._tile
|
||||
return self._tile # wall_n_floors funktionalität
|
||||
|
||||
@property
|
||||
def last_tile(self):
|
||||
try:
|
||||
return self._last_tile
|
||||
except AttributeError:
|
||||
# noinspection PyAttributeOutsideInit
|
||||
self._last_tile = None
|
||||
return self._last_tile
|
||||
|
||||
@property
|
||||
def last_pos(self):
|
||||
try:
|
||||
return self.last_tile.pos
|
||||
except AttributeError:
|
||||
return c.VALUE_NO_POS
|
||||
# @property
|
||||
# def last_tile(self):
|
||||
# try:
|
||||
# return self._last_tile
|
||||
# except AttributeError:
|
||||
# # noinspection PyAttributeOutsideInit
|
||||
# self._last_tile = None
|
||||
# return self._last_tile
|
||||
|
||||
@property
|
||||
def direction_of_view(self):
|
||||
last_x, last_y = self.last_pos
|
||||
last_x, last_y = self._last_pos
|
||||
curr_x, curr_y = self.pos
|
||||
return last_x - curr_x, last_y - curr_y
|
||||
|
||||
def move(self, next_tile):
|
||||
curr_tile = self.tile
|
||||
if not_same_tile := curr_tile != next_tile:
|
||||
if valid := next_tile.enter(self):
|
||||
curr_tile.leave(self)
|
||||
self._tile = next_tile
|
||||
self._last_tile = curr_tile
|
||||
def move(self, next_pos):
|
||||
next_pos = next_pos
|
||||
curr_pos = self._pos
|
||||
if not_same_pos := curr_pos != next_pos:
|
||||
if valid := next_tile.enter(self): # muss abgefragt werden über observer? alle obs? wie sonst posdict
|
||||
# curr_tile.leave(self) kann raus wegen notify change pos
|
||||
self._pos = next_pos
|
||||
self._last_pos = curr_pos
|
||||
for observer in self.observers:
|
||||
observer.notify_change_pos(self)
|
||||
return valid
|
||||
return not_same_tile
|
||||
return not_same_pos
|
||||
|
||||
def __init__(self, tile, **kwargs):
|
||||
def __init__(self, position, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._status = None
|
||||
self._tile = tile
|
||||
tile.enter(self)
|
||||
self._pos = position
|
||||
self._last_pos = c.VALUE_NO_POS
|
||||
# tile.enter(self)
|
||||
|
||||
def summarize_state(self) -> dict:
|
||||
return dict(name=str(self.name), x=int(self.x), y=int(self.y),
|
||||
tile=str(self.tile.name), can_collide=bool(self.var_can_collide))
|
||||
def summarize_state(self) -> dict: # tile=str(self.tile.name)
|
||||
return dict(name=str(self.name), x=int(self.x), y=int(self.y), can_collide=bool(self.var_can_collide))
|
||||
|
||||
@abc.abstractmethod
|
||||
def render(self):
|
||||
|
@ -81,16 +81,15 @@ class Factory(gym.Env):
|
||||
self.state = None
|
||||
|
||||
# Init entity:
|
||||
entities = self.map.do_init()
|
||||
entities = self.map.do_init() # done
|
||||
|
||||
# Grab all )rules:
|
||||
rules = self.conf.load_rules()
|
||||
|
||||
# Agents
|
||||
# noinspection PyAttributeOutsideInit
|
||||
self.state = Gamestate(entities, rules, self.conf.env_seed)
|
||||
|
||||
agents = self.conf.load_agents(self.map.size, self[c.FLOOR].empty_tiles)
|
||||
self.state = Gamestate(entities, rules, self.conf.env_seed) # get_all_tiles_with_collisions
|
||||
agents = self.conf.load_agents(self.map.size, self[c.FLOOR].empty_tiles) # empty_tiles -> entity(tile)
|
||||
self.state.entities.add_item({c.AGENT: agents})
|
||||
|
||||
# All is set up, trigger additional init (after agent entity spawn etc)
|
||||
@ -166,7 +165,7 @@ class Factory(gym.Env):
|
||||
if not self._renderer: # lazy init
|
||||
from marl_factory_grid.utils.renderer import Renderer
|
||||
global Renderer
|
||||
self._renderer = Renderer(self.map.level_shape, view_radius=self.conf.pomdp_r, fps=10)
|
||||
self._renderer = Renderer(self.map.level_shape, view_radius=self.conf.pomdp_r, fps=10)
|
||||
|
||||
render_entities = self.state.entities.render()
|
||||
if self.conf.pomdp_r:
|
||||
|
@ -1,4 +1,4 @@
|
||||
from typing import List
|
||||
from typing import List, Tuple
|
||||
|
||||
from marl_factory_grid.environment import constants as c
|
||||
from marl_factory_grid.environment.entity.entity import Entity
|
||||
@ -6,14 +6,13 @@ from marl_factory_grid.environment.entity.wall_floor import Floor
|
||||
|
||||
|
||||
class PositionMixin:
|
||||
|
||||
_entity = Entity
|
||||
var_is_blocking_light: bool = True
|
||||
var_can_collide: bool = True
|
||||
var_has_position: bool = True
|
||||
|
||||
def spawn(self, tiles: List[Floor]):
|
||||
self.add_items([self._entity(tile) for tile in tiles])
|
||||
def spawn(self, coords: List[Tuple[(int, int)]]): # runde klammern?
|
||||
self.add_items([self._entity(pos) for pos in coords])
|
||||
|
||||
def render(self):
|
||||
return [y for y in [x.render() for x in self] if y is not None]
|
||||
@ -33,10 +32,6 @@ class PositionMixin:
|
||||
entity_kwargs=entity_kwargs,
|
||||
**kwargs)
|
||||
|
||||
@property
|
||||
def tiles(self):
|
||||
return [entity.tile for entity in self]
|
||||
|
||||
def __delitem__(self, name):
|
||||
idx, obj = next((i, obj) for i, obj in enumerate(self) if obj.name == name)
|
||||
obj.tile.leave(obj)
|
||||
|
@ -126,12 +126,17 @@ class Objects:
|
||||
del self[item]
|
||||
|
||||
def notify_change_pos(self, entity: object):
|
||||
# print("notifychange")
|
||||
try:
|
||||
# print("lastpos")
|
||||
# print(self.pos_dict[entity.last_pos])
|
||||
self.pos_dict[entity.last_pos].remove(entity)
|
||||
except (ValueError, AttributeError):
|
||||
pass
|
||||
if entity.var_has_position:
|
||||
try:
|
||||
# print("pos")
|
||||
# print(self.pos_dict[entity.pos])
|
||||
self.pos_dict[entity.pos].append(entity)
|
||||
except (ValueError, AttributeError):
|
||||
pass
|
||||
|
@ -51,10 +51,9 @@ class Floors(Walls):
|
||||
|
||||
@property
|
||||
def empty_tiles(self) -> List[Floor]:
|
||||
# def empty_tiles(self) -> List[Tuple[int, int]]:
|
||||
tiles = [tile for tile in self if tile.is_empty()]
|
||||
# positions = [tile.pos for tile in self if tile.is_empty()]
|
||||
random.shuffle(tiles)
|
||||
return tiles
|
||||
|
||||
@classmethod
|
||||
def from_tiles(cls, tiles, *args, **kwargs):
|
||||
raise RuntimeError()
|
||||
|
@ -71,7 +71,7 @@ class PodRules(Rule):
|
||||
def on_init(self, state, lvl_map):
|
||||
pod_collection = state[b.CHARGE_PODS]
|
||||
empty_tiles = state[c.FLOOR].empty_tiles[:self.n_pods]
|
||||
pods = pod_collection.from_tiles(empty_tiles, entity_kwargs=dict(
|
||||
pods = pod_collection.from_coordinates(empty_tiles, entity_kwargs=dict(
|
||||
multi_charge=self.multi_charge, charge_rate=self.charge_rate)
|
||||
)
|
||||
pod_collection.add_items(pods)
|
||||
|
@ -48,4 +48,4 @@ class DirtPile(Entity):
|
||||
return state_dict
|
||||
|
||||
def render(self):
|
||||
return RenderEntity(d.DIRT, self.tile.pos, min(0.15 + self.amount, 1.5), 'scale')
|
||||
return RenderEntity(d.DIRT, self.pos, min(0.15 + self.amount, 1.5), 'scale')
|
||||
|
@ -29,7 +29,7 @@ class ItemAction(Action):
|
||||
elif items := state[i.ITEM].by_pos(entity.pos):
|
||||
item = items[0]
|
||||
item.change_parent_collection(inventory)
|
||||
item.set_tile_to(state.NO_POS_TILE)
|
||||
item.set_pos_to(c.VALUE_NO_POS)
|
||||
state.print(f'{entity.name} just picked up an item at {entity.pos}')
|
||||
return ActionResult(entity=entity, identifier=self._identifier, validity=c.VALID, reward=r.PICK_UP_VALID)
|
||||
|
||||
|
@ -11,7 +11,7 @@ class Item(Entity):
|
||||
var_can_collide = False
|
||||
|
||||
def render(self):
|
||||
return RenderEntity(i.ITEM, self.tile.pos) if self.pos != c.VALUE_NO_POS else None
|
||||
return RenderEntity(i.ITEM, self.pos) if self.pos != c.VALUE_NO_POS else None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@ -29,8 +29,8 @@ class Item(Entity):
|
||||
def set_auto_despawn(self, auto_despawn):
|
||||
self._auto_despawn = auto_despawn
|
||||
|
||||
def set_tile_to(self, no_pos_tile):
|
||||
self._tile = no_pos_tile
|
||||
def set_pos_to(self, no_pos):
|
||||
self._pos = no_pos
|
||||
|
||||
def summarize_state(self) -> dict:
|
||||
super_summarization = super(Item, self).summarize_state()
|
||||
@ -57,7 +57,7 @@ class DropOffLocation(Entity):
|
||||
return True
|
||||
|
||||
def render(self):
|
||||
return RenderEntity(i.DROP_OFF, self.tile.pos)
|
||||
return RenderEntity(i.DROP_OFF, self.pos)
|
||||
|
||||
@property
|
||||
def encoding(self):
|
||||
|
@ -1,5 +1,5 @@
|
||||
import random
|
||||
from typing import List
|
||||
from typing import List, Tuple
|
||||
|
||||
from marl_factory_grid.environment.entity.entity import Entity
|
||||
from marl_factory_grid.environment.entity.object import Object
|
||||
@ -14,12 +14,12 @@ class Zone(Object):
|
||||
|
||||
@property
|
||||
def positions(self):
|
||||
return [x.pos for x in self.tiles]
|
||||
return self.coords
|
||||
|
||||
def __init__(self, tiles: List[Floor], *args, **kwargs):
|
||||
def __init__(self, coords: List[Tuple[(int, int)]], *args, **kwargs):
|
||||
super(Zone, self).__init__(*args, **kwargs)
|
||||
self.tiles = tiles
|
||||
self.coords = coords
|
||||
|
||||
@property
|
||||
def random_tile(self):
|
||||
return random.choice(self.tiles)
|
||||
return random.choice(self.coords)
|
||||
|
@ -35,11 +35,13 @@ class LevelParser(object):
|
||||
entities = Entities()
|
||||
# Walls
|
||||
walls = Walls.from_coordinates(self.get_coordinates_for_symbol(c.SYMBOL_WALL), self.size)
|
||||
# walls = self.get_coordinates_for_symbol(c.SYMBOL_WALL)
|
||||
entities.add_items({c.WALL: walls})
|
||||
|
||||
# Floor
|
||||
floor = Floors.from_coordinates(self.get_coordinates_for_symbol(c.SYMBOL_WALL, negate=True), self.size)
|
||||
entities.add_items({c.FLOOR: floor})
|
||||
# entities.add_items({c.WALL: self.get_coordinates_for_symbol(c.SYMBOL_WALL, negative=True)})
|
||||
|
||||
# All other
|
||||
for es_name in self.e_p_dict:
|
||||
@ -55,6 +57,8 @@ class LevelParser(object):
|
||||
e = e_class.from_coordinates(np.argwhere(level_array == c.VALUE_OCCUPIED_CELL).tolist(),
|
||||
entities[c.FLOOR], self.size, entity_kwargs=e_kwargs
|
||||
)
|
||||
# e_coords = (np.argwhere(level_array == c.VALUE_OCCUPIED_CELL).tolist()) # braucht e_class?
|
||||
# entities.add_items({e.name: e_coords})
|
||||
else:
|
||||
raise ValueError(f'No {e_class} (Symbol: {e_class.symbol}) could be found!\n'
|
||||
f'Check your level file!')
|
||||
|
@ -59,11 +59,10 @@ class Gamestate(object):
|
||||
|
||||
@property
|
||||
def moving_entites(self):
|
||||
return [y for x in self.entities for y in x if x.var_can_move]
|
||||
return [y for x in self.entities for y in x if x.var_can_move] # wird das aus dem String gelesen?
|
||||
|
||||
def __init__(self, entitites, rules: Dict[str, dict], env_seed=69, verbose=False):
|
||||
self.entities = entitites
|
||||
self.NO_POS_TILE = Floor(c.VALUE_NO_POS)
|
||||
def __init__(self, entities, rules: Dict[str, dict], env_seed=69, verbose=False):
|
||||
self.entities = entities
|
||||
self.curr_step = 0
|
||||
self.curr_actions = None
|
||||
self.verbose = verbose
|
||||
@ -109,7 +108,7 @@ class Gamestate(object):
|
||||
results.extend(on_check_done_result)
|
||||
return results
|
||||
|
||||
def get_all_tiles_with_collisions(self) -> List[Floor]:
|
||||
def get_all_tiles_with_collisions(self) -> List[Floor]: # -> List[Tuple(Int,Int)]
|
||||
tiles = [self[c.FLOOR].by_pos(pos) for pos, e in self.entities.pos_dict.items()
|
||||
if sum([x.var_can_collide for x in e]) > 1]
|
||||
# tiles = [x for x in self[c.FLOOR] if len(x.guests_that_can_collide) > 1]
|
||||
|
Reference in New Issue
Block a user