2019-09-29 09:37:30 +02:00

360 lines
14 KiB
Python

from typing import Union
from functools import reduce
from statistics import stdev
from sklearn.cluster import Birch, KMeans, DBSCAN
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from dataset import *
from networks.modules import AbstractNeuralNetwork
from matplotlib import pyplot as plt
from matplotlib.patches import Polygon
from matplotlib.collections import LineCollection, PatchCollection
import matplotlib.colors as mcolors
import matplotlib.cm as cmaps
from math import pi
def search_for_weights(func, folder, file_type='latent_space'):
while not os.path.exists(folder):
if len(os.path.split(folder)) >= 50:
raise FileNotFoundError(f'The folder "{folder}" could not be found')
folder = os.path.join(os.pardir, folder)
if any([file_type in x.name for x in os.scandir(folder)]):
return
if any(['.ckpt' in element.name and element.is_dir() for element in os.scandir(folder)]):
_, _, filenames = next(os.walk(os.path.join(folder, 'weights.ckpt')))
filenames.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))
func(os.path.join(folder, 'weights.ckpt', filenames[-1]))
return
for element in os.scandir(folder):
if os.path.exists(element):
if element.is_dir():
search_for_weights(func, element.path, file_type=file_type)
elif element.is_file() and element.name.endswith('.ckpt'):
func(element.path)
else:
continue
class Printer(object):
def __init__(self, model: AbstractNeuralNetwork, ax=None):
self.norm = mcolors.Normalize(vmin=0, vmax=1)
self.colormap = cmaps.gist_rainbow
self.network = model
self.fig = plt.figure(dpi=300)
self.ax = ax if ax else plt.subplot(1, 1, 1)
pass
def colorize(self, x, min_val: Union[float, None] = None, max_val: Union[float, None] = None,
colormap=cmaps.rainbow, **kwargs):
norm = mcolors.Normalize(vmin=min_val, vmax=max_val)
colored = colormap(norm(x))
return colored
@staticmethod
def project_to_2d(data: np.ndarray, method: str = 'tsne') -> np.ndarray:
projector = TSNE() if method.lower() == 'tsne' else PCA()
print('Starting TSNE Transformation')
projected_data = projector.fit_transform(data)
assert projected_data.shape[-1] == 2
print('TSNE Projection Successfull')
return projected_data
@staticmethod
def cluster_data(data: np.ndarray, cluster_center_file: str = None) -> np.ndarray:
print('Start Clustering with Birch')
if cluster_center_file:
with open(cluster_center_file, 'r') as f:
cluster_center_string = f.readlines()[0]
centers = ast.literal_eval(cluster_center_string)
clusterer = Birch(n_clusters=len(centers))
clusterer.init = np.asarray(centers)
else:
# clusterer = Birch(n_clusters=None)
clusterer = Birch()
labels = clusterer.fit_predict(data)
print('Birch Clustering Sucessfull')
return labels
def print_possible_latent_spaces(self, data: Trajectories, n: Union[int, str] = 1000, **kwargs):
predictions, _ = self._gather_predictions(data, n)
if len(predictions) >= 2:
predictions += (torch.cat(predictions, dim=-1), )
labels = self.cluster_data(predictions[-1])
for idx, prediction in enumerate(predictions):
self.print_latent_space(prediction, labels, running_index=idx, **kwargs)
def print_latent_space(self, prediction, labels, running_index=0, save=None):
self.colormap = cmaps.tab20
if isinstance(prediction, torch.Tensor):
prediction = prediction.numpy()
elif isinstance(prediction, np.ndarray):
pass
elif isinstance(prediction, list):
prediction = np.asarray(prediction)
else:
raise RuntimeError
if prediction.shape[-1] > 2:
fig, axs = plt.subplots(ncols=2, nrows=1)
transformers = [TSNE(2), PCA(2)]
print('Starting Dimensional Reduction')
for idx, transformer in enumerate(transformers):
transformed = transformer.fit_transform(prediction)
print(f'{transformer.__class__.__name__} Projection Sucessfull')
colored = self.colormap(labels)
ax = axs[idx]
ax.scatter(x=transformed[:, 0], y=transformed[:, 1], c=colored)
ax.set_title(transformer.__class__.__name__)
ax.set_xlim(np.min(transformed[:, 0])*1.1, np.max(transformed[:, 0]*1.1))
ax.set_ylim(np.min(transformed[:, 1]*1.1), np.max(transformed[:, 1]*1.1))
elif prediction.shape[-1] == 2:
fig, axs = plt.subplots()
# TODO: Build transformation for lat_dim_size >= 3
print('All Predictions sucesfully Gathered and Shaped ')
axs.set_xlim(np.min(prediction[:, 0]), np.max(prediction[:, 0]))
axs.set_ylim(np.min(prediction[:, 1]), np.max(prediction[:, 1]))
# ToDo: Insert Normalization
colored = self.colormap(labels)
plt.scatter(prediction[:, 0], prediction[:, 1], c=colored)
else:
raise NotImplementedError("Latent Dimensions can not be one-dimensional (yet).")
if save:
plt.savefig(f'{save}_{running_index}.png')
def print_latent_density(self): # , data: DataContainer):
raise NotImplementedError("My Future Self has to come up with smth")
# fig, ax = plt.subplots()
# preds = []
# for i in range(data.len - data.width * data.stepsize):
# for i in range(5000):
#
# seq = data.sub_trajectory_by_key(i, stepsize=data.stepsize)
#
# preds.append(self.nn.encoder([seq[None, ...]])[0])
#
# TODO: Build transformation for lat_dim_size >= 3
# pred_array = np.asarray(preds).reshape((-1, nn.latDim))
# k = KernelDensity()
# k.fit(pred_array)
# z = np.exp(k.score_samples(pred_array))
#
# levels = np.linspace(0, z.max(), 25)
# xgrid, ygrid = np.meshgrid(pred_array[::5, 0], pred_array[::5, 1])
# xy = np.vstack([xgrid.ravel(), ygrid.ravel()]).T
# z = np.exp(k.score_samples(xy)).reshape(xgrid.shape)
#
# plt.contourf(xgrid, ygrid, z, levels=levels, cmap=plt.cm.Reds)
# plt.show()
def _gather_predictions(self, data: Trajectories, n: int = 1000,
color_by_movement=False, **kwargs):
"""
Check if any value for n is given and gather some random datapoints from the dataset. In accordance with the
maximal possible trajectory amount that is given by stepsize * width.
Also retunr the keys for all possible predictions.
:param data:
:type data: Dataset
:param n:
:param tsne:
:param kwargs:
:return:
"""
print("Gathering Predictions")
n = n if isinstance(n, int) and n else len(data) - (data.size * data.step)
idxs = np.random.choice(np.arange(len(data) - data.step * data.size), n, replace=False)
complete_data = torch.stack([data.get_both_by_key(idx) for idx in idxs], dim=0)
segment_coords, trajectories = complete_data[:, :, :2], complete_data[:, :, 2:]
if color_by_movement:
motion_analyser = MotionAnalyser()
predictions = (motion_analyser.cluster_motion(segment_coords), )
else:
with torch.no_grad():
predictions = self.network(trajectories)[:-1]
return predictions, segment_coords
@staticmethod
def colorize_as_hsv(self, x, min_val: Union[float, None] = None, max_val: Union[float, None] = None,
colormap=cmaps.rainbow, **kwargs):
norm = mcolors.Normalize(vmin=min_val, vmax=max_val)
colored = colormap(norm(x))
return colored
def _build_trajectory_shapes(self, predictions: np.ndarray, segment_coordinates,
axis=None, transformation=TSNE, **kwargs):
if not isinstance(predictions, np.ndarray):
predictions = tuple((x if torch.is_tensor(x) else torch.from_numpy(x) for x in predictions))
predictions = torch.cat(predictions, dim=-1)
if axis is not None:
predictions = predictions[:, axis][..., None]
if predictions.shape[-1] >= 4:
if True:
predictions = Birch(n_clusters=3).fit_predict(predictions).reshape(-1, 1)
else:
transformer = transformation(n_components=3, random_state=42)
predictions = transformer.fit_transform(predictions)
if predictions.shape[-1] == 1:
colored = self.colorize(predictions.reshape(-1), **kwargs)
elif predictions.shape[-1] == 2:
colored = self.colorize(predictions[:, 0], **kwargs)
if kwargs.get('min_val', None):
lightning = mcolors.Normalize(vmin=kwargs.get('min_val', None), vmax=kwargs.get('max_val', None))
else:
lightning = mcolors.Normalize()
alpha = lightning(predictions[:, 1])
colored[:, -1] = alpha
elif predictions.shape[-1] == 3:
norm = mcolors.Normalize()
colored = [(r, g, b) for r,g,b in norm(predictions)]
else:
raise NotImplementedError('Full Prediction Shape was: {}'.format(predictions.shape))
# TODO Build a isomap or tsne transformation here to get a two dimensional space
segment_coordinates = segment_coordinates.cpu() if torch.is_tensor(segment_coordinates) else segment_coordinates
return LineCollection(segment_coordinates, linewidths=(1, 1, 1, 1),
colors=colored, linestyle='solid')
@staticmethod
def _build_map_shapes(base_map: Map):
# Base Map Plotting
# filled Triangle
patches = [Polygon(base_map[i], True, color='black') for i in range(len(base_map))]
return PatchCollection(patches, color='black')
def print_trajec_on_basemap(self, data, base_map: Map, save=False, color_by_movement=False, **kwargs):
"""
:rtype: object
"""
prediction_segments = self._gather_predictions(data, color_by_movement=color_by_movement, **kwargs)
trajectory_shapes = self._build_trajectory_shapes(*prediction_segments, **kwargs)
map_shapes = self._build_map_shapes(base_map)
self.ax.add_collection(trajectory_shapes)
self.ax.axis('auto')
self.ax.add_collection(map_shapes)
self.ax.set_title('Trajectories on BaseMap')
if save:
if isinstance(save, str):
self.save(save)
else:
self.save(base_map.name)
pass
@staticmethod
def show():
plt.show()
return True
@staticmethod
def save(filename):
plt.savefig(filename)
class MotionAnalyser(object):
def __init__(self):
pass
def _sequential_pairwise_map(self, func, xy_sequence, on_deltas=False):
zipped_list = [x for x in zip(xy_sequence[:-1], xy_sequence[1:])]
if on_deltas:
zipped_list = [self.delta(*movement) for movement in zipped_list]
else:
pass
return [func(*xy) for xy in zipped_list]
@staticmethod
def delta(x1y1, x2y2):
x1, y1 = x1y1
x2, y2 = x2y2
return x2-x1, y2-y1
@staticmethod
def get_r(deltax, deltay):
# https://mathinsight.org/polar_coordinates
r = torch.sqrt(deltax**2 + deltay**2)
return r
@staticmethod
def get_theta(deltax, deltay, rad=False):
# https://mathinsight.org/polar_coordinates
theta = torch.atan2(deltay, deltax)
return theta if rad else theta * 180 / pi
def get_theta_for_sequence(self, xy_sequence):
ts = self._sequential_pairwise_map(self.get_theta, xy_sequence, on_deltas=True)
return ts
def get_r_for_sequence(self, xy_sequence):
rs = self._sequential_pairwise_map(self.get_r, xy_sequence, on_deltas=True)
return rs
def get_unique_seq_identifier(self, xy_sequence):
# Globals
global_delta = self.delta(xy_sequence[0], xy_sequence[-1])
global_theta = self.get_theta(*global_delta)
global_r = self.get_r(*global_delta)
# For Each
theta_seq = self.get_theta_for_sequence(xy_sequence)
mean_theta = sum(theta_seq) / len(theta_seq)
theta_sum = sum([abs(theta) for theta in theta_seq])
std_theta = stdev(map(float, theta_seq))
return torch.stack((global_r, torch.as_tensor(std_theta), mean_theta, global_theta))
def cluster_motion(self, trajectory_samples, cluster_class=KMeans):
cluster_class = cluster_class(3)
std, mean = torch.std_mean(trajectory_samples, dim=0)
trajectory_samples = (trajectory_samples - mean) / std
unique_seq_identifiers = torch.stack([self.get_unique_seq_identifier(trajectory)
for trajectory in trajectory_samples])
clustered_movement = cluster_class.fit_predict(unique_seq_identifiers)
if False:
from sklearn.decomposition import PCA
p = PCA(2)
t = p.fit_transform(unique_seq_identifiers)
f = plt.figure()
plt.scatter(t[:, 0], t[:,1])
plt.show()
return clustered_movement.reshape(-1, 1)
if __name__ == '__main__':
raise PermissionError('This file should not be called.')