transition
This commit is contained in:
@ -12,7 +12,7 @@ from einops import rearrange
|
||||
import sys
|
||||
sys.path.append(str(Path(__file__).parent))
|
||||
|
||||
from .util import AutoPad, Interpolate, ShapeMixin, F_x, Flatten, ResidualBlock, PreNorm
|
||||
from .util import AutoPad, Interpolate, ShapeMixin, F_x, Flatten
|
||||
|
||||
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
|
||||
@ -85,7 +85,6 @@ class ConvModule(ShapeMixin, nn.Module):
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def forward(self, x):
|
||||
tensor = self.norm(x)
|
||||
tensor = self.conv(tensor)
|
||||
@ -100,12 +99,13 @@ class PreInitializedConvModule(ShapeMixin, nn.Module):
|
||||
def __init__(self, in_shape, weight_matrix):
|
||||
super(PreInitializedConvModule, self).__init__()
|
||||
self.in_shape = in_shape
|
||||
self.weight_matrix = weight_matrix
|
||||
raise NotImplementedError
|
||||
# ToDo Get the weight_matrix shape and init a conv_module of similar size,
|
||||
# override the weights then.
|
||||
|
||||
def forward(self, x):
|
||||
|
||||
x = torch.matmul(x, self.weight_matrix) # ToDo: This is an Placeholder
|
||||
return x
|
||||
|
||||
|
||||
@ -214,8 +214,9 @@ class RecurrentModule(ShapeMixin, nn.Module):
|
||||
tensor = self.rnn(x)
|
||||
return tensor
|
||||
|
||||
|
||||
class FeedForward(nn.Module):
|
||||
def __init__(self, dim, hidden_dim, dropout = 0.):
|
||||
def __init__(self, dim, hidden_dim, dropout=0.):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(dim, hidden_dim),
|
||||
@ -224,31 +225,35 @@ class FeedForward(nn.Module):
|
||||
nn.Linear(hidden_dim, dim),
|
||||
nn.Dropout(dropout)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
return self.net(x)
|
||||
|
||||
|
||||
class Attention(nn.Module):
|
||||
def __init__(self, dim, heads = 8, dropout = 0.):
|
||||
def __init__(self, dim, heads=8, dropout=0.):
|
||||
super().__init__()
|
||||
self.heads = heads
|
||||
self.scale = dim ** -0.5
|
||||
self.scale = dim / heads ** -0.5
|
||||
|
||||
self.to_qkv = nn.Linear(dim, dim * 3, bias = False)
|
||||
self.to_qkv = nn.Linear(dim, dim * 3, bias=False)
|
||||
self.to_out = nn.Sequential(
|
||||
nn.Linear(dim, dim),
|
||||
nn.Dropout(dropout)
|
||||
)
|
||||
|
||||
def forward(self, x, mask = None):
|
||||
def forward(self, x, mask=None, return_attn_weights=False):
|
||||
# noinspection PyTupleAssignmentBalance
|
||||
b, n, _, h = *x.shape, self.heads
|
||||
qkv = self.to_qkv(x).chunk(3, dim = -1)
|
||||
q, k, v = [rearrange(t, 'b n (h d) -> b h n d', h = h) for t in qkv]
|
||||
|
||||
qkv = self.to_qkv(x).chunk(3, dim=-1)
|
||||
q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), qkv)
|
||||
|
||||
dots = torch.einsum('bhid,bhjd->bhij', q, k) * self.scale
|
||||
mask_value = -torch.finfo(dots.dtype).max
|
||||
|
||||
if mask is not None:
|
||||
mask = F.pad(mask.flatten(1), [1, 0], value = True)
|
||||
mask = F.pad(mask.flatten(1), (1, 0), value=True)
|
||||
assert mask.shape[-1] == dots.shape[-1], 'mask has incorrect dimensions'
|
||||
mask = mask[:, None, :] * mask[:, :, None]
|
||||
dots.masked_fill_(~mask, mask_value)
|
||||
@ -258,39 +263,47 @@ class Attention(nn.Module):
|
||||
|
||||
out = torch.einsum('bhij,bhjd->bhid', attn, v)
|
||||
out = rearrange(out, 'b h n d -> b n (h d)')
|
||||
out = self.to_out(out)
|
||||
return out
|
||||
|
||||
class Transformer(nn.Module):
|
||||
def __init__(self, dim, depth, heads, mlp_dim, dropout):
|
||||
super().__init__()
|
||||
self.layers = nn.ModuleList([])
|
||||
for _ in range(depth):
|
||||
self.layers.append(nn.ModuleList([
|
||||
ResidualBlock(PreNorm(dim, Attention(dim, heads = heads, dropout = dropout))),
|
||||
ResidualBlock(PreNorm(dim, FeedForward(dim, mlp_dim, dropout = dropout)))
|
||||
]))
|
||||
|
||||
def forward(self, x, mask = None, *_, **__):
|
||||
for attn, ff in self.layers:
|
||||
x = attn(x, mask = mask)
|
||||
x = ff(x)
|
||||
return x
|
||||
out = self.to_out(out)
|
||||
if return_attn_weights:
|
||||
return out, attn
|
||||
else:
|
||||
return out
|
||||
|
||||
|
||||
class TransformerModule(ShapeMixin, nn.Module):
|
||||
|
||||
def __init__(self, in_shape, hidden_size, n_heads, num_layers=1, dropout=None, use_norm=False, activation='gelu'):
|
||||
def __init__(self, in_shape, depth, heads, mlp_dim, dropout=None, use_norm=False, activation='gelu'):
|
||||
super(TransformerModule, self).__init__()
|
||||
|
||||
self.in_shape = in_shape
|
||||
|
||||
self.flat = Flatten(self.in_shape) if isinstance(self.in_shape, (tuple, list)) else F_x(in_shape)
|
||||
|
||||
self.transformer = Transformer(dim=self.flat.flat_shape, depth=num_layers, heads=n_heads,
|
||||
mlp_dim=hidden_size, dropout=dropout)
|
||||
self.layers = nn.ModuleList([])
|
||||
self.embedding_dim = self.flat.flat_shape
|
||||
self.norm = nn.LayerNorm(self.embedding_dim)
|
||||
self.attns = nn.ModuleList([Attention(self.embedding_dim, heads=heads, dropout=dropout) for _ in range(depth)])
|
||||
self.mlps = nn.ModuleList([FeedForward(self.embedding_dim, mlp_dim, dropout=dropout) for _ in range(depth)])
|
||||
|
||||
def forward(self, x, mask=None, key_padding_mask=None):
|
||||
def forward(self, x, mask=None, return_attn_weights=False, **_):
|
||||
tensor = self.flat(x)
|
||||
tensor = self.transformer(tensor, mask, key_padding_mask)
|
||||
return tensor
|
||||
attn_weights = list()
|
||||
|
||||
for attn, mlp in zip(self.attns, self.mlps):
|
||||
# Attention
|
||||
skip_connection = tensor.clone()
|
||||
tensor = self.norm(tensor)
|
||||
if return_attn_weights:
|
||||
tensor, attn_weight = attn(tensor, mask=mask, return_attn_weights=return_attn_weights)
|
||||
attn_weights.append(attn_weight)
|
||||
else:
|
||||
tensor = attn(tensor, mask=mask)
|
||||
tensor = tensor + skip_connection
|
||||
|
||||
# MLP
|
||||
skip_connection = tensor.clone()
|
||||
tensor = self.norm(tensor)
|
||||
tensor = mlp(tensor)
|
||||
tensor = tensor + skip_connection
|
||||
|
||||
return (tensor, attn_weights) if return_attn_weights else tensor
|
||||
|
Reference in New Issue
Block a user