Function for loading colored routes from a file. This way, a user can define his own paths without a manual coordinate input.
1051 lines
43 KiB
Python
1051 lines
43 KiB
Python
#!/usr/bin/python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
from __future__ import division, absolute_import
|
||
import pickle
|
||
from multiprocessing import Pool
|
||
import networkx as nx
|
||
from PIL import Image, ImageDraw
|
||
from math import sqrt, hypot, degrees, atan2
|
||
import random
|
||
import numpy as np
|
||
import time
|
||
from pylab import imshow, show, get_cmap, savefig
|
||
from collections import UserList, UserDict
|
||
import scipy.spatial as sp
|
||
from scipy import ndimage
|
||
from PCHA import PCHA
|
||
# from py_pcha.PCHA import PCHA
|
||
from operator import itemgetter
|
||
from dtw import dtw
|
||
|
||
workercount = 1
|
||
|
||
|
||
class Worker(object):
|
||
def __init__(self, n=workercount):
|
||
self.pool = Pool(processes=n)
|
||
self.timer = time.clock()
|
||
|
||
def do_work_onClass(self, itemList, taskName, *args):
|
||
results = []
|
||
for item in itemList:
|
||
task = getattr(item, taskName)
|
||
results.append(self.pool.apply_async(task, args=(*args,)))
|
||
self.pool.close()
|
||
self.pool.join()
|
||
return [r.get() for r in results]
|
||
|
||
def do_work(self, itemList, task):
|
||
results = []
|
||
for item in itemList:
|
||
results.append((self.pool.apply_async(task, args=(item, ))))
|
||
self.pool.close()
|
||
self.pool.join()
|
||
return [r.get() for r in results]
|
||
|
||
def do_2_work(self, itemList1, itemList2, task):
|
||
if len(itemList1) != len(itemList2):
|
||
raise ValueError('ItemLists need to be of same shape!')
|
||
results = []
|
||
for item1, item2 in itemList1, itemList2:
|
||
results.append((self.pool.apply_async(task, args=(item1, item2, ))))
|
||
self.pool.close()
|
||
self.pool.join()
|
||
return [r.get() for r in results]
|
||
|
||
def init_many(self, classObject, argsList):
|
||
results = self.pool.starmap(classObject, argsList)
|
||
self.pool.close()
|
||
self.pool.join()
|
||
return results
|
||
|
||
|
||
class IsovistCollection(UserDict):
|
||
def __init__(self, walkable, rangeLimit, tileArray, worker=None, single_threaded=False):
|
||
super(IsovistCollection, self).__init__()
|
||
if not isinstance(worker, Worker):
|
||
raise TypeError
|
||
self.data = dict()
|
||
self.walkable = walkable
|
||
self.tileArray = tileArray
|
||
self.rangeLimit = rangeLimit
|
||
self.lfr = None
|
||
if rangeLimit:
|
||
if not single_threaded:
|
||
workerResult = worker.init_many(
|
||
Isovist, [(*npIdx, self.tileArray, self.walkable, self.rangeLimit)
|
||
for npIdx, value in np.ndenumerate(self.tileArray) if value == self.walkable])
|
||
self.data = {isovist.vertex: isovist for isovist in workerResult}
|
||
# The following would be a non multithreaded approach, maybe activate it for smaller blueprints later
|
||
# TODO: Activate this for smaller Blueprints, when multithreading would lead to overhead
|
||
else:
|
||
for ndIndex, value in np.ndenumerate(self.tileArray):
|
||
if value == self.walkable or value > 0:
|
||
self.add_isovist(*ndIndex)
|
||
else:
|
||
pass
|
||
|
||
# TODO Nachträglich mehrere Isovisten durch multiprozesse hinzufügen
|
||
def add_isovist(self, x, y):
|
||
"""
|
||
Generate and add Isovist for specif coordinate.
|
||
|
||
:param x: X-Coordinate
|
||
:type x: int
|
||
:param y: Y-Coordinate
|
||
:type y: int
|
||
:return: Just a boolean as control-function.
|
||
:rtype: bool
|
||
"""
|
||
|
||
self[(x, y)] = (Isovist(x, y, self.tileArray, self.walkable, self.rangeLimit))
|
||
return True
|
||
|
||
@staticmethod
|
||
def get_angle(point, target):
|
||
"""
|
||
Calculate the angle between two points in degrees
|
||
https://stackoverflow.com/questions/9970281/java-calculating-the-angle-between-two-points-in-degrees
|
||
|
||
:param point: The point from where to measure.
|
||
:type point: (int, int)
|
||
:param target: The point to where to measure.
|
||
:type target: (int, int)
|
||
:return: Angle between two points in degree.
|
||
:rtype: int
|
||
"""
|
||
angle = degrees(atan2(target[1] - point[1], target[0] - point[0]))
|
||
if angle < 0:
|
||
angle += 360
|
||
return int(angle)
|
||
|
||
def rotateIsovist(self, isovist, prev, curr):
|
||
|
||
"""
|
||
Rotate an numpy-array or Isovist class object regarding the angle between the two points.
|
||
|
||
How-to-rotate-a-matrix-by-45-degrees:
|
||
https://math.stackexchange.com/questions/732679/how-to-rotate-a-matrix-by-45-degrees
|
||
https://docs.scipy.org/doc/scipy-0.13.0/reference/generated/scipy.ndimage.interpolation.rotate.html
|
||
|
||
:param isovist: Numpy array or Isovist class object.
|
||
:type isovist: np.ndarray or Isovist
|
||
:param prev: T-1 point comming from.
|
||
:type prev: (int, int)
|
||
:param curr: T-0 Point currently standing on.
|
||
:type curr: (int, int)
|
||
:return:
|
||
:rtype: np.ndarray or Isovist
|
||
"""
|
||
if isinstance(isovist, Isovist):
|
||
array = isovist.visArray
|
||
elif isinstance(isovist, np.ndarray):
|
||
array = isovist
|
||
else:
|
||
raise TypeError('Must be either np.ndarray or Isovist class object, but was: ', type(isovist))
|
||
|
||
# Calculate how many times to rotate by 90°
|
||
|
||
cur_angle = self.get_angle(prev, curr)
|
||
if self.lfr is not None:
|
||
if abs(self.lfr - cur_angle) == 45:
|
||
rotation = abs(self.lfr // 90)
|
||
else:
|
||
self.lfr = cur_angle
|
||
rotation = abs(cur_angle // 90)
|
||
else:
|
||
self.lfr = cur_angle
|
||
rotation = abs(cur_angle // 90)
|
||
|
||
if isinstance(isovist, Isovist):
|
||
isovist.visArray = np.rot90(array, rotation)
|
||
return isovist
|
||
else:
|
||
return np.rot90(array, rotation)
|
||
|
||
def get_items_for_track(self, track, dim='flat', in_walk_dir=False):
|
||
"""
|
||
|
||
:param track: Track class Object or list of int holding path coordinates.
|
||
:type track: Track or list of (int, int)
|
||
:param dim: Either 'flat' for a flattened numpy array or 'full' for all dimensions.
|
||
:type dim: str
|
||
:param in_walk_dir: Determine if the Isovist should rotate in walking direktion.
|
||
:type in_walk_dir: bool
|
||
:return: An array of Isovist arrays, one for each coordinate in track.
|
||
:rtype: np.ndarray
|
||
"""
|
||
if dim not in ['flat', 'full']:
|
||
raise ValueError('Dim can either be "flat" or "full".')
|
||
if not isinstance(track, Track):
|
||
raise TypeError('Please provide a Track object')
|
||
if not in_walk_dir:
|
||
if dim == 'flat':
|
||
return np.dstack([self[point].get_1D_array() for point in track])
|
||
else:
|
||
return np.dstack([self[point].visArray for point in track])
|
||
elif in_walk_dir and dim == 'full':
|
||
self.lfr = None
|
||
return np.dstack([self.rotateIsovist(self[currP].visArray, prevP, currP)
|
||
for prevP, currP in zip(track[:-1], track[1:])])
|
||
else:
|
||
raise ValueError('For a rotation in walking direction, please choose "full" output-mode.')
|
||
|
||
# [nb_samples, nb_frames, width, height, channels]
|
||
|
||
def set_rangeLimit(self, n):
|
||
if isinstance(n, int):
|
||
self.rangeLimit = n
|
||
else:
|
||
raise TypeError('n needs to be an integer!')
|
||
|
||
def add_for_trackCollection(self, trackCollection):
|
||
if isinstance(self.tileArray, np.ndarray) and self.rangeLimit:
|
||
for key in trackCollection.keys():
|
||
for i in range(len(trackCollection[key])):
|
||
self.add_isovist(*trackCollection[key][i])
|
||
else:
|
||
raise ValueError('Please provide a valid basemap array and a rangeLimit >= 1)')
|
||
return True
|
||
|
||
|
||
class Isovist(object):
|
||
def __init__(self, x, y, array, walkable, rangeLimit):
|
||
"""
|
||
"Calculate lit squares from the given location and radius through 'Shadow Casting'"
|
||
Source:
|
||
http://www.roguebasin.com/index.php?title=FOV_using_recursive_shadowcasting
|
||
http://www.roguebasin.com/index.php?title=PythonShadowcastingImplementation
|
||
|
||
:param x: y-part of the center coordinate from where the 'light' travels
|
||
:type x: int
|
||
:param y: X-part of the center coordinate from where the 'light' travels
|
||
:type y: int
|
||
:param array: Numpy Array holding the background image
|
||
:type array: np.ndarray
|
||
:param walkable: The value which identifies positions in the array through which light can travel
|
||
:type walkable: int or (int, int, int)
|
||
:param rangeLimit: Determine the radius in which pixels of the shadow needs to be calculated
|
||
:type rangeLimit: int
|
||
"""
|
||
mult = [[1, 0, 0, -1, -1, 0, 0, 1],
|
||
[0, 1, -1, 0, 0, -1, 1, 0],
|
||
[0, 1, 1, 0, 0, -1, -1, 0],
|
||
[1, 0, 0, 1, -1, 0, 0, -1]]
|
||
self.x = x
|
||
self.y = y
|
||
self.vertex = (self.x, self.y)
|
||
if isinstance(array, np.ndarray):
|
||
self.rangeLimit = rangeLimit if rangeLimit else int(sqrt(array.shape[0] * array.shape[1]))
|
||
self.visArray = np.zeros(array.shape, dtype=bool)
|
||
|
||
for octant in range(8):
|
||
self.__cast_light(self.x, self.y, 1, 1.0, 0.0, self.rangeLimit,
|
||
mult[0][octant], mult[1][octant],
|
||
mult[2][octant], mult[3][octant], 0, array, walkable)
|
||
|
||
offset = int(rangeLimit/2)
|
||
# self.visArray = self.visArray[
|
||
# max(int(x-self.rangeLimit), 0):min(self.visArray.shape[0], int(x+self.rangeLimit)),
|
||
# max(int(y-self.rangeLimit), 0):min(self.visArray.shape[1], int(y+self.rangeLimit))]
|
||
self.visArray = np.pad(self.visArray, ((offset, offset), (offset, offset)),
|
||
mode='constant')[self.x:self.x+rangeLimit, self.y:self.y+rangeLimit]
|
||
|
||
self.size = np.sum(self.visArray)
|
||
|
||
centroid = ndimage.measurements.center_of_mass(self.visArray.astype(int))
|
||
# centroid = np.average(self.visArray[:,:2], axis=0, weights=self.visArray[:,2]) # TODO: Baue eine np.Lösung
|
||
# https://stackoverflow.com/questions/29356825/python-calculate-center-of-mass
|
||
self.Xcent = centroid[0]
|
||
self.Ycent = centroid[1]
|
||
|
||
@staticmethod
|
||
def __blocksLight(x, y, array, walkable):
|
||
if x < 0 or y < 0:
|
||
return True
|
||
try:
|
||
return False if array[x, y] == walkable or array[x, y] > 0 else True
|
||
except IndexError:
|
||
return True
|
||
|
||
def __setVisible(self, x, y):
|
||
if x > 0 or y > 0:
|
||
try:
|
||
self.visArray[x, y] = True
|
||
return
|
||
except IndexError:
|
||
return
|
||
|
||
def __isVisible(self, x, y):
|
||
return self.visArray[x, y]
|
||
|
||
def __cast_light(self, cx, cy, row, start, end, radius, xx, xy, yx, yy, idx, array, walkable):
|
||
"""Recursive lightcasting function"""
|
||
if start < end:
|
||
return
|
||
radius_squared = radius * radius
|
||
for j in range(row, radius + 1):
|
||
dx, dy = -j - 1, -j
|
||
blocked = False
|
||
while dx <= 0:
|
||
dx += 1
|
||
# Translate the dx, dy coordinates into map coordinates:
|
||
X, Y = cx + dx * xx + dy * xy, cy + dx * yx + dy * yy
|
||
# l_slope and r_slope store the slopes of the left and right
|
||
# extremities of the square we're considering:
|
||
l_slope, r_slope = (dx - 0.5) / (dy + 0.5), (dx + 0.5) / (dy - 0.5)
|
||
if start < r_slope:
|
||
continue
|
||
elif end > l_slope:
|
||
break
|
||
else:
|
||
# Our light beam is touching this square; light it:
|
||
if dx * dx + dy * dy < radius_squared:
|
||
self.__setVisible(X, Y)
|
||
if blocked:
|
||
# we're scanning a row of blocked squares:
|
||
if self.__blocksLight(X, Y, array, walkable):
|
||
new_start = r_slope
|
||
continue
|
||
else:
|
||
blocked = False
|
||
start = new_start
|
||
else:
|
||
if self.__blocksLight(X, Y, array, walkable) and j < radius:
|
||
# This is a blocking square, start a child scan:
|
||
blocked = True
|
||
self.__cast_light(cx, cy, j + 1, start, l_slope,
|
||
radius, xx, xy, yx, yy, idx + 1, array, walkable)
|
||
new_start = r_slope
|
||
# Row is scanned; do next row unless last square was blocked:
|
||
if blocked:
|
||
break
|
||
|
||
def saveImg(self, filename='Isovist.tif'):
|
||
filename = filename if filename.endswith('.tif') else '%s.tif' % filename
|
||
imshow(self.visArray, interpolation='none', cmap='gray')
|
||
savefig(filename)
|
||
|
||
def get_1D_array(self):
|
||
return self.visArray.ravel()
|
||
|
||
|
||
class TrackCollection(UserDict):
|
||
def __init__(self, indoorToolset, worker=None):
|
||
"""
|
||
:param indoorToolset: An indoorToolset with baseMap holding cell values
|
||
:type indoorToolset: indoor_Toolset
|
||
:rtype: TrackCollection
|
||
"""
|
||
if not isinstance(indoorToolset, IndoorToolset):
|
||
raise TypeError
|
||
super(UserDict, self).__init__()
|
||
self.data = dict()
|
||
self.map = indoorToolset
|
||
self.hDict = dict()
|
||
self.archeArray = None
|
||
self.maxLen = self.__update_maxLen()
|
||
self.worker = worker if worker else Worker(n=workercount)
|
||
|
||
def __update_maxLen(self):
|
||
if self:
|
||
return max([len(self[track]) for track in self.keys()])
|
||
else:
|
||
return 0
|
||
|
||
def homotopic_classification(self):
|
||
print('Homotopic Classification started!')
|
||
baseArray = self.map.imgArray.astype(bool).astype(int)
|
||
|
||
massNames = list(self.keys())
|
||
for track in massNames.copy():
|
||
found = False
|
||
for key in reversed(list(self.hDict.keys())):
|
||
if self[key].isHomotopic(self[track], self.map, baseArray=baseArray):
|
||
self.hDict[key].append(track)
|
||
self[track].group = key
|
||
found = True
|
||
break
|
||
if not found:
|
||
self.hDict[track] = [track]
|
||
self[track].group = track
|
||
|
||
print('All Done\n%i Classes could be observed.' % len(self.hDict))
|
||
return True
|
||
|
||
def add_single_track(self, start, target, penalty=None, qhull=True):
|
||
track = self.map.calculate_path(start, target, penalty=penalty, qhull=qhull)
|
||
key = self.__find_list_middle(track)
|
||
track.vertex = key
|
||
self[key] = track
|
||
return True
|
||
|
||
|
||
def add_n_bunch_tracks(self, n, start, target, nbunch=None, penalty=None):
|
||
|
||
def build_track(segment1, segment2):
|
||
combined = list()
|
||
combined.extend(segment1 + list(reversed(segment2)))
|
||
return Track(combined, self.map.walkableTile)
|
||
|
||
if isinstance(penalty, int) or isinstance(penalty, float):
|
||
for i in range(n):
|
||
track = self.map.calculate_path(start, target, penalty=penalty)
|
||
key = self.__find_list_middle(track)
|
||
track.vertex = key
|
||
self[key] = track
|
||
|
||
else:
|
||
if nbunch and not n:
|
||
if isinstance(nbunch, list):
|
||
if all([isinstance(track, Track) for track in nbunch]):
|
||
for i, track in enumerate(nbunch):
|
||
key = self.__find_list_middle(track)
|
||
track.vertex = key
|
||
self[key] = track
|
||
n = i + 1
|
||
|
||
else:
|
||
singleSourceDij_S = nx.single_source_dijkstra_path(self.map.graph, start, weight='weight')
|
||
print('Start-Tree Created!')
|
||
singleSourceDij_T = nx.single_source_dijkstra_path(self.map.graph, target, weight='weight')
|
||
print('Target-Tree Created!')
|
||
|
||
if isinstance(n, str):
|
||
if n == 'all':
|
||
n = len(self.map.graph)
|
||
|
||
if n > 10:
|
||
allNodes = self.map.graph.nodes()
|
||
[allNodes.remove(p) for p in [start, target]]
|
||
if len(allNodes) > n:
|
||
rand = random.Random()
|
||
while len(allNodes) > n:
|
||
rand.seed(time.clock())
|
||
allNodes.remove(rand.randrange(len(allNodes)))
|
||
allTracks = self.worker.do_2_work(
|
||
[singleSourceDij_S[point] for point in allNodes if
|
||
singleSourceDij_S.get(point, None) and singleSourceDij_T.get(point, None)],
|
||
[singleSourceDij_T[point] for point in allNodes if
|
||
singleSourceDij_S.get(point, None) and singleSourceDij_T.get(point, None)],
|
||
build_track)
|
||
for track in allTracks:
|
||
self[track.vertex] = track
|
||
|
||
else:
|
||
for i in range(n):
|
||
point = self.map.getRandomPos()
|
||
segmentS = singleSourceDij_S[point]
|
||
segmentT = singleSourceDij_T[point]
|
||
self[point] = build_track(segmentS, segmentT)
|
||
self[point].vertex = point
|
||
self.maxLen = self.__update_maxLen()
|
||
print('%i tracks added!!' % n)
|
||
return True
|
||
|
||
@staticmethod
|
||
def __find_list_middle(input_list):
|
||
middle = float(len(input_list)) / 2
|
||
if middle % 2 != 0:
|
||
return input_list[int(middle - .5)]
|
||
else:
|
||
return input_list[int(middle - 1)]
|
||
|
||
def add_n_bunch_random(self, n, penalty=None, safe=True, minLen=0):
|
||
"""
|
||
:type n: int
|
||
:type penalty: float or int or None
|
||
:type safe: bool
|
||
:type minLen: int
|
||
:rtype: bool
|
||
"""
|
||
|
||
if not isinstance(n, int) or not isinstance(minLen, int):
|
||
raise TypeError
|
||
|
||
if n >= 50:
|
||
results = self.worker.do_work([None] * n, getattr(self.map, 'return_random_path'))
|
||
# TODO: Hier geht es weiter --> Pool object in self.map class maybe change the multiprocess call
|
||
for track in results:
|
||
while minLen and len(track) <= minLen:
|
||
print('Was too Small..')
|
||
track = self.map.return_random_path(penalty=penalty, safe=safe)
|
||
mid = self.__find_list_middle(track)
|
||
track.vertex = mid
|
||
self[mid] = track
|
||
|
||
else:
|
||
for i in range(n):
|
||
track = self.map.return_random_path(penalty=penalty, safe=safe)
|
||
|
||
while minLen and len(track) <= minLen:
|
||
track = self.map.return_random_path(penalty=penalty, safe=safe)
|
||
mid = self.__find_list_middle(track)
|
||
track.vertex = mid
|
||
self[mid] = track
|
||
|
||
self.maxLen = self.__update_maxLen()
|
||
print('returning %i Tracks' % n)
|
||
return True
|
||
|
||
def show(self, graphUpdate=False, saveIMG=False, hClass=False, trackList=None, allTracks=False):
|
||
"""
|
||
:param graphUpdate: Determine whether Node Values in Connected Graph are used.
|
||
:type graphUpdate: bool
|
||
:param saveIMG: Additionally save it as ".tif"-File
|
||
:type saveIMG: bool
|
||
:param hClass: Draw the hClasses
|
||
:type hClass: bool
|
||
:param trackList: None or list
|
||
:param allTracks: Show all Tracks grey in Background
|
||
:type allTracks: bool
|
||
:return : Show or Print the Bitmap
|
||
:rtype : None
|
||
"""
|
||
if allTracks:
|
||
allTracks = self.values()
|
||
if hClass:
|
||
self.map.show(self.hDict, hClass=True, graphUpdate=graphUpdate, saveIMG=saveIMG,
|
||
trackList=trackList, allTracks=allTracks)
|
||
else:
|
||
self.map.show(graphUpdate=graphUpdate, saveIMG=saveIMG, trackList=trackList, allTracks=allTracks)
|
||
|
||
def fill_archeArray(self):
|
||
idxLenList = [[idx, trck.tracklen] for idx, trck in zip(self.keys(), self.values())]
|
||
idxLenList.sort(key=itemgetter(1))
|
||
|
||
p = Pool()
|
||
# TODO: WAS mache ich denn eigentlich hier?
|
||
# TODO: Für sowas habe ich doch einen Worker Pool und brauche keinen neuen zu deployen
|
||
poolResults = []
|
||
t = time.clock()
|
||
print('All_Core_MultiProcessing_Pool started\n'
|
||
'Set Up & Loaded with %i tasks \ntime is %f' % ((len(self)), t))
|
||
for track in self.keys():
|
||
poolResults.append(p.apply_async(extract_arch_attributes,
|
||
args=(self[track], self[idxLenList[0][0]], list(self.hDict.keys()),)
|
||
))
|
||
p.close()
|
||
p.join()
|
||
print('Pool closed, merging... time is %F\n%f seconds taken...' % (time.clock(), time.clock() - t))
|
||
|
||
for result in poolResults:
|
||
if self.archeArray is not None:
|
||
self.archeArray = np.vstack([self.archeArray, result.get()])
|
||
if self.archeArray is None:
|
||
self.archeArray = result.get()
|
||
|
||
print(len(self.archeArray) if self.archeArray is not None else 0, ' Attributes Added!\n')
|
||
return True
|
||
|
||
def return_archetypes(self, k):
|
||
if not self.hDict:
|
||
self.homotopic_classification()
|
||
if self.archeArray is None:
|
||
self.fill_archeArray()
|
||
if self.archeArray is not None:
|
||
data = self.archeArray.copy()[:, 1:].T.astype(np.float64)
|
||
# https: // github.com / ulfaslak / py_pcha
|
||
# X = np.random.random((dimensions, examples)) Transpose array with array.T
|
||
# needed to change the data-dtype
|
||
XC, S, C, SSE, varexpl = PCHA(data, noc=k, delta=0.1, verbose=False)
|
||
|
||
print(' Arc #1, Arc #2, Arc #3 Arc #4\n\n', XC,
|
||
'\n\n Variables explained: ', varexpl)
|
||
else:
|
||
print('not yet implemented')
|
||
return False
|
||
|
||
# Ref -- http://stackoverflow.com/questions/1401712/how-can-the-euclidean-distance-be-calculated-with-numpy
|
||
# Ref2 -- http://stackoverflow.com/questions/8049798/understanding-nested-list-comprehension
|
||
# noinspection PyTypeChecker
|
||
idxArray = np.argmin([[np.linalg.norm(canidate - arche)
|
||
for canidate in self.archeArray[:, 1:]] for arche in XC.T], axis=1)
|
||
return [self[key] for key in [self.archeArray[:, 0][idx] for idx in idxArray]]
|
||
|
||
def return_walkableTileList(self):
|
||
return [npIndex for npIndex, value in np.ndenumerate(self.map.imgArray) if value == self.map.walkableTile]
|
||
|
||
def save_to_disc(self, filename):
|
||
filename = filename if filename.endswith('.pik') else '%s%s' % (filename, '.pik')
|
||
self.worker = None
|
||
with open(filename, 'wb') as output:
|
||
pickle.dump(self, output, pickle.HIGHEST_PROTOCOL)
|
||
self.worker = Worker(n=workercount)
|
||
|
||
def recover_from_disc(self, filename):
|
||
filename = filename if filename.endswith('.pik') else '%s%s' % (filename, '.pik')
|
||
with open(filename, 'rb') as file:
|
||
pick = pickle.load(file)
|
||
self.data = pick.data
|
||
self.map = pick.map
|
||
self.hDict = pick.hDict
|
||
self.archeArray = pick.archeArray
|
||
self.maxLen = self.__update_maxLen()
|
||
|
||
def as_n_sample_4D(self, timessteps, moving_window=False, stackSize=0, start=0,
|
||
in_walk_dir=False, keys=False, for_track=None):
|
||
stackList, keyList = list(), list()
|
||
if moving_window:
|
||
for i, key in enumerate(self.keys()):
|
||
|
||
if for_track:
|
||
track = self[for_track]
|
||
else:
|
||
track = self[key]
|
||
|
||
if stackSize and len(stackList) > stackSize:
|
||
stackList = stackList[:stackSize]
|
||
if keys:
|
||
keyList = keyList[:stackSize]
|
||
break
|
||
if i >= start:
|
||
isoArray = self.map.isovists.get_items_for_track(track, dim='full', in_walk_dir=in_walk_dir)
|
||
isoArray = isoArray.swapaxes(0, 2)
|
||
|
||
tempSequence = [isoArray[j:j + timessteps] for j in range(len(isoArray) - (timessteps - 1))]
|
||
|
||
if keys:
|
||
tempKeys = [track[(j + 1) + timessteps // 2] for j in range(len(tempSequence))]
|
||
keyList.extend(tempKeys)
|
||
|
||
stackList.extend(tempSequence)
|
||
if for_track:
|
||
break
|
||
|
||
else:
|
||
for i, key in enumerate(self.keys()):
|
||
if for_track:
|
||
track = self[for_track]
|
||
else:
|
||
track = self[key]
|
||
if stackSize and i > stackSize*(start+1):
|
||
break
|
||
if i >= start*stackSize:
|
||
diaStack = self.map.isovists.get_items_for_track(track, dim='full',
|
||
in_walk_dir=in_walk_dir).swapaxes(0, 2)
|
||
x = diaStack.shape[0] // timessteps
|
||
tempSequence = np.array_split(diaStack[:timessteps*x], x)
|
||
|
||
if keys:
|
||
tempKeys = [track[(j * timessteps) + 1 + (timessteps // 2)] for j in range(len(tempSequence))]
|
||
keyList.extend(tempKeys)
|
||
|
||
stackList.extend(tempSequence)
|
||
if for_track:
|
||
break
|
||
|
||
if keys:
|
||
return keyList, np.stack(stackList).astype(int)[..., None]
|
||
else:
|
||
return np.stack(stackList).astype(int)[..., None]
|
||
|
||
def as_flatTfArray(self, maxLen=0):
|
||
if maxLen == -1: # Ignore the maxLen Parameter
|
||
return np.dstack([self.map.isovists.get_items_for_track(self[key]) for key in self.keys()])
|
||
if not maxLen: # Default maxLen of longest track in collection, not calles when: maxLen >= 1
|
||
maxLen = self.maxLen
|
||
resultArray = None
|
||
for track in self:
|
||
sizedArray = np.dstack([self.map.isovists[self[track][-1]].get_1D_array() for _ in range(maxLen)])
|
||
isoArray = self.map.isovists.get_items_for_track(self[track])
|
||
sizedArray[:isoArray.shape[0], :isoArray.shape[1], :isoArray.shape[2]] = isoArray
|
||
if resultArray is not None:
|
||
resultArray = np.vstack([resultArray, sizedArray.ravel()])
|
||
elif resultArray is None:
|
||
resultArray = sizedArray.ravel()
|
||
return resultArray
|
||
|
||
def draw_track(self, key, saveIMG=''):
|
||
imArray = self.map.imgArray.copy()
|
||
for pixel in self[key]:
|
||
imArray[pixel] = 15
|
||
imshow(self.map.imgArray)
|
||
|
||
if saveIMG:
|
||
if not isinstance(saveIMG, str):
|
||
raise TypeError('Needs to be a String or Basestring as Name')
|
||
saveIMG = saveIMG if saveIMG.endswith('.tif') else '%s.tif' % saveIMG
|
||
savefig(saveIMG)
|
||
|
||
def read_from_basemap(self, startColor, trackColor, endColor):
|
||
|
||
def find_next_candidates(p):
|
||
positions = [(p[0]-1, p[1]-1),
|
||
(p[0], p[1]-1),
|
||
(p[0]+1, p[1]-1),
|
||
(p[0]-1, p[1]),
|
||
(p[0]+1, p[1]),
|
||
(p[0]-1, p[1]+1),
|
||
(p[0], p[1]+1),
|
||
(p[0]+1, p[1]+1)]
|
||
return [point for point in positions if self.map.imgArray[point] in [endColor, trackColor]]
|
||
|
||
startTiles, tracks = list(), list()
|
||
for idx, value in np.ndenumerate(self.map.imgArray):
|
||
if value == startColor:
|
||
startTiles.append(idx)
|
||
|
||
for startTile in startTiles:
|
||
currentTrack = list()
|
||
position = startTile
|
||
while self.map.imgArray[position] != endColor:
|
||
currentTrack.append(position)
|
||
c = find_next_candidates(position)
|
||
if len(c) == 1:
|
||
position = c[0]
|
||
elif len(c) == 2:
|
||
position = c[0] if c[0] not in currentTrack else c[1]
|
||
else:
|
||
raise ValueError('Something went wrong here, maybe no stop position?')
|
||
tracks.append(Track(currentTrack, self.map.walkableTile, qhull=False))
|
||
self.add_n_bunch_tracks(0, 0, 0, tracks)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
print('pass')
|
||
pass
|
||
|
||
|
||
|
||
class Track(UserList):
|
||
def __init__(self, NodeList, walkableTile, qhull=True):
|
||
if not isinstance(NodeList, list):
|
||
raise TypeError
|
||
super(UserList, self).__init__()
|
||
self.walkableTile = walkableTile
|
||
self.data = NodeList.copy()
|
||
self.group = None
|
||
self.vertex = None
|
||
self.hull = sp.ConvexHull(np.array(self)) if qhull else None
|
||
|
||
self.tracklen = self.length()
|
||
|
||
def __setitem__(self, i, item):
|
||
self.data[i] = item
|
||
self.tracklen = self.length()
|
||
|
||
def __delitem__(self, i):
|
||
del self.data[i]
|
||
self.tracklen = self.length()
|
||
|
||
def length(self, *args):
|
||
"""
|
||
:param args: Pass a foreign list if Points list(tuple(x,y)), this function then acts as @static method
|
||
:type args: [(int,int)]
|
||
:return: Sum of distance between every following point.
|
||
:rtype: float
|
||
Reference:
|
||
http://stackoverflow.com/questions/21216841/length-of-a-list-of-points/21217048
|
||
"""
|
||
if len(args) == 0:
|
||
return sum([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in zip(self[:-1], self[1:])])
|
||
else:
|
||
return sum([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in zip(args[0][:-1], args[0][1:])])
|
||
|
||
def areaCHull(self):
|
||
# http://stackoverflow.com/questions/21727199/
|
||
# python-convex-hull-with-scipy-spatial-delaunay-how-to-eleminate-points-inside-t
|
||
if not self.hull:
|
||
self.hull = sp.ConvexHull(np.array(self))
|
||
xList, yList = [self.hull.points[i][0] for i in self.hull.vertices], \
|
||
[self.hull.points[i][1] for i in self.hull.vertices]
|
||
# http: // stackoverflow.com / questions / 19873596 / convex - hull - area - in -python
|
||
return 0.5 * np.abs(np.dot(xList, np.roll(yList, 1)) - np.dot(yList, np.roll(xList, 1)))
|
||
|
||
def getStart(self):
|
||
return self[0]
|
||
|
||
def getTarget(self):
|
||
return self[-1]
|
||
|
||
def isHomotopic(self, track, indoorToolset, baseArray=None):
|
||
if not isinstance(track, Track):
|
||
raise TypeError
|
||
l, l2 = self.data.copy(), track.data.copy()
|
||
l.extend(reversed(l2))
|
||
|
||
img = Image.new('L', (indoorToolset.width, indoorToolset.height), 0)
|
||
ImageDraw.Draw(img).polygon(l, outline=1, fill=1)
|
||
binPoly = np.array(img)
|
||
|
||
if baseArray is None:
|
||
baseArray = indoorToolset.imgArray
|
||
|
||
a = (binPoly * baseArray).sum()
|
||
if a >= 1:
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
def mergeWith(self, track):
|
||
"""
|
||
:param track: A track to merge with
|
||
:type track: Track or list
|
||
:return: Two merged tracks
|
||
:rtype: Track
|
||
"""
|
||
if isinstance(track, Track):
|
||
l2 = track.data
|
||
elif isinstance(track, list):
|
||
l2 = track
|
||
else:
|
||
typ, prefix = str(type(track)), ('a', 'e', 'i', 'o', 'u')
|
||
raise TypeError('The has to be a List or a Track, but was %s: "%s"' %
|
||
('an' if typ.startswith(prefix) else 'a', typ))
|
||
l = self.data
|
||
l.extend(l2)
|
||
return Track(l, self.walkableTile)
|
||
|
||
def return_isovists(self, trackCollection=None, indoorToolset=None):
|
||
if isinstance(trackCollection, TrackCollection):
|
||
return [trackCollection.map.isovists[point] for point in self]
|
||
elif isinstance(indoorToolset, IndoorToolset):
|
||
return [trackCollection.isovists[point] for point in self]
|
||
else:
|
||
print('Please provide a TrackCollection or a Indoortoolset that holds the Isovist reference.')
|
||
return False
|
||
|
||
|
||
class IndoorToolset(object):
|
||
def __init__(self, imageArray, walkableTile, graph=None, worker=None, isoVistSize=0):
|
||
"""
|
||
:param graph: An optional Graph
|
||
:type graph: nx.Graph
|
||
"""
|
||
if not isinstance(imageArray, np.ndarray) or not isinstance(worker, Worker):
|
||
raise TypeError
|
||
|
||
self.walkableTile = walkableTile
|
||
self.imgArray = imageArray
|
||
self.shape = self.imgArray.shape
|
||
self.height = self.shape[0]
|
||
self.width = self.shape[1]
|
||
if graph is not None and isinstance(graph, nx.Graph):
|
||
self.graph = graph.copy()
|
||
else:
|
||
self.graph = self.translate_to_graph()
|
||
self.__rand = random.Random()
|
||
self.isovists = IsovistCollection(self.walkableTile, isoVistSize, self.imgArray, worker=worker)
|
||
|
||
|
||
def refresh_random_clock(self):
|
||
self.__rand.seed(time.clock())
|
||
|
||
def copy(self):
|
||
return IndoorToolset(self.imgArray.copy(), self.walkableTile, graph=self.graph.copy())
|
||
|
||
def __len__(self):
|
||
return self.width * self.height
|
||
|
||
def show(self, *args, graphUpdate=False, saveIMG=False, hClass=False, trackList=None, allTracks=None):
|
||
def print_n_show(img, name, hot=False):
|
||
# http://stackoverflow.com/questions/9406400/how-can-i-use-a-pre-made-color-map-for-my-heat-map-in-matplotlib
|
||
if hot:
|
||
imshow(img, cmap=get_cmap("hot")) # , interpolation='nearest') # Additional Option
|
||
else:
|
||
imshow(img)
|
||
if saveIMG:
|
||
savefig(name)
|
||
show()
|
||
|
||
if graphUpdate:
|
||
maX = max(nx.get_node_attributes(self.graph, 'count').values())
|
||
for node in self.graph.nodes():
|
||
maX = maX if maX >= 1 else 1
|
||
color = int(self.graph.node[node]['count'] / maX * 100)
|
||
self.imgArray[node] = color if color is not 0 else 255
|
||
print_n_show(self.imgArray, "heatMap.tif", hot=True)
|
||
|
||
if hClass:
|
||
if not args[0]:
|
||
print('Not Classified!')
|
||
pass
|
||
else:
|
||
hDict = args[0].copy()
|
||
|
||
for i, hClassKey in enumerate(hDict.keys()):
|
||
color = i + 1 * 255 / len(hDict)
|
||
for track in hDict[hClassKey]:
|
||
self.imgArray[track] = int(color)
|
||
print_n_show(self.imgArray.copy(), 'hClass.tif')
|
||
|
||
if allTracks:
|
||
for track in allTracks:
|
||
for pixel in track:
|
||
self.imgArray[pixel] = 7
|
||
if trackList:
|
||
if isinstance(trackList, list):
|
||
for i, track in enumerate(trackList):
|
||
color = i + 1 * 255 / len(trackList)
|
||
for pixel in track:
|
||
self.imgArray[pixel] = int(color)
|
||
print_n_show(self.imgArray, 'tracks.tif')
|
||
|
||
def getRandomPos(self, verbose=False):
|
||
"""
|
||
:param verbose: Print more infromation
|
||
:type verbose: bool
|
||
|
||
:return: A random walkable position in the graph
|
||
:rtype: (int, int)
|
||
"""
|
||
self.refresh_random_clock()
|
||
rs = self.graph.nodes()[self.__rand.randrange(len(self.graph))]
|
||
if verbose:
|
||
print(rs, ' is random Position -> checking accessibiliy')
|
||
notWalkable = False
|
||
if self.imgArray[rs] != self.walkableTile:
|
||
notWalkable = True
|
||
while notWalkable:
|
||
self.refresh_random_clock()
|
||
rs = self.graph.nodes()[self.__rand.randrange(len(self.graph))]
|
||
if verbose:
|
||
print(rs, 'needed to be computed, upper was not walkable')
|
||
if verbose:
|
||
print('is valid!')
|
||
return rs
|
||
|
||
def translate_to_graph(self):
|
||
graph = nx.Graph()
|
||
for idx, value in np.ndenumerate(self.imgArray):
|
||
if value > 0 or value == self.walkableTile:
|
||
x, y = idx
|
||
graph.add_node((x, y), count=0)
|
||
|
||
# up
|
||
if graph.has_node((x, y - 1)):
|
||
graph.add_edge((x, y), (x, y - 1), weight=1)
|
||
|
||
# upLeft
|
||
if graph.has_node((x - 1, y - 1)):
|
||
graph.add_edge((x, y), (x - 1, y - 1), weight=sqrt(2))
|
||
|
||
# lowerLeft
|
||
if graph.has_node((x - 1, y + 1)):
|
||
graph.add_edge((x, y), (x - 1, y + 1), weight=sqrt(2))
|
||
|
||
# left
|
||
if graph.has_node((x - 1, y)):
|
||
graph.add_edge((x, y), (x - 1, y), weight=1)
|
||
|
||
return graph
|
||
|
||
def calculate_path(self, source, target, alg='dijkstra', path=None, penalty=None, qhull=True):
|
||
"""
|
||
Calculate a path through the graph, based on the Bitmap you imported.
|
||
|
||
:param source: (X,Y) Positions Tuple to route from
|
||
:type source: (int, int)
|
||
:param target: (X,Y) Positions Tuple to route to
|
||
:type target: (int, int)
|
||
:param alg: Define the Routing Algorithm through the graph
|
||
:type alg: basestring
|
||
:param path: Use this for Updating edge weights for an allready given Path
|
||
in form of a (X,Y) Position Tuple List
|
||
:type path: ((int, int))
|
||
:param penalty: Set a Nummer for applying edge weights
|
||
:type penalty: None or float
|
||
:return: Calculates an Path with the given algorithm, default: 'Dijkstra'
|
||
:rtype: Track
|
||
"""
|
||
|
||
dij_s_p = list()
|
||
if not path and not isinstance(path, list):
|
||
if alg == 'dijkstra':
|
||
dij_s_p = nx.dijkstra_path(self.graph, source, target, weight='weight')
|
||
elif alg == 'bi_dijkstra':
|
||
dij_s_p = nx.bidirectional_dijkstra(self.graph, source, target, weight='weight')
|
||
else:
|
||
dij_s_p = path
|
||
|
||
if penalty and (isinstance(penalty, float) or isinstance(penalty, int)):
|
||
for node in dij_s_p.copy():
|
||
oldCount = self.graph.node[node]['count']
|
||
self.graph.add_node(node, count=oldCount + penalty)
|
||
|
||
for currNode, nextNode in zip(dij_s_p[:-1], dij_s_p[1:]):
|
||
oldWeight = self.graph.edge[currNode][nextNode]['weight']
|
||
self.graph.add_edge(currNode, nextNode, weight=oldWeight + penalty)
|
||
|
||
track = Track(dij_s_p, self.walkableTile, qhull=qhull)
|
||
print(len(dij_s_p), ' Len Path generated -> Nodes: ', len(self.graph), ' -> Edges: ', len(self.graph.edges()))
|
||
return track
|
||
|
||
def return_random_path(self, penalty=None, safe=False):
|
||
"""
|
||
Calculate a single random shortest path
|
||
|
||
:param penalty: Set a Nummer for applying edge weights
|
||
:type penalty: None or float
|
||
|
||
:param safe: Apply a connectivity check before returning.
|
||
Affects the performance, only usefull when dealing with multiple non connected components.
|
||
:type safe: bool
|
||
|
||
:return: A Random shortest Path.
|
||
:rtype: Track
|
||
"""
|
||
p, p2 = (0, 0), (0, 0)
|
||
# while angle in degree modulo 45 == 0 or distance <= sqrt(2)
|
||
while degrees(atan2(p[1] - p2[1], p[0] - p2[0])) % 45 == 0 or hypot(p[0] - p2[0], p[1] - p2[1]) <= sqrt(2):
|
||
p, p2 = self.getRandomPos(), self.getRandomPos()
|
||
print('source: ', p, 'target: ', p2, ' - generated')
|
||
if safe:
|
||
while not nx.has_path(self.graph, p, p2):
|
||
p, p2 = self.getRandomPos(), self.getRandomPos()
|
||
print('unconnected -> New Try: S=', p, ' T=', p2)
|
||
return self.calculate_path(p, p2, penalty=penalty)
|
||
|
||
|
||
|
||
|
||
# Extraction Function - had to be static because of multiprocessing
|
||
def extract_arch_attributes(track, shortestT, hClassList):
|
||
attributes = list()
|
||
# ▪ TrackID - do not use as real attribute for analysis input!!!!!!!!!!
|
||
# remove by: dataArray[:, 1:] Subset without first Attribute
|
||
attributes.append(track.vertex)
|
||
# ▪ Convex hull’s area,
|
||
attributes.append(track.areaCHull())
|
||
# ▪ length,
|
||
attributes.append(track.length(track.hull.points))
|
||
#
|
||
# Longest Distance between two vertives of the Convex Hull, this is probably the euclidian distance
|
||
# between start and target - ups
|
||
# cLD = max([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in combinations(track.hull.points, 2)]))
|
||
# attributes.append(cLD))
|
||
#
|
||
# ▪ vertices,
|
||
attributes.append(len(track))
|
||
# ▪ ,centroid - (distance to mipoint between start und target)
|
||
# http://stackoverflow.com/questions/31562534/scipy-centroid-of-convex-hull
|
||
centroid = list((np.mean(track.hull.vertices[0]), np.mean(track.hull.vertices[1])))
|
||
midpoint = list(((track[0][0] + track[-1][0]) / 2, (track[0][1] + track[-1][1]) / 2))
|
||
cMidDist = hypot(centroid[0] - midpoint[0], centroid[1] - midpoint[1])
|
||
attributes.append(cMidDist)
|
||
# ▪ and orientation --http://stackoverflow.com/questions/31735499/calculate-angle-clockwise-between-two-points
|
||
# https://docs.python.org/dev/reference/expressions.html#calls
|
||
ang1 = np.arctan2(*centroid[::-1])
|
||
ang2 = np.arctan2(*midpoint[::-1])
|
||
attributes.append(np.rad2deg((ang1 - ang2) % (2 * np.pi)))
|
||
# ▪ Length
|
||
attributes.append(track.tracklen)
|
||
# ▪ Angular sum(cancelling / positive)
|
||
# Not yet implemented
|
||
# ▪ Relative length Regarding the shortest route
|
||
attributes.append(track.tracklen / shortestT.tracklen)
|
||
# ▪ DTW distance
|
||
dist, cost, acc, path = \
|
||
dtw(np.array(track), np.array(shortestT), dist=lambda x, y: np.linalg.norm(x - y, ord=1))
|
||
attributes.append(dist)
|
||
# ▪ Pixel’s average / min “heat”
|
||
# Not yet implemented
|
||
# ▪ Homotrophic class - remap (x,y)Coord-Tuple-Keys to int representation
|
||
attributes.append(hClassList.index(track.group))
|
||
return np.array(attributes)
|