ae_toolbox_torch/networks/seperating_adversarial_auto_encoder.py
2019-08-24 19:05:46 +02:00

110 lines
4.8 KiB
Python

from torch.optim import Adam
from torch.nn.functional import mse_loss
from networks.modules import *
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class SeperatingAdversarialAutoEncoder(Module):
def __init__(self, latent_dim, features):
super(SeperatingAdversarialAutoEncoder, self).__init__()
self.latent_dim = latent_dim
self.features = features
self.spatial_encoder = PoolingEncoder(self.latent_dim)
self.temporal_encoder = Encoder(self.latent_dim)
self.decoder = Decoder(self.latent_dim * 2, self.features)
self.spatial_discriminator = Discriminator(self.latent_dim, self.features)
self.temporal_discriminator = Discriminator(self.latent_dim, self.features)
def forward(self, batch):
# Encoder
# outputs, hidden (Batch, Timesteps aka. Size, Features / Latent Dim Size)
z_spatial, z_temporal = self.spatial_encoder(batch), self.temporal_encoder(batch)
# Decoder
# First repeat the data accordingly to the batch size
z_concat = torch.cat((z_spatial, z_temporal), dim=-1)
z_repeatet = Repeater((batch.shape[0], batch.shape[1], -1))(z_concat)
x_hat = self.decoder(z_repeatet)
return z_spatial, z_temporal, x_hat
class SeparatingAdversarialAELightningOverrides(LightningModuleOverrides):
def __init__(self):
super(SeparatingAdversarialAELightningOverrides, self).__init__()
def training_step(self, batch, _, optimizer_i):
spatial_latent_fake, temporal_latent_fake, batch_hat = self.network.forward(batch)
if optimizer_i == 0:
# ---------------------
# Train temporal Discriminator
# ---------------------
# latent_fake, reconstruction
temporal_latent_real = self.normal.sample(temporal_latent_fake.shape).to(device)
# Evaluate the input
temporal_real_prediction = self.network.temporal_discriminator.forward(temporal_latent_real)
temporal_fake_prediction = self.network.temporal_discriminator.forward(temporal_latent_fake)
# Train the discriminator
temporal_loss_real = mse_loss(temporal_real_prediction,
torch.zeros(temporal_real_prediction.shape, device=device))
temporal_loss_fake = mse_loss(temporal_fake_prediction,
torch.ones(temporal_fake_prediction.shape, device=device))
# Calculate the mean over bot the real and the fake acc
# ToDo: do i need to compute this seperate?
d_loss = 0.5 * torch.add(temporal_loss_real, temporal_loss_fake)
return {'loss': d_loss}
if optimizer_i == 1:
# ---------------------
# Train spatial Discriminator
# ---------------------
# latent_fake, reconstruction
spatial_latent_real = self.normal.sample(spatial_latent_fake.shape).to(device)
# Evaluate the input
spatial_real_prediction = self.network.spatial_discriminator.forward(spatial_latent_real)
spatial_fake_prediction = self.network.spatial_discriminator.forward(spatial_latent_fake)
# Train the discriminator
spatial_loss_real = mse_loss(spatial_real_prediction,
torch.zeros(spatial_real_prediction.shape, device=device))
spatial_loss_fake = mse_loss(spatial_fake_prediction,
torch.ones(spatial_fake_prediction.shape, device=device))
# Calculate the mean over bot the real and the fake acc
# ToDo: do i need to compute this seperate?
d_loss = 0.5 * torch.add(spatial_loss_real, spatial_loss_fake)
return {'loss': d_loss}
elif optimizer_i == 2:
# ---------------------
# Train AutoEncoder
# ---------------------
loss = mse_loss(batch, batch_hat)
return {'loss': loss}
else:
raise RuntimeError('This should not have happened, catch me if u can.')
# This is Fucked up, why do i need to put an additional empty list here?
def configure_optimizers(self):
return [Adam([*self.network.spatial_discriminator.parameters(), *self.network.spatial_encoder.parameters()]
, lr=0.02),
Adam([*self.network.temporal_discriminator.parameters(), *self.network.temporal_encoder.parameters()]
, lr=0.02),
Adam([*self.network.temporal_encoder.parameters(),
*self.network.spatial_encoder.parameters(),
*self.network.decoder.parameters()]
, lr=0.02)], []
if __name__ == '__main__':
raise PermissionError('Get out of here - never run this module')