# Imports from python Internals from abc import ABC from itertools import cycle from collections import defaultdict, namedtuple # Numerical Imports, Metrics and Plotting import numpy as np from sklearn.ensemble import IsolationForest from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_auc_score, roc_curve, auc, f1_score, \ recall_score, average_precision_score from matplotlib import pyplot as plt # Import Deep Learning Framework import torch from torch import nn from torch.optim import Adam from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts from torch.utils.data import DataLoader from torchcontrib.optim import SWA from torchvision.transforms import Compose, RandomApply # Import Functions and Modules from MLLIB from ml_lib.audio_toolset.mel_augmentation import NoiseInjection, LoudnessManipulator, ShiftTime, MaskAug from ml_lib.audio_toolset.audio_io import NormalizeLocal from ml_lib.modules.util import LightningBaseModule from ml_lib.utils.tools import to_one_hot from ml_lib.utils.transforms import ToTensor # Import Project Variables import variables as V class BaseLossMixin: absolute_loss = nn.L1Loss() nll_loss = nn.NLLLoss() bce_loss = nn.BCELoss() ce_loss = nn.CrossEntropyLoss() class BaseOptimizerMixin: def configure_optimizers(self): assert isinstance(self, LightningBaseModule) optimizer_dict = dict( # 'optimizer':optimizer, # The Optimizer # 'lr_scheduler': scheduler, # The LR scheduler frequency=1, # The frequency of the scheduler interval='epoch', # The unit of the scheduler's step size # 'reduce_on_plateau': False, # For ReduceLROnPlateau scheduler # 'monitor': 'mean_val_loss' # Metric to monitor ) optimizer = Adam(params=self.parameters(), lr=self.params.lr, weight_decay=self.params.weight_decay) if self.params.sto_weight_avg: optimizer = SWA(optimizer, swa_start=10, swa_freq=5, swa_lr=0.05) optimizer_dict.update(optimizer=optimizer) if self.params.lr_warmup_steps: scheduler = CosineAnnealingWarmRestarts(optimizer, self.params.lr_warmup_steps) optimizer_dict.update(lr_scheduler=scheduler) return optimizer_dict def on_train_end(self): assert isinstance(self, LightningBaseModule) for opt in self.trainer.optimizers: if isinstance(opt, SWA): opt.swap_swa_sgd() def on_epoch_end(self): assert isinstance(self, LightningBaseModule) if self.params.opt_reset_interval: if self.current_epoch % self.params.opt_reset_interval == 0: for opt in self.trainer.optimizers: opt.state = defaultdict(dict) class BaseTrainMixin: def training_step(self, batch_xy, batch_nb, *args, **kwargs): assert isinstance(self, LightningBaseModule) batch_x, batch_y = batch_xy y = self(batch_x).main_out loss = self.ce_loss(y.squeeze(), batch_y.long()) return dict(loss=loss) def training_epoch_end(self, outputs): assert isinstance(self, LightningBaseModule) keys = list(outputs[0].keys()) summary_dict = {f'mean_{key}': torch.mean(torch.stack([output[key] for output in outputs])) for key in keys if 'loss' in key} for key in summary_dict.keys(): self.log(key, summary_dict[key]) class BaseValMixin: def validation_step(self, batch_xy, batch_idx, *args, **kwargs): assert isinstance(self, LightningBaseModule) batch_x, batch_y = batch_xy y = self(batch_x).main_out val_loss = self.ce_loss(y.squeeze(), batch_y.long()) return dict(val_loss=val_loss, batch_idx=batch_idx, y=y, batch_y=batch_y) def validation_epoch_end(self, outputs, *_, **__): assert isinstance(self, LightningBaseModule) summary_dict = dict() keys = list(outputs[0].keys()) summary_dict.update({f'mean_{key}': torch.mean(torch.stack([output[key] for output in outputs])) for key in keys if 'loss' in key} ) additional_scores = self.additional_scores(outputs) summary_dict.update(**additional_scores) for key in summary_dict.keys(): self.log(key, summary_dict[key]) class BaseTestMixin: def test_step(self, batch_xy, batch_idx, *_, **__): assert isinstance(self, LightningBaseModule) batch_x, batch_y = batch_xy y = self(batch_x).main_out test_loss = self.ce_loss(y.squeeze(), batch_y.long()) return dict(test_loss=test_loss, batch_idx=batch_idx, y=y, batch_y=batch_y) def test_epoch_end(self, outputs, *_, **__): assert isinstance(self, LightningBaseModule) summary_dict = dict() keys = list(outputs[0].keys()) summary_dict.update({f'mean_{key}': torch.mean(torch.stack([output[key] for output in outputs])) for key in keys if 'loss' in key} ) additional_scores = self.additional_scores(outputs) summary_dict.update(**additional_scores) for key in summary_dict.keys(): self.log(key, summary_dict[key]) class DatasetMixin: def build_dataset(self): assert isinstance(self, LightningBaseModule) # Dataset # ============================================================================= # Mel Transforms mel_kwargs = dict(sample_rate=self.params.sr, n_mels=self.params.n_mels, n_fft=self.params.n_fft, hop_length=self.params.hop_length) # Utility normalize = NormalizeLocal() # Data Augmentations mel_augmentations = Compose([ RandomApply([ NoiseInjection(0.2), LoudnessManipulator(0.5), ShiftTime(0.4), MaskAug(0.2), ], p=0.6), normalize]) # Datasets Dataset = namedtuple('Datasets', 'train_dataset val_dataset test_dataset') dataset = Dataset(self.dataset_class(data_root=self.params.root, # TRAIN DATASET setting=V.DATA_OPTION_train, fold=list(range(1,8)), reset=self.params.reset, mel_kwargs=mel_kwargs, mel_augmentations=mel_augmentations), val_dataset=self.dataset_class(data_root=self.params.root, # VALIDATION DATASET setting=V.DATA_OPTION_devel, fold=9, reset=self.params.reset, mel_kwargs=mel_kwargs, mel_augmentations=normalize), test_dataset=self.dataset_class(data_root=self.params.root, # TEST DATASET setting=V.DATA_OPTION_test, fold=10, reset=self.params.reset, mel_kwargs=mel_kwargs, mel_augmentations=normalize), ) if dataset.train_dataset.task_type == V.TASK_OPTION_binary: # noinspection PyAttributeOutsideInit self.additional_scores = BinaryScores(self) elif dataset.train_dataset.task_type == V.TASK_OPTION_multiclass: # noinspection PyAttributeOutsideInit self.additional_scores = MultiClassScores(self) else: raise ValueError return dataset class BaseDataloadersMixin(ABC): # Dataloaders # ================================================================================ # Train Dataloader def train_dataloader(self): assert isinstance(self, LightningBaseModule) # sampler = RandomSampler(self.dataset.train_dataset, True, len(self.dataset.train_dataset)) sampler = None return DataLoader(dataset=self.dataset.train_dataset, shuffle=True if not sampler else None, sampler=sampler, batch_size=self.params.batch_size, pin_memory=True, num_workers=self.params.worker) # Test Dataloader def test_dataloader(self): assert isinstance(self, LightningBaseModule) return DataLoader(dataset=self.dataset.test_dataset, shuffle=False, batch_size=self.params.batch_size, pin_memory=True, num_workers=self.params.worker) # Validation Dataloader def val_dataloader(self): assert isinstance(self, LightningBaseModule) return DataLoader(dataset=self.dataset.val_dataset, shuffle=False, pin_memory=True, batch_size=self.params.batch_size, num_workers=self.params.worker) class BaseScores(ABC): def __init__(self, lightning_model): self.model = lightning_model pass def __call__(self, outputs): # summary_dict = dict() # return summary_dict raise NotImplementedError class MultiClassScores(BaseScores): def __init__(self, *args): super(MultiClassScores, self).__init__(*args) pass def __call__(self, outputs): summary_dict = dict() ####################################################################################### # Additional Score - UAR - ROC - Conf. Matrix - F1 ####################################################################################### # # INIT y_true = torch.cat([output['batch_y'] for output in outputs]).cpu().numpy() y_true_one_hot = to_one_hot(y_true, self.model.n_classes) y_pred = torch.cat([output['y'] for output in outputs]).squeeze().cpu().float().numpy() y_pred_max = np.argmax(y_pred, axis=1) class_names = {val: key for key, val in self.model.dataset.test_dataset.classes.items()} ###################################################################################### # # F1 SCORE micro_f1_score = f1_score(y_true, y_pred_max, labels=None, pos_label=1, average='micro', sample_weight=None, zero_division=True) macro_f1_score = f1_score(y_true, y_pred_max, labels=None, pos_label=1, average='macro', sample_weight=None, zero_division=True) summary_dict.update(dict(micro_f1_score=micro_f1_score, macro_f1_score=macro_f1_score)) ####################################################################################### # # ROC Curve # Compute ROC curve and ROC area for each class fpr = dict() tpr = dict() roc_auc = dict() for i in range(self.model.n_classes): fpr[i], tpr[i], _ = roc_curve(y_true_one_hot[:, i], y_pred[:, i]) roc_auc[i] = auc(fpr[i], tpr[i]) # Compute micro-average ROC curve and ROC area fpr["micro"], tpr["micro"], _ = roc_curve(y_true_one_hot.ravel(), y_pred.ravel()) roc_auc["micro"] = auc(fpr["micro"], tpr["micro"]) # First aggregate all false positive rates all_fpr = np.unique(np.concatenate([fpr[i] for i in range(self.model.n_classes)])) # Then interpolate all ROC curves at this points mean_tpr = np.zeros_like(all_fpr) for i in range(self.model.n_classes): mean_tpr += np.interp(all_fpr, fpr[i], tpr[i]) # Finally average it and compute AUC mean_tpr /= self.model.n_classes fpr["macro"] = all_fpr tpr["macro"] = mean_tpr roc_auc["macro"] = auc(fpr["macro"], tpr["macro"]) # Plot all ROC curves plt.figure() plt.plot(fpr["micro"], tpr["micro"], label=f'micro ROC ({round(roc_auc["micro"], 2)})', color='deeppink', linestyle=':', linewidth=4) plt.plot(fpr["macro"], tpr["macro"], label=f'macro ROC({round(roc_auc["macro"], 2)})', color='navy', linestyle=':', linewidth=4) colors = cycle(['firebrick', 'orangered', 'gold', 'olive', 'limegreen', 'aqua', 'dodgerblue', 'slategrey', 'royalblue', 'indigo', 'fuchsia'], ) for i, color in zip(range(self.model.n_classes), colors): plt.plot(fpr[i], tpr[i], color=color, lw=2, label=f'{class_names[i]} ({round(roc_auc[i], 2)})') plt.plot([0, 1], [0, 1], 'k--', lw=2) plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.legend(loc="lower right") self.model.logger.log_image('ROC', image=plt.gcf(), step=self.model.current_epoch) self.model.logger.log_image('ROC', image=plt.gcf(), step=self.model.current_epoch, ext='pdf') plt.clf() ####################################################################################### # # ROC SCORE try: macro_roc_auc_ovr = roc_auc_score(y_true_one_hot, y_pred, multi_class="ovr", average="macro") summary_dict.update(macro_roc_auc_ovr=macro_roc_auc_ovr) except ValueError: micro_roc_auc_ovr = roc_auc_score(y_true_one_hot, y_pred, multi_class="ovr", average="micro") summary_dict.update(micro_roc_auc_ovr=micro_roc_auc_ovr) ####################################################################################### # # Confusion matrix cm = confusion_matrix([class_names[x] for x in y_true], [class_names[x] for x in y_pred_max], labels=[class_names[key] for key in class_names.keys()], normalize='all') disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[class_names[i] for i in range(self.model.n_classes)] ) disp.plot(include_values=True) self.model.logger.log_image('Confusion_Matrix', image=disp.figure_, step=self.model.current_epoch) self.model.logger.log_image('Confusion_Matrix', image=disp.figure_, step=self.model.current_epoch, ext='pdf') plt.close('all') return summary_dict class BinaryScores(BaseScores): def __init__(self, *args): super(BinaryScores, self).__init__(*args) def __call__(self, outputs): summary_dict = dict() # Additional Score like the unweighted Average Recall: ######################### # UnweightedAverageRecall y_true = torch.cat([output['batch_y'] for output in outputs]) .cpu().numpy() y_pred = torch.cat([output['element_wise_recon_error'] for output in outputs]).squeeze().cpu().numpy() # How to apply a threshold manualy # y_pred = (y_pred >= 0.5).astype(np.float32) # How to apply a threshold by IF (Isolation Forest) clf = IsolationForest(random_state=self.model.seed) y_score = clf.fit_predict(y_pred.reshape(-1,1)) y_score = (np.asarray(y_score) == -1).astype(np.float32) uar_score = recall_score(y_true, y_score, labels=[0, 1], average='macro', sample_weight=None, zero_division='warn') summary_dict.update(dict(uar_score=uar_score)) ######################### # Precission precision_score = average_precision_score(y_true, y_score) summary_dict.update(dict(precision_score=precision_score)) ######################### # AUC try: auc_score = roc_auc_score(y_true=y_true, y_score=y_score) summary_dict.update(dict(auc_score=auc_score)) except ValueError: summary_dict.update(dict(auc_score=-1)) ######################### # pAUC try: pauc = roc_auc_score(y_true=y_true, y_score=y_score, max_fpr=0.15) summary_dict.update(dict(pauc_score=pauc)) except ValueError: summary_dict.update(dict(pauc_score=-1)) return summary_dict