File size: 19,353 Bytes
6127b48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
import torch
from scipy.stats import betabinom
from torch import nn
from torch.nn import functional as F
from TTS.tts.layers.tacotron.common_layers import Linear
class LocationLayer(nn.Module):
"""Layers for Location Sensitive Attention
Args:
attention_dim (int): number of channels in the input tensor.
attention_n_filters (int, optional): number of filters in convolution. Defaults to 32.
attention_kernel_size (int, optional): kernel size of convolution filter. Defaults to 31.
"""
def __init__(self, attention_dim, attention_n_filters=32, attention_kernel_size=31):
super().__init__()
self.location_conv1d = nn.Conv1d(
in_channels=2,
out_channels=attention_n_filters,
kernel_size=attention_kernel_size,
stride=1,
padding=(attention_kernel_size - 1) // 2,
bias=False,
)
self.location_dense = Linear(attention_n_filters, attention_dim, bias=False, init_gain="tanh")
def forward(self, attention_cat):
"""
Shapes:
attention_cat: [B, 2, C]
"""
processed_attention = self.location_conv1d(attention_cat)
processed_attention = self.location_dense(processed_attention.transpose(1, 2))
return processed_attention
class GravesAttention(nn.Module):
"""Graves Attention as is ref1 with updates from ref2.
ref1: https://arxiv.org/abs/1910.10288
ref2: https://arxiv.org/pdf/1906.01083.pdf
Args:
query_dim (int): number of channels in query tensor.
K (int): number of Gaussian heads to be used for computing attention.
"""
COEF = 0.3989422917366028 # numpy.sqrt(1/(2*numpy.pi))
def __init__(self, query_dim, K):
super().__init__()
self._mask_value = 1e-8
self.K = K
# self.attention_alignment = 0.05
self.eps = 1e-5
self.J = None
self.N_a = nn.Sequential(
nn.Linear(query_dim, query_dim, bias=True), nn.ReLU(), nn.Linear(query_dim, 3 * K, bias=True)
)
self.attention_weights = None
self.mu_prev = None
self.init_layers()
def init_layers(self):
torch.nn.init.constant_(self.N_a[2].bias[(2 * self.K) : (3 * self.K)], 1.0) # bias mean
torch.nn.init.constant_(self.N_a[2].bias[self.K : (2 * self.K)], 10) # bias std
def init_states(self, inputs):
if self.J is None or inputs.shape[1] + 1 > self.J.shape[-1]:
self.J = torch.arange(0, inputs.shape[1] + 2.0).to(inputs.device) + 0.5
self.attention_weights = torch.zeros(inputs.shape[0], inputs.shape[1]).to(inputs.device)
self.mu_prev = torch.zeros(inputs.shape[0], self.K).to(inputs.device)
# pylint: disable=R0201
# pylint: disable=unused-argument
def preprocess_inputs(self, inputs):
return None
def forward(self, query, inputs, processed_inputs, mask):
"""
Shapes:
query: [B, C_attention_rnn]
inputs: [B, T_in, C_encoder]
processed_inputs: place_holder
mask: [B, T_in]
"""
gbk_t = self.N_a(query)
gbk_t = gbk_t.view(gbk_t.size(0), -1, self.K)
# attention model parameters
# each B x K
g_t = gbk_t[:, 0, :]
b_t = gbk_t[:, 1, :]
k_t = gbk_t[:, 2, :]
# dropout to decorrelate attention heads
g_t = torch.nn.functional.dropout(g_t, p=0.5, training=self.training)
# attention GMM parameters
sig_t = torch.nn.functional.softplus(b_t) + self.eps
mu_t = self.mu_prev + torch.nn.functional.softplus(k_t)
g_t = torch.softmax(g_t, dim=-1) + self.eps
j = self.J[: inputs.size(1) + 1]
# attention weights
phi_t = g_t.unsqueeze(-1) * (1 / (1 + torch.sigmoid((mu_t.unsqueeze(-1) - j) / sig_t.unsqueeze(-1))))
# discritize attention weights
alpha_t = torch.sum(phi_t, 1)
alpha_t = alpha_t[:, 1:] - alpha_t[:, :-1]
alpha_t[alpha_t == 0] = 1e-8
# apply masking
if mask is not None:
alpha_t.data.masked_fill_(~mask, self._mask_value)
context = torch.bmm(alpha_t.unsqueeze(1), inputs).squeeze(1)
self.attention_weights = alpha_t
self.mu_prev = mu_t
return context
class OriginalAttention(nn.Module):
"""Bahdanau Attention with various optional modifications.
- Location sensitive attnetion: https://arxiv.org/abs/1712.05884
- Forward Attention: https://arxiv.org/abs/1807.06736 + state masking at inference
- Using sigmoid instead of softmax normalization
- Attention windowing at inference time
Note:
Location Sensitive Attention extends the additive attention mechanism
to use cumulative attention weights from previous decoder time steps with the current time step features.
Forward attention computes most probable monotonic alignment. The modified attention probabilities at each
timestep are computed recursively by the forward algorithm.
Transition agent in the forward attention explicitly gates the attention mechanism whether to move forward or
stay at each decoder timestep.
Attention windowing is a inductive prior that prevents the model from attending to previous and future timesteps
beyond a certain window.
Args:
query_dim (int): number of channels in the query tensor.
embedding_dim (int): number of channels in the vakue tensor. In general, the value tensor is the output of the encoder layer.
attention_dim (int): number of channels of the inner attention layers.
location_attention (bool): enable/disable location sensitive attention.
attention_location_n_filters (int): number of location attention filters.
attention_location_kernel_size (int): filter size of location attention convolution layer.
windowing (int): window size for attention windowing. if it is 5, for computing the attention, it only considers the time steps [(t-5), ..., (t+5)] of the input.
norm (str): normalization method applied to the attention weights. 'softmax' or 'sigmoid'
forward_attn (bool): enable/disable forward attention.
trans_agent (bool): enable/disable transition agent in the forward attention.
forward_attn_mask (int): enable/disable an explicit masking in forward attention. It is useful to set at especially inference time.
"""
# Pylint gets confused by PyTorch conventions here
# pylint: disable=attribute-defined-outside-init
def __init__(
self,
query_dim,
embedding_dim,
attention_dim,
location_attention,
attention_location_n_filters,
attention_location_kernel_size,
windowing,
norm,
forward_attn,
trans_agent,
forward_attn_mask,
):
super().__init__()
self.query_layer = Linear(query_dim, attention_dim, bias=False, init_gain="tanh")
self.inputs_layer = Linear(embedding_dim, attention_dim, bias=False, init_gain="tanh")
self.v = Linear(attention_dim, 1, bias=True)
if trans_agent:
self.ta = nn.Linear(query_dim + embedding_dim, 1, bias=True)
if location_attention:
self.location_layer = LocationLayer(
attention_dim,
attention_location_n_filters,
attention_location_kernel_size,
)
self._mask_value = -float("inf")
self.windowing = windowing
self.win_idx = None
self.norm = norm
self.forward_attn = forward_attn
self.trans_agent = trans_agent
self.forward_attn_mask = forward_attn_mask
self.location_attention = location_attention
def init_win_idx(self):
self.win_idx = -1
self.win_back = 2
self.win_front = 6
def init_forward_attn(self, inputs):
B = inputs.shape[0]
T = inputs.shape[1]
self.alpha = torch.cat([torch.ones([B, 1]), torch.zeros([B, T])[:, :-1] + 1e-7], dim=1).to(inputs.device)
self.u = (0.5 * torch.ones([B, 1])).to(inputs.device)
def init_location_attention(self, inputs):
B = inputs.size(0)
T = inputs.size(1)
self.attention_weights_cum = torch.zeros([B, T], device=inputs.device)
def init_states(self, inputs):
B = inputs.size(0)
T = inputs.size(1)
self.attention_weights = torch.zeros([B, T], device=inputs.device)
if self.location_attention:
self.init_location_attention(inputs)
if self.forward_attn:
self.init_forward_attn(inputs)
if self.windowing:
self.init_win_idx()
def preprocess_inputs(self, inputs):
return self.inputs_layer(inputs)
def update_location_attention(self, alignments):
self.attention_weights_cum += alignments
def get_location_attention(self, query, processed_inputs):
attention_cat = torch.cat((self.attention_weights.unsqueeze(1), self.attention_weights_cum.unsqueeze(1)), dim=1)
processed_query = self.query_layer(query.unsqueeze(1))
processed_attention_weights = self.location_layer(attention_cat)
energies = self.v(torch.tanh(processed_query + processed_attention_weights + processed_inputs))
energies = energies.squeeze(-1)
return energies, processed_query
def get_attention(self, query, processed_inputs):
processed_query = self.query_layer(query.unsqueeze(1))
energies = self.v(torch.tanh(processed_query + processed_inputs))
energies = energies.squeeze(-1)
return energies, processed_query
def apply_windowing(self, attention, inputs):
back_win = self.win_idx - self.win_back
front_win = self.win_idx + self.win_front
if back_win > 0:
attention[:, :back_win] = -float("inf")
if front_win < inputs.shape[1]:
attention[:, front_win:] = -float("inf")
# this is a trick to solve a special problem.
# but it does not hurt.
if self.win_idx == -1:
attention[:, 0] = attention.max()
# Update the window
self.win_idx = torch.argmax(attention, 1).long()[0].item()
return attention
def apply_forward_attention(self, alignment):
# forward attention
fwd_shifted_alpha = F.pad(self.alpha[:, :-1].clone().to(alignment.device), (1, 0, 0, 0))
# compute transition potentials
alpha = ((1 - self.u) * self.alpha + self.u * fwd_shifted_alpha + 1e-8) * alignment
# force incremental alignment
if not self.training and self.forward_attn_mask:
_, n = fwd_shifted_alpha.max(1)
val, _ = alpha.max(1)
for b in range(alignment.shape[0]):
alpha[b, n[b] + 3 :] = 0
alpha[b, : (n[b] - 1)] = 0 # ignore all previous states to prevent repetition.
alpha[b, (n[b] - 2)] = 0.01 * val[b] # smoothing factor for the prev step
# renormalize attention weights
alpha = alpha / alpha.sum(dim=1, keepdim=True)
return alpha
def forward(self, query, inputs, processed_inputs, mask):
"""
shapes:
query: [B, C_attn_rnn]
inputs: [B, T_en, D_en]
processed_inputs: [B, T_en, D_attn]
mask: [B, T_en]
"""
if self.location_attention:
attention, _ = self.get_location_attention(query, processed_inputs)
else:
attention, _ = self.get_attention(query, processed_inputs)
# apply masking
if mask is not None:
attention.data.masked_fill_(~mask, self._mask_value)
# apply windowing - only in eval mode
if not self.training and self.windowing:
attention = self.apply_windowing(attention, inputs)
# normalize attention values
if self.norm == "softmax":
alignment = torch.softmax(attention, dim=-1)
elif self.norm == "sigmoid":
alignment = torch.sigmoid(attention) / torch.sigmoid(attention).sum(dim=1, keepdim=True)
else:
raise ValueError("Unknown value for attention norm type")
if self.location_attention:
self.update_location_attention(alignment)
# apply forward attention if enabled
if self.forward_attn:
alignment = self.apply_forward_attention(alignment)
self.alpha = alignment
context = torch.bmm(alignment.unsqueeze(1), inputs)
context = context.squeeze(1)
self.attention_weights = alignment
# compute transition agent
if self.forward_attn and self.trans_agent:
ta_input = torch.cat([context, query.squeeze(1)], dim=-1)
self.u = torch.sigmoid(self.ta(ta_input))
return context
class MonotonicDynamicConvolutionAttention(nn.Module):
"""Dynamic convolution attention from
https://arxiv.org/pdf/1910.10288.pdf
query -> linear -> tanh -> linear ->|
| mask values
v | |
atten_w(t-1) -|-> conv1d_dynamic -> linear -|-> tanh -> + -> softmax -> * -> * -> context
|-> conv1d_static -> linear -| |
|-> conv1d_prior -> log ----------------|
query: attention rnn output.
Note:
Dynamic convolution attention is an alternation of the location senstive attention with
dynamically computed convolution filters from the previous attention scores and a set of
constraints to keep the attention alignment diagonal.
DCA is sensitive to mixed precision training and might cause instable training.
Args:
query_dim (int): number of channels in the query tensor.
embedding_dim (int): number of channels in the value tensor.
static_filter_dim (int): number of channels in the convolution layer computing the static filters.
static_kernel_size (int): kernel size for the convolution layer computing the static filters.
dynamic_filter_dim (int): number of channels in the convolution layer computing the dynamic filters.
dynamic_kernel_size (int): kernel size for the convolution layer computing the dynamic filters.
prior_filter_len (int, optional): [description]. Defaults to 11 from the paper.
alpha (float, optional): [description]. Defaults to 0.1 from the paper.
beta (float, optional): [description]. Defaults to 0.9 from the paper.
"""
def __init__(
self,
query_dim,
embedding_dim, # pylint: disable=unused-argument
attention_dim,
static_filter_dim,
static_kernel_size,
dynamic_filter_dim,
dynamic_kernel_size,
prior_filter_len=11,
alpha=0.1,
beta=0.9,
):
super().__init__()
self._mask_value = 1e-8
self.dynamic_filter_dim = dynamic_filter_dim
self.dynamic_kernel_size = dynamic_kernel_size
self.prior_filter_len = prior_filter_len
self.attention_weights = None
# setup key and query layers
self.query_layer = nn.Linear(query_dim, attention_dim)
self.key_layer = nn.Linear(attention_dim, dynamic_filter_dim * dynamic_kernel_size, bias=False)
self.static_filter_conv = nn.Conv1d(
1,
static_filter_dim,
static_kernel_size,
padding=(static_kernel_size - 1) // 2,
bias=False,
)
self.static_filter_layer = nn.Linear(static_filter_dim, attention_dim, bias=False)
self.dynamic_filter_layer = nn.Linear(dynamic_filter_dim, attention_dim)
self.v = nn.Linear(attention_dim, 1, bias=False)
prior = betabinom.pmf(range(prior_filter_len), prior_filter_len - 1, alpha, beta)
self.register_buffer("prior", torch.FloatTensor(prior).flip(0))
# pylint: disable=unused-argument
def forward(self, query, inputs, processed_inputs, mask):
"""
query: [B, C_attn_rnn]
inputs: [B, T_en, D_en]
processed_inputs: place holder.
mask: [B, T_en]
"""
# compute prior filters
prior_filter = F.conv1d(
F.pad(self.attention_weights.unsqueeze(1), (self.prior_filter_len - 1, 0)), self.prior.view(1, 1, -1)
)
prior_filter = torch.log(prior_filter.clamp_min_(1e-6)).squeeze(1)
G = self.key_layer(torch.tanh(self.query_layer(query)))
# compute dynamic filters
dynamic_filter = F.conv1d(
self.attention_weights.unsqueeze(0),
G.view(-1, 1, self.dynamic_kernel_size),
padding=(self.dynamic_kernel_size - 1) // 2,
groups=query.size(0),
)
dynamic_filter = dynamic_filter.view(query.size(0), self.dynamic_filter_dim, -1).transpose(1, 2)
# compute static filters
static_filter = self.static_filter_conv(self.attention_weights.unsqueeze(1)).transpose(1, 2)
alignment = (
self.v(
torch.tanh(self.static_filter_layer(static_filter) + self.dynamic_filter_layer(dynamic_filter))
).squeeze(-1)
+ prior_filter
)
# compute attention weights
attention_weights = F.softmax(alignment, dim=-1)
# apply masking
if mask is not None:
attention_weights.data.masked_fill_(~mask, self._mask_value)
self.attention_weights = attention_weights
# compute context
context = torch.bmm(attention_weights.unsqueeze(1), inputs).squeeze(1)
return context
def preprocess_inputs(self, inputs): # pylint: disable=no-self-use
return None
def init_states(self, inputs):
B = inputs.size(0)
T = inputs.size(1)
self.attention_weights = torch.zeros([B, T], device=inputs.device)
self.attention_weights[:, 0] = 1.0
def init_attn(
attn_type,
query_dim,
embedding_dim,
attention_dim,
location_attention,
attention_location_n_filters,
attention_location_kernel_size,
windowing,
norm,
forward_attn,
trans_agent,
forward_attn_mask,
attn_K,
):
if attn_type == "original":
return OriginalAttention(
query_dim,
embedding_dim,
attention_dim,
location_attention,
attention_location_n_filters,
attention_location_kernel_size,
windowing,
norm,
forward_attn,
trans_agent,
forward_attn_mask,
)
if attn_type == "graves":
return GravesAttention(query_dim, attn_K)
if attn_type == "dynamic_convolution":
return MonotonicDynamicConvolutionAttention(
query_dim,
embedding_dim,
attention_dim,
static_filter_dim=8,
static_kernel_size=21,
dynamic_filter_dim=8,
dynamic_kernel_size=21,
prior_filter_len=11,
alpha=0.1,
beta=0.9,
)
raise RuntimeError(f" [!] Given Attention Type '{attn_type}' is not exist.")
|