134 lines
4.6 KiB
Python
134 lines
4.6 KiB
Python
import random
|
|
import numpy as np
|
|
import torch
|
|
from torch.utils.data import Dataset, DataLoader, ConcatDataset
|
|
import librosa
|
|
from sklearn.metrics import roc_auc_score
|
|
from tqdm import tqdm
|
|
from pathlib import Path
|
|
|
|
__all__ = ['MIMII']
|
|
|
|
|
|
class MIMII(object):
|
|
def __init__(self, dataset_path, machine_id, seed=42):
|
|
torch.random.manual_seed(seed)
|
|
np.random.seed(seed)
|
|
self.machine = dataset_path.name
|
|
self.machine_id = machine_id
|
|
self.root = dataset_path / f'id_0{machine_id}'
|
|
self.min_level_db = -80
|
|
self.sr = 16000
|
|
|
|
train = list((self.root / 'normal' / 'processed').glob('*.npy'))
|
|
test = list((self.root / 'abnormal' / 'processed').glob('*.npy'))
|
|
random.shuffle(train)
|
|
|
|
normal_test = train[:len(test)]
|
|
|
|
self.test_labels = [0]*len(normal_test) + [1]*len(test)
|
|
self.train_labels = [0]*(len(train) - len(normal_test))
|
|
|
|
self.train_paths = train[len(test):]
|
|
self.test_paths = normal_test + test
|
|
|
|
self.device = 'cpu'
|
|
|
|
def to(self, device):
|
|
self.device = device
|
|
return self
|
|
|
|
def _normalize(self, S):
|
|
return np.clip((S - self.min_level_db) / -self.min_level_db, 0, 1)
|
|
|
|
def _denormalize(self, S):
|
|
return (np.clip(S, 0, 1) * -self.min_level_db) + self.min_level_db
|
|
|
|
def preprocess(self, **kwargs):
|
|
for mode in ['normal', 'abnormal']:
|
|
folder = (self.root / mode / 'processed')
|
|
folder.mkdir(parents=False, exist_ok=True)
|
|
wavs = (self.root / mode).glob('*.wav')
|
|
print(f' Processing {folder}')
|
|
for file in tqdm(list(wavs)):
|
|
# (folder / file.stem).mkdir(parents=False, exist_ok=True)
|
|
audio, sr = librosa.load(str(file), sr=self.sr)
|
|
mel_spec = librosa.feature.melspectrogram(audio, sr=sr, **kwargs)
|
|
mel_spec_db = librosa.amplitude_to_db(mel_spec, ref=np.max)
|
|
mel_spec_norm = self._normalize(mel_spec_db)
|
|
m, n = mel_spec_norm.shape
|
|
np.save(folder/(file.stem + f'_{m}_{n}.npy'), mel_spec_norm)
|
|
return self
|
|
|
|
def train_dataloader(self, segment_len=20, hop_len=5, **kwargs):
|
|
# return both!!!
|
|
# todo exclude a part and save for eval
|
|
ds = []
|
|
for p, l in zip(self.train_paths, self.train_labels):
|
|
ds.append(
|
|
MimiiTorchDataset(path=p, label=l,
|
|
segment_len=segment_len,
|
|
hop=hop_len)
|
|
)
|
|
return DataLoader(ConcatDataset(ds), **kwargs)
|
|
|
|
def test_dataloader(self, *args, **kwargs):
|
|
raise NotImplementedError('test_dataloader is not supported')
|
|
|
|
def evaluate_model(self, f, segment_len=20, hop_len=5):
|
|
datasets = []
|
|
for p, l in zip(self.test_paths, self.test_labels):
|
|
datasets.append(
|
|
MimiiTorchDataset(path=p, label=l,
|
|
segment_len=segment_len,
|
|
hop=hop_len)
|
|
)
|
|
y_true, y_score = [], []
|
|
with torch.no_grad():
|
|
for dataset in tqdm(datasets):
|
|
loader = DataLoader(dataset, batch_size=300, shuffle=False, num_workers=2)
|
|
file_preds = []
|
|
for batch in loader:
|
|
data, labels = batch
|
|
data = data.to(self.device)
|
|
|
|
preds = f.test_loss(data)
|
|
|
|
file_preds += preds.cpu().data.tolist()
|
|
y_true.append(labels.max().item())
|
|
y_score .append(np.mean(file_preds))
|
|
return roc_auc_score(y_true, y_score)
|
|
|
|
|
|
|
|
class MimiiTorchDataset(Dataset):
|
|
def __init__(self, path, segment_len, hop, label):
|
|
self.path = path
|
|
self.segment_len = segment_len
|
|
self.m, self.n = str(path).split('_')[-2:] # get spectrogram dimensions
|
|
self.n = int(self.n.split('.', 1)[0]) # remove .npy
|
|
self.m, self.n = (int(i) for i in (self.m, self.n))
|
|
self.offsets = list(range(0, self.n - segment_len, hop))
|
|
self.label = label
|
|
|
|
def __getitem__(self, item):
|
|
start = self.offsets[item]
|
|
mel_spec = np.load(self.path)
|
|
snippet = mel_spec[:, start: start + self.segment_len]
|
|
return snippet, self.label
|
|
|
|
def __len__(self):
|
|
return len(self.offsets)
|
|
|
|
|
|
class MimiiTorchTestDataset(MimiiTorchDataset):
|
|
def __init__(self, *arg, **kwargs):
|
|
super(MimiiTorchTestDataset, self).__init__(*arg, **kwargs)
|
|
|
|
def __getitem__(self, item):
|
|
x = super.__init__(item)
|
|
return x, self.path
|
|
|
|
|
|
|