|
|
|
|
|
from typing import Dict, List, Tuple, Union |
|
|
|
import torch |
|
from torch import nn |
|
from torch.cuda.amp.autocast_mode import autocast |
|
from trainer.trainer_utils import get_optimizer, get_scheduler |
|
|
|
from TTS.tts.layers.tacotron.capacitron_layers import CapacitronVAE |
|
from TTS.tts.layers.tacotron.gst_layers import GST |
|
from TTS.tts.layers.tacotron.tacotron import Decoder, Encoder, PostCBHG |
|
from TTS.tts.models.base_tacotron import BaseTacotron |
|
from TTS.tts.utils.measures import alignment_diagonal_score |
|
from TTS.tts.utils.speakers import SpeakerManager |
|
from TTS.tts.utils.text.tokenizer import TTSTokenizer |
|
from TTS.tts.utils.visual import plot_alignment, plot_spectrogram |
|
from TTS.utils.capacitron_optimizer import CapacitronOptimizer |
|
|
|
|
|
class Tacotron(BaseTacotron): |
|
"""Tacotron as in https://arxiv.org/abs/1703.10135 |
|
It's an autoregressive encoder-attention-decoder-postnet architecture. |
|
Check `TacotronConfig` for the arguments. |
|
|
|
Args: |
|
config (TacotronConfig): Configuration for the Tacotron model. |
|
speaker_manager (SpeakerManager): Speaker manager to handle multi-speaker settings. Only use if the model is |
|
a multi-speaker model. Defaults to None. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
config: "TacotronConfig", |
|
ap: "AudioProcessor" = None, |
|
tokenizer: "TTSTokenizer" = None, |
|
speaker_manager: SpeakerManager = None, |
|
): |
|
|
|
super().__init__(config, ap, tokenizer, speaker_manager) |
|
|
|
|
|
|
|
for key in config: |
|
setattr(self, key, config[key]) |
|
|
|
|
|
|
|
|
|
if self.use_speaker_embedding or self.use_d_vector_file: |
|
self.init_multispeaker(config) |
|
self.decoder_in_features += self.embedded_speaker_dim |
|
|
|
if self.use_gst: |
|
self.decoder_in_features += self.gst.gst_embedding_dim |
|
|
|
if self.use_capacitron_vae: |
|
self.decoder_in_features += self.capacitron_vae.capacitron_VAE_embedding_dim |
|
|
|
|
|
self.embedding = nn.Embedding(self.num_chars, 256, padding_idx=0) |
|
self.embedding.weight.data.normal_(0, 0.3) |
|
|
|
|
|
self.encoder = Encoder(self.encoder_in_features) |
|
self.decoder = Decoder( |
|
self.decoder_in_features, |
|
self.decoder_output_dim, |
|
self.r, |
|
self.memory_size, |
|
self.attention_type, |
|
self.windowing, |
|
self.attention_norm, |
|
self.prenet_type, |
|
self.prenet_dropout, |
|
self.use_forward_attn, |
|
self.transition_agent, |
|
self.forward_attn_mask, |
|
self.location_attn, |
|
self.attention_heads, |
|
self.separate_stopnet, |
|
self.max_decoder_steps, |
|
) |
|
self.postnet = PostCBHG(self.decoder_output_dim) |
|
self.last_linear = nn.Linear(self.postnet.cbhg.gru_features * 2, self.out_channels) |
|
|
|
|
|
self.decoder.prenet.dropout_at_inference = self.prenet_dropout_at_inference |
|
|
|
|
|
if self.gst and self.use_gst: |
|
self.gst_layer = GST( |
|
num_mel=self.decoder_output_dim, |
|
num_heads=self.gst.gst_num_heads, |
|
num_style_tokens=self.gst.gst_num_style_tokens, |
|
gst_embedding_dim=self.gst.gst_embedding_dim, |
|
) |
|
|
|
|
|
if self.capacitron_vae and self.use_capacitron_vae: |
|
self.capacitron_vae_layer = CapacitronVAE( |
|
num_mel=self.decoder_output_dim, |
|
encoder_output_dim=self.encoder_in_features, |
|
capacitron_VAE_embedding_dim=self.capacitron_vae.capacitron_VAE_embedding_dim, |
|
speaker_embedding_dim=self.embedded_speaker_dim |
|
if self.use_speaker_embedding and self.capacitron_vae.capacitron_use_speaker_embedding |
|
else None, |
|
text_summary_embedding_dim=self.capacitron_vae.capacitron_text_summary_embedding_dim |
|
if self.capacitron_vae.capacitron_use_text_summary_embeddings |
|
else None, |
|
) |
|
|
|
|
|
if self.bidirectional_decoder: |
|
self._init_backward_decoder() |
|
|
|
if self.double_decoder_consistency: |
|
self.coarse_decoder = Decoder( |
|
self.decoder_in_features, |
|
self.decoder_output_dim, |
|
self.ddc_r, |
|
self.memory_size, |
|
self.attention_type, |
|
self.windowing, |
|
self.attention_norm, |
|
self.prenet_type, |
|
self.prenet_dropout, |
|
self.use_forward_attn, |
|
self.transition_agent, |
|
self.forward_attn_mask, |
|
self.location_attn, |
|
self.attention_heads, |
|
self.separate_stopnet, |
|
self.max_decoder_steps, |
|
) |
|
|
|
def forward( |
|
self, text, text_lengths, mel_specs=None, mel_lengths=None, aux_input={"speaker_ids": None, "d_vectors": None} |
|
): |
|
""" |
|
Shapes: |
|
text: [B, T_in] |
|
text_lengths: [B] |
|
mel_specs: [B, T_out, C] |
|
mel_lengths: [B] |
|
aux_input: 'speaker_ids': [B, 1] and 'd_vectors':[B, C] |
|
""" |
|
aux_input = self._format_aux_input(aux_input) |
|
outputs = {"alignments_backward": None, "decoder_outputs_backward": None} |
|
inputs = self.embedding(text) |
|
input_mask, output_mask = self.compute_masks(text_lengths, mel_lengths) |
|
|
|
encoder_outputs = self.encoder(inputs) |
|
|
|
encoder_outputs = encoder_outputs * input_mask.unsqueeze(2).expand_as(encoder_outputs) |
|
|
|
if self.gst and self.use_gst: |
|
|
|
encoder_outputs = self.compute_gst(encoder_outputs, mel_specs) |
|
|
|
if self.use_speaker_embedding or self.use_d_vector_file: |
|
if not self.use_d_vector_file: |
|
|
|
embedded_speakers = self.speaker_embedding(aux_input["speaker_ids"])[:, None] |
|
else: |
|
|
|
embedded_speakers = torch.unsqueeze(aux_input["d_vectors"], 1) |
|
encoder_outputs = self._concat_speaker_embedding(encoder_outputs, embedded_speakers) |
|
|
|
if self.capacitron_vae and self.use_capacitron_vae: |
|
|
|
encoder_outputs, *capacitron_vae_outputs = self.compute_capacitron_VAE_embedding( |
|
encoder_outputs, |
|
reference_mel_info=[mel_specs, mel_lengths], |
|
text_info=[inputs, text_lengths] |
|
if self.capacitron_vae.capacitron_use_text_summary_embeddings |
|
else None, |
|
speaker_embedding=embedded_speakers if self.capacitron_vae.capacitron_use_speaker_embedding else None, |
|
) |
|
else: |
|
capacitron_vae_outputs = None |
|
|
|
|
|
|
|
decoder_outputs, alignments, stop_tokens = self.decoder(encoder_outputs, mel_specs, input_mask) |
|
|
|
if output_mask is not None: |
|
decoder_outputs = decoder_outputs * output_mask.unsqueeze(1).expand_as(decoder_outputs) |
|
|
|
postnet_outputs = self.postnet(decoder_outputs) |
|
|
|
if output_mask is not None: |
|
postnet_outputs = postnet_outputs * output_mask.unsqueeze(2).expand_as(postnet_outputs) |
|
|
|
postnet_outputs = self.last_linear(postnet_outputs) |
|
|
|
decoder_outputs = decoder_outputs.transpose(1, 2).contiguous() |
|
if self.bidirectional_decoder: |
|
decoder_outputs_backward, alignments_backward = self._backward_pass(mel_specs, encoder_outputs, input_mask) |
|
outputs["alignments_backward"] = alignments_backward |
|
outputs["decoder_outputs_backward"] = decoder_outputs_backward |
|
if self.double_decoder_consistency: |
|
decoder_outputs_backward, alignments_backward = self._coarse_decoder_pass( |
|
mel_specs, encoder_outputs, alignments, input_mask |
|
) |
|
outputs["alignments_backward"] = alignments_backward |
|
outputs["decoder_outputs_backward"] = decoder_outputs_backward |
|
outputs.update( |
|
{ |
|
"model_outputs": postnet_outputs, |
|
"decoder_outputs": decoder_outputs, |
|
"alignments": alignments, |
|
"stop_tokens": stop_tokens, |
|
"capacitron_vae_outputs": capacitron_vae_outputs, |
|
} |
|
) |
|
return outputs |
|
|
|
@torch.no_grad() |
|
def inference(self, text_input, aux_input=None): |
|
aux_input = self._format_aux_input(aux_input) |
|
inputs = self.embedding(text_input) |
|
encoder_outputs = self.encoder(inputs) |
|
if self.gst and self.use_gst: |
|
|
|
encoder_outputs = self.compute_gst(encoder_outputs, aux_input["style_mel"], aux_input["d_vectors"]) |
|
if self.capacitron_vae and self.use_capacitron_vae: |
|
if aux_input["style_text"] is not None: |
|
style_text_embedding = self.embedding(aux_input["style_text"]) |
|
style_text_length = torch.tensor([style_text_embedding.size(1)], dtype=torch.int64).to( |
|
encoder_outputs.device |
|
) |
|
reference_mel_length = ( |
|
torch.tensor([aux_input["style_mel"].size(1)], dtype=torch.int64).to(encoder_outputs.device) |
|
if aux_input["style_mel"] is not None |
|
else None |
|
) |
|
|
|
encoder_outputs, *_ = self.compute_capacitron_VAE_embedding( |
|
encoder_outputs, |
|
reference_mel_info=[aux_input["style_mel"], reference_mel_length] |
|
if aux_input["style_mel"] is not None |
|
else None, |
|
text_info=[style_text_embedding, style_text_length] if aux_input["style_text"] is not None else None, |
|
speaker_embedding=aux_input["d_vectors"] |
|
if self.capacitron_vae.capacitron_use_speaker_embedding |
|
else None, |
|
) |
|
if self.num_speakers > 1: |
|
if not self.use_d_vector_file: |
|
|
|
embedded_speakers = self.speaker_embedding(aux_input["speaker_ids"]) |
|
|
|
if embedded_speakers.ndim == 1: |
|
embedded_speakers = embedded_speakers[None, None, :] |
|
elif embedded_speakers.ndim == 2: |
|
embedded_speakers = embedded_speakers[None, :] |
|
else: |
|
|
|
embedded_speakers = torch.unsqueeze(aux_input["d_vectors"], 1) |
|
encoder_outputs = self._concat_speaker_embedding(encoder_outputs, embedded_speakers) |
|
decoder_outputs, alignments, stop_tokens = self.decoder.inference(encoder_outputs) |
|
postnet_outputs = self.postnet(decoder_outputs) |
|
postnet_outputs = self.last_linear(postnet_outputs) |
|
decoder_outputs = decoder_outputs.transpose(1, 2) |
|
outputs = { |
|
"model_outputs": postnet_outputs, |
|
"decoder_outputs": decoder_outputs, |
|
"alignments": alignments, |
|
"stop_tokens": stop_tokens, |
|
} |
|
return outputs |
|
|
|
def before_backward_pass(self, loss_dict, optimizer) -> None: |
|
|
|
|
|
if self.use_capacitron_vae: |
|
loss_dict["capacitron_vae_beta_loss"].backward() |
|
optimizer.first_step() |
|
|
|
def train_step(self, batch: Dict, criterion: torch.nn.Module) -> Tuple[Dict, Dict]: |
|
"""Perform a single training step by fetching the right set of samples from the batch. |
|
|
|
Args: |
|
batch ([Dict]): A dictionary of input tensors. |
|
criterion ([torch.nn.Module]): Callable criterion to compute model loss. |
|
""" |
|
text_input = batch["text_input"] |
|
text_lengths = batch["text_lengths"] |
|
mel_input = batch["mel_input"] |
|
mel_lengths = batch["mel_lengths"] |
|
linear_input = batch["linear_input"] |
|
stop_targets = batch["stop_targets"] |
|
stop_target_lengths = batch["stop_target_lengths"] |
|
speaker_ids = batch["speaker_ids"] |
|
d_vectors = batch["d_vectors"] |
|
|
|
aux_input = {"speaker_ids": speaker_ids, "d_vectors": d_vectors} |
|
outputs = self.forward(text_input, text_lengths, mel_input, mel_lengths, aux_input) |
|
|
|
|
|
if mel_lengths.max() % self.decoder.r != 0: |
|
alignment_lengths = ( |
|
mel_lengths + (self.decoder.r - (mel_lengths.max() % self.decoder.r)) |
|
) // self.decoder.r |
|
else: |
|
alignment_lengths = mel_lengths // self.decoder.r |
|
|
|
|
|
with autocast(enabled=False): |
|
loss_dict = criterion( |
|
outputs["model_outputs"].float(), |
|
outputs["decoder_outputs"].float(), |
|
mel_input.float(), |
|
linear_input.float(), |
|
outputs["stop_tokens"].float(), |
|
stop_targets.float(), |
|
stop_target_lengths, |
|
outputs["capacitron_vae_outputs"] if self.capacitron_vae else None, |
|
mel_lengths, |
|
None if outputs["decoder_outputs_backward"] is None else outputs["decoder_outputs_backward"].float(), |
|
outputs["alignments"].float(), |
|
alignment_lengths, |
|
None if outputs["alignments_backward"] is None else outputs["alignments_backward"].float(), |
|
text_lengths, |
|
) |
|
|
|
|
|
align_error = 1 - alignment_diagonal_score(outputs["alignments"]) |
|
loss_dict["align_error"] = align_error |
|
return outputs, loss_dict |
|
|
|
def get_optimizer(self) -> List: |
|
if self.use_capacitron_vae: |
|
return CapacitronOptimizer(self.config, self.named_parameters()) |
|
return get_optimizer(self.config.optimizer, self.config.optimizer_params, self.config.lr, self) |
|
|
|
def get_scheduler(self, optimizer: object): |
|
opt = optimizer.primary_optimizer if self.use_capacitron_vae else optimizer |
|
return get_scheduler(self.config.lr_scheduler, self.config.lr_scheduler_params, opt) |
|
|
|
def before_gradient_clipping(self): |
|
if self.use_capacitron_vae: |
|
|
|
model_params_to_clip = [] |
|
for name, param in self.named_parameters(): |
|
if param.requires_grad: |
|
if name != "capacitron_vae_layer.beta": |
|
model_params_to_clip.append(param) |
|
torch.nn.utils.clip_grad_norm_(model_params_to_clip, self.capacitron_vae.capacitron_grad_clip) |
|
|
|
def _create_logs(self, batch, outputs, ap): |
|
postnet_outputs = outputs["model_outputs"] |
|
decoder_outputs = outputs["decoder_outputs"] |
|
alignments = outputs["alignments"] |
|
alignments_backward = outputs["alignments_backward"] |
|
mel_input = batch["mel_input"] |
|
linear_input = batch["linear_input"] |
|
|
|
pred_linear_spec = postnet_outputs[0].data.cpu().numpy() |
|
pred_mel_spec = decoder_outputs[0].data.cpu().numpy() |
|
gt_linear_spec = linear_input[0].data.cpu().numpy() |
|
gt_mel_spec = mel_input[0].data.cpu().numpy() |
|
align_img = alignments[0].data.cpu().numpy() |
|
|
|
figures = { |
|
"pred_linear_spec": plot_spectrogram(pred_linear_spec, ap, output_fig=False), |
|
"real_linear_spec": plot_spectrogram(gt_linear_spec, ap, output_fig=False), |
|
"pred_mel_spec": plot_spectrogram(pred_mel_spec, ap, output_fig=False), |
|
"real_mel_spec": plot_spectrogram(gt_mel_spec, ap, output_fig=False), |
|
"alignment": plot_alignment(align_img, output_fig=False), |
|
} |
|
|
|
if self.bidirectional_decoder or self.double_decoder_consistency: |
|
figures["alignment_backward"] = plot_alignment(alignments_backward[0].data.cpu().numpy(), output_fig=False) |
|
|
|
|
|
audio = ap.inv_spectrogram(pred_linear_spec.T) |
|
return figures, {"audio": audio} |
|
|
|
def train_log( |
|
self, batch: dict, outputs: dict, logger: "Logger", assets: dict, steps: int |
|
) -> None: |
|
figures, audios = self._create_logs(batch, outputs, self.ap) |
|
logger.train_figures(steps, figures) |
|
logger.train_audios(steps, audios, self.ap.sample_rate) |
|
|
|
def eval_step(self, batch: dict, criterion: nn.Module): |
|
return self.train_step(batch, criterion) |
|
|
|
def eval_log(self, batch: dict, outputs: dict, logger: "Logger", assets: dict, steps: int) -> None: |
|
figures, audios = self._create_logs(batch, outputs, self.ap) |
|
logger.eval_figures(steps, figures) |
|
logger.eval_audios(steps, audios, self.ap.sample_rate) |
|
|
|
@staticmethod |
|
def init_from_config(config: "TacotronConfig", samples: Union[List[List], List[Dict]] = None): |
|
"""Initiate model from config |
|
|
|
Args: |
|
config (TacotronConfig): Model config. |
|
samples (Union[List[List], List[Dict]]): Training samples to parse speaker ids for training. |
|
Defaults to None. |
|
""" |
|
from TTS.utils.audio import AudioProcessor |
|
|
|
ap = AudioProcessor.init_from_config(config) |
|
tokenizer, new_config = TTSTokenizer.init_from_config(config) |
|
speaker_manager = SpeakerManager.init_from_config(config, samples) |
|
return Tacotron(new_config, ap, tokenizer, speaker_manager) |
|
|