project Refactor, CNN Classifier Basics

This commit is contained in:
Steffen Illium
2020-02-19 21:11:42 +01:00
parent 8424251ca0
commit 78f0df8a2a
16 changed files with 622 additions and 560 deletions

3
.idea/deployment.xml generated
View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" autoUpload="On explicit save action" serverName="steffen@aimachine:22">
<component name="PublishConfigData" serverName="steffen@aimachine:22">
<serverData>
<paths name="steffen@aimachine:22">
<serverdata>
@@ -10,6 +10,5 @@
</serverdata>
</paths>
</serverData>
<option name="myAutoUpload" value="ON_EXPLICIT_SAVE" />
</component>
</project>

View File

@@ -2,10 +2,12 @@
<dictionary name="steffen">
<words>
<w>conv</w>
<w>dataloader</w>
<w>homotopic</w>
<w>hparams</w>
<w>hyperparamter</w>
<w>numlayers</w>
<w>reparameterize</w>
<w>traj</w>
</words>
</dictionary>

View File

@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Remote Python 3.7.6 (sftp://steffen@aimachine:22/home/steffen/envs/traj_gen/bin/python)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.7 (traj_gen)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

2
.idea/misc.xml generated
View File

@@ -3,5 +3,5 @@
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.7.6 (sftp://steffen@aimachine:22/home/steffen/envs/traj_gen/bin/python)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (traj_gen)" project-jdk-type="Python SDK" />
</project>

View File

@@ -11,12 +11,17 @@ from lib.preprocessing.generator import Generator
class TrajPairDataset(Dataset):
@property
def map_shape(self):
return self._dataset.map.as_array.shape
def __init__(self, data):
super(TrajPairDataset, self).__init__()
self.alternatives = data['alternatives']
self.trajectory = data['trajectory']
self.labels = data['labels']
self.mapname = data['map']['name'][4:] if data['map']['name'].startswith('map_') else data['map']['name']
self.map = data['map']['map']
def __len__(self):
return len(self.alternatives)
@@ -26,7 +31,8 @@ class TrajPairDataset(Dataset):
class DatasetMapping(Dataset):
def __init__(self, dataset, mapping):
def __init__(self, dataset: Union[TrajPairDataset, ConcatDataset], mapping):
self._dataset = dataset
self._mapping = mapping
@@ -38,6 +44,15 @@ class DatasetMapping(Dataset):
class TrajPairData(object):
@property
def map_shapes(self):
return [dataset.map_shape for dataset in self._dataset.datasets]
@property
def map_shapes_max(self):
shapes = self.map_shapes
return map(max, zip(*shapes))
@property
def name(self):
return self.__class__.__name__
@@ -52,8 +67,7 @@ class TrajPairData(object):
self.mapname = mapname
self.train_split, self.val_split, self.test_split = train_val_test_split
self.data_root = Path(data_root)
self.maps_root = Path(data_root) if data_root else Path() / 'res' / 'maps'
self._dataset = None
self.maps_root = Path(map_root) if map_root else Path() / 'res' / 'maps'
self._dataset, self._train_map, self._val_map, self._test_map = self._load_dataset()
def _build_data_on_demand(self):

View File

@@ -1,527 +0,0 @@
from abc import ABC
from pathlib import Path
from typing import Union
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
# Utility - Modules
###################
from torch.utils.data import DataLoader
from dataset.dataset import TrajDataset
from lib.objects.map import MapStorage
class Flatten(nn.Module):
def __init__(self, to=(-1, )):
super(Flatten, self).__init__()
self.to = to
def forward(self, x):
return x.view(x.size(0), *self.to)
class Interpolate(nn.Module):
def __init__(self, size=None, scale_factor=None, mode='nearest', align_corners=None):
super(Interpolate, self).__init__()
self.interp = nn.functional.interpolate
self.size = size
self.scale_factor = scale_factor
self.align_corners = align_corners
self.mode = mode
def forward(self, x):
x = self.interp(x, size=self.size, scale_factor=self.scale_factor,
mode=self.mode, align_corners=self.align_corners)
return x
class AutoPad(nn.Module):
def __init__(self, interpolations=3, base=2):
super(AutoPad, self).__init__()
self.fct = base ** interpolations
def forward(self, x):
x = F.pad(x,
[0,
(x.shape[-1] // self.fct + 1) * self.fct - x.shape[-1] if x.shape[-1] % self.fct != 0 else 0,
(x.shape[-2] // self.fct + 1) * self.fct - x.shape[-2] if x.shape[-2] % self.fct != 0 else 0,
0])
return x
class LightningBaseModule(pl.LightningModule, ABC):
@classmethod
def name(cls):
raise NotImplementedError('Give your model a name!')
@property
def shape(self):
try:
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
except Exception as e:
print(e)
return -1
def __init__(self, params):
super(LightningBaseModule, self).__init__()
self.hparams = params
# Data loading
# =============================================================================
# Dataset
self.dataset = TrajDataset('data')
self.map_storage = MapStorage(self.hparams.data_param.map_root)
def size(self):
return self.shape
def _move_to_model_device(self, x):
return x.cuda() if next(self.parameters()).is_cuda else x.cpu()
def save_to_disk(self, model_path):
Path(model_path, exist_ok=True).mkdir(parents=True, exist_ok=True)
if not (model_path / 'model_class.obj').exists():
with (model_path / 'model_class.obj').open('wb') as f:
torch.save(self.__class__, f)
return True
@pl.data_loader
def train_dataloader(self):
return DataLoader(dataset=self.dataset.train_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
@pl.data_loader
def test_dataloader(self):
return DataLoader(dataset=self.dataset.test_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
@pl.data_loader
def val_dataloader(self):
return DataLoader(dataset=self.dataset.val_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
def configure_optimizers(self):
raise NotImplementedError
def forward(self, *args, **kwargs):
raise NotImplementedError
def validation_step(self, *args, **kwargs):
raise NotImplementedError
def validation_end(self, outputs):
raise NotImplementedError
def training_step(self, batch_xy, batch_nb, *args, **kwargs):
raise NotImplementedError
def test_step(self, *args, **kwargs):
raise NotImplementedError
def test_end(self, outputs):
from sklearn.metrics import roc_auc_score
y_scores, y_true = [], []
for output in outputs:
y_scores.append(output['y_pred'])
y_true.append(output['y_true'])
y_true = torch.cat(y_true, dim=0)
# FIXME: What did this do do i need it?
# y_true = (y_true != V.HOMOTOPIC).long()
y_scores = torch.cat(y_scores, dim=0)
roc_auc_scores = roc_auc_score(y_true.cpu().numpy(), y_scores.cpu().numpy())
print(f'AUC Score: {roc_auc_scores}')
return {'roc_auc_scores': roc_auc_scores}
def init_weights(self):
def _weight_init(m):
if hasattr(m, 'weight'):
if isinstance(m.weight, torch.Tensor):
torch.nn.init.xavier_uniform_(m.weight)
if hasattr(m, 'bias'):
if isinstance(m.bias, torch.Tensor):
m.bias.data.fill_(0.01)
self.apply(_weight_init)
class FilterLayer(nn.Module):
def __init__(self):
super(FilterLayer, self).__init__()
def forward(self, x):
tensor = x[:, -1]
return tensor
class MergingLayer(nn.Module):
def __init__(self):
super(MergingLayer, self).__init__()
def forward(self, x):
# ToDo: Which ones to combine?
return
class FlipTensor(nn.Module):
def __init__(self, dim=-2):
super(FlipTensor, self).__init__()
self.dim = dim
def forward(self, x):
idx = [i for i in range(x.size(self.dim) - 1, -1, -1)]
idx = torch.as_tensor(idx).long()
inverted_tensor = x.index_select(self.dim, idx)
return inverted_tensor
#
# Sub - Modules
###################
class ConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, activation: nn.Module = nn.ELU, pooling_size=None, use_bias=True, use_norm=True,
dropout: Union[int, float] = 0,
conv_filters=64, conv_kernel=5, conv_stride=1, conv_padding=0):
super(ConvModule, self).__init__()
# Module Paramters
self.in_shape = in_shape
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.activation = activation()
# Convolution Paramters
self.padding = conv_padding
self.stride = conv_stride
# Modules
self.dropout = nn.Dropout2d(dropout) if dropout else False
self.pooling = nn.MaxPool2d(pooling_size) if pooling_size else False
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04, affine=False) if use_norm else False
self.conv = nn.Conv2d(in_channels, conv_filters, conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride
)
def forward(self, x):
x = self.norm(x) if self.norm else x
tensor = self.conv(x)
tensor = self.dropout(tensor) if self.dropout else tensor
tensor = self.pooling(tensor) if self.pooling else tensor
tensor = self.activation(tensor)
return tensor
class DeConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, conv_filters=3, conv_kernel=5, conv_stride=1, conv_padding=0,
dropout: Union[int, float] = 0, autopad=False,
activation: Union[None, nn.Module] = nn.ReLU, interpolation_scale=None,
use_bias=True, normalize=False):
super(DeConvModule, self).__init__()
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.padding = conv_padding
self.stride = conv_stride
self.in_shape = in_shape
self.conv_filters = conv_filters
self.autopad = AutoPad() if autopad else False
self.interpolation = Interpolate(scale_factor=interpolation_scale) if interpolation_scale else False
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04, affine=False) if normalize else False
self.dropout = nn.Dropout2d(dropout) if dropout else False
self.de_conv = nn.ConvTranspose2d(in_channels, self.conv_filters, conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride)
self.activation = activation() if activation else None
def forward(self, x):
x = self.norm(x) if self.norm else x
x = self.dropout(x) if self.dropout else x
x = self.autopad(x) if self.autopad else x
x = self.interpolation(x) if self.interpolation else x
tensor = self.de_conv(x)
tensor = self.activation(tensor) if self.activation else tensor
return tensor
def size(self):
return self.shape
class RecurrentModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, hidden_size, num_layers=1, cell_type=nn.GRU, use_bias=True, dropout=0):
super(RecurrentModule, self).__init__()
self.use_bias = use_bias
self.num_layers = num_layers
self.in_shape = in_shape
self.hidden_size = hidden_size
self.dropout = dropout
self.rnn = cell_type(self.in_shape[-1] * self.in_shape[-2], hidden_size,
num_layers=num_layers,
bias=self.use_bias,
batch_first=True,
dropout=self.dropout)
def forward(self, x):
tensor = self.rnn(x)
return tensor
#
# Full Model Parts
###################
class Generator(nn.Module):
@property
def shape(self):
x = torch.randn(self.lat_dim).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, out_channels, re_shape, lat_dim, use_norm=False, use_bias=True, dropout: Union[int, float] = 0,
filters: List[int] = None, activation=nn.ReLU):
super(Generator, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
self.filters = filters
self.activation = activation
self.inner_activation = activation()
self.out_activation = None
self.lat_dim = lat_dim
self.dropout = dropout
self.l1 = nn.Linear(self.lat_dim, reduce(mul, re_shape), bias=use_bias)
# re_shape = (self.lat_dim // reduce(mul, re_shape[1:]), ) + tuple(re_shape[1:])
self.flat = Flatten(to=re_shape)
self.deconv1 = DeConvModule(re_shape, conv_filters=self.filters[0],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv2 = DeConvModule(self.deconv1.shape, conv_filters=self.filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv3 = DeConvModule(self.deconv2.shape, conv_filters=self.filters[2],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv4 = DeConvModule(self.deconv3.shape, conv_filters=out_channels,
conv_kernel=3,
conv_padding=1,
# normalize=use_norm,
activation=self.out_activation
)
def forward(self, z):
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor)
tensor = self.deconv2(tensor)
tensor = self.deconv3(tensor)
tensor = self.deconv4(tensor)
return tensor
def size(self):
return self.shape
class UnitGenerator(Generator):
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitGenerator, self).__init__(*args, **kwargs)
self.norm_f = nn.BatchNorm1d(self.l1.out_features, eps=1e-04, affine=False)
self.norm1 = nn.BatchNorm2d(self.deconv1.conv_filters, eps=1e-04, affine=False)
self.norm2 = nn.BatchNorm2d(self.deconv2.conv_filters, eps=1e-04, affine=False)
self.norm3 = nn.BatchNorm2d(self.deconv3.conv_filters, eps=1e-04, affine=False)
def forward(self, z_c1_c2_c3):
z, c1, c2, c3 = z_c1_c2_c3
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.norm(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor) + c3
tensor = self.inner_activation(tensor)
tensor = self.norm1(tensor)
tensor = self.deconv2(tensor) + c2
tensor = self.inner_activation(tensor)
tensor = self.norm2(tensor)
tensor = self.deconv3(tensor) + c1
tensor = self.inner_activation(tensor)
tensor = self.norm3(tensor)
tensor = self.deconv4(tensor)
return tensor
class BaseEncoder(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, in_shape, lat_dim=256, use_bias=True, use_norm=False, dropout: Union[int, float] = 0,
latent_activation: Union[nn.Module, None] = None, activation: nn.Module = nn.ELU,
filters: List[int] = None):
super(BaseEncoder, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
# Optional Padding for odd image-sizes
# Obsolet, already Done by autopadding module on incoming tensors
# in_shape = [x+1 if x % 2 != 0 and idx else x for idx, x in enumerate(in_shape)]
# Parameters
self.lat_dim = lat_dim
self.in_shape = in_shape
self.use_bias = use_bias
self.latent_activation = latent_activation() if latent_activation else None
# Modules
self.conv1 = ConvModule(self.in_shape, conv_filters=filters[0],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv2 = ConvModule(self.conv1.shape, conv_filters=filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv3 = ConvModule(self.conv2.shape, conv_filters=filters[2],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.flat = Flatten()
def forward(self, x):
tensor = self.conv1(x)
tensor = self.conv2(tensor)
tensor = self.conv3(tensor)
tensor = self.flat(tensor)
return tensor
class UnitEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitEncoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
c1 = self.conv1(x)
c2 = self.conv2(c1)
c3 = self.conv3(c2)
tensor = self.flat(c3)
l1 = self.l1(tensor)
return c1, c2, c3, l1
class VariationalEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(VariationalEncoder, self).__init__(*args, **kwargs)
self.logvar = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
self.mu = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
@staticmethod
def reparameterize(mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def forward(self, x):
tensor = super(VariationalEncoder, self).forward(x)
mu = self.mu(tensor)
logvar = self.logvar(tensor)
z = self.reparameterize(mu, logvar)
return mu, logvar, z
class Encoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(Encoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
tensor = super(Encoder, self).forward(x)
tensor = self.l1(tensor)
tensor = self.latent_activation(tensor) if self.latent_activation else tensor
return tensor

View File

@@ -1,4 +1,6 @@
from lib.models.blocks import RecurrentModule, ConvModule, DeConvModule, Generator, LightningBaseModule
from dataset.dataset import TrajPairData
from lib.modules.blocks import ConvModule
from lib.modules.utils import LightningBaseModule
class CNNRouteGeneratorModel(LightningBaseModule):
@@ -23,5 +25,21 @@ class CNNRouteGeneratorModel(LightningBaseModule):
def __init__(self, *params):
super(CNNRouteGeneratorModel, self).__init__(*params)
# Dataset
self.dataset = TrajPairData(self.hparams.data_param.data_root)
# Additional Attributes
self.in_shape = self.dataset.map_shapes_max
# NN Nodes
self.conv1 = ConvModule(self.in_shape, self.hparams.model_param.filters[0])
self.conv2 = ConvModule(self.conv1.shape, self.hparams.model_param.filters[0])
self.conv3 = ConvModule(self.conv2.shape, self.hparams.model_param.filters[0])
def forward(self, x):
pass

View File

@@ -1,10 +1,8 @@
from lib.models.blocks import RecurrentModule, ConvModule, DeConvModule, Generator, LightningBaseModule
from lib.models.losses import BinaryHomotopicLoss
from lib.modules.blocks import LightningBaseModule
from lib.modules.losses import BinaryHomotopicLoss
from lib.objects.map import Map
from lib.objects.trajectory import Trajectory
import torch
import torch.functional as F
import torch.nn as nn
nn.MSELoss

View File

@@ -0,0 +1,32 @@
from lib.modules.utils import LightningBaseModule
from lib.modules.blocks import ConvModule
class ConvHomDetector(LightningBaseModule):
name = 'CNNHomotopyClassifier'
def configure_optimizers(self):
pass
def validation_step(self, *args, **kwargs):
pass
def validation_end(self, outputs):
pass
def training_step(self, batch_xy, batch_nb, *args, **kwargs):
pass
def test_step(self, *args, **kwargs):
pass
def __init__(self, *params):
super(ConvHomDetector, self).__init__(*params)
self.conv1 = ConvModule(self.dataset.map_shape
)
def forward(self, x):
pass

0
lib/modules/__init__.py Normal file
View File

123
lib/modules/blocks.py Normal file
View File

@@ -0,0 +1,123 @@
from abc import ABC
from pathlib import Path
from typing import Union
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
from lib.modules.utils import AutoPad, Interpolate
#
# Sub - Modules
###################
class ConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, activation: nn.Module = nn.ELU, pooling_size=None, use_bias=True, use_norm=True,
dropout: Union[int, float] = 0,
conv_filters=64, conv_kernel=5, conv_stride=1, conv_padding=0):
super(ConvModule, self).__init__()
# Module Paramters
self.in_shape = in_shape
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.activation = activation()
# Convolution Paramters
self.padding = conv_padding
self.stride = conv_stride
# Modules
self.dropout = nn.Dropout2d(dropout) if dropout else False
self.pooling = nn.MaxPool2d(pooling_size) if pooling_size else False
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04, affine=False) if use_norm else False
self.conv = nn.Conv2d(in_channels, conv_filters, conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride
)
def forward(self, x):
x = self.norm(x) if self.norm else x
tensor = self.conv(x)
tensor = self.dropout(tensor) if self.dropout else tensor
tensor = self.pooling(tensor) if self.pooling else tensor
tensor = self.activation(tensor)
return tensor
class DeConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, conv_filters=3, conv_kernel=5, conv_stride=1, conv_padding=0,
dropout: Union[int, float] = 0, autopad=False,
activation: Union[None, nn.Module] = nn.ReLU, interpolation_scale=None,
use_bias=True, normalize=False):
super(DeConvModule, self).__init__()
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.padding = conv_padding
self.stride = conv_stride
self.in_shape = in_shape
self.conv_filters = conv_filters
self.autopad = AutoPad() if autopad else False
self.interpolation = Interpolate(scale_factor=interpolation_scale) if interpolation_scale else False
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04, affine=False) if normalize else False
self.dropout = nn.Dropout2d(dropout) if dropout else False
self.de_conv = nn.ConvTranspose2d(in_channels, self.conv_filters, conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride)
self.activation = activation() if activation else None
def forward(self, x):
x = self.norm(x) if self.norm else x
x = self.dropout(x) if self.dropout else x
x = self.autopad(x) if self.autopad else x
x = self.interpolation(x) if self.interpolation else x
tensor = self.de_conv(x)
tensor = self.activation(tensor) if self.activation else tensor
return tensor
def size(self):
return self.shape
class RecurrentModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, hidden_size, num_layers=1, cell_type=nn.GRU, use_bias=True, dropout=0):
super(RecurrentModule, self).__init__()
self.use_bias = use_bias
self.num_layers = num_layers
self.in_shape = in_shape
self.hidden_size = hidden_size
self.dropout = dropout
self.rnn = cell_type(self.in_shape[-1] * self.in_shape[-2], hidden_size,
num_layers=num_layers,
bias=self.use_bias,
batch_first=True,
dropout=self.dropout)
def forward(self, x):
tensor = self.rnn(x)
return tensor

View File

@@ -1,9 +1,7 @@
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
from lib.models.blocks import FlipTensor
from lib.modules.utils import FlipTensor
from lib.objects.map import MapStorage
@@ -16,6 +14,4 @@ class BinaryHomotopicLoss(nn.Module):
def forward(self, x:torch.Tensor, y: torch.Tensor, mapnames: str):
y_flipepd = self.flipper(y)
circle = torch.cat((x, y_flipepd), dim=-1)
masp = self.map_storage[mapname].are
masp = self.map_storage[mapnames].are

229
lib/modules/model_parts.py Normal file
View File

@@ -0,0 +1,229 @@
#
# Full Model Parts
###################
import torch
from torch import nn
class Generator(nn.Module):
@property
def shape(self):
x = torch.randn(self.lat_dim).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, out_channels, re_shape, lat_dim, use_norm=False, use_bias=True, dropout: Union[int, float] = 0,
filters: List[int] = None, activation=nn.ReLU):
super(Generator, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
self.filters = filters
self.activation = activation
self.inner_activation = activation()
self.out_activation = None
self.lat_dim = lat_dim
self.dropout = dropout
self.l1 = nn.Linear(self.lat_dim, reduce(mul, re_shape), bias=use_bias)
# re_shape = (self.lat_dim // reduce(mul, re_shape[1:]), ) + tuple(re_shape[1:])
self.flat = Flatten(to=re_shape)
self.deconv1 = DeConvModule(re_shape, conv_filters=self.filters[0],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv2 = DeConvModule(self.deconv1.shape, conv_filters=self.filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv3 = DeConvModule(self.deconv2.shape, conv_filters=self.filters[2],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv4 = DeConvModule(self.deconv3.shape, conv_filters=out_channels,
conv_kernel=3,
conv_padding=1,
# normalize=use_norm,
activation=self.out_activation
)
def forward(self, z):
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor)
tensor = self.deconv2(tensor)
tensor = self.deconv3(tensor)
tensor = self.deconv4(tensor)
return tensor
def size(self):
return self.shape
class UnitGenerator(Generator):
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitGenerator, self).__init__(*args, **kwargs)
self.norm_f = nn.BatchNorm1d(self.l1.out_features, eps=1e-04, affine=False)
self.norm1 = nn.BatchNorm2d(self.deconv1.conv_filters, eps=1e-04, affine=False)
self.norm2 = nn.BatchNorm2d(self.deconv2.conv_filters, eps=1e-04, affine=False)
self.norm3 = nn.BatchNorm2d(self.deconv3.conv_filters, eps=1e-04, affine=False)
def forward(self, z_c1_c2_c3):
z, c1, c2, c3 = z_c1_c2_c3
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.norm(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor) + c3
tensor = self.inner_activation(tensor)
tensor = self.norm1(tensor)
tensor = self.deconv2(tensor) + c2
tensor = self.inner_activation(tensor)
tensor = self.norm2(tensor)
tensor = self.deconv3(tensor) + c1
tensor = self.inner_activation(tensor)
tensor = self.norm3(tensor)
tensor = self.deconv4(tensor)
return tensor
class BaseEncoder(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, in_shape, lat_dim=256, use_bias=True, use_norm=False, dropout: Union[int, float] = 0,
latent_activation: Union[nn.Module, None] = None, activation: nn.Module = nn.ELU,
filters: List[int] = None):
super(BaseEncoder, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
# Optional Padding for odd image-sizes
# Obsolet, already Done by autopadding module on incoming tensors
# in_shape = [x+1 if x % 2 != 0 and idx else x for idx, x in enumerate(in_shape)]
# Parameters
self.lat_dim = lat_dim
self.in_shape = in_shape
self.use_bias = use_bias
self.latent_activation = latent_activation() if latent_activation else None
# Modules
self.conv1 = ConvModule(self.in_shape, conv_filters=filters[0],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv2 = ConvModule(self.conv1.shape, conv_filters=filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv3 = ConvModule(self.conv2.shape, conv_filters=filters[2],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.flat = Flatten()
def forward(self, x):
tensor = self.conv1(x)
tensor = self.conv2(tensor)
tensor = self.conv3(tensor)
tensor = self.flat(tensor)
return tensor
class UnitEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitEncoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
c1 = self.conv1(x)
c2 = self.conv2(c1)
c3 = self.conv3(c2)
tensor = self.flat(c3)
l1 = self.l1(tensor)
return c1, c2, c3, l1
class VariationalEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(VariationalEncoder, self).__init__(*args, **kwargs)
self.logvar = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
self.mu = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
@staticmethod
def reparameterize(mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def forward(self, x):
tensor = super(VariationalEncoder, self).forward(x)
mu = self.mu(tensor)
logvar = self.logvar(tensor)
z = self.reparameterize(mu, logvar)
return mu, logvar, z
class Encoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(Encoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
tensor = super(Encoder, self).forward(x)
tensor = self.l1(tensor)
tensor = self.latent_activation(tensor) if self.latent_activation else tensor
return tensor

190
lib/modules/utils.py Normal file
View File

@@ -0,0 +1,190 @@
from abc import ABC
from pathlib import Path
import torch
from torch import nn
from torch import functional as F
from torch.utils.data import DataLoader
from dataset.dataset import TrajDataset, TrajPairDataset
from lib.objects.map import MapStorage
import pytorch_lightning as pl
# Utility - Modules
###################
class Flatten(nn.Module):
def __init__(self, to=(-1, )):
super(Flatten, self).__init__()
self.to = to
def forward(self, x):
return x.view(x.size(0), *self.to)
class Interpolate(nn.Module):
def __init__(self, size=None, scale_factor=None, mode='nearest', align_corners=None):
super(Interpolate, self).__init__()
self.interp = nn.functional.interpolate
self.size = size
self.scale_factor = scale_factor
self.align_corners = align_corners
self.mode = mode
def forward(self, x):
x = self.interp(x, size=self.size, scale_factor=self.scale_factor,
mode=self.mode, align_corners=self.align_corners)
return x
class AutoPad(nn.Module):
def __init__(self, interpolations=3, base=2):
super(AutoPad, self).__init__()
self.fct = base ** interpolations
def forward(self, x):
# noinspection PyUnresolvedReferences
x = F.pad(x,
[0,
(x.shape[-1] // self.fct + 1) * self.fct - x.shape[-1] if x.shape[-1] % self.fct != 0 else 0,
(x.shape[-2] // self.fct + 1) * self.fct - x.shape[-2] if x.shape[-2] % self.fct != 0 else 0,
0])
return x
class LightningBaseModule(pl.LightningModule, ABC):
@classmethod
def name(cls):
raise NotImplementedError('Give your model a name!')
@property
def shape(self):
try:
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
except Exception as e:
print(e)
return -1
def __init__(self, params):
super(LightningBaseModule, self).__init__()
self.hparams = params
# Data loading
# =============================================================================
# Map Object
self.map_storage = MapStorage(self.hparams.data_param.map_root)
def size(self):
return self.shape
def _move_to_model_device(self, x):
return x.cuda() if next(self.parameters()).is_cuda else x.cpu()
def save_to_disk(self, model_path):
Path(model_path, exist_ok=True).mkdir(parents=True, exist_ok=True)
if not (model_path / 'model_class.obj').exists():
with (model_path / 'model_class.obj').open('wb') as f:
torch.save(self.__class__, f)
return True
@pl.data_loader
def train_dataloader(self):
return DataLoader(dataset=self.dataset.train_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
@pl.data_loader
def test_dataloader(self):
return DataLoader(dataset=self.dataset.test_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
@pl.data_loader
def val_dataloader(self):
return DataLoader(dataset=self.dataset.val_dataset, shuffle=True,
batch_size=self.hparams.data_param.batchsize,
num_workers=self.hparams.data_param.worker)
def configure_optimizers(self):
raise NotImplementedError
def forward(self, *args, **kwargs):
raise NotImplementedError
def validation_step(self, *args, **kwargs):
raise NotImplementedError
def validation_end(self, outputs):
raise NotImplementedError
def training_step(self, batch_xy, batch_nb, *args, **kwargs):
raise NotImplementedError
def test_step(self, *args, **kwargs):
raise NotImplementedError
def test_end(self, outputs):
from sklearn.metrics import roc_auc_score
y_scores, y_true = [], []
for output in outputs:
y_scores.append(output['y_pred'])
y_true.append(output['y_true'])
y_true = torch.cat(y_true, dim=0)
# FIXME: What did this do do i need it?
# y_true = (y_true != V.HOMOTOPIC).long()
y_scores = torch.cat(y_scores, dim=0)
roc_auc_scores = roc_auc_score(y_true.cpu().numpy(), y_scores.cpu().numpy())
print(f'AUC Score: {roc_auc_scores}')
return {'roc_auc_scores': roc_auc_scores}
def init_weights(self):
def _weight_init(m):
if hasattr(m, 'weight'):
if isinstance(m.weight, torch.Tensor):
torch.nn.init.xavier_uniform_(m.weight)
if hasattr(m, 'bias'):
if isinstance(m.bias, torch.Tensor):
m.bias.data.fill_(0.01)
self.apply(_weight_init)
class FilterLayer(nn.Module):
def __init__(self):
super(FilterLayer, self).__init__()
def forward(self, x):
tensor = x[:, -1]
return tensor
class MergingLayer(nn.Module):
def __init__(self):
super(MergingLayer, self).__init__()
def forward(self, x):
# ToDo: Which ones to combine?
return
class FlipTensor(nn.Module):
def __init__(self, dim=-2):
super(FlipTensor, self).__init__()
self.dim = dim
def forward(self, x):
idx = [i for i in range(x.size(self.dim) - 1, -1, -1)]
idx = torch.as_tensor(idx).long()
inverted_tensor = x.index_select(self.dim, idx)
return inverted_tensor

View File

@@ -14,22 +14,10 @@ class ModelParameters(Namespace):
tanh=nn.Tanh
)
@property
def model_param(self):
return self._model_param
@property
def train_param(self):
return self._train_param
@property
def data_param(self):
return self._data_param
def __init__(self, model_param, train_param, data_param):
self._model_param = model_param
self._train_param = train_param
self._data_param = data_param
self.model_param = model_param
self.train_param = train_param
self.data_param = data_param
kwargs = vars(model_param)
kwargs.update(vars(train_param))
kwargs.update(vars(data_param))