initial commit

This commit is contained in:
Si11ium
2020-04-08 14:59:42 +02:00
parent c15ee64688
commit f0262e1895
19 changed files with 1021 additions and 0 deletions

0
modules/__init__.py Normal file
View File

144
modules/blocks.py Normal file
View File

@ -0,0 +1,144 @@
from typing import Union
import torch
from torch import nn
from ml_lib.modules.utils import AutoPad, Interpolate
#
# Sub - Modules
###################
class ConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, conv_filters, conv_kernel, activation: nn.Module = nn.ELU, pooling_size=None,
use_bias=True, use_norm=False, dropout: Union[int, float] = 0,
conv_class=nn.Conv2d, conv_stride=1, conv_padding=0):
super(ConvModule, self).__init__()
# Module Parameters
self.in_shape = in_shape
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.activation = activation()
# Convolution Parameters
self.padding = conv_padding
self.stride = conv_stride
self.conv_filters = conv_filters
self.conv_kernel = conv_kernel
# Modules
self.dropout = nn.Dropout2d(dropout) if dropout else lambda x: x
self.pooling = nn.MaxPool2d(pooling_size) if pooling_size else lambda x: x
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04) if use_norm else lambda x: x
self.conv = conv_class(in_channels, self.conv_filters, self.conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride
)
def forward(self, x):
x = self.norm(x)
tensor = self.conv(x)
tensor = self.dropout(tensor)
tensor = self.pooling(tensor)
tensor = self.activation(tensor)
return tensor
class DeConvModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, conv_filters, conv_kernel, conv_stride=1, conv_padding=0,
dropout: Union[int, float] = 0, autopad=0,
activation: Union[None, nn.Module] = nn.ReLU, interpolation_scale=0,
use_bias=True, use_norm=False):
super(DeConvModule, self).__init__()
in_channels, height, width = in_shape[0], in_shape[1], in_shape[2]
self.padding = conv_padding
self.conv_kernel = conv_kernel
self.stride = conv_stride
self.in_shape = in_shape
self.conv_filters = conv_filters
self.autopad = AutoPad() if autopad else lambda x: x
self.interpolation = Interpolate(scale_factor=interpolation_scale) if interpolation_scale else lambda x: x
self.norm = nn.BatchNorm2d(in_channels, eps=1e-04) if use_norm else lambda x: x
self.dropout = nn.Dropout2d(dropout) if dropout else lambda x: x
self.de_conv = nn.ConvTranspose2d(in_channels, self.conv_filters, self.conv_kernel, bias=use_bias,
padding=self.padding, stride=self.stride)
self.activation = activation() if activation else lambda x: x
def forward(self, x):
x = self.norm(x)
x = self.dropout(x)
x = self.autopad(x)
x = self.interpolation(x)
tensor = self.de_conv(x)
tensor = self.activation(tensor)
return tensor
class ResidualModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, module_class, n, activation=None, **module_parameters):
assert n >= 1
super(ResidualModule, self).__init__()
self.in_shape = in_shape
module_parameters.update(in_shape=in_shape)
self.activation = activation() if activation else lambda x: x
self.residual_block = nn.ModuleList([module_class(**module_parameters) for _ in range(n)])
assert self.in_shape == self.shape, f'The in_shape: {self.in_shape} - must match the out_shape: {self.shape}.'
def forward(self, x):
for module in self.residual_block:
tensor = module(x)
# noinspection PyUnboundLocalVariable
tensor = tensor + x
tensor = self.activation(tensor)
return tensor
class RecurrentModule(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
def __init__(self, in_shape, hidden_size, num_layers=1, cell_type=nn.GRU, use_bias=True, dropout=0):
super(RecurrentModule, self).__init__()
self.use_bias = use_bias
self.num_layers = num_layers
self.in_shape = in_shape
self.hidden_size = hidden_size
self.dropout = dropout
self.rnn = cell_type(self.in_shape[-1] * self.in_shape[-2], hidden_size,
num_layers=num_layers,
bias=self.use_bias,
batch_first=True,
dropout=self.dropout)
def forward(self, x):
tensor = self.rnn(x)
return tensor

23
modules/losses.py Normal file
View File

@ -0,0 +1,23 @@
from typing import List
import torch
from torch import nn
from ml_lib.modules.utils import FlipTensor
from ml_lib.objects.map import MapStorage, Map
from ml_lib.objects.trajectory import Trajectory
class BinaryHomotopicLoss(nn.Module):
def __init__(self, map_storage: MapStorage):
super(BinaryHomotopicLoss, self).__init__()
self.map_storage = map_storage
self.flipper = FlipTensor()
def forward(self, x: torch.Tensor, y: torch.Tensor, mapnames: str):
maps: List[Map] = [self.map_storage[mapname] for mapname in mapnames]
for basemap in maps:
basemap = basemap.as_2d_array

229
modules/model_parts.py Normal file
View File

@ -0,0 +1,229 @@
#
# Full Model Parts
###################
import torch
from torch import nn
class Generator(nn.Module):
@property
def shape(self):
x = torch.randn(self.lat_dim).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, out_channels, re_shape, lat_dim, use_norm=False, use_bias=True, dropout: Union[int, float] = 0,
filters: List[int] = None, activation=nn.ReLU):
super(Generator, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
self.filters = filters
self.activation = activation
self.inner_activation = activation()
self.out_activation = None
self.lat_dim = lat_dim
self.dropout = dropout
self.l1 = nn.Linear(self.lat_dim, reduce(mul, re_shape), bias=use_bias)
# re_shape = (self.feature_mixed_dim // reduce(mul, re_shape[1:]), ) + tuple(re_shape[1:])
self.flat = Flatten(to=re_shape)
self.deconv1 = DeConvModule(re_shape, conv_filters=self.filters[0],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv2 = DeConvModule(self.deconv1.shape, conv_filters=self.filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv3 = DeConvModule(self.deconv2.shape, conv_filters=self.filters[2],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
normalize=use_norm,
activation=self.activation,
interpolation_scale=2,
dropout=self.dropout
)
self.deconv4 = DeConvModule(self.deconv3.shape, conv_filters=out_channels,
conv_kernel=3,
conv_padding=1,
# normalize=use_norm,
activation=self.out_activation
)
def forward(self, z):
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor)
tensor = self.deconv2(tensor)
tensor = self.deconv3(tensor)
tensor = self.deconv4(tensor)
return tensor
def size(self):
return self.shape
class UnitGenerator(Generator):
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitGenerator, self).__init__(*args, **kwargs)
self.norm_f = nn.BatchNorm1d(self.l1.out_features, eps=1e-04, affine=False)
self.norm1 = nn.BatchNorm2d(self.deconv1.conv_filters, eps=1e-04, affine=False)
self.norm2 = nn.BatchNorm2d(self.deconv2.conv_filters, eps=1e-04, affine=False)
self.norm3 = nn.BatchNorm2d(self.deconv3.conv_filters, eps=1e-04, affine=False)
def forward(self, z_c1_c2_c3):
z, c1, c2, c3 = z_c1_c2_c3
tensor = self.l1(z)
tensor = self.inner_activation(tensor)
tensor = self.norm(tensor)
tensor = self.flat(tensor)
tensor = self.deconv1(tensor) + c3
tensor = self.inner_activation(tensor)
tensor = self.norm1(tensor)
tensor = self.deconv2(tensor) + c2
tensor = self.inner_activation(tensor)
tensor = self.norm2(tensor)
tensor = self.deconv3(tensor) + c1
tensor = self.inner_activation(tensor)
tensor = self.norm3(tensor)
tensor = self.deconv4(tensor)
return tensor
class BaseEncoder(nn.Module):
@property
def shape(self):
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
# noinspection PyUnresolvedReferences
def __init__(self, in_shape, lat_dim=256, use_bias=True, use_norm=False, dropout: Union[int, float] = 0,
latent_activation: Union[nn.Module, None] = None, activation: nn.Module = nn.ELU,
filters: List[int] = None):
super(BaseEncoder, self).__init__()
assert filters, '"Filters" has to be a list of int len 3'
# Optional Padding for odd image-sizes
# Obsolet, already Done by autopadding module on incoming tensors
# in_shape = [x+1 if x % 2 != 0 and idx else x for idx, x in enumerate(in_shape)]
# Parameters
self.lat_dim = lat_dim
self.in_shape = in_shape
self.use_bias = use_bias
self.latent_activation = latent_activation() if latent_activation else None
# Modules
self.conv1 = ConvModule(self.in_shape, conv_filters=filters[0],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv2 = ConvModule(self.conv1.shape, conv_filters=filters[1],
conv_kernel=3,
conv_padding=1,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.conv3 = ConvModule(self.conv2.shape, conv_filters=filters[2],
conv_kernel=5,
conv_padding=2,
conv_stride=1,
pooling_size=2,
use_norm=use_norm,
dropout=dropout,
activation=activation
)
self.flat = Flatten()
def forward(self, x):
tensor = self.conv1(x)
tensor = self.conv2(tensor)
tensor = self.conv3(tensor)
tensor = self.flat(tensor)
return tensor
class UnitEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
kwargs.update(use_norm=True)
super(UnitEncoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
c1 = self.conv1(x)
c2 = self.conv2(c1)
c3 = self.conv3(c2)
tensor = self.flat(c3)
l1 = self.l1(tensor)
return c1, c2, c3, l1
class VariationalEncoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(VariationalEncoder, self).__init__(*args, **kwargs)
self.logvar = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
self.mu = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
@staticmethod
def reparameterize(mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def forward(self, x):
tensor = super(VariationalEncoder, self).forward(x)
mu = self.mu(tensor)
logvar = self.logvar(tensor)
z = self.reparameterize(mu, logvar)
return mu, logvar, z
class Encoder(BaseEncoder):
# noinspection PyUnresolvedReferences
def __init__(self, *args, **kwargs):
super(Encoder, self).__init__(*args, **kwargs)
self.l1 = nn.Linear(reduce(mul, self.conv3.shape), self.lat_dim, bias=self.use_bias)
def forward(self, x):
tensor = super(Encoder, self).forward(x)
tensor = self.l1(tensor)
tensor = self.latent_activation(tensor) if self.latent_activation else tensor
return tensor

201
modules/utils.py Normal file
View File

@ -0,0 +1,201 @@
from abc import ABC
from pathlib import Path
import torch
from torch import nn
from torch import functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
# Utility - Modules
###################
class Flatten(nn.Module):
@property
def shape(self):
try:
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:] if len(output.shape[1:]) > 1 else output.shape[-1]
except Exception as e:
print(e)
return -1
def __init__(self, in_shape, to=-1):
assert isinstance(to, int) or isinstance(to, tuple)
super(Flatten, self).__init__()
self.in_shape = in_shape
self.to = (to,) if isinstance(to, int) else to
def forward(self, x):
return x.view(x.size(0), *self.to)
class Interpolate(nn.Module):
def __init__(self, size=None, scale_factor=None, mode='nearest', align_corners=None):
super(Interpolate, self).__init__()
self.interp = nn.functional.interpolate
self.size = size
self.scale_factor = scale_factor
self.align_corners = align_corners
self.mode = mode
def forward(self, x):
x = self.interp(x, size=self.size, scale_factor=self.scale_factor,
mode=self.mode, align_corners=self.align_corners)
return x
class AutoPad(nn.Module):
def __init__(self, interpolations=3, base=2):
super(AutoPad, self).__init__()
self.fct = base ** interpolations
def forward(self, x):
# noinspection PyUnresolvedReferences
x = F.pad(x,
[0,
(x.shape[-1] // self.fct + 1) * self.fct - x.shape[-1] if x.shape[-1] % self.fct != 0 else 0,
(x.shape[-2] // self.fct + 1) * self.fct - x.shape[-2] if x.shape[-2] % self.fct != 0 else 0,
0])
return x
class WeightInit:
def __init__(self, in_place_init_function):
self.in_place_init_function = in_place_init_function
def __call__(self, m):
if hasattr(m, 'weight'):
if isinstance(m.weight, torch.Tensor):
if m.weight.ndim < 2:
m.weight.data.fill_(0.01)
else:
self.in_place_init_function(m.weight)
if hasattr(m, 'bias'):
if isinstance(m.bias, torch.Tensor):
m.bias.data.fill_(0.01)
class LightningBaseModule(pl.LightningModule, ABC):
@classmethod
def name(cls):
raise NotImplementedError('Give your model a name!')
@property
def shape(self):
try:
x = torch.randn(self.in_shape).unsqueeze(0)
output = self(x)
return output.shape[1:]
except Exception as e:
print(e)
return -1
def __init__(self, hparams):
super(LightningBaseModule, self).__init__()
self.hparams = hparams
# Data loading
# =============================================================================
# Map Object
# self.map_storage = MapStorage(self.hparams.data_param.map_root)
def size(self):
return self.shape
def _move_to_model_device(self, x):
return x.cuda() if next(self.parameters()).is_cuda else x.cpu()
def save_to_disk(self, model_path):
Path(model_path, exist_ok=True).mkdir(parents=True, exist_ok=True)
if not (model_path / 'model_class.obj').exists():
with (model_path / 'model_class.obj').open('wb') as f:
torch.save(self.__class__, f)
return True
@property
def data_len(self):
return len(self.dataset.train_dataset)
@property
def n_train_batches(self):
return len(self.train_dataloader())
def configure_optimizers(self):
raise NotImplementedError
def forward(self, *args, **kwargs):
raise NotImplementedError
def training_step(self, batch_xy, batch_nb, *args, **kwargs):
raise NotImplementedError
def test_step(self, *args, **kwargs):
raise NotImplementedError
def test_epoch_end(self, outputs):
raise NotImplementedError
def init_weights(self, in_place_init_func_=nn.init.xavier_uniform_):
weight_initializer = WeightInit(in_place_init_function=in_place_init_func_)
self.apply(weight_initializer)
# Dataloaders
# ================================================================================
# Train Dataloader
def train_dataloader(self):
return DataLoader(dataset=self.dataset.train_dataset, shuffle=True,
batch_size=self.hparams.train_param.batch_size,
num_workers=self.hparams.data_param.worker)
# Test Dataloader
def test_dataloader(self):
return DataLoader(dataset=self.dataset.test_dataset, shuffle=True,
batch_size=self.hparams.train_param.batch_size,
num_workers=self.hparams.data_param.worker)
# Validation Dataloader
def val_dataloader(self):
return DataLoader(dataset=self.dataset.val_dataset, shuffle=True,
batch_size=self.hparams.train_param.batch_size,
num_workers=self.hparams.data_param.worker)
class FilterLayer(nn.Module):
def __init__(self):
super(FilterLayer, self).__init__()
def forward(self, x):
tensor = x[:, -1]
return tensor
class MergingLayer(nn.Module):
def __init__(self):
super(MergingLayer, self).__init__()
def forward(self, x):
# ToDo: Which ones to combine?
return
class FlipTensor(nn.Module):
def __init__(self, dim=-2):
super(FlipTensor, self).__init__()
self.dim = dim
def forward(self, x):
idx = [i for i in range(x.size(self.dim) - 1, -1, -1)]
idx = torch.as_tensor(idx).long()
inverted_tensor = x.index_select(self.dim, idx)
return inverted_tensor