Shadow casting

This commit is contained in:
Steffen Illium
2021-07-27 13:37:56 +02:00
parent de821ebc0c
commit 69f573d9ba
6 changed files with 125 additions and 27 deletions

View File

@ -8,6 +8,7 @@ from gym import spaces
import yaml
from gym.wrappers import FrameStack
from environments.factory.base.shadow_casting import Map
from environments.helpers import Constants as c, Constants
from environments import helpers as h
from environments.factory.base.objects import Slice, Agent, Tile, Action
@ -26,22 +27,21 @@ class BaseFactory(gym.Env):
@property
def observation_space(self):
if self.combin_agent_slices_in_obs:
n_agent_slices = 1
else: # not self.combin_agent_slices_in_obs:
if self.omit_agent_slice_in_obs:
n_agent_slices = self.n_agents - 1
else: # not self.omit_agent_slice_in_obs:
n_agent_slices = self.n_agents
if self.combin_agent_slices_in_obs and self.omit_agent_slice_in_obs:
if self.n_agents > 1:
slices = self._slices.n - (self._agents.n - 1)
else:
slices = self._slices.n - 1
elif self.combin_agent_slices_in_obs and not self.omit_agent_slice_in_obs:
slices = self._slices.n - (self._agents.n - 1)
elif not self.combin_agent_slices_in_obs and self.omit_agent_slice_in_obs:
slices = self._slices.n - (self._agents.n - 1)
elif not self.combin_agent_slices_in_obs and not self.omit_agent_slice_in_obs:
slices = self._slices.n
if self.pomdp_radius:
shape = (self._slices.n - n_agent_slices, self.pomdp_radius * 2 + 1, self.pomdp_radius * 2 + 1)
space = spaces.Box(low=0, high=1, shape=shape, dtype=np.float32)
return space
else:
shape = [x-n_agent_slices if idx == 0 else x for idx, x in enumerate(self._level_shape)]
space = spaces.Box(low=0, high=1, shape=shape, dtype=np.float32)
return space
level_shape = (self.pomdp_radius * 2 + 1, self.pomdp_radius * 2 + 1) if self.pomdp_radius else self._level_shape
space = spaces.Box(low=0, high=1, shape=(slices, *level_shape), dtype=np.float32)
return space
@property
def pomdp_diameter(self):
@ -90,7 +90,7 @@ class BaseFactory(gym.Env):
def __init__(self, level_name='simple', n_agents=1, max_steps=int(5e2), pomdp_radius: Union[None, int] = 0,
movement_properties: MovementProperties = MovementProperties(), parse_doors=False,
combin_agent_slices_in_obs: bool = False, frames_to_stack=0, record_episodes=False,
omit_agent_slice_in_obs=False, done_at_collision=False, **kwargs):
omit_agent_slice_in_obs=False, done_at_collision=False, cast_shadows=True, **kwargs):
assert frames_to_stack != 1 and frames_to_stack >= 0, "'frames_to_stack' cannot be negative or 1."
# Attribute Assignment
@ -103,6 +103,7 @@ class BaseFactory(gym.Env):
self.pomdp_radius = pomdp_radius
self.combin_agent_slices_in_obs = combin_agent_slices_in_obs
self.omit_agent_slice_in_obs = omit_agent_slice_in_obs
self.cast_shadows = cast_shadows
self.frames_to_stack = frames_to_stack
self.done_at_collision = done_at_collision
@ -123,12 +124,15 @@ class BaseFactory(gym.Env):
# Level
level_filepath = Path(__file__).parent.parent / h.LEVELS_DIR / f'{self.level_name}.txt'
parsed_level = h.parse_level(level_filepath)
level = [Slice(c.LEVEL.name, h.one_hot_level(parsed_level))]
level = [Slice(c.LEVEL.name, h.one_hot_level(parsed_level), is_blocking_light=True)]
self._level_shape = level[0].shape
# Doors
parsed_doors = h.one_hot_level(parsed_level, c.DOOR)
doors = [Slice(c.DOORS.name, parsed_doors)] if parsed_doors.any() and self.parse_doors else []
if parsed_doors.any():
doors = [Slice(c.DOORS.name, parsed_doors, is_blocking_light=True)]
else:
doors = []
# Agents
agents = []
@ -301,9 +305,15 @@ class BaseFactory(gym.Env):
else:
obs = self._obs_cube
if self.cast_shadows:
slices = [l_slice.slice != c.OCCUPIED_CELL.value for l_slice in self._slices if l_slice.is_blocking_light]
light_block_map = Map((np.prod(slices, axis=0) != True).astype(int))
light_block_map = light_block_map.do_fov(*agent.pos, max(self._level_shape))
obs = ((obs * light_block_map)).astype(int)
if self.combin_agent_slices_in_obs and self.n_agents > 1:
agent_obs = np.sum(obs[[key for key, slice in self._slices.items() if c.AGENT.name in slice.name and
(not self.omit_agent_slice_in_obs and slice.name != agent.name)]],
agent_obs = np.sum(obs[[key for key, l_slice in self._slices.items() if c.AGENT.name in l_slice.name and
(not self.omit_agent_slice_in_obs and l_slice.name != agent.name)]],
axis=0, keepdims=True)
obs = np.concatenate((obs[:first_agent_slice], agent_obs, obs[first_agent_slice+self.n_agents:]))
return obs

View File

@ -57,9 +57,10 @@ class Slice(Object):
def free_tiles(self):
return np.argwhere(self.slice == c.FREE_CELL.value)
def __init__(self, identifier, arrayslice):
def __init__(self, identifier, arrayslice, is_blocking_light=False):
super(Slice, self).__init__(identifier)
self.slice = arrayslice
self.is_blocking_light = is_blocking_light
class Wall(Object):

View File

@ -0,0 +1,85 @@
import numpy as np
from environments.helpers import Constants as c
FOV_RADIUS = 10
mult_array = np.asarray([
[1, 0, 0, -1, -1, 0, 0, 1],
[0, 1, -1, 0, 0, -1, 1, 0],
[0, 1, 1, 0, 0, -1, -1, 0],
[1, 0, 0, 1, -1, 0, 0, -1]
])
class Map(object):
# Multipliers for transforming coordinates to other octants:
def __init__(self, map_array: np.ndarray):
self.data = map_array
self.width, self.height = map_array.shape
self.light = np.zeros_like(self.data)
self.flag = 0
def blocked(self, x, y):
return (x < 0 or y < 0
or x >= self.width or y >= self.height
or self.data[x, y] == c.OCCUPIED_CELL)
def lit(self, x, y):
return self.light[x, y] == self.flag
def set_lit(self, x, y):
if 0 <= x < self.width and 0 <= y < self.height:
self.light[x, y] = self.flag
def _cast_light(self, cx, cy, row, start, end, radius, xx, xy, yx, yy, id):
"Recursive lightcasting function"
if start < end:
return
radius_squared = radius*radius
new_start = None
for j in range(row, radius+1):
dx, dy = -j-1, -j
blocked = False
while dx <= 0:
dx += 1
# Translate the dx, dy coordinates into map coordinates:
X, Y = cx + dx * xx + dy * xy, cy + dx * yx + dy * yy
# l_slope and r_slope store the slopes of the left and right
# extremities of the square we're considering:
l_slope, r_slope = (dx-0.5)/(dy+0.5), (dx+0.5)/(dy-0.5)
if start < r_slope:
continue
elif end > l_slope:
break
else:
# Our light beam is touching this square; light it:
if dx*dx + dy*dy < radius_squared:
self.set_lit(X, Y)
if blocked:
# we're scanning a row of blocked squares:
if self.blocked(X, Y):
new_start = r_slope
continue
else:
blocked = False
start = new_start
else:
if self.blocked(X, Y) and j < radius:
# This is a blocking square, start a child scan:
blocked = True
self._cast_light(cx, cy, j+1, start, l_slope,
radius, xx, xy, yx, yy, id+1)
new_start = r_slope
# Row is scanned; do next row unless last square was blocked:
if blocked:
break
def do_fov(self, x, y, radius):
"Calculate lit squares from the given location and radius"
self.flag += 1
for oct in range(8):
self._cast_light(x, y, 1, 1.0, 0.0, radius,
mult_array[0, oct], mult_array[1, oct],
mult_array[2, oct], mult_array[3, oct], 0)
return self.light

View File

@ -229,7 +229,7 @@ if __name__ == '__main__':
allow_no_op=False)
factory = SimpleFactory(movement_properties=move_props, dirt_properties=dirt_props, n_agents=1,
combin_agent_slices_in_obs=False, level_name='rooms', parse_doors=True,
pomdp_radius=3)
pomdp_radius=3, cast_shadows=True)
n_actions = factory.action_space.n - 1
_ = factory.observation_space