from torch.optim import Adam from networks.auto_encoder import AutoEncoder from torch.nn.functional import mse_loss from networks.modules import * import torch class SeperatingAdversarialAutoEncoder(Module): def __init__(self, latent_dim, features, **kwargs): super(SeperatingAdversarialAutoEncoder, self).__init__() self.latent_dim = latent_dim self.features = features self.spatial_encoder = PoolingEncoder(self.latent_dim) self.temporal_encoder = Encoder(self.latent_dim) self.decoder = Decoder(self.latent_dim, self.features) self.spatial_discriminator = Discriminator(self.latent_dim, self.features) self.temporal_discriminator = Discriminator(self.latent_dim, self.features) def forward(self, batch): # Encoder # outputs, hidden (Batch, Timesteps aka. Size, Features / Latent Dim Size) z_spatial, z_temporal = self.spatial_encoder(batch), self.temporal_encoder(batch) # Decoder # First repeat the data accordingly to the batch size z_concat = torch.cat((z_spatial, z_temporal), dim=-1) z_repeatet = Repeater((batch.shape[0], batch.shape[1], -1))(z_concat) x_hat = self.decoder(z_repeatet) return z_spatial, z_temporal, x_hat class SeparatingAdversarialAELightningOverrides(LightningModuleOverrides): def __init__(self): super(SeparatingAdversarialAELightningOverrides, self).__init__() def training_step(self, batch, _, optimizer_i): spatial_latent_fake, temporal_latent_fake, batch_hat = self.network.forward(batch) if optimizer_i == 0: # --------------------- # Train temporal Discriminator # --------------------- # latent_fake, reconstruction temporal_latent_real = self.normal.sample(temporal_latent_fake.shape) # Evaluate the input temporal_real_prediction = self.network.temporal_discriminator.forward(temporal_latent_real) temporal_fake_prediction = self.network.temporal_discriminator.forward(temporal_latent_fake) # Train the discriminator temporal_loss_real = mse_loss(temporal_real_prediction, torch.zeros(temporal_real_prediction.shape)) temporal_loss_fake = mse_loss(temporal_fake_prediction, torch.ones(temporal_fake_prediction.shape)) # Calculate the mean over bot the real and the fake acc # ToDo: do i need to compute this seperate? d_loss = 0.5 * torch.add(temporal_loss_real, temporal_loss_fake) return {'loss': d_loss} if optimizer_i == 1: # --------------------- # Train spatial Discriminator # --------------------- # latent_fake, reconstruction spatial_latent_real = self.normal.sample(spatial_latent_fake.shape) # Evaluate the input spatial_real_prediction = self.network.spatial_discriminator.forward(spatial_latent_real) spatial_fake_prediction = self.network.spatial_discriminator.forward(spatial_latent_fake) # Train the discriminator spatial_loss_real = mse_loss(spatial_real_prediction, torch.zeros(spatial_real_prediction.shape)) spatial_loss_fake = mse_loss(spatial_fake_prediction, torch.ones(spatial_fake_prediction.shape)) # Calculate the mean over bot the real and the fake acc # ToDo: do i need to compute this seperate? d_loss = 0.5 * torch.add(spatial_loss_real, spatial_loss_fake) return {'loss': d_loss} elif optimizer_i == 2: # --------------------- # Train AutoEncoder # --------------------- loss = mse_loss(batch, batch_hat) return {'loss': loss} else: raise RuntimeError('This should not have happened, catch me if u can.') # This is Fucked up, why do i need to put an additional empty list here? def configure_optimizers(self): return [Adam([*self.network.spatial_discriminator.parameters(), *self.network.spatial_encoder.parameters()] , lr=0.02), Adam([*self.network.temporal_discriminator.parameters(), *self.network.temporal_encoder.parameters()] , lr=0.02), Adam([*self.network.temporal_encoder.parameters(), *self.network.spatial_encoder.parameters(), *self.network.decoder.parameters()] , lr=0.02)], [] if __name__ == '__main__': raise PermissionError('Get out of here - never run this module')