import random import numpy as np import torch from torch.utils.data import Dataset, DataLoader, ConcatDataset import librosa from sklearn.metrics import roc_auc_score from tqdm import tqdm from pathlib import Path __all__ = ['MIMII'] class MIMII(object): def __init__(self, dataset_path, machine_id): self.machine = dataset_path.name self.machine_id = machine_id self.root = dataset_path / f'id_0{machine_id}' self.min_level_db = -80 self.sr = 16000 train = list((self.root / 'normal' / 'processed').glob('*.npy')) test = list((self.root / 'abnormal' / 'processed').glob('*.npy')) random.shuffle(train) normal_test = train[:len(test)] self.test_labels = [0]*len(normal_test) + [1]*len(test) self.train_labels = [0]*(len(train) - len(normal_test)) self.train_paths = train[len(test):] self.test_paths = normal_test + test self.device = 'cpu' def to(self, device): self.device = device return self def _normalize(self, S): return np.clip((S - self.min_level_db) / -self.min_level_db, 0, 1) def _denormalize(self, S): return (np.clip(S, 0, 1) * -self.min_level_db) + self.min_level_db def preprocess(self, **kwargs): for mode in ['normal', 'abnormal']: folder = (self.root / mode / 'processed') folder.mkdir(parents=False, exist_ok=True) wavs = (self.root / mode).glob('*.wav') print(f' Processing {folder}') for file in tqdm(list(wavs)): # (folder / file.stem).mkdir(parents=False, exist_ok=True) audio, sr = librosa.load(str(file), sr=self.sr) mel_spec = librosa.feature.melspectrogram(audio, sr=sr, **kwargs) mel_spec_db = librosa.amplitude_to_db(mel_spec, ref=np.max) mel_spec_norm = self._normalize(mel_spec_db) m, n = mel_spec_norm.shape np.save(folder/(file.stem + f'_{m}_{n}.npy'), mel_spec_norm) return self def train_dataloader(self, segment_len=20, hop_len=5, transform=None, **kwargs): # return both!!! # todo exclude a part and save for eval ds = [] for p, l in zip(self.train_paths, self.train_labels): ds.append( MimiiTorchDataset(path=p, label=l, segment_len=segment_len, hop=hop_len, transform=transform) ) return DataLoader(ConcatDataset(ds), **kwargs) def test_datasets(self, segment_len=20, hop_len=5, transform=None): datasets = [] for p, l in zip(self.test_paths, self.test_labels): datasets.append( MimiiTorchDataset(path=p, label=l, segment_len=segment_len, hop=hop_len, transform=transform) ) return datasets def evaluate_model(self, f, segment_len=20, hop_len=5, transform=None): f.eval() datasets = self.test_datasets(segment_len, hop_len, transform) y_true, y_score = [], [] with torch.no_grad(): for dataset in tqdm(datasets): loader = DataLoader(dataset, batch_size=300, shuffle=False, num_workers=2) file_preds = [] for batch in loader: data, labels = batch data = data.to(self.device) preds = f.test_loss(data) file_preds += preds.cpu().data.tolist() y_true.append(labels.max().item()) y_score .append(np.mean(file_preds)) f.train() return roc_auc_score(y_true, y_score) class MimiiTorchDataset(Dataset): def __init__(self, path, segment_len, hop, label, transform=None): self.path = path self.segment_len = segment_len self.m, self.n = str(path).split('_')[-2:] # get spectrogram dimensions self.n = int(self.n.split('.', 1)[0]) # remove .npy self.m, self.n = (int(i) for i in (self.m, self.n)) self.offsets = list(range(0, self.n - segment_len, hop)) self.label = label self.transform = transform def __getitem__(self, item): start = self.offsets[item] mel_spec = np.load(self.path) snippet = mel_spec[:, start: start + self.segment_len] if self.transform: snippet = self.transform(snippet) return snippet, self.label def __len__(self): return len(self.offsets) class MimiiTorchTestDataset(MimiiTorchDataset): def __init__(self, *arg, **kwargs): super(MimiiTorchTestDataset, self).__init__(*arg, **kwargs) def __getitem__(self, item): x = super.__init__(item) return x, self.path