1052 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import division, absolute_import
import pickle
from multiprocessing import Pool
import networkx as nx
from PIL import Image, ImageDraw
from math import sqrt, hypot, degrees, atan2
import random
import numpy as np
import time
from pylab import imshow, show, get_cmap, savefig
from collections import UserList, UserDict
import scipy.spatial as sp
from scipy import ndimage
from PCHA import PCHA
# from py_pcha.PCHA import PCHA
from operator import itemgetter
from dtw import dtw
workercount = 6
class Worker(object):
def __init__(self, n=workercount):
self.pool = Pool(processes=n)
self.timer = time.clock()
def do_work_onClass(self, itemList, taskName, *args):
results = []
for item in itemList:
task = getattr(item, taskName)
results.append(self.pool.apply_async(task, args=(*args,)))
self.pool.close()
self.pool.join()
return [r.get() for r in results]
def do_work(self, itemList, task):
results = []
for item in itemList:
results.append((self.pool.apply_async(task, args=(item, ))))
self.pool.close()
self.pool.join()
return [r.get() for r in results]
def do_2_work(self, itemList1, itemList2, task):
if len(itemList1) != len(itemList2):
raise ValueError('ItemLists need to be of same shape!')
results = []
for item1, item2 in itemList1, itemList2:
results.append((self.pool.apply_async(task, args=(item1, item2, ))))
self.pool.close()
self.pool.join()
return [r.get() for r in results]
def init_many(self, classObject, argsList):
results = self.pool.starmap(classObject, argsList)
self.pool.close()
self.pool.join()
return results
class IsovistCollection(UserDict):
def __init__(self, walkables, rangeLimit, tileArray, worker=None, single_threaded=False):
super(IsovistCollection, self).__init__()
if not isinstance(worker, Worker):
raise TypeError
self.data = dict()
self.walkables = walkables
self.tileArray = tileArray
self.rangeLimit = rangeLimit
self.lfr = None
if rangeLimit:
if not single_threaded:
workerResult = worker.init_many(
Isovist, [(*npIdx, self.tileArray, self.walkables, self.rangeLimit)
for npIdx, value in np.ndenumerate(self.tileArray) if value == self.walkables])
self.data = {isovist.vertex: isovist for isovist in workerResult}
# The following would be a non multithreaded approach, maybe activate it for smaller blueprints later
# TODO: Activate this for smaller Blueprints, when multithreading would lead to overhead
else:
for ndIndex, value in np.ndenumerate(self.tileArray):
if value in self.walkables:
self.add_isovist(*ndIndex)
else:
pass
# TODO Nachträglich mehrere Isovisten durch multiprozesse hinzufügen
def add_isovist(self, x, y):
"""
Generate and add Isovist for specif coordinate.
:param x: X-Coordinate
:type x: int
:param y: Y-Coordinate
:type y: int
:return: Just a boolean as control-function.
:rtype: bool
"""
self[(x, y)] = (Isovist(x, y, self.tileArray, self.walkables, self.rangeLimit))
return True
@staticmethod
def get_angle(point, target):
"""
Calculate the angle between two points in degrees
https://stackoverflow.com/questions/9970281/java-calculating-the-angle-between-two-points-in-degrees
:param point: The point from where to measure.
:type point: (int, int)
:param target: The point to where to measure.
:type target: (int, int)
:return: Angle between two points in degree.
:rtype: int
"""
angle = degrees(atan2(target[1] - point[1], target[0] - point[0]))
if angle < 0:
angle += 360
return int(angle)
def rotateIsovist(self, isovist, prev, curr):
"""
Rotate an numpy-array or Isovist class object regarding the angle between the two points.
How-to-rotate-a-matrix-by-45-degrees:
https://math.stackexchange.com/questions/732679/how-to-rotate-a-matrix-by-45-degrees
https://docs.scipy.org/doc/scipy-0.13.0/reference/generated/scipy.ndimage.interpolation.rotate.html
:param isovist: Numpy array or Isovist class object.
:type isovist: np.ndarray or Isovist
:param prev: T-1 point comming from.
:type prev: (int, int)
:param curr: T-0 Point currently standing on.
:type curr: (int, int)
:return:
:rtype: np.ndarray or Isovist
"""
if isinstance(isovist, Isovist):
array = isovist.visArray
elif isinstance(isovist, np.ndarray):
array = isovist
else:
raise TypeError('Must be either np.ndarray or Isovist class object, but was: ', type(isovist))
# Calculate how many times to rotate by 90°
cur_angle = self.get_angle(prev, curr)
if self.lfr is not None:
if abs(self.lfr - cur_angle) == 45:
rotation = abs(self.lfr // 90)
else:
self.lfr = cur_angle
rotation = abs(cur_angle // 90)
else:
self.lfr = cur_angle
rotation = abs(cur_angle // 90)
if isinstance(isovist, Isovist):
isovist.visArray = np.rot90(array, rotation)
return isovist
else:
return np.rot90(array, rotation)
def get_items_for_track(self, track, dim='flat', in_walk_dir=False):
"""
:param track: Track class Object or list of int holding path coordinates.
:type track: Track or list of (int, int)
:param dim: Either 'flat' for a flattened numpy array or 'full' for all dimensions.
:type dim: str
:param in_walk_dir: Determine if the Isovist should rotate in walking direktion.
:type in_walk_dir: bool
:return: An array of Isovist arrays, one for each coordinate in track.
:rtype: np.ndarray
"""
if dim not in ['flat', 'full']:
raise ValueError('Dim can either be "flat" or "full".')
if not isinstance(track, Track):
raise TypeError('Please provide a Track object')
if not in_walk_dir:
if dim == 'flat':
return np.dstack([self[point].get_1D_array() for point in track])
else:
return np.dstack([self[point].visArray for point in track])
elif in_walk_dir and dim == 'full':
self.lfr = None
return np.dstack([self.rotateIsovist(self[currP].visArray, prevP, currP)
for prevP, currP in zip(track[:-1], track[1:])])
else:
raise ValueError('For a rotation in walking direction, please choose "full" output-mode.')
# [nb_samples, nb_frames, width, height, channels]
def set_rangeLimit(self, n):
if isinstance(n, int):
self.rangeLimit = n
else:
raise TypeError('n needs to be an integer!')
def add_for_trackCollection(self, trackCollection):
if isinstance(self.tileArray, np.ndarray) and self.rangeLimit:
for key in trackCollection.keys():
for i in range(len(trackCollection[key])):
self.add_isovist(*trackCollection[key][i])
else:
raise ValueError('Please provide a valid basemap array and a rangeLimit >= 1)')
return True
class Isovist(object):
def __init__(self, x, y, array, walkables, rangeLimit):
"""
"Calculate lit squares from the given location and radius through 'Shadow Casting'"
Source:
http://www.roguebasin.com/index.php?title=FOV_using_recursive_shadowcasting
http://www.roguebasin.com/index.php?title=PythonShadowcastingImplementation
:param x: y-part of the center coordinate from where the 'light' travels
:type x: int
:param y: X-part of the center coordinate from where the 'light' travels
:type y: int
:param array: Numpy Array holding the background image
:type array: np.ndarray
:param walkables: The value which identifies positions in the array through which light can travel
:type walkables: int or (int, int, int)
:param rangeLimit: Determine the radius in which pixels of the shadow needs to be calculated
:type rangeLimit: int
"""
mult = [[1, 0, 0, -1, -1, 0, 0, 1],
[0, 1, -1, 0, 0, -1, 1, 0],
[0, 1, 1, 0, 0, -1, -1, 0],
[1, 0, 0, 1, -1, 0, 0, -1]]
self.x = x
self.y = y
self.vertex = (self.x, self.y)
self.walkables = walkables
if isinstance(array, np.ndarray):
self.rangeLimit = rangeLimit if rangeLimit else int(sqrt(array.shape[0] * array.shape[1]))
self.radius = rangeLimit // 2 + 1
self.visArray = np.zeros(array.shape, dtype=bool)
for octant in range(8):
self.__cast_light(self.x, self.y, 1, 1.0, 0.0,
mult[0][octant], mult[1][octant],
mult[2][octant], mult[3][octant], 0, array)
offset = int(rangeLimit/2)
# self.visArray = self.visArray[
# max(int(x-self.rangeLimit), 0):min(self.visArray.shape[0], int(x+self.rangeLimit)),
# max(int(y-self.rangeLimit), 0):min(self.visArray.shape[1], int(y+self.rangeLimit))]
self.visArray = np.pad(self.visArray, ((offset, offset), (offset, offset)),
mode='constant')[self.x:self.x+rangeLimit, self.y:self.y+rangeLimit]
self.size = np.sum(self.visArray)
centroid = ndimage.measurements.center_of_mass(self.visArray.astype(int))
# centroid = np.average(self.visArray[:,:2], axis=0, weights=self.visArray[:,2]) # TODO: Baue eine np.Lösung
# https://stackoverflow.com/questions/29356825/python-calculate-center-of-mass
self.Xcent = centroid[0]
self.Ycent = centroid[1]
def __blocksLight(self, x, y, array):
if x < 0 or y < 0:
return True
try:
return False if array[x, y] in self.walkables else True
except IndexError:
return True
def __setVisible(self, x, y):
if x > 0 or y > 0:
try:
self.visArray[x, y] = True
return
except IndexError:
return
def __isVisible(self, x, y):
return self.visArray[x, y]
def __cast_light(self, cx, cy, row, start, end, xx, xy, yx, yy, idx, array):
"""Recursive lightcasting function from roguebasin.com"""
if start < end:
return
radius_squared = self.radius * self.radius
for j in range(row, self.radius + 1):
dx, dy = -j - 1, -j
blocked = False
while dx <= 0:
dx += 1
# Translate the dx, dy coordinates into map coordinates:
X, Y = cx + dx * xx + dy * xy, cy + dx * yx + dy * yy
# l_slope and r_slope store the slopes of the left and right
# extremities of the square we're considering:
l_slope, r_slope = (dx - 0.5) / (dy + 0.5), (dx + 0.5) / (dy - 0.5)
if start < r_slope:
continue
elif end > l_slope:
break
else:
# Our light beam is touching this square; light it:
if dx * dx + dy * dy < radius_squared:
self.__setVisible(X, Y)
if blocked:
# we're scanning a row of blocked squares:
if self.__blocksLight(X, Y, array):
new_start = r_slope
continue
else:
blocked = False
start = new_start
else:
if self.__blocksLight(X, Y, array) and j < self.radius:
# This is a blocking square, start a child scan:
blocked = True
self.__cast_light(cx, cy, j + 1, start, l_slope, xx, xy, yx, yy, idx + 1, array)
new_start = r_slope
# Row is scanned; do next row unless last square was blocked:
if blocked:
break
def saveImg(self, filename='Isovist.tif'):
filename = filename if filename.endswith('.tif') else '%s.tif' % filename
imshow(self.visArray, interpolation='none', cmap='gray')
savefig(filename)
def get_1D_array(self):
return self.visArray.ravel()
class TrackCollection(UserDict):
def __init__(self, indoorToolset, worker=None):
"""
:param indoorToolset: An indoorToolset with baseMap holding cell values
:type indoorToolset: indoor_Toolset
:rtype: TrackCollection
"""
if not isinstance(indoorToolset, IndoorToolset):
raise TypeError
super(UserDict, self).__init__()
self.data = dict()
self.map = indoorToolset
self.hDict = dict()
self.archeArray = None
self.maxLen = self.__update_maxLen()
self.worker = worker if worker else Worker(n=workercount)
def __update_maxLen(self):
if self:
return max([len(self[track]) for track in self.keys()])
else:
return 0
def homotopic_classification(self):
print('Homotopic Classification started!')
baseArray = self.map.imgArray.astype(bool).astype(int)
massNames = list(self.keys())
for track in massNames.copy():
found = False
for key in reversed(list(self.hDict.keys())):
if self[key].isHomotopic(self[track], self.map, baseArray=baseArray):
self.hDict[key].append(track)
self[track].group = key
found = True
break
if not found:
self.hDict[track] = [track]
self[track].group = track
print('All Done\n%i Classes could be observed.' % len(self.hDict))
return True
def add_single_track(self, start, target, penalty=None, qhull=True):
track = self.map.calculate_path(start, target, penalty=penalty, qhull=qhull)
key = self.__find_list_middle(track)
track.vertex = key
self[key] = track
return True
def add_n_bunch_tracks(self, n, start, target, nbunch=None, penalty=None):
def build_track(segment1, segment2):
combined = list()
combined.extend(segment1 + list(reversed(segment2)))
return Track(combined, self.map.walkableTiles)
if isinstance(penalty, int) or isinstance(penalty, float):
for i in range(n):
track = self.map.calculate_path(start, target, penalty=penalty)
key = self.__find_list_middle(track)
track.vertex = key
self[key] = track
else:
if nbunch and not n:
if isinstance(nbunch, list):
if all([isinstance(track, Track) for track in nbunch]):
for i, track in enumerate(nbunch):
key = self.__find_list_middle(track)
track.vertex = key
self[key] = track
n = i + 1
else:
singleSourceDij_S = nx.single_source_dijkstra_path(self.map.graph, start, weight='weight')
print('Start-Tree Created!')
singleSourceDij_T = nx.single_source_dijkstra_path(self.map.graph, target, weight='weight')
print('Target-Tree Created!')
if isinstance(n, str):
if n == 'all':
n = len(self.map.graph)
if n > 10:
allNodes = self.map.graph.nodes()
[allNodes.remove(p) for p in [start, target]]
if len(allNodes) > n:
rand = random.Random()
while len(allNodes) > n:
rand.seed(time.clock())
allNodes.remove(rand.randrange(len(allNodes)))
allTracks = self.worker.do_2_work(
[singleSourceDij_S[point] for point in allNodes if
singleSourceDij_S.get(point, None) and singleSourceDij_T.get(point, None)],
[singleSourceDij_T[point] for point in allNodes if
singleSourceDij_S.get(point, None) and singleSourceDij_T.get(point, None)],
build_track)
for track in allTracks:
self[track.vertex] = track
else:
for i in range(n):
point = self.map.getRandomPos()
segmentS = singleSourceDij_S[point]
segmentT = singleSourceDij_T[point]
self[point] = build_track(segmentS, segmentT)
self[point].vertex = point
self.maxLen = self.__update_maxLen()
print('%i tracks added!!' % n)
return True
@staticmethod
def __find_list_middle(input_list):
middle = float(len(input_list)) / 2
if middle % 2 != 0:
return input_list[int(middle - .5)]
else:
return input_list[int(middle - 1)]
def add_n_bunch_random(self, n, penalty=None, safe=True, minLen=0):
"""
:type n: int
:type penalty: float or int or None
:type safe: bool
:type minLen: int
:rtype: bool
"""
if not isinstance(n, int) or not isinstance(minLen, int):
raise TypeError
if n >= 50:
results = self.worker.do_work([None] * n, getattr(self.map, 'return_random_path'))
# TODO: Hier geht es weiter --> Pool object in self.map class maybe change the multiprocess call
for track in results:
while minLen and len(track) <= minLen:
print('Was too Small..')
track = self.map.return_random_path(penalty=penalty, safe=safe)
mid = self.__find_list_middle(track)
track.vertex = mid
self[mid] = track
else:
for i in range(n):
track = self.map.return_random_path(penalty=penalty, safe=safe)
while minLen and len(track) <= minLen:
track = self.map.return_random_path(penalty=penalty, safe=safe)
mid = self.__find_list_middle(track)
track.vertex = mid
self[mid] = track
self.maxLen = self.__update_maxLen()
print('returning %i Tracks' % n)
return True
def show(self, graphUpdate=False, saveIMG=False, hClass=False, trackList=None, allTracks=False):
"""
:param graphUpdate: Determine whether Node Values in Connected Graph are used.
:type graphUpdate: bool
:param saveIMG: Additionally save it as ".tif"-File
:type saveIMG: bool
:param hClass: Draw the hClasses
:type hClass: bool
:param trackList: None or list
:param allTracks: Show all Tracks grey in Background
:type allTracks: bool
:return : Show or Print the Bitmap
:rtype : None
"""
if allTracks:
allTracks = self.values()
if hClass:
self.map.show(self.hDict, hClass=True, graphUpdate=graphUpdate, saveIMG=saveIMG,
trackList=trackList, allTracks=allTracks)
else:
self.map.show(graphUpdate=graphUpdate, saveIMG=saveIMG, trackList=trackList, allTracks=allTracks)
def fill_archeArray(self):
idxLenList = [[idx, trck.tracklen] for idx, trck in zip(self.keys(), self.values())]
idxLenList.sort(key=itemgetter(1))
p = Pool()
# TODO: WAS mache ich denn eigentlich hier?
# TODO: Für sowas habe ich doch einen Worker Pool und brauche keinen neuen zu deployen
poolResults = []
t = time.clock()
print('All_Core_MultiProcessing_Pool started\n'
'Set Up & Loaded with %i tasks \ntime is %f' % ((len(self)), t))
for track in self.keys():
poolResults.append(p.apply_async(extract_arch_attributes,
args=(self[track], self[idxLenList[0][0]], list(self.hDict.keys()),)
))
p.close()
p.join()
print('Pool closed, merging... time is %F\n%f seconds taken...' % (time.clock(), time.clock() - t))
for result in poolResults:
if self.archeArray is not None:
self.archeArray = np.vstack([self.archeArray, result.get()])
if self.archeArray is None:
self.archeArray = result.get()
print(len(self.archeArray) if self.archeArray is not None else 0, ' Attributes Added!\n')
return True
def return_archetypes(self, k):
if not self.hDict:
self.homotopic_classification()
if self.archeArray is None:
self.fill_archeArray()
if self.archeArray is not None:
data = self.archeArray.copy()[:, 1:].T.astype(np.float64)
# https: // github.com / ulfaslak / py_pcha
# X = np.random.random((dimensions, examples)) Transpose array with array.T
# needed to change the data-dtype
XC, S, C, SSE, varexpl = PCHA(data, noc=k, delta=0.1, verbose=False)
print(' Arc #1, Arc #2, Arc #3 Arc #4\n\n', XC,
'\n\n Variables explained: ', varexpl)
else:
print('not yet implemented')
return False
# Ref -- http://stackoverflow.com/questions/1401712/how-can-the-euclidean-distance-be-calculated-with-numpy
# Ref2 -- http://stackoverflow.com/questions/8049798/understanding-nested-list-comprehension
# noinspection PyTypeChecker
idxArray = np.argmin([[np.linalg.norm(canidate - arche)
for canidate in self.archeArray[:, 1:]] for arche in XC.T], axis=1)
return [self[key] for key in [self.archeArray[:, 0][idx] for idx in idxArray]]
def return_walkableTileList(self):
return [npIndex for npIndex, value in np.ndenumerate(self.map.imgArray) if value in self.map.walkableTiles]
def save_to_disc(self, filename):
filename = filename if filename.endswith('.pik') else '%s%s' % (filename, '.pik')
self.worker = None
with open(filename, 'wb') as output:
pickle.dump(self, output, pickle.HIGHEST_PROTOCOL)
self.worker = Worker(n=workercount)
def recover_from_disc(self, filename):
filename = filename if filename.endswith('.pik') else '%s%s' % (filename, '.pik')
with open(filename, 'rb') as file:
pick = pickle.load(file)
self.data = pick.data
self.map = pick.map
self.hDict = pick.hDict
self.archeArray = pick.archeArray
self.maxLen = self.__update_maxLen()
def as_n_sample_4D(self, timessteps, moving_window=False, stackSize=0, start=0,
in_walk_dir=False, keys=False, for_track=None):
stackList, keyList = list(), list()
if moving_window:
for i, key in enumerate(self.keys()):
if for_track:
track = self[for_track]
else:
track = self[key]
if stackSize and len(stackList) > stackSize:
stackList = stackList[:stackSize]
if keys:
keyList = keyList[:stackSize]
break
if i >= start:
isoArray = self.map.isovists.get_items_for_track(track, dim='full', in_walk_dir=in_walk_dir)
isoArray = isoArray.swapaxes(0, 2)
tempSequence = [isoArray[j:j + timessteps] for j in range(len(isoArray) - (timessteps - 1))]
if keys:
tempKeys = [track[(j + 1) + timessteps // 2] for j in range(len(tempSequence))]
keyList.extend(tempKeys)
stackList.extend(tempSequence)
if for_track:
break
else:
for i, key in enumerate(self.keys()):
if for_track:
track = self[for_track]
else:
track = self[key]
if stackSize and i > stackSize*(start+1):
break
if i >= start*stackSize:
diaStack = self.map.isovists.get_items_for_track(track, dim='full',
in_walk_dir=in_walk_dir).swapaxes(0, 2)
x = diaStack.shape[0] // timessteps
tempSequence = np.array_split(diaStack[:timessteps*x], x)
if keys:
tempKeys = [track[(j * timessteps) + 1 + (timessteps // 2)] for j in range(len(tempSequence))]
keyList.extend(tempKeys)
stackList.extend(tempSequence)
if for_track:
break
if keys:
return keyList, np.stack(stackList).astype(int)[..., None]
else:
return np.stack(stackList).astype(int)[..., None]
def as_flatTfArray(self, maxLen=0):
if maxLen == -1: # Ignore the maxLen Parameter
return np.dstack([self.map.isovists.get_items_for_track(self[key]) for key in self.keys()])
if not maxLen: # Default maxLen of longest track in collection, not calles when: maxLen >= 1
maxLen = self.maxLen
resultArray = None
for track in self:
sizedArray = np.dstack([self.map.isovists[self[track][-1]].get_1D_array() for _ in range(maxLen)])
isoArray = self.map.isovists.get_items_for_track(self[track])
sizedArray[:isoArray.shape[0], :isoArray.shape[1], :isoArray.shape[2]] = isoArray
if resultArray is not None:
resultArray = np.vstack([resultArray, sizedArray.ravel()])
elif resultArray is None:
resultArray = sizedArray.ravel()
return resultArray
def draw_track(self, key, saveIMG=''):
imArray = self.map.imgArray.copy()
for pixel in self[key]:
imArray[pixel] = 15
imshow(self.map.imgArray)
if saveIMG:
if not isinstance(saveIMG, str):
raise TypeError('Needs to be a String or Basestring as Name')
saveIMG = saveIMG if saveIMG.endswith('.tif') else '%s.tif' % saveIMG
savefig(saveIMG)
def read_from_basemap(self, startColor, trackColor, endColor):
def find_next_candidates(p):
positions = [(p[0]-1, p[1]-1),
(p[0], p[1]-1),
(p[0]+1, p[1]-1),
(p[0]-1, p[1]),
(p[0]+1, p[1]),
(p[0]-1, p[1]+1),
(p[0], p[1]+1),
(p[0]+1, p[1]+1)]
return [point for point in positions if self.map.imgArray[point] in [endColor, trackColor]]
startTiles, tracks = list(), list()
for idx, value in np.ndenumerate(self.map.imgArray):
if value == startColor:
startTiles.append(idx)
for startTile in startTiles:
currentTrack = list()
position = startTile
while self.map.imgArray[position] != endColor:
currentTrack.append(position)
c = find_next_candidates(position)
if len(c) == 1:
position = c[0]
elif len(c) == 2:
position = c[0] if c[0] not in currentTrack else c[1]
else:
raise ValueError('Something went wrong here, maybe no stop position?')
tracks.append(Track(currentTrack, self.map.walkableTiles, qhull=False))
self.add_n_bunch_tracks(0, 0, 0, tracks)
print('pass')
pass
class Track(UserList):
def __init__(self, NodeList, walkableTiles, qhull=True):
if not isinstance(NodeList, list):
raise TypeError
super(UserList, self).__init__()
self.walkableTiles = walkableTiles
self.data = NodeList.copy()
self.group = None
self.vertex = None
self.hull = sp.ConvexHull(np.array(self)) if qhull else None
self.tracklen = self.length()
def __setitem__(self, i, item):
self.data[i] = item
self.tracklen = self.length()
def __delitem__(self, i):
del self.data[i]
self.tracklen = self.length()
def length(self, *args):
"""
:param args: Pass a foreign list if Points list(tuple(x,y)), this function then acts as @static method
:type args: [(int,int)]
:return: Sum of distance between every following point.
:rtype: float
Reference:
http://stackoverflow.com/questions/21216841/length-of-a-list-of-points/21217048
"""
if len(args) == 0:
return sum([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in zip(self[:-1], self[1:])])
else:
return sum([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in zip(args[0][:-1], args[0][1:])])
def areaCHull(self):
# http://stackoverflow.com/questions/21727199/
# python-convex-hull-with-scipy-spatial-delaunay-how-to-eleminate-points-inside-t
if not self.hull:
self.hull = sp.ConvexHull(np.array(self))
xList, yList = [self.hull.points[i][0] for i in self.hull.vertices], \
[self.hull.points[i][1] for i in self.hull.vertices]
# http: // stackoverflow.com / questions / 19873596 / convex - hull - area - in -python
return 0.5 * np.abs(np.dot(xList, np.roll(yList, 1)) - np.dot(yList, np.roll(xList, 1)))
def getStart(self):
return self[0]
def getTarget(self):
return self[-1]
def isHomotopic(self, track, indoorToolset, baseArray=None):
if not isinstance(track, Track):
raise TypeError
l, l2 = self.data.copy(), track.data.copy()
l.extend(reversed(l2))
img = Image.new('L', (indoorToolset.width, indoorToolset.height), 0)
ImageDraw.Draw(img).polygon(l, outline=1, fill=1)
binPoly = np.array(img)
if baseArray is None:
baseArray = indoorToolset.imgArray
a = (binPoly * baseArray).sum()
if a >= 1:
return False
else:
return True
def mergeWith(self, track):
"""
:param track: A track to merge with
:type track: Track or list
:return: Two merged tracks
:rtype: Track
"""
if isinstance(track, Track):
l2 = track.data
elif isinstance(track, list):
l2 = track
else:
typ, prefix = str(type(track)), ('a', 'e', 'i', 'o', 'u')
raise TypeError('The has to be a List or a Track, but was %s: "%s"' %
('an' if typ.startswith(prefix) else 'a', typ))
l = self.data
l.extend(l2)
return Track(l, self.walkableTiles)
def return_isovists(self, trackCollection=None, indoorToolset=None):
if isinstance(trackCollection, TrackCollection):
return [trackCollection.map.isovists[point] for point in self]
elif isinstance(indoorToolset, IndoorToolset):
return [trackCollection.isovists[point] for point in self]
else:
print('Please provide a TrackCollection or a Indoortoolset that holds the Isovist reference.')
return False
class IndoorToolset(object):
def __init__(self, imageArray, walkableTiles, graph=None, worker=None, isoVistSize=0):
"""
:param graph: An optional Graph
:type graph: nx.Graph
"""
if not isinstance(imageArray, np.ndarray) or not isinstance(worker, Worker):
raise TypeError
self.walkableTiles = walkableTiles
self.imgArray = imageArray
self.shape = self.imgArray.shape
self.height = self.shape[0]
self.width = self.shape[1]
if graph is not None and isinstance(graph, nx.Graph):
self.graph = graph.copy()
else:
self.graph = self.translate_to_graph()
self.__rand = random.Random()
self.isovists = IsovistCollection(self.walkableTiles, isoVistSize, self.imgArray,
worker=worker, single_threaded=True)
def refresh_random_clock(self):
self.__rand.seed(time.clock())
def copy(self):
return IndoorToolset(self.imgArray.copy(), self.walkableTiles, graph=self.graph.copy())
def __len__(self):
return self.width * self.height
def show(self, *args, graphUpdate=False, saveIMG=False, hClass=False, trackList=None, allTracks=None):
def print_n_show(img, name, hot=False):
# http://stackoverflow.com/questions/9406400/how-can-i-use-a-pre-made-color-map-for-my-heat-map-in-matplotlib
if hot:
imshow(img, cmap=get_cmap("hot")) # , interpolation='nearest') # Additional Option
else:
imshow(img)
if saveIMG:
savefig(name)
show()
if graphUpdate:
maX = max(nx.get_node_attributes(self.graph, 'count').values())
for node in self.graph.nodes():
maX = maX if maX >= 1 else 1
color = int(self.graph.node[node]['count'] / maX * 100)
self.imgArray[node] = color if color is not 0 else 255
print_n_show(self.imgArray, "heatMap.tif", hot=True)
if hClass:
if not args[0]:
print('Not Classified!')
pass
else:
hDict = args[0].copy()
for i, hClassKey in enumerate(hDict.keys()):
color = i + 1 * 255 / len(hDict)
for track in hDict[hClassKey]:
self.imgArray[track] = int(color)
print_n_show(self.imgArray.copy(), 'hClass.tif')
if allTracks:
for track in allTracks:
for pixel in track:
self.imgArray[pixel] = 7
if trackList:
if isinstance(trackList, list):
for i, track in enumerate(trackList):
color = i + 1 * 255 / len(trackList)
for pixel in track:
self.imgArray[pixel] = int(color)
print_n_show(self.imgArray, 'tracks.tif')
def getRandomPos(self, verbose=False):
"""
:param verbose: Print more infromation
:type verbose: bool
:return: A random walkable position in the graph
:rtype: (int, int)
"""
self.refresh_random_clock()
rs = self.graph.nodes()[self.__rand.randrange(len(self.graph))]
if verbose:
print(rs, ' is random Position -> checking accessibiliy')
if self.imgArray[rs] in self.walkableTiles:
notWalkable = True
else:
notWalkable = False
while notWalkable:
self.refresh_random_clock()
rs = self.graph.nodes()[self.__rand.randrange(len(self.graph))]
if verbose:
print(rs, 'needed to be computed, upper was not walkable')
if verbose:
print('is valid!')
return rs
def translate_to_graph(self):
graph = nx.Graph()
for idx, value in np.ndenumerate(self.imgArray):
if value in self.walkableTiles:
x, y = idx
graph.add_node((x, y), count=0)
# up
if graph.has_node((x, y - 1)):
graph.add_edge((x, y), (x, y - 1), weight=1)
# upLeft
if graph.has_node((x - 1, y - 1)):
graph.add_edge((x, y), (x - 1, y - 1), weight=sqrt(2))
# lowerLeft
if graph.has_node((x - 1, y + 1)):
graph.add_edge((x, y), (x - 1, y + 1), weight=sqrt(2))
# left
if graph.has_node((x - 1, y)):
graph.add_edge((x, y), (x - 1, y), weight=1)
return graph
def calculate_path(self, source, target, alg='dijkstra', path=None, penalty=None, qhull=True):
"""
Calculate a path through the graph, based on the Bitmap you imported.
:param source: (X,Y) Positions Tuple to route from
:type source: (int, int)
:param target: (X,Y) Positions Tuple to route to
:type target: (int, int)
:param alg: Define the Routing Algorithm through the graph
:type alg: basestring
:param path: Use this for Updating edge weights for an allready given Path
in form of a (X,Y) Position Tuple List
:type path: ((int, int))
:param penalty: Set a Nummer for applying edge weights
:type penalty: None or float
:return: Calculates an Path with the given algorithm, default: 'Dijkstra'
:rtype: Track
"""
dij_s_p = list()
if not path and not isinstance(path, list):
if alg == 'dijkstra':
dij_s_p = nx.dijkstra_path(self.graph, source, target, weight='weight')
elif alg == 'bi_dijkstra':
dij_s_p = nx.bidirectional_dijkstra(self.graph, source, target, weight='weight')
else:
dij_s_p = path
if penalty and (isinstance(penalty, float) or isinstance(penalty, int)):
for node in dij_s_p.copy():
oldCount = self.graph.node[node]['count']
self.graph.add_node(node, count=oldCount + penalty)
for currNode, nextNode in zip(dij_s_p[:-1], dij_s_p[1:]):
oldWeight = self.graph.edge[currNode][nextNode]['weight']
self.graph.add_edge(currNode, nextNode, weight=oldWeight + penalty)
track = Track(dij_s_p, self.walkableTiles, qhull=qhull)
print(len(dij_s_p), ' Len Path generated -> Nodes: ', len(self.graph), ' -> Edges: ', len(self.graph.edges()))
return track
def return_random_path(self, penalty=None, safe=False):
"""
Calculate a single random shortest path
:param penalty: Set a Nummer for applying edge weights
:type penalty: None or float
:param safe: Apply a connectivity check before returning.
Affects the performance, only usefull when dealing with multiple non connected components.
:type safe: bool
:return: A Random shortest Path.
:rtype: Track
"""
p, p2 = (0, 0), (0, 0)
# while angle in degree modulo 45 == 0 or distance <= sqrt(2)
while degrees(atan2(p[1] - p2[1], p[0] - p2[0])) % 45 == 0 or hypot(p[0] - p2[0], p[1] - p2[1]) <= sqrt(2):
p, p2 = self.getRandomPos(), self.getRandomPos()
print('source: ', p, 'target: ', p2, ' - generated')
if safe:
while not nx.has_path(self.graph, p, p2):
p, p2 = self.getRandomPos(), self.getRandomPos()
print('unconnected -> New Try: S=', p, ' T=', p2)
return self.calculate_path(p, p2, penalty=penalty)
# Extraction Function - had to be static because of multiprocessing
def extract_arch_attributes(track, shortestT, hClassList):
attributes = list()
# ▪ TrackID - do not use as real attribute for analysis input!!!!!!!!!!
# remove by: dataArray[:, 1:] Subset without first Attribute
attributes.append(track.vertex)
# ▪ Convex hulls area,
attributes.append(track.areaCHull())
# ▪ length,
attributes.append(track.length(track.hull.points))
#
# Longest Distance between two vertives of the Convex Hull, this is probably the euclidian distance
# between start and target - ups
# cLD = max([hypot(p1[0] - p2[0], p1[1] - p2[1]) for p1, p2 in combinations(track.hull.points, 2)]))
# attributes.append(cLD))
#
# ▪ vertices,
attributes.append(len(track))
# ▪ ,centroid - (distance to mipoint between start und target)
# http://stackoverflow.com/questions/31562534/scipy-centroid-of-convex-hull
centroid = list((np.mean(track.hull.vertices[0]), np.mean(track.hull.vertices[1])))
midpoint = list(((track[0][0] + track[-1][0]) / 2, (track[0][1] + track[-1][1]) / 2))
cMidDist = hypot(centroid[0] - midpoint[0], centroid[1] - midpoint[1])
attributes.append(cMidDist)
# ▪ and orientation --http://stackoverflow.com/questions/31735499/calculate-angle-clockwise-between-two-points
# https://docs.python.org/dev/reference/expressions.html#calls
ang1 = np.arctan2(*centroid[::-1])
ang2 = np.arctan2(*midpoint[::-1])
attributes.append(np.rad2deg((ang1 - ang2) % (2 * np.pi)))
# ▪ Length
attributes.append(track.tracklen)
# ▪ Angular sum(cancelling / positive)
# Not yet implemented
# ▪ Relative length Regarding the shortest route
attributes.append(track.tracklen / shortestT.tracklen)
# ▪ DTW distance
dist, cost, acc, path = \
dtw(np.array(track), np.array(shortestT), dist=lambda x, y: np.linalg.norm(x - y, ord=1))
attributes.append(dist)
# ▪ Pixels average / min “heat”
# Not yet implemented
# ▪ Homotrophic class - remap (x,y)Coord-Tuple-Keys to int representation
attributes.append(hClassList.index(track.group))
return np.array(attributes)