import argparse from collections import defaultdict from distutils.util import strtobool import os import ast from abc import ABC, abstractmethod from tqdm import tqdm import numpy as np import torch from torch.utils.data import Dataset, ConcatDataset # Command line argument parsing def build_parse_commands(): # Init the Command Line Arguments Parser arg_parser = argparse.ArgumentParser(description='VAE and GSE Autoencoder with latent Space Clustering Approaches') # Specify a pretrained weight file to load. arg_parser.add_argument('--model_file', nargs='?', default='', help='Specify a pretrained model file to load.') # Specify a pretrained weight file to load. arg_parser.add_argument('--files', nargs='?', default='', help='Set the raw data location. Should be filled with maps and trajectories') # Set a fixed prng seed. arg_parser.add_argument('--seed', nargs='?', default=-999, help='Set a fixed prng seed.') # DataSet parameters arg_parser.add_argument('--size', nargs='?', default=9, help='Set a trajectory length; the number of isovists.') arg_parser.add_argument('--step', nargs='?', default=5, help='Set a fixed stepsize between isovist centers.') arg_parser.add_argument('--overlapping', nargs='?', default=True, help='Whether the Isovists should overlap.') # Specify the Map to use in Training and visualization arg_parser.add_argument('-p', '--print_on_map', default=False, type=strtobool, help='Whether trajcetories should be colored and displayed on a map.') arg_parser.add_argument('-l', '--print_latent', default=False, type=strtobool, help='Whether latent encoding space should be colored and displayed.') arg_parser.add_argument('-d', '--divided_latent_viz', default=False, type=strtobool, help='Whether latent encoding space should be colored and displayed seperatein saae case.') return arg_parser.parse_args() class AbstractDataset(ConcatDataset, ABC): # maps = ['hotel', 'tum','gallery', 'queens', 'oet'] @property def maps(self): return ['test', 'test2'] # return ['hotel', 'tum','gallery', 'queens', 'oet'] @property @abstractmethod def raw_filenames(self): raise NotImplementedError('Specify the file ending here') @property def raw_paths(self): return [os.path.join(self.path, 'raw', x) for x in self.raw_filenames] @property def processed_filenames(self): return [f'{x}_{self.__class__.__name__}.to' for x in self.maps] @property def processed_paths(self): return [os.path.join(self.path, 'processed', x) for x in self.processed_filenames] def __init__(self, path, refresh=False, transforms=None, **kwargs): self.path = path self.refresh = refresh self.transforms = transforms or None super(AbstractDataset, self).__init__(datasets=self._load_datasets()) @abstractmethod def process(self, filepath): raise NotImplementedError def _load_datasets(self): if self.refresh: for filepath in self.processed_paths: try: os.remove(filepath) print('Processed Location "Refreshed" (We deleted the Files)') except FileNotFoundError: print('You meant to refresh the allready processed dataset, but there were none...') print('continue processing') pass datasets = [] # ToDo: Make this nicer for map_idx, _ in tqdm(enumerate(self.maps), total=len(self.maps), unit="files" ): while True: try: device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') datasets.append(torch.load(self.processed_paths[map_idx], map_location=device)) break except FileNotFoundError: os.makedirs(os.path.join(*os.path.split(self.processed_paths[map_idx])[:-1]), exist_ok=True) processed = self.process(self.raw_paths[map_idx]) tqdm.write(f'Dataset "{self.processed_paths[map_idx]}" processed') torch.save(processed, self.processed_paths[map_idx]) continue return datasets class DataContainer(AbstractDataset): @staticmethod def calculate_model_shapes(size, step, **kwargs): return @property def raw_filenames(self): return [f'{x}_trajec.csv' for x in self.maps] def __init__(self, path, size, step, **kwargs): self.size = size self.step = step super(DataContainer, self).__init__(path, **kwargs) pass def process(self, filepath): dataDict = defaultdict(list) total_lines = len(open(filepath,'r').readlines()) with open(filepath, 'r') as f: delimiter = ',' # Separate the header headers = f.readline().rstrip().split(delimiter) headers.remove('inDoor') # Iterate over every line and convert it to float / value # ToDo: Make this nicer for line in tqdm(f, total=total_lines, unit=" lines", mininterval=1, miniters=1000): if line == '': continue else: for attr, x in zip(headers, line.rstrip().split(delimiter)[None:None]): if attr not in ['inDoor']: dataDict[attr].append(ast.literal_eval(x)) return Trajectories(self.size, self.step, headers, transforms=self.transforms, **dataDict) class Trajectories(Dataset): # As in "To take hold of isovists and isovist fields" - M. L. Benedikt, read only measures specified by Benedikt @property def isovistMeasures(self): return ['X', 'Z', 'realSurfacePerimeter', 'occlusionValue', 'area', 'variance', 'skewness', 'circularity_ben'] @property def features(self): return len(self.isovistMeasures) def __init__(self, size, step, headers, transforms=None, **kwargs): super(Trajectories, self).__init__() self.size: int = size self.step: int = step self.headers: list = headers self.transforms: list = transforms or list() self.data = self.__init_data_(**kwargs) pass def __init_data_(self, **kwargs): dataDict = dict() for key, val in kwargs.items(): if key in self.isovistMeasures: dataDict[key] = torch.tensor(val) # Check if all keys are of same length assert len(set(x.size()[0] for x in dataDict.values() if torch.is_tensor(x))) <= 1 data = torch.stack([dataDict[key] for key in self.isovistMeasures], dim=-1) for transformation in self.transforms: # All but x,y data[:, 2:] = transformation(data[:, 2:]) return data def __iter__(self): for i in range(len(self)): yield self[i] def __getitem__(self, item, coords=False): """ Return a trajectory sample from the dataset by a specific key. :param item: The index number of the trajectory to return. :return: """ subList = self.data[item:item + self.size * self.step or None:self.step] xy, tensor = subList[:, :2], subList[:, 2:] return (xy, tensor) if coords else tensor def __len__(self): total_len = self.data.size()[0] return total_len - (self.size * self.step - (self.step - 1)) class MapContainer(AbstractDataset): @property def raw_filenames(self): return [f'{x}_map.csv' for x in self.maps] def __init__(self, path, **kwargs): super(MapContainer, self).__init__(path, **kwargs) pass def process(self, filepath): dataDict = defaultdict(list) with open(filepath, 'r') as f: delimiter = ',' # Separate the header headers = f.readline().rstrip().split(delimiter) # Iterate over every line and convert it to float / value # ToDo: Make this nicer for line in tqdm(f): if line == '': continue else: for attr, x in zip(headers, line.rstrip().split(delimiter)[None:None]): dataDict[attr].append(ast.literal_eval(x)) return Map(np.asarray([dataDict[head] for head in headers])) class Map(object): def __init__(self, mapData: np.ndarray): """ This is a Container Class for triangulated basemaps in csv format. :param mapData: The map as np.ndarray, already read from disk. """ self.map: np.ndarray = mapData self.minx, self.maxx = np.min(self.map[[0, 2, 4]]), np.max(self.map[[0, 2, 4]]) self.miny, self.maxy = np.min(self.map[[1, 3, 5]]), np.max(self.map[[1, 3, 5]]) print('BaseMap Initialized') def __len__(self): return self.map.shape[0] def vertices(self): vertices = self.map.reshape((-1, 2, 3)) return vertices def __getitem__(self, item): return self.map[item].reshape(3, 2) if __name__ == '__main__': args = build_parse_commands() if args.seed != -999: np.random.seed(args.seed) torch.manual_seed(args.seed) # d = DataContainer(args.files, args.size, args.step) m = MapContainer(args.files, refresh=True) print(len(m[1]))