ae_toolbox_torch/data/dataset.py

253 lines
8.9 KiB
Python

import argparse
from collections import defaultdict
from distutils.util import strtobool
import os
import ast
from abc import ABC, abstractmethod
from tqdm import tqdm
import numpy as np
import torch
from torch.utils.data import Dataset, ConcatDataset
# Command line argument parsing
def build_parse_commands():
# Init the Command Line Arguments Parser
arg_parser = argparse.ArgumentParser(description='VAE and GSE Autoencoder with latent Space Clustering Approaches')
# Specify a pretrained weight file to load.
arg_parser.add_argument('--model_file', nargs='?', default='',
help='Specify a pretrained model file to load.')
# Specify a pretrained weight file to load.
arg_parser.add_argument('--files', nargs='?', default='',
help='Set the raw data location. Should be filled with maps and trajectories')
# Set a fixed prng seed.
arg_parser.add_argument('--seed', nargs='?', default=-999, help='Set a fixed prng seed.')
# DataSet parameters
arg_parser.add_argument('--size', nargs='?', default=9, help='Set a trajectory length; the number of isovists.')
arg_parser.add_argument('--step', nargs='?', default=5, help='Set a fixed stepsize between isovist centers.')
arg_parser.add_argument('--overlapping', nargs='?', default=True, help='Whether the Isovists should overlap.')
# Specify the Map to use in Training and visualization
arg_parser.add_argument('-p', '--print_on_map', default=False, type=strtobool,
help='Whether trajcetories should be colored and displayed on a map.')
arg_parser.add_argument('-l', '--print_latent', default=False, type=strtobool,
help='Whether latent encoding space should be colored and displayed.')
arg_parser.add_argument('-d', '--divided_latent_viz', default=False, type=strtobool,
help='Whether latent encoding space should be colored and displayed seperatein saae case.')
return arg_parser.parse_args()
class AbstractDataset(ConcatDataset, ABC):
# maps = ['hotel', 'tum','gallery', 'queens', 'oet']
@property
def maps(self):
return ['hotel', 'tum','gallery', 'queens', 'oet']
@property
@abstractmethod
def raw_filenames(self):
raise NotImplementedError('Specify the file ending here')
@property
def raw_paths(self):
return [os.path.join(self.path, 'raw', x) for x in self.raw_filenames]
@property
def processed_filenames(self):
return [f'{x}_{self.__class__.__name__}.to' for x in self.maps]
@property
def processed_paths(self):
return [os.path.join(self.path, 'processed', x) for x in self.processed_filenames]
def __init__(self, path, refresh=False, **kwargs):
self.path = path
self.refresh = refresh
super(AbstractDataset, self).__init__(datasets=self._load_datasets())
@abstractmethod
def process(self, filepath):
raise NotImplementedError
def _load_datasets(self):
if self.refresh:
for filepath in self.processed_paths:
try:
os.remove(filepath)
print('Processed Location "Refreshed" (We deleted the Files)')
except FileNotFoundError:
print('You meant to refresh the allready processed dataset, but there were none...')
print('continue processing')
pass
datasets = []
# ToDo: Make this nicer
for map_idx, _ in tqdm(enumerate(self.maps),
total=len(self.maps), unit="files"
):
while True:
try:
datasets.append(torch.load(self.processed_paths[map_idx]))
print(f'Dataset "{self.processed_paths[map_idx]}" loaded')
break
except FileNotFoundError:
os.makedirs(os.path.join(*os.path.split(self.processed_paths[map_idx])[:-1]), exist_ok=True)
processed = self.process(self.raw_paths[map_idx])
torch.save(processed, self.processed_paths[map_idx])
continue
return datasets
class DataContainer(AbstractDataset):
@staticmethod
def calculate_model_shapes(size, step, **kwargs):
return
@property
def raw_filenames(self):
return [f'{x}_trajec.csv' for x in self.maps]
def __init__(self, path, size, step, **kwargs):
self.size = size
self.step = step
super(DataContainer, self).__init__(path, **kwargs)
pass
def process(self, filepath):
dataDict = defaultdict(list)
with open(filepath, 'r') as f:
delimiter = ','
# Separate the header
headers = f.readline().rstrip().split(delimiter)
headers.remove('inDoor')
# Iterate over every line and convert it to float / value
# ToDo: Make this nicer
for line in tqdm(f, total=len(self.maps), unit="lines"):
if line == '':
continue
else:
for attr, x in zip(headers, line.rstrip().split(delimiter)[None:None]):
if attr not in ['inDoor']:
dataDict[attr].append(ast.literal_eval(x))
return Trajectories(self.size, self.step, headers, **dataDict)
class Trajectories(Dataset):
# As in "To take hold of isovists and isovist fields" - M. L. Benedikt, read only measures specified by Benedikt
@property
def isovistMeasures(self):
return ['X', 'Z', 'realSurfacePerimeter', 'occlusionValue', 'area', 'variance', 'skewness', 'circularity_ben']
@property
def features(self):
return len(self.isovistMeasures)
def __init__(self, size, step, headers, **kwargs):
super(Trajectories, self).__init__()
self.size: int = size
self.step: int = step
self.headers: list = headers
dataDict = dict()
for key, val in kwargs.items():
if key in self.isovistMeasures:
dataDict[key] = torch.tensor(val)
# Check if all keys are of same length
assert len(set(x.size()[0] for x in dataDict.values() if torch.is_tensor(x))) <= 1
self.data = torch.stack([dataDict[key] for key in self.isovistMeasures], dim=-1)
pass
def __iter_tenors__(self):
return
def __iter__(self):
for i in range(len(self)):
yield self[i]
def __getitem__(self, item, coords=False):
"""
Return a trajectory sample from the dataset by a specific key.
:param item: The index number of the trajectory to return.
:return:
"""
subList = self.data[item:item + self.size * self.step or None:self.step]
xy, tensor = subList[:, 2], subList[:, 2:]
return (xy, tensor) if coords else tensor
def __len__(self):
total_len = self.data.size()[0]
return total_len - (self.size * self.step - (self.step - 1))
class MapContainer(AbstractDataset):
@property
def raw_filenames(self):
return [f'{x}_map.csv' for x in self.maps]
def __init__(self, path, **kwargs):
super(MapContainer, self).__init__(path, **kwargs)
pass
def process(self, filepath):
dataDict = defaultdict(list)
with open(filepath, 'r') as f:
delimiter = ','
# Separate the header
headers = f.readline().rstrip().split(delimiter)
# Iterate over every line and convert it to float / value
# ToDo: Make this nicer
for line in tqdm(f):
if line == '':
continue
else:
for attr, x in zip(headers, line.rstrip().split(delimiter)[None:None]):
dataDict[attr].append(ast.literal_eval(x))
return Map(np.asarray([dataDict[head] for head in headers]))
class Map(object):
def __init__(self, mapData: np.ndarray):
"""
This is a Container Class for triangulated basemaps in csv format.
:param mapData: The map as np.ndarray, already read from disk.
"""
self.map: np.ndarray = mapData
self.minx, self.maxx = np.min(self.map[[0, 2, 4]]), np.max(self.map[[0, 2, 4]])
self.miny, self.maxy = np.min(self.map[[1, 3, 5]]), np.max(self.map[[1, 3, 5]])
print('BaseMap Initialized')
def __len__(self):
return self.map.shape[0]
def vertices(self):
vertices = self.map.reshape((-1, 2, 3))
return vertices
def __getitem__(self, item):
return self.map[item].reshape(3, 2)
if __name__ == '__main__':
args = build_parse_commands()
if args.seed != -999:
np.random.seed(args.seed)
torch.manual_seed(args.seed)
# d = DataContainer(args.files, args.size, args.step)
m = MapContainer(args.files, refresh=True)
print(len(m[1]))