100 lines
4.3 KiB
Python
100 lines
4.3 KiB
Python
from argparse import Namespace
|
|
|
|
from torch import nn
|
|
from torch.nn import ModuleDict
|
|
|
|
from torchvision.transforms import Compose, ToTensor
|
|
|
|
from ml_lib.audio_toolset.audio_io import AudioToMel, NormalizeLocal, PowerToDB, MelToImage
|
|
from ml_lib.modules.blocks import ConvModule
|
|
from ml_lib.modules.utils import LightningBaseModule, Flatten, BaseModuleMixin_Dataloaders, HorizontalSplitter, \
|
|
HorizontalMerger
|
|
from models.module_mixins import BaseOptimizerMixin, BaseTrainMixin, BaseValMixin
|
|
|
|
|
|
class BandwiseBinaryClassifier(BaseModuleMixin_Dataloaders,
|
|
BaseTrainMixin,
|
|
BaseValMixin,
|
|
BaseOptimizerMixin,
|
|
LightningBaseModule
|
|
):
|
|
|
|
def __init__(self, hparams):
|
|
super(BandwiseBinaryClassifier, self).__init__(hparams)
|
|
|
|
# Dataset and Dataloaders
|
|
# =============================================================================
|
|
# Transforms
|
|
transforms = Compose([AudioToMel(n_mels=32), MelToImage(), ToTensor(), NormalizeLocal()])
|
|
# Datasets
|
|
from datasets.binar_masks import BinaryMasksDataset
|
|
self.dataset = Namespace(
|
|
**dict(
|
|
train_dataset=BinaryMasksDataset(self.params.root, setting='train', transforms=transforms),
|
|
val_dataset=BinaryMasksDataset(self.params.root, setting='devel', transforms=transforms),
|
|
test_dataset=BinaryMasksDataset(self.params.root, setting='test', transforms=transforms),
|
|
)
|
|
)
|
|
|
|
# Model Paramters
|
|
# =============================================================================
|
|
# Additional parameters
|
|
self.in_shape = self.dataset.train_dataset.sample_shape
|
|
self.conv_filters = self.params.filters
|
|
self.criterion = nn.BCELoss()
|
|
self.n_band_sections = 5
|
|
|
|
# Modules
|
|
self.split = HorizontalSplitter(self.in_shape, self.n_band_sections)
|
|
self.conv_dict = ModuleDict()
|
|
|
|
self.conv_dict.update({f"conv_1_{band_section}":
|
|
ConvModule(self.split.shape, self.conv_filters[0], 3, conv_stride=1, **self.params.module_kwargs)
|
|
for band_section in range(self.n_band_sections)}
|
|
)
|
|
self.conv_dict.update({f"conv_2_{band_section}":
|
|
ConvModule(self.conv_dict['conv_1_1'].shape, self.conv_filters[1], 3, conv_stride=1,
|
|
**self.params.module_kwargs) for band_section in range(self.n_band_sections)}
|
|
)
|
|
self.conv_dict.update({f"conv_3_{band_section}":
|
|
ConvModule(self.conv_dict['conv_2_1'].shape, self.conv_filters[2], 3, conv_stride=1,
|
|
**self.params.module_kwargs)
|
|
for band_section in range(self.n_band_sections)}
|
|
)
|
|
|
|
self.merge = HorizontalMerger(self.conv_dict['conv_3_1'].shape, self.n_band_sections)
|
|
|
|
self.flat = Flatten(self.merge.shape)
|
|
|
|
self.full_1 = nn.Linear(self.flat.shape, self.params.lat_dim, self.params.bias)
|
|
self.full_2 = nn.Linear(self.full_1.out_features, self.full_1.out_features // 2, self.params.bias)
|
|
|
|
self.full_out = nn.Linear(self.full_2.out_features, 1, self.params.bias)
|
|
|
|
# Utility Modules
|
|
|
|
self.dropout = nn.Dropout2d(self.params.dropout) if self.params.dropout else lambda x: x
|
|
self.activation = self.params.activation()
|
|
self.sigmoid = nn.Sigmoid()
|
|
|
|
def forward(self, batch, **kwargs):
|
|
tensors = self.split(batch)
|
|
for idx, tensor in enumerate(tensors):
|
|
tensors[idx] = self.conv_dict[f"conv_1_{idx}"](tensor)
|
|
for idx, tensor in enumerate(tensors):
|
|
tensors[idx] = self.conv_dict[f"conv_2_{idx}"](tensor)
|
|
for idx, tensor in enumerate(tensors):
|
|
tensors[idx] = self.conv_dict[f"conv_3_{idx}"](tensor)
|
|
|
|
tensor = self.merge(tensors)
|
|
tensor = self.flat(tensor)
|
|
tensor = self.full_1(tensor)
|
|
tensor = self.activation(tensor)
|
|
tensor = self.dropout(tensor)
|
|
tensor = self.full_2(tensor)
|
|
tensor = self.activation(tensor)
|
|
tensor = self.dropout(tensor)
|
|
tensor = self.full_out(tensor)
|
|
tensor = self.sigmoid(tensor)
|
|
return tensor
|