import torch.nn as nn import torch.nn.functional as F import torch import math class Attention(nn.Module): """ Compute 'Scaled Dot Product Attention """ def __init__(self): super().__init__() def forward(self, query, key, value, mask=None, dropout=None): d_k = query.size(-1) scores = torch.matmul(query, key.transpose(-2, -1)) \ / math.sqrt(d_k) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) p_attn = F.softmax(scores, dim=-1) if dropout is not None: p_attn = dropout(p_attn) return torch.matmul(p_attn, value), p_attn class MultiHeadedAttention(nn.Module): """ Take in model size and number of heads. """ def __init__(self, h, d_model, dropout=0.1): super().__init__() assert d_model % h == 0 # We assume d_v always equals d_k self.d_k = d_model // h self.h = h self.linear_layers = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range(3)]) self.output_linear = nn.Linear(d_model, d_model) self.attention = Attention() self.dropout = nn.Dropout(p=dropout) def forward(self, query, key, value, mask=None): # if mask is not None: # # Same mask applied to all h heads. # mask = mask.unsqueeze(1) nbatches = query.size(0) # 1) Do all the linear projections in batch from d_model => h x d_k query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linear_layers, (query, key, value))] # 2) Apply attention on all the projected vectors in batch. x, attn = self.attention(query, key, value, mask=mask, dropout=self.dropout) # torch.Size([64, 8, 100, 100]) # print("Attention", attn.shape) # 3) "Concat" using a view and apply a final linear. x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k) return self.output_linear(x)