Robert Müller 482f45df87 big update
2020-04-06 14:46:26 +02:00

139 lines
4.8 KiB
Python

import random
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, ConcatDataset
import librosa
from sklearn.metrics import roc_auc_score
from tqdm import tqdm
from pathlib import Path
__all__ = ['MIMII']
class MIMII(object):
def __init__(self, dataset_path, machine_id):
self.machine = dataset_path.name
self.machine_id = machine_id
self.root = dataset_path / f'id_0{machine_id}'
self.min_level_db = -80
self.sr = 16000
train = list((self.root / 'normal' / 'processed').glob('*.npy'))
test = list((self.root / 'abnormal' / 'processed').glob('*.npy'))
random.shuffle(train)
normal_test = train[:len(test)]
self.test_labels = [0]*len(normal_test) + [1]*len(test)
self.train_labels = [0]*(len(train) - len(normal_test))
self.train_paths = train[len(test):]
self.test_paths = normal_test + test
self.device = 'cpu'
def to(self, device):
self.device = device
return self
def _normalize(self, S):
return np.clip((S - self.min_level_db) / -self.min_level_db, 0, 1)
def _denormalize(self, S):
return (np.clip(S, 0, 1) * -self.min_level_db) + self.min_level_db
def preprocess(self, **kwargs):
for mode in ['normal', 'abnormal']:
folder = (self.root / mode / 'processed')
folder.mkdir(parents=False, exist_ok=True)
wavs = (self.root / mode).glob('*.wav')
print(f' Processing {folder}')
for file in tqdm(list(wavs)):
# (folder / file.stem).mkdir(parents=False, exist_ok=True)
audio, sr = librosa.load(str(file), sr=self.sr)
mel_spec = librosa.feature.melspectrogram(audio, sr=sr, **kwargs)
mel_spec_db = librosa.amplitude_to_db(mel_spec, ref=np.max)
mel_spec_norm = self._normalize(mel_spec_db)
m, n = mel_spec_norm.shape
np.save(folder/(file.stem + f'_{m}_{n}.npy'), mel_spec_norm)
return self
def train_dataloader(self, segment_len=20, hop_len=5, transform=None, **kwargs):
# return both!!!
# todo exclude a part and save for eval
ds = []
for p, l in zip(self.train_paths, self.train_labels):
ds.append(
MimiiTorchDataset(path=p, label=l,
segment_len=segment_len,
hop=hop_len,
transform=transform)
)
return DataLoader(ConcatDataset(ds), **kwargs)
def test_datasets(self, segment_len=20, hop_len=5, transform=None):
datasets = []
for p, l in zip(self.test_paths, self.test_labels):
datasets.append(
MimiiTorchDataset(path=p, label=l,
segment_len=segment_len,
hop=hop_len, transform=transform)
)
return datasets
def evaluate_model(self, f, segment_len=20, hop_len=5, transform=None):
f.eval()
datasets = self.test_datasets(segment_len, hop_len, transform)
y_true, y_score = [], []
with torch.no_grad():
for dataset in tqdm(datasets):
loader = DataLoader(dataset, batch_size=300, shuffle=False, num_workers=2)
file_preds = []
for batch in loader:
data, labels = batch
data = data.to(self.device)
preds = f.test_loss(data)
file_preds += preds.cpu().data.tolist()
y_true.append(labels.max().item())
y_score .append(np.mean(file_preds))
f.train()
return roc_auc_score(y_true, y_score)
class MimiiTorchDataset(Dataset):
def __init__(self, path, segment_len, hop, label, transform=None):
self.path = path
self.segment_len = segment_len
self.m, self.n = str(path).split('_')[-2:] # get spectrogram dimensions
self.n = int(self.n.split('.', 1)[0]) # remove .npy
self.m, self.n = (int(i) for i in (self.m, self.n))
self.offsets = list(range(0, self.n - segment_len, hop))
self.label = label
self.transform = transform
def __getitem__(self, item):
start = self.offsets[item]
mel_spec = np.load(self.path)
snippet = mel_spec[:, start: start + self.segment_len]
if self.transform:
snippet = self.transform(snippet)
return snippet, self.label
def __len__(self):
return len(self.offsets)
class MimiiTorchTestDataset(MimiiTorchDataset):
def __init__(self, *arg, **kwargs):
super(MimiiTorchTestDataset, self).__init__(*arg, **kwargs)
def __getitem__(self, item):
x = super.__init__(item)
return x, self.path