No more Monitor,

env hparams pickeling,
pomdp,
now training and learning
This commit is contained in:
steffen-illium
2021-06-01 16:38:55 +02:00
parent 55b409c72f
commit ff9846eb54
6 changed files with 95 additions and 111 deletions

View File

@ -1,3 +1,4 @@
import pickle
from pathlib import Path
from typing import List, Union, Iterable
@ -101,28 +102,21 @@ class BaseFactory(gym.Env):
@property
def observation_space(self):
if self.pomdp_size:
return spaces.Box(low=0, high=1, shape=(self.state.shape[0], self.pomdp_size,
self.pomdp_size), dtype=np.float32)
if self.pomdp_radius:
return spaces.Box(low=0, high=1, shape=(self._state.shape[0], self.pomdp_radius * 2 + 1,
self.pomdp_radius * 2 + 1), dtype=np.float32)
else:
space = spaces.Box(low=0, high=1, shape=self.state.shape, dtype=np.float32)
# space = spaces.MultiBinary(np.prod(self.state.shape))
# space = spaces.Dict({
# 'level': spaces.MultiBinary(np.prod(self.state[0].shape)),
# 'agent_n': spaces.Discrete(np.prod(self.state[1].shape)),
# 'dirt': spaces.Box(low=0, high=1, shape=self.state[2].shape, dtype=np.float32)
# })
space = spaces.Box(low=0, high=1, shape=self._state.shape, dtype=np.float32)
return space
@property
def movement_actions(self):
return self._actions.movement_actions
def __init__(self, level='simple', n_agents=1, max_steps=int(5e2), pomdp_size: Union[None, int] = None, **kwargs):
def __init__(self, level='simple', n_agents=1, max_steps=int(5e2), pomdp_radius: Union[None, int] = None, **kwargs):
self.n_agents = n_agents
self.max_steps = max_steps
assert pomdp_size is None or (pomdp_size is not None and pomdp_size % 2 == 1)
self.pomdp_size = pomdp_size
self.pomdp_radius = pomdp_radius
self.done_at_collision = False
_actions = Actions(allow_square_movement=kwargs.get('allow_square_movement', True),
@ -130,10 +124,10 @@ class BaseFactory(gym.Env):
allow_no_op=kwargs.get('allow_no_op', True))
self._actions = _actions + self.additional_actions
self.level = h.one_hot_level(
self._level = h.one_hot_level(
h.parse_level(Path(__file__).parent / h.LEVELS_DIR / f'{level}.txt')
)
self.state_slices = StateSlice(n_agents)
self._state_slices = StateSlice(n_agents)
self.reset()
@property
@ -150,48 +144,35 @@ class BaseFactory(gym.Env):
def reset(self) -> (np.ndarray, int, bool, dict):
self.steps = 0
self.agent_states = []
self._agent_states = []
# Agent placement ...
agents = np.zeros((self.n_agents, *self.level.shape), dtype=np.int8)
floor_tiles = np.argwhere(self.level == h.IS_FREE_CELL)
agents = np.zeros((self.n_agents, *self._level.shape), dtype=np.int8)
floor_tiles = np.argwhere(self._level == h.IS_FREE_CELL)
# ... on random positions
np.random.shuffle(floor_tiles)
for i, (x, y) in enumerate(floor_tiles[:self.n_agents]):
agents[i, x, y] = h.IS_OCCUPIED_CELL
agent_state = AgentState(i, -1)
agent_state.update(pos=[x, y])
self.agent_states.append(agent_state)
self._agent_states.append(agent_state)
# state.shape = level, agent 1,..., agent n,
self.state = np.concatenate((np.expand_dims(self.level, axis=0), agents), axis=0)
self._state = np.concatenate((np.expand_dims(self._level, axis=0), agents), axis=0)
# Returns State
return self._return_state()
def _return_state(self):
if self.pomdp_size:
pos = self.agent_states[0].pos
if self.pomdp_radius:
pos = self._agent_states[0].pos
# pos = [agent_state.pos for agent_state in self.agent_states]
# obs = [] ... list comprehension... pos per agent
offset = self.pomdp_size // 2
x0, x1 = max(0, pos[0] - offset), pos[0] + offset + 1
y0, y1 = max(0, pos[1] - offset), pos[1] + offset + 1
obs = self.state[:, x0:x1, y0:y1]
if obs.shape[1] != self.pomdp_size or obs.shape[2] != self.pomdp_size:
obs_padded = np.zeros((obs.shape[0], self.pomdp_size, self.pomdp_size))
try:
a_pos = np.argwhere(obs[h.AGENT_START_IDX] == h.IS_OCCUPIED_CELL)[0]
except IndexError:
print('Shiiiiiit')
try:
obs_padded[:, abs(a_pos[0]-offset):abs(a_pos[0]-offset)+obs.shape[1], abs(a_pos[1]-offset):abs(a_pos[1]-offset)+obs.shape[2]] = obs
except ValueError:
print('Shiiiiiit')
assert all(np.argwhere(obs_padded[h.AGENT_START_IDX] == h.IS_OCCUPIED_CELL)[0] == (3,3))
obs = obs_padded
npad = [(0, 0)] + [(self.pomdp_radius, self.pomdp_radius)] * (self._state.ndim - 1)
x_roll = self.pomdp_radius-pos[0]
y_roll = self.pomdp_radius-pos[1]
padded_state = np.pad(self._state, pad_width=npad, mode='constant', constant_values=0)
padded_state = np.roll(np.roll(padded_state, x_roll, axis=1), y_roll, axis=2)
obs = padded_state[:, :self.pomdp_radius * 2 + 1, :self.pomdp_radius * 2 + 1]
else:
obs = self.state
obs = self._state
return obs
def do_additional_actions(self, agent_i: int, action: int) -> ((int, int), bool):
@ -217,12 +198,12 @@ class BaseFactory(gym.Env):
agent_i_state.update(pos=pos, action_valid=valid)
agent_states.append(agent_i_state)
for i, collision_vec in enumerate(self.check_all_collisions(agent_states, self.state.shape[0])):
for i, collision_vec in enumerate(self.check_all_collisions(agent_states, self._state.shape[0])):
agent_states[i].update(collision_vector=collision_vec)
if self.done_at_collision and collision_vec.any():
done = True
self.agent_states = agent_states
self._agent_states = agent_states
reward, info = self.calculate_reward(agent_states)
if self.steps >= self.max_steps:
@ -250,7 +231,7 @@ class BaseFactory(gym.Env):
def check_collisions(self, agent_state: AgentState) -> np.ndarray:
pos_x, pos_y = agent_state.pos
# FixMe: We need to find a way to spare out some dimensions, eg. an info dimension etc... a[?,]
collisions_vec = self.state[:, pos_x, pos_y].copy() # "vertical fiber" at position of agent i
collisions_vec = self._state[:, pos_x, pos_y].copy() # "vertical fiber" at position of agent i
collisions_vec[h.AGENT_START_IDX + agent_state.i] = h.IS_FREE_CELL # no self-collisions
if agent_state.action_valid:
# ToDo: Place a function hook here
@ -262,11 +243,11 @@ class BaseFactory(gym.Env):
def do_move(self, agent_i: int, old_pos: (int, int), new_pos: (int, int)) -> None:
(x, y), (x_new, y_new) = old_pos, new_pos
self.state[agent_i + h.AGENT_START_IDX, x, y] = h.IS_FREE_CELL
self.state[agent_i + h.AGENT_START_IDX, x_new, y_new] = h.IS_OCCUPIED_CELL
self._state[agent_i + h.AGENT_START_IDX, x, y] = h.IS_FREE_CELL
self._state[agent_i + h.AGENT_START_IDX, x_new, y_new] = h.IS_OCCUPIED_CELL
def move_or_colide(self, agent_i: int, action: int) -> ((int, int), bool):
old_pos, new_pos, valid = h.check_agent_move(state=self.state,
old_pos, new_pos, valid = h.check_agent_move(state=self._state,
dim=agent_i + h.AGENT_START_IDX,
action=action)
if valid:
@ -278,7 +259,7 @@ class BaseFactory(gym.Env):
return old_pos, valid
def agent_i_position(self, agent_i: int) -> (int, int):
positions = np.argwhere(self.state[h.AGENT_START_IDX+agent_i] == h.IS_OCCUPIED_CELL)
positions = np.argwhere(self._state[h.AGENT_START_IDX + agent_i] == h.IS_OCCUPIED_CELL)
assert positions.shape[0] == 1
pos_x, pos_y = positions[0] # a.flatten()
return pos_x, pos_y
@ -288,13 +269,13 @@ class BaseFactory(gym.Env):
assert isinstance(excluded_slices, (int, list))
excluded_slices = excluded_slices if isinstance(excluded_slices, list) else [excluded_slices]
state = self.state
state = self._state
if excluded_slices:
# Todo: Is there a cleaner way?
inds = list(range(self.state.shape[0]))
inds = list(range(self._state.shape[0]))
excluded_slices = [inds[x] if x < 0 else x for x in excluded_slices]
state = self.state[[x for x in inds if x not in excluded_slices]]
state = self._state[[x for x in inds if x not in excluded_slices]]
free_cells = np.argwhere(state.sum(0) == h.IS_FREE_CELL)
np.random.shuffle(free_cells)
@ -306,3 +287,9 @@ class BaseFactory(gym.Env):
def render(self):
raise NotImplementedError
def save_params(self, filepath: Path):
d = {key: val for key, val in self.__dict__.items() if not key.startswith('_') or not key.startswith('__')}
filepath.parent.mkdir(parents=True, exist_ok=True)
with filepath.open('wb') as f:
pickle.dump(d, f, protocol=pickle.HIGHEST_PROTOCOL)