Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2023 The Suno AI Authors and The HuggingFace Inc. team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" PyTorch BARK model.""" | |
import math | |
from typing import Dict, Optional, Tuple, Union | |
import numpy as np | |
import torch | |
from torch import nn | |
from torch.nn import functional as F | |
from ...generation.logits_process import AlternatingCodebooksLogitsProcessor, SuppressTokensLogitsProcessor | |
from ...modeling_outputs import CausalLMOutputWithPast, MaskedLMOutput | |
from ...modeling_utils import PreTrainedModel, get_parameter_device | |
from ...utils import ( | |
add_start_docstrings, | |
add_start_docstrings_to_model_forward, | |
is_accelerate_available, | |
logging, | |
) | |
from ..auto import AutoModel | |
from .configuration_bark import ( | |
BarkCoarseConfig, | |
BarkConfig, | |
BarkFineConfig, | |
BarkSemanticConfig, | |
BarkSubModelConfig, | |
) | |
from .generation_configuration_bark import ( | |
BarkCoarseGenerationConfig, | |
BarkFineGenerationConfig, | |
BarkSemanticGenerationConfig, | |
) | |
logger = logging.get_logger(__name__) | |
_CHECKPOINT_FOR_DOC = "suno/bark-small" | |
_CONFIG_FOR_DOC = "BarkConfig" | |
BARK_PRETRAINED_MODEL_ARCHIVE_LIST = [ | |
"suno/bark-small", | |
"suno/bark", | |
# See all Bark models at https://huggingface.co/models?filter=bark | |
] | |
class BarkSelfAttention(nn.Module): | |
# adapted from GPTNeoSelfAttention and Bark code | |
# BarkSelfAttention can have two attention type, i.e full attention or causal attention | |
def __init__(self, config, is_causal=False): | |
super().__init__() | |
# regularization | |
self.dropout = config.dropout | |
self.attn_dropout = nn.Dropout(config.dropout) | |
self.resid_dropout = nn.Dropout(config.dropout) | |
self.embed_dim = config.hidden_size | |
self.num_heads = config.num_heads | |
self.head_dim = self.embed_dim // self.num_heads | |
if config.hidden_size % config.num_heads != 0: | |
raise ValueError( | |
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" | |
f" {self.num_heads})." | |
) | |
# key, query, value projections for all heads, but in a batch | |
self.att_proj = nn.Linear(config.hidden_size, 3 * config.hidden_size, bias=config.bias) | |
# output projection | |
self.out_proj = nn.Linear(config.hidden_size, config.hidden_size, bias=config.bias) | |
self.is_causal = is_causal | |
if is_causal: | |
block_size = config.block_size | |
bias = torch.tril(torch.ones((block_size, block_size), dtype=bool)).view(1, 1, block_size, block_size) | |
self.register_buffer("bias", bias) | |
# Copied from transformers.models.gpt_neo.modeling_gpt_neo.GPTNeoSelfAttention._split_heads | |
def _split_heads(self, tensor, num_heads, attn_head_size): | |
""" | |
Splits hidden_size dim into attn_head_size and num_heads | |
""" | |
new_shape = tensor.size()[:-1] + (num_heads, attn_head_size) | |
tensor = tensor.view(new_shape) | |
return tensor.permute(0, 2, 1, 3) # (batch, head, seq_length, head_features) | |
def _merge_heads(self, tensor, num_heads, attn_head_size): | |
""" | |
Merges attn_head_size dim and num_attn_heads dim into hidden_size | |
""" | |
# re-assemble all head outputs side by side | |
# (batch, num_heads, seq_len, attn_head_size) -> (batch, seq_len, num_heads*attn_head_size) | |
tensor = tensor.transpose(1, 2).contiguous() | |
tensor = tensor.view(tensor.size()[:-2] + (num_heads * attn_head_size,)) | |
return tensor | |
def _attn(self, query, key, value, attention_mask=None, head_mask=None): | |
# unlike GPTNeo's SelfAttention, divide by the square root of the dimension of the query and the key | |
attn_weights = torch.matmul(query, key.transpose(-1, -2)) * (1.0 / math.sqrt(self.head_dim)) | |
if self.is_causal: | |
query_length, key_length = query.size(-2), key.size(-2) | |
# fill the upper left part of the attention weights with inf | |
attn_weights = attn_weights.masked_fill( | |
self.bias[:, :, key_length - query_length : key_length, :key_length] == 0, | |
torch.finfo(attn_weights.dtype).min, | |
) | |
if attention_mask is not None: | |
# Apply the attention mask | |
attn_weights = attn_weights + attention_mask | |
attn_weights = nn.functional.softmax(attn_weights, dim=-1) | |
attn_weights = attn_weights.to(value.dtype) | |
attn_weights = self.attn_dropout(attn_weights) | |
# Mask heads if we want to | |
if head_mask is not None: | |
attn_weights = attn_weights * head_mask | |
# (batch, num_heads, seq_len, seq_len) x (batch, num_heads, seq_len, attn_head_size) | |
# -> (batch, num_heads, seq_len, attn_head_size) | |
attn_output = torch.matmul(attn_weights, value) | |
return attn_output, attn_weights | |
def forward( | |
self, | |
hidden_states, | |
attention_mask=None, | |
past_key_values=None, | |
head_mask=None, | |
use_cache=False, | |
output_attentions=False, | |
): | |
# calculate query, key, values for all heads in batch and move head forward to be the batch dim | |
query, key, value = self.att_proj(hidden_states).split(self.embed_dim, dim=2) | |
query = self._split_heads(query, self.num_heads, self.head_dim) | |
key = self._split_heads(key, self.num_heads, self.head_dim) | |
value = self._split_heads(value, self.num_heads, self.head_dim) | |
if past_key_values is not None: | |
past_key = past_key_values[0] | |
past_value = past_key_values[1] | |
key = torch.cat((past_key, key), dim=-2) | |
value = torch.cat((past_value, value), dim=-2) | |
if use_cache is True: | |
present = (key, value) | |
else: | |
present = None | |
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask) | |
attn_output = self._merge_heads(attn_output, self.num_heads, self.head_dim) | |
attn_output = self.out_proj(attn_output) | |
attn_output = self.resid_dropout(attn_output) | |
outputs = (attn_output, present) | |
if output_attentions: | |
outputs += (attn_weights,) | |
return outputs | |
class BarkLayerNorm(nn.Module): | |
"""LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False.""" | |
def __init__(self, hidden_size, bias=True): | |
super().__init__() | |
self.weight = nn.Parameter(torch.ones(hidden_size)) | |
self.bias = nn.Parameter(torch.zeros(hidden_size)) if bias else None | |
def forward(self, input): | |
return F.layer_norm(input, self.weight.shape, self.weight, self.bias, eps=1e-5) | |
class BarkMLP(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.in_proj = nn.Linear(config.hidden_size, 4 * config.hidden_size, bias=config.bias) | |
self.out_proj = nn.Linear(4 * config.hidden_size, config.hidden_size, bias=config.bias) | |
self.dropout = nn.Dropout(config.dropout) | |
self.gelu = nn.GELU() | |
def forward(self, hidden_states): | |
hidden_states = self.in_proj(hidden_states) | |
hidden_states = self.gelu(hidden_states) | |
hidden_states = self.out_proj(hidden_states) | |
hidden_states = self.dropout(hidden_states) | |
return hidden_states | |
class BarkBlock(nn.Module): | |
def __init__(self, config, is_causal=False): | |
super().__init__() | |
if is_causal: | |
# if causal, uses handmade LayerNorm, so that the layerNorm bias is optional | |
# this handmade layerNorm is used to stick with Bark choice of leaving optional bias in | |
# AutoRegressive models (corresponding to the "Text" and the "Coarse" modules) | |
self.layernorm_1 = BarkLayerNorm(config.hidden_size, bias=config.bias) | |
self.layernorm_2 = BarkLayerNorm(config.hidden_size, bias=config.bias) | |
else: | |
self.layernorm_1 = nn.LayerNorm(config.hidden_size) | |
self.layernorm_2 = nn.LayerNorm(config.hidden_size) | |
self.attn = BarkSelfAttention(config, is_causal=is_causal) | |
self.mlp = BarkMLP(config) | |
def forward( | |
self, | |
hidden_states, | |
past_key_values=None, | |
attention_mask=None, | |
head_mask=None, | |
use_cache=False, | |
output_attentions=False, | |
): | |
intermediary_hidden_states = self.layernorm_1(hidden_states) | |
attn_outputs = self.attn( | |
intermediary_hidden_states, | |
past_key_values=past_key_values, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
) | |
attn_output = attn_outputs[0] # output_attn: output, present_key_values, (attn_weights) | |
outputs = attn_outputs[1:] | |
intermediary_hidden_states = hidden_states + attn_output | |
intermediary_hidden_states = intermediary_hidden_states + self.mlp( | |
self.layernorm_2(intermediary_hidden_states) | |
) | |
if use_cache: | |
outputs = (intermediary_hidden_states,) + outputs | |
else: | |
outputs = (intermediary_hidden_states,) + outputs[1:] | |
return outputs # hidden_states, ((present), attentions) | |
class BarkPreTrainedModel(PreTrainedModel): | |
""" | |
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained | |
models. | |
""" | |
config_class = BarkConfig | |
supports_gradient_checkpointing = False | |
def _init_weights(self, module): | |
"""Initialize the weights.""" | |
if isinstance(module, (nn.Linear,)): | |
# Slightly different from the TF version which uses truncated_normal for initialization | |
# cf https://github.com/pytorch/pytorch/pull/5617 | |
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) | |
if module.bias is not None: | |
module.bias.data.zero_() | |
elif isinstance(module, nn.Embedding): | |
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) | |
if module.padding_idx is not None: | |
module.weight.data[module.padding_idx].zero_() | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
def __init__(self, *inputs, **kwargs): | |
super().__init__(*inputs, **kwargs) | |
def device(self) -> torch.device: | |
""" | |
`torch.device`: The device on which the module is (assuming that all the module parameters are on the same | |
device). | |
""" | |
# if has _hf_hook, has been offloaded so the device has to be found in the hook | |
if not hasattr(self, "_hf_hook"): | |
return get_parameter_device(self) | |
for module in self.modules(): | |
if ( | |
hasattr(module, "_hf_hook") | |
and hasattr(module._hf_hook, "execution_device") | |
and module._hf_hook.execution_device is not None | |
): | |
return torch.device(module._hf_hook.execution_device) | |
return get_parameter_device(self) | |
def _set_gradient_checkpointing(self, module, value=False): | |
if isinstance(module, BarkCausalModel) or isinstance(module, BarkFineModel) or isinstance(module, BarkModel): | |
module.gradient_checkpointing = value | |
BARK_MODEL_START_DOCSTRING = """ | |
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the | |
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads | |
etc.) | |
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. | |
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage | |
and behavior. | |
Parameters: | |
config ([`{config}`]): | |
Model configuration class with all the parameters of the model. Initializing with a config file does not | |
load the weights associated with the model, only the configuration. Check out the | |
[`~PreTrainedModel.from_pretrained`] method to load the model weights. | |
""" | |
BARK_START_DOCSTRING = r""" | |
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the | |
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads | |
etc.) | |
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. | |
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage | |
and behavior. | |
Parameters: | |
config ([`BarkConfig`]): | |
Model configuration class with all the parameters of the model. Initializing with a config file does not | |
load the weights associated with the model, only the configuration. Check out the | |
[`~PreTrainedModel.from_pretrained`] method to load the model weights. | |
""" | |
BARK_FINE_INPUTS_DOCSTRING = r""" | |
Args: | |
codebook_idx (`int`): | |
Index of the codebook that will be predicted. | |
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length, number_of_codebooks)`): | |
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide | |
it. Initially, indices of the first two codebooks are obtained from the `coarse` sub-model. The rest is | |
predicted recursively by attending the previously predicted channels. The model predicts on windows of | |
length 1024. | |
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
- 1 for tokens that are **not masked**, | |
- 0 for tokens that are **masked**. | |
[What are attention masks?](../glossary#attention-mask) | |
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): | |
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0, | |
config.max_position_embeddings - 1]`. | |
[What are position IDs?](../glossary#position-ids) | |
head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*): | |
Mask to nullify selected heads of the attention modules in the encoder. Mask values selected in `[0, 1]`: | |
- 1 indicates the head is **not masked**, | |
- 0 indicates the head is **masked**. | |
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): NOT IMPLEMENTED YET. | |
input_embeds (`torch.FloatTensor` of shape `(batch_size, input_sequence_length, hidden_size)`, *optional*): | |
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. If | |
`past_key_values` is used, optionally only the last `input_embeds` have to be input (see | |
`past_key_values`). This is useful if you want more control over how to convert `input_ids` indices into | |
associated vectors than the model's internal embedding lookup matrix. | |
output_attentions (`bool`, *optional*): | |
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned | |
tensors for more detail. | |
output_hidden_states (`bool`, *optional*): | |
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for | |
more detail. | |
return_dict (`bool`, *optional*): | |
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. | |
""" | |
BARK_CAUSAL_MODEL_INPUTS_DOCSTRING = r""" | |
Args: | |
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): | |
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide | |
it. Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and | |
[`PreTrainedTokenizer.__call__`] for details. [What are input IDs?](../glossary#input-ids) | |
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache` is passed or when `config.use_cache=True`): | |
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape | |
`(batch_size, num_heads, sequence_length, embed_size_per_head)`. | |
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see | |
`past_key_values` input) to speed up sequential decoding. | |
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that | |
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all | |
`input_ids` of shape `(batch_size, sequence_length)`. | |
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
- 1 for tokens that are **not masked**, | |
- 0 for tokens that are **masked**. | |
[What are attention masks?](../glossary#attention-mask) | |
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): | |
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0, | |
config.max_position_embeddings - 1]`. | |
[What are position IDs?](../glossary#position-ids) | |
head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*): | |
Mask to nullify selected heads of the attention modules in the encoder. Mask values selected in `[0, 1]`: | |
- 1 indicates the head is **not masked**, | |
- 0 indicates the head is **masked**. | |
input_embeds (`torch.FloatTensor` of shape `(batch_size, input_sequence_length, hidden_size)`, *optional*): | |
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. | |
Here, due to `Bark` particularities, if `past_key_values` is used, `input_embeds` will be ignored and you | |
have to use `input_ids`. If `past_key_values` is not used and `use_cache` is set to `True`, `input_embeds` | |
is used in priority instead of `input_ids`. | |
use_cache (`bool`, *optional*): | |
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see | |
`past_key_values`). | |
output_attentions (`bool`, *optional*): | |
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned | |
tensors for more detail. | |
output_hidden_states (`bool`, *optional*): | |
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for | |
more detail. | |
return_dict (`bool`, *optional*): | |
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. | |
""" | |
# GPT2-like autoregressive model | |
class BarkCausalModel(BarkPreTrainedModel): | |
config_class = BarkSubModelConfig | |
def __init__(self, config): | |
super().__init__(config) | |
self.config = config | |
# initialize as an autoregressive GPT-like model | |
self.input_embeds_layer = nn.Embedding(config.input_vocab_size, config.hidden_size) | |
self.position_embeds_layer = nn.Embedding(config.block_size, config.hidden_size) | |
self.drop = nn.Dropout(config.dropout) | |
self.layers = nn.ModuleList([BarkBlock(config, is_causal=True) for _ in range(config.num_layers)]) | |
self.layernorm_final = BarkLayerNorm(config.hidden_size, bias=config.bias) | |
self.lm_head = nn.Linear(config.hidden_size, config.output_vocab_size, bias=False) | |
self.gradient_checkpointing = False | |
# Initialize weights and apply final processing | |
self.post_init() | |
def get_input_embeddings(self): | |
return self.input_embeds_layer | |
def set_input_embeddings(self, new_embeddings): | |
self.input_embeds_layer = new_embeddings | |
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs): | |
input_embeds = kwargs.get("input_embeds", None) | |
attention_mask = kwargs.get("attention_mask", None) | |
position_ids = kwargs.get("position_ids", None) | |
if past_key_values is not None: | |
# only last token for inputs_ids if past is defined in kwargs | |
seq_len = input_ids.shape[1] | |
input_ids = input_ids[:, [-1]] | |
# input_embeds have already been used and is not required anymore | |
input_embeds = None | |
else: | |
if input_embeds is not None and kwargs.get("use_cache"): | |
seq_len = input_embeds.shape[1] | |
else: | |
seq_len = input_ids.shape[1] | |
# ensure that attention_mask and position_ids shapes are aligned with the weird Bark hack of reducing | |
# sequence length on the first forward pass | |
if attention_mask is not None: | |
attention_mask = attention_mask[:, :seq_len] | |
if position_ids is not None: | |
position_ids = position_ids[:, :seq_len] | |
if attention_mask is not None and position_ids is None: | |
# create position_ids on the fly for batch generation | |
position_ids = attention_mask.long().cumsum(-1) - 1 | |
position_ids.masked_fill_(attention_mask == 0, 1) | |
if past_key_values: | |
position_ids = position_ids[:, -1].unsqueeze(-1) | |
else: | |
position_ids = None | |
if input_embeds is not None and kwargs.get("use_cache"): | |
return { | |
"input_ids": None, | |
"input_embeds": input_embeds, | |
"past_key_values": past_key_values, | |
"use_cache": kwargs.get("use_cache"), | |
"position_ids": position_ids, | |
"attention_mask": attention_mask, | |
} | |
return { | |
"input_ids": input_ids, | |
"past_key_values": past_key_values, | |
"use_cache": kwargs.get("use_cache"), | |
"position_ids": position_ids, | |
"attention_mask": attention_mask, | |
} | |
def forward( | |
self, | |
input_ids: Optional[torch.Tensor] = None, | |
past_key_values: Optional[Tuple[torch.FloatTensor]] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.Tensor] = None, | |
head_mask: Optional[torch.Tensor] = None, | |
labels: Optional[torch.LongTensor] = None, | |
input_embeds: Optional[torch.Tensor] = None, | |
use_cache: Optional[bool] = None, | |
output_attentions: Optional[bool] = None, | |
output_hidden_states: Optional[bool] = None, | |
return_dict: Optional[bool] = None, | |
) -> Union[Tuple[torch.Tensor], CausalLMOutputWithPast]: | |
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
output_hidden_states = ( | |
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
) | |
use_cache = use_cache if use_cache is not None else self.config.use_cache | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
# Verify if input_embeds already exists | |
# then compute embeddings. | |
if input_ids is not None and input_embeds is not None: | |
raise ValueError("You cannot specify both input_ids and input_embeds at the same time") | |
elif input_embeds is not None and past_key_values is None: | |
# we want to return the input_embeds in priority so that it is in line with a weird hack | |
# of Bark which concatenate two bits of the input_embeds on the first forward pass of the semantic model | |
pass | |
elif input_ids is not None: | |
input_embeds = self.input_embeds_layer(input_ids) # token embeddings of shape (b, t, n_embd) | |
elif input_embeds is not None: | |
pass | |
else: | |
raise ValueError("You have to specify either input_ids or input_embeds") | |
input_shape = input_embeds.size()[:-1] | |
batch_size = input_embeds.shape[0] | |
seq_length = input_shape[-1] | |
device = input_ids.device if input_ids is not None else input_embeds.device | |
if past_key_values is None: | |
past_length = 0 | |
past_key_values = tuple([None] * len(self.layers)) | |
else: | |
past_length = past_key_values[0][0].size(-2) | |
if position_ids is None: | |
position_ids = torch.arange(past_length, seq_length + past_length, dtype=torch.long, device=device) | |
position_ids = position_ids.unsqueeze(0) # shape (1, seq_length) | |
position_embeds = self.position_embeds_layer(position_ids) # position embeddings of shape (1, t, n_embd) | |
# Attention mask. | |
if attention_mask is not None: | |
if batch_size <= 0: | |
raise ValueError("batch_size has to be defined and > 0") | |
attention_mask = attention_mask.view(batch_size, -1) | |
# We create a 3D attention mask from a 2D tensor mask. | |
# Sizes are [batch_size, 1, 1, to_seq_length] | |
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] | |
# this attention mask is more simple than the triangular masking of causal attention | |
# used in OpenAI GPT, we just need to prepare the broadcast dimension here. | |
attention_mask = attention_mask[:, None, None, :] | |
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for | |
# masked positions, this operation will create a tensor which is 0.0 for | |
# positions we want to attend and the dtype's smallest value for masked positions. | |
# Since we are adding it to the raw scores before the softmax, this is | |
# effectively the same as removing these entirely. | |
attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility | |
attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min | |
# Prepare head mask if needed | |
# 1.0 in head_mask indicate we keep the head | |
# attention_probs has shape bsz x num_heads x N x N | |
# head_mask has shape num_layers x batch x num_heads x N x N | |
head_mask = self.get_head_mask(head_mask, self.config.num_layers) | |
hidden_states = self.drop(input_embeds + position_embeds) | |
output_shape = input_shape + (hidden_states.size(-1),) | |
if self.gradient_checkpointing and self.training: | |
if use_cache: | |
logger.warning_once( | |
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." | |
) | |
use_cache = False | |
present_key_values = () if use_cache else None | |
all_self_attentions = () if output_attentions else None | |
all_hidden_states = () if output_hidden_states else None | |
for i, (block, past_layer_key_values) in enumerate(zip(self.layers, past_key_values)): | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states,) | |
if self.gradient_checkpointing and self.training: | |
def create_custom_forward(module): | |
def custom_forward(*inputs): | |
# None for past_key_value | |
return module(*inputs, use_cache, output_attentions) | |
return custom_forward | |
outputs = torch.utils.checkpoint.checkpoint( | |
create_custom_forward(block), | |
hidden_states, | |
None, | |
attention_mask, | |
head_mask[i], | |
) | |
else: | |
outputs = block( | |
hidden_states, | |
past_key_values=past_layer_key_values, | |
attention_mask=attention_mask, | |
head_mask=head_mask[i], | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
) | |
hidden_states = outputs[0] | |
if use_cache: | |
present_key_values = present_key_values + (outputs[1],) | |
if output_attentions: | |
all_self_attentions = all_self_attentions + (outputs[2 if use_cache else 1],) | |
hidden_states = self.layernorm_final(hidden_states) | |
hidden_states = hidden_states.view(output_shape) | |
# Add last hidden state | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states,) | |
logits = self.lm_head(hidden_states) | |
loss = None | |
if labels is not None: | |
raise NotImplementedError( | |
"Training is not implemented yet for Bark - ensure you do not pass `labels` to the model." | |
) | |
if not return_dict: | |
return tuple( | |
v for v in [None, logits, present_key_values, all_hidden_states, all_self_attentions] if v is not None | |
) | |
return CausalLMOutputWithPast( | |
loss=loss, | |
logits=logits, | |
past_key_values=present_key_values, | |
hidden_states=all_hidden_states, | |
attentions=all_self_attentions, | |
) | |
def _reorder_cache( | |
past_key_values: Tuple[Tuple[torch.Tensor]], beam_idx: torch.Tensor | |
) -> Tuple[Tuple[torch.Tensor]]: | |
""" | |
This function is used to re-order the `past_key_values` cache if [`~PreTrainedModel.beam_search`] or | |
[`~PreTrainedModel.beam_sample`] is called. This is required to match `past_key_values` with the correct | |
beam_idx at every generation step. | |
""" | |
# Necessary for beam_search | |
return tuple( | |
tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past) | |
for layer_past in past_key_values | |
) | |
class BarkSemanticModel(BarkCausalModel): | |
base_model_prefix = "semantic" | |
config_class = BarkSemanticConfig | |
def generate( | |
self, | |
input_ids: torch.Tensor, | |
semantic_generation_config: BarkSemanticGenerationConfig = None, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
**kwargs, | |
) -> torch.LongTensor: | |
""" | |
Generates text semantic tokens from an input prompt and an additional optional `Bark` speaker prompt. | |
Args: | |
input_ids (`Optional[torch.Tensor]` of shape (batch_size, seq_len), *optional*): | |
Input ids, i.e tokenized input sentences. Will be truncated up to | |
semantic_generation_config.max_input_semantic_length tokens. Note that the output audios will be as | |
long as the longest generation among the batch. | |
semantic_generation_config (`BarkSemanticGenerationConfig`): | |
Generation config indicating how to generate the semantic tokens. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*): | |
Optional `Bark` speaker prompt. | |
attention_mask (`Optional[torch.Tensor]`, *optional*): | |
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
- 1 for tokens that are **not masked**, | |
- 0 for tokens that are **masked**. | |
[What are attention masks?](../glossary#attention-mask) | |
Returns: | |
torch.LongTensor: Output semantic tokens. | |
""" | |
if semantic_generation_config is None: | |
raise ValueError("`semantic_generation_config` has to be provided") | |
batch_size = input_ids.shape[0] | |
max_input_semantic_length = semantic_generation_config.max_input_semantic_length | |
input_ids = input_ids + semantic_generation_config.text_encoding_offset | |
if attention_mask is not None: | |
input_ids = input_ids.masked_fill((1 - attention_mask).bool(), semantic_generation_config.text_pad_token) | |
if history_prompt is not None: | |
semantic_history = history_prompt["semantic_prompt"][-max_input_semantic_length:] | |
semantic_history = nn.functional.pad( | |
semantic_history, | |
(0, max_input_semantic_length - len(semantic_history)), | |
value=semantic_generation_config.semantic_pad_token, | |
mode="constant", | |
) | |
else: | |
semantic_history = torch.tensor( | |
[semantic_generation_config.semantic_pad_token] * max_input_semantic_length, dtype=torch.int | |
).to(self.device) | |
semantic_history = torch.repeat_interleave(semantic_history[None], batch_size, dim=0) | |
infer_array = torch.tensor( | |
[[semantic_generation_config.semantic_infer_token]] * batch_size, dtype=torch.int | |
).to(self.device) | |
input_embeds = torch.cat( | |
[ | |
self.input_embeds_layer(input_ids[:, :max_input_semantic_length]) | |
+ self.input_embeds_layer(semantic_history[:, : max_input_semantic_length + 1]), | |
self.input_embeds_layer(infer_array), | |
], | |
dim=1, | |
) | |
tokens_to_suppress = list( | |
range(semantic_generation_config.semantic_vocab_size, semantic_generation_config.semantic_pad_token) | |
) | |
tokens_to_suppress.extend( | |
list(range(semantic_generation_config.semantic_pad_token + 1, self.config.output_vocab_size)) | |
) | |
suppress_tokens_logits_processor = SuppressTokensLogitsProcessor(tokens_to_suppress) | |
# pass input_ids in order to stay consistent with the transformers generate method even though it is not used | |
# (except to get the input seq_len - that's why we keep the first 257 tokens) | |
semantic_output = super().generate( | |
torch.ones((batch_size, max_input_semantic_length + 1), dtype=torch.int).to(self.device), | |
input_embeds=input_embeds, | |
logits_processor=[suppress_tokens_logits_processor], | |
generation_config=semantic_generation_config, | |
**kwargs, | |
) # size: 10048 | |
# take the generated semantic tokens | |
semantic_output = semantic_output[:, max_input_semantic_length + 1 :] | |
return semantic_output | |
class BarkCoarseModel(BarkCausalModel): | |
base_model_prefix = "coarse_acoustics" | |
config_class = BarkCoarseConfig | |
def preprocess_histories( | |
self, | |
max_coarse_history: int, | |
semantic_to_coarse_ratio: int, | |
batch_size: int, | |
semantic_generation_config: int, | |
codebook_size: int, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
): | |
""" | |
Preprocess the optional `Bark` speaker prompts before `self.generate`. | |
Args: | |
max_coarse_history (`int`): | |
Maximum size of coarse tokens used. | |
semantic_to_coarse_ratio (`int`): | |
Ratio of semantic to coarse frequency | |
batch_size (`int`): | |
Batch size, i.e the number of samples. | |
semantic_generation_config (`BarkSemanticGenerationConfig`): | |
Generation config indicating how to generate the semantic tokens. | |
codebook_size (`int`): | |
Codebook channel size, i.e. the size of the output vocabulary per codebook channel. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`): | |
Optional `Bark` speaker prompt. | |
Returns: Returns: | |
`tuple(torch.FloatTensor)`: | |
- **x_semantic_history** (`torch.FloatTensor` -- Processed semantic speaker prompt. | |
- **x_coarse_history** (`torch.FloatTensor`) -- Processed coarse speaker prompt. | |
""" | |
if history_prompt is not None: | |
x_semantic_history = torch.repeat_interleave(history_prompt["semantic_prompt"][None], batch_size, dim=0) | |
# clone to avoid modifying history_prompt.coarse_prompt | |
x_coarse_history = history_prompt["coarse_prompt"].clone() | |
# offset x_coarse_history | |
if codebook_size is not None: | |
for n in range(1, x_coarse_history.shape[0]): | |
# offset | |
x_coarse_history[n, :] += codebook_size * n | |
# flatten x_coarse_history | |
x_coarse_history = torch.transpose(x_coarse_history, 0, 1).view(-1) | |
x_coarse_history = x_coarse_history + semantic_generation_config.semantic_vocab_size | |
x_coarse_history = torch.repeat_interleave(x_coarse_history[None], batch_size, dim=0) | |
# e.g: after SEMANTIC_VOCAB_SIZE (10000), 1024 tokens dedicated to first codebook, 1024 next tokens | |
# dedicated to second codebook. | |
max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio)) | |
# trim histories correctly | |
n_semantic_hist_provided = min( | |
[ | |
max_semantic_history, | |
x_semantic_history.shape[1] - x_semantic_history.shape[1] % 2, | |
int(np.floor(x_coarse_history.shape[1] / semantic_to_coarse_ratio)), | |
] | |
) | |
n_coarse_hist_provided = int(round(n_semantic_hist_provided * semantic_to_coarse_ratio)) | |
x_semantic_history = x_semantic_history[:, -n_semantic_hist_provided:].int() | |
x_coarse_history = x_coarse_history[:, -n_coarse_hist_provided:].int() | |
# bit of a hack for time alignment (sounds better) - from Bark original implementation | |
x_coarse_history = x_coarse_history[:, :-2] | |
else: | |
# shape: (batch_size, 0) | |
x_semantic_history = torch.tensor([[]] * batch_size, dtype=torch.int).to(self.device) | |
x_coarse_history = torch.tensor([[]] * batch_size, dtype=torch.int).to(self.device) | |
return x_semantic_history, x_coarse_history | |
def generate( | |
self, | |
semantic_output: torch.Tensor, | |
semantic_generation_config: BarkSemanticGenerationConfig = None, | |
coarse_generation_config: BarkCoarseGenerationConfig = None, | |
codebook_size: int = 1024, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
**kwargs, | |
) -> torch.LongTensor: | |
""" | |
Generates coarse acoustics tokens from input text semantic tokens and an additional optional `Bark` speaker | |
prompt. | |
Args: | |
semantic_output (`torch.Tensor` of shape (batch_size, seq_len), *optional*): | |
Input text semantic ids, i.e the output of `BarkSemanticModel.generate`. | |
semantic_generation_config (`BarkSemanticGenerationConfig`): | |
Generation config indicating how to generate the semantic tokens. | |
coarse_generation_config (`BarkCoarseGenerationConfig`): | |
Generation config indicating how to generate the coarse tokens. | |
codebook_size (`int`, *optional*, defaults to 1024): | |
Codebook channel size, i.e. the size of the output vocabulary per codebook channel. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*): | |
Optional `Bark` speaker prompt. | |
Returns: | |
torch.LongTensor: Output coarse acoustics tokens. | |
""" | |
if semantic_generation_config is None: | |
raise ValueError("`semantic_generation_config` has to be provided") | |
if coarse_generation_config is None: | |
raise ValueError("`coarse_generation_config` has to be provided") | |
max_coarse_input_length = coarse_generation_config.max_coarse_input_length | |
max_coarse_history = coarse_generation_config.max_coarse_history | |
sliding_window_len = coarse_generation_config.sliding_window_len | |
# replace semantic_pad_token (eos_tok and pad_tok here) with coarse_semantic_pad_token i.e the pad_token | |
# used in the next model | |
semantic_output.masked_fill_( | |
semantic_output == semantic_generation_config.semantic_pad_token, | |
coarse_generation_config.coarse_semantic_pad_token, | |
) | |
semantic_to_coarse_ratio = ( | |
coarse_generation_config.coarse_rate_hz | |
/ semantic_generation_config.semantic_rate_hz | |
* coarse_generation_config.n_coarse_codebooks | |
) | |
max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio)) | |
# beware, depends on the seq_len of the longest sequence of the batch. | |
# Also, the seq_len might be one token too long because of an added | |
# pad_token as compared to Bark original implementation. | |
max_generated_len = np.floor( | |
semantic_output.shape[1] * semantic_to_coarse_ratio / coarse_generation_config.n_coarse_codebooks | |
) | |
max_generated_len = int(round(max_generated_len * coarse_generation_config.n_coarse_codebooks)) | |
batch_size = semantic_output.shape[0] | |
x_semantic_history, x_coarse = self.preprocess_histories( | |
history_prompt=history_prompt, | |
max_coarse_history=max_coarse_history, | |
semantic_to_coarse_ratio=semantic_to_coarse_ratio, | |
batch_size=batch_size, | |
semantic_generation_config=semantic_generation_config, | |
codebook_size=codebook_size, | |
) | |
base_semantic_idx = x_semantic_history.shape[1] | |
semantic_output = torch.hstack([x_semantic_history, semantic_output]) | |
n_window_steps = int(np.ceil(max_generated_len / sliding_window_len)) | |
total_generated_len = 0 | |
len_coarse_history = x_coarse.shape[1] | |
for _ in range(n_window_steps): | |
semantic_idx = base_semantic_idx + int(round(total_generated_len / semantic_to_coarse_ratio)) | |
# pad from right side | |
input_coarse = semantic_output[:, np.max([0, semantic_idx - max_semantic_history]) :] | |
input_coarse = input_coarse[:, :max_coarse_input_length] | |
input_coarse = F.pad( | |
input_coarse, | |
(0, max_coarse_input_length - input_coarse.shape[-1]), | |
"constant", | |
coarse_generation_config.coarse_semantic_pad_token, | |
) | |
input_coarse = torch.hstack( | |
[ | |
input_coarse, | |
torch.tensor([[coarse_generation_config.coarse_infer_token]] * batch_size).to(self.device), | |
x_coarse[:, -max_coarse_history:], | |
] | |
) | |
alternatingLogitsProcessor = AlternatingCodebooksLogitsProcessor( | |
input_coarse.shape[1], | |
semantic_generation_config.semantic_vocab_size, | |
codebook_size, | |
) | |
output_coarse = super().generate( | |
input_coarse, | |
logits_processor=[alternatingLogitsProcessor], | |
max_new_tokens=min(sliding_window_len, max_generated_len - total_generated_len), | |
generation_config=coarse_generation_config, | |
**kwargs, | |
) | |
input_coarse_len = input_coarse.shape[1] | |
x_coarse = torch.hstack([x_coarse, output_coarse[:, input_coarse_len:]]) | |
total_generated_len = x_coarse.shape[1] - len_coarse_history | |
del output_coarse | |
coarse_output = x_coarse[:, len_coarse_history:] | |
return coarse_output | |
class BarkFineModel(BarkPreTrainedModel): | |
base_model_prefix = "fine_acoustics" | |
config_class = BarkFineConfig | |
main_input_name = "codebook_idx" | |
def __init__(self, config): | |
# non-causal gpt-like model with one embedding layer and one lm_head for each codebook of Encodec | |
super().__init__(config) | |
self.config = config | |
# initialize a modified non causal GPT-like model | |
# note that for there is one embedding layer and one lm_head for each codebook of Encodec | |
self.input_embeds_layers = nn.ModuleList( | |
[nn.Embedding(config.input_vocab_size, config.hidden_size) for _ in range(config.n_codes_total)] | |
) | |
self.position_embeds_layer = nn.Embedding(config.block_size, config.hidden_size) | |
self.drop = nn.Dropout(config.dropout) | |
self.layers = nn.ModuleList([BarkBlock(config, is_causal=False) for _ in range(config.num_layers)]) | |
self.layernorm_final = nn.LayerNorm(config.hidden_size) | |
self.lm_heads = nn.ModuleList( | |
[ | |
nn.Linear(config.hidden_size, config.output_vocab_size, bias=False) | |
for _ in range(config.n_codes_given, config.n_codes_total) | |
] | |
) | |
self.gradient_checkpointing = False | |
self.n_codes_total = config.n_codes_total | |
# Initialize weights and apply final processing | |
self.post_init() | |
def get_input_embeddings(self): | |
# one embedding layers for each codebook | |
return self.input_embeds_layers | |
def set_input_embeddings(self, new_embeddings): | |
# one embedding layers for each codebook | |
self.input_embeds_layers = new_embeddings | |
def get_output_embeddings(self): | |
# one lm_head for each codebook | |
return self.lm_heads | |
def set_output_embeddings(self, new_output_embeddings): | |
# one lm_head for each codebook | |
self.lm_heads = new_output_embeddings | |
def _resize_token_embeddings(self, new_num_tokens, pad_to_multiple_of=None): | |
old_embeddings_list = self.get_input_embeddings() | |
new_embeddings_list = nn.ModuleList( | |
[ | |
self._get_resized_embeddings(old_embeddings, new_num_tokens, pad_to_multiple_of) | |
for old_embeddings in old_embeddings_list | |
] | |
) | |
self.set_input_embeddings(new_embeddings_list) | |
new_num_tokens = new_embeddings_list[0].weight.shape[0] | |
# if word embeddings are not tied, make sure that lm head is resized as well | |
if self.get_output_embeddings() is not None and not self.config.tie_word_embeddings: | |
old_lm_head_list = self.get_output_embeddings() | |
new_lm_head_list = nn.ModuleList( | |
[self._get_resized_lm_head(old_lm_head, new_num_tokens) for old_lm_head in old_lm_head_list] | |
) | |
self.set_output_embeddings(new_lm_head_list) | |
return self.get_input_embeddings() | |
def resize_token_embeddings( | |
self, new_num_tokens: Optional[int] = None, pad_to_multiple_of: Optional[int] = None | |
) -> nn.Embedding: | |
""" | |
Resizes input token embeddings matrix of the model if `new_num_tokens != config.vocab_size`. | |
Takes care of tying weights embeddings afterwards if the model class has a `tie_weights()` method. | |
Arguments: | |
new_num_tokens (`int`, *optional*): | |
The number of new tokens in the embedding matrix. Increasing the size will add newly initialized | |
vectors at the end. Reducing the size will remove vectors from the end. If not provided or `None`, just | |
returns a pointer to the input tokens `torch.nn.Embedding` module of the model without doing anything. | |
pad_to_multiple_of (`int`, *optional*): | |
If set will pad the embedding matrix to a multiple of the provided value. | |
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability | |
`>= 7.5` (Volta), or on TPUs which benefit from having sequence lengths be a multiple of 128. For more | |
details about this, or help on choosing the correct value for resizing, refer to this guide: | |
https://docs.nvidia.com/deeplearning/performance/dl-performance-matrix-multiplication/index.html#requirements-tc | |
Return: | |
`torch.nn.Embedding`: Pointer to the input tokens Embeddings Module of the model. | |
""" | |
model_embeds = self._resize_token_embeddings(new_num_tokens, pad_to_multiple_of) | |
if new_num_tokens is None and pad_to_multiple_of is None: | |
return model_embeds | |
# Update base model and current model config | |
self.config.output_vocab_size = model_embeds[0].weight.shape[0] | |
self.config.vocab_size = model_embeds[0].weight.shape[0] | |
self.output_vocab_size = model_embeds[0].weight.shape[0] | |
self.vocab_size = model_embeds[0].weight.shape[0] | |
# Tie weights again if needed | |
self.tie_weights() | |
return model_embeds | |
def tie_weights(self): | |
""" | |
Tie the weights between the input embeddings list and the output embeddings list. | |
If the `torchscript` flag is set in the configuration, can't handle parameter sharing so we are cloning the | |
weights instead. | |
""" | |
if getattr(self.config, "tie_word_embeddings", True): | |
self._tied_weights_keys = [] | |
output_embeddings = self.get_output_embeddings() | |
input_embeddings = self.get_input_embeddings() | |
for i in range(self.config.n_codes_total - self.config.n_codes_given): | |
# self.input_embeds_layers[i + 1].weight = self.lm_heads[i].weight | |
self._tie_or_clone_weights(output_embeddings[i], input_embeddings[i + 1]) | |
self._tied_weights_keys.append(f"lm_heads.{i}.weight") | |
for module in self.modules(): | |
if hasattr(module, "_tie_weights"): | |
module._tie_weights() | |
def forward( | |
self, | |
codebook_idx: int, # an additionnal idx corresponding to the id of the codebook that will be predicted | |
input_ids: Optional[torch.Tensor] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.Tensor] = None, | |
head_mask: Optional[torch.Tensor] = None, | |
labels: Optional[torch.LongTensor] = None, | |
input_embeds: Optional[torch.Tensor] = None, | |
output_attentions: Optional[bool] = None, | |
output_hidden_states: Optional[bool] = None, | |
return_dict: Optional[bool] = None, | |
) -> Union[Tuple[torch.Tensor], MaskedLMOutput]: | |
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
output_hidden_states = ( | |
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
) | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
if codebook_idx == 0: | |
raise ValueError("Cannot predict 0th codebook - 0th codebook should be predicted by the coarse model") | |
if input_ids is not None and input_embeds is not None: | |
raise ValueError("You cannot specify both input_ids and input_embeds at the same time") | |
if input_ids is None and input_embeds is None: | |
raise ValueError("You have to specify either input_ids or input_embeds") | |
if input_ids is not None: | |
# the input_embeddings are the sum of the j previous codebooks embeddings before | |
# the current codebook_idx codebook | |
# forward the GPT model itself | |
input_embeds = [ | |
input_embeds_layer(input_ids[:, :, i]).unsqueeze(-1) | |
for i, input_embeds_layer in enumerate(self.input_embeds_layers) | |
] # token embeddings of shape (b, t, n_embd) | |
input_embeds = torch.cat(input_embeds, dim=-1) | |
input_embeds = input_embeds[:, :, :, : codebook_idx + 1].sum(dim=-1) | |
input_shape = input_embeds.size()[:-1] | |
batch_size = input_embeds.shape[0] | |
seq_length = input_shape[1] | |
device = input_ids.device if input_ids is not None else input_embeds.device | |
if position_ids is None: | |
position_ids = torch.arange(0, seq_length, dtype=torch.long, device=device) | |
position_ids = position_ids.unsqueeze(0) # shape (1, seq_length) | |
position_embeds = self.position_embeds_layer(position_ids) # position embeddings of shape (1, t, n_embd) | |
# Attention mask. | |
if attention_mask is not None: | |
if batch_size <= 0: | |
raise ValueError("batch_size has to be defined and > 0") | |
attention_mask = attention_mask.view(batch_size, -1) | |
attention_mask = attention_mask[:, None, None, :] | |
attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility | |
attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min | |
head_mask = self.get_head_mask(head_mask, self.config.num_layers) | |
hidden_states = self.drop(input_embeds + position_embeds) | |
output_shape = input_shape + (hidden_states.size(-1),) | |
all_self_attentions = () if output_attentions else None | |
all_hidden_states = () if output_hidden_states else None | |
for i, block in enumerate(self.layers): | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states,) | |
outputs = block( | |
hidden_states, | |
attention_mask=attention_mask, | |
head_mask=head_mask[i], | |
output_attentions=output_attentions, | |
) | |
hidden_states = outputs[0] | |
if output_attentions: | |
all_self_attentions = all_self_attentions + (outputs[1],) | |
hidden_states = self.layernorm_final(hidden_states) | |
hidden_states = hidden_states.view(output_shape) | |
# Add last hidden state | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states,) | |
logits = self.lm_heads[codebook_idx - self.config.n_codes_given](hidden_states) | |
loss = None | |
if labels is not None: | |
raise NotImplementedError("Training is not implemented yet") | |
if not return_dict: | |
return tuple(v for v in [None, logits, all_hidden_states, all_self_attentions] if v is not None) | |
return MaskedLMOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=all_hidden_states, | |
attentions=all_self_attentions, | |
) | |
def generate( | |
self, | |
coarse_output: torch.Tensor, | |
semantic_generation_config: BarkSemanticGenerationConfig = None, | |
coarse_generation_config: BarkCoarseGenerationConfig = None, | |
fine_generation_config: BarkFineGenerationConfig = None, | |
codebook_size: int = 1024, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
**kwargs, | |
) -> torch.LongTensor: | |
""" | |
Generates fine acoustics tokens from input coarse acoustics tokens and an additional optional `Bark` speaker | |
prompt. | |
Args: | |
coarse_output (`torch.Tensor` of shape (batch_size, seq_len)): | |
Input coarse acoustics ids, i.e the output of `BarkCoarseModel.generate`. | |
semantic_generation_config (`BarkSemanticGenerationConfig`): | |
Generation config indicating how to generate the semantic tokens. | |
coarse_generation_config (`BarkCoarseGenerationConfig`): | |
Generation config indicating how to generate the coarse tokens. | |
fine_generation_config (`BarkFineGenerationConfig`): | |
Generation config indicating how to generate the fine tokens. | |
codebook_size (`int`, *optional*, defaults to 1024): | |
Codebook channel size, i.e. the size of the output vocabulary per codebook channel. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*): | |
Optional `Bark` speaker prompt. | |
Returns: | |
torch.LongTensor: Output fine acoustics tokens. | |
""" | |
if semantic_generation_config is None: | |
raise ValueError("`semantic_generation_config` has to be provided") | |
if coarse_generation_config is None: | |
raise ValueError("`coarse_generation_config` has to be provided") | |
if fine_generation_config is None: | |
raise ValueError("`fine_generation_config` has to be provided") | |
# since we don't really use GenerationConfig through the fine model (autoencoder) | |
# and since only temperature is used from the classic GenerationConfig parameters | |
# manually impose the kwargs priority over the generation config | |
temperature = kwargs.get("temperature", fine_generation_config.temperature) | |
max_fine_history_length = fine_generation_config.max_fine_history_length | |
max_fine_input_length = fine_generation_config.max_fine_input_length | |
# shape: (batch, n_coarse_codebooks * seq_len) | |
# new_shape: (batch, seq_len, n_coarse_codebooks) | |
coarse_output = coarse_output.view(coarse_output.shape[0], -1, coarse_generation_config.n_coarse_codebooks) | |
# brings ids into the range [0, codebook_size -1] | |
coarse_output = torch.remainder(coarse_output - semantic_generation_config.semantic_vocab_size, codebook_size) | |
batch_size = coarse_output.shape[0] | |
if history_prompt is not None: | |
x_fine_history = torch.repeat_interleave(history_prompt["fine_prompt"].T[None], batch_size, dim=0) | |
# transpose to get to shape (seq_len, n_fine_codebooks) | |
else: | |
x_fine_history = None | |
n_coarse = coarse_generation_config.n_coarse_codebooks | |
# pad the last 6th codebooks | |
fine_input = F.pad( | |
coarse_output, | |
(0, fine_generation_config.n_fine_codebooks - n_coarse), | |
"constant", | |
codebook_size, | |
) | |
# prepend history if available (max max_fine_history_length) | |
if x_fine_history is not None: | |
fine_input = torch.cat([x_fine_history[:, -max_fine_history_length:, :], fine_input], dim=1) | |
# len of the fine_history that has been added to fine_input | |
n_history = x_fine_history[:, -max_fine_history_length:, :].shape[1] | |
else: | |
n_history = 0 | |
n_remove_from_end = 0 | |
# need to pad if too short (since non-causal model) | |
if fine_input.shape[1] < max_fine_input_length: | |
n_remove_from_end = max_fine_input_length - fine_input.shape[1] | |
fine_input = F.pad(fine_input, (0, 0, 0, n_remove_from_end), mode="constant", value=codebook_size) | |
# we can be lazy about fractional loop and just keep overwriting codebooks. | |
# seems that coarse_output.shape[1] - (max_fine_input_length - n_history) is equal to minus n_remove_from_end | |
# So if we needed to pad because too short, n_loops is always 1 (because n_remove_from_end > 0) | |
# If not, we loop over at least twice. | |
n_loops = (coarse_output.shape[1] - (max_fine_input_length - n_history)) / max_fine_history_length | |
n_loops = int(np.ceil(n_loops)) | |
n_loops = max(0, n_loops) + 1 | |
for n_outer in range(n_loops): | |
start_idx = min([n_outer * max_fine_history_length, fine_input.shape[1] - max_fine_input_length]) | |
start_fill_idx = min( | |
[n_history + n_outer * max_fine_history_length, fine_input.shape[1] - max_fine_history_length] | |
) | |
rel_start_fill_idx = start_fill_idx - start_idx | |
input_buffer = fine_input[:, start_idx : start_idx + max_fine_input_length, :] | |
for n_inner in range(n_coarse, fine_generation_config.n_fine_codebooks): | |
logits = self.forward(n_inner, input_buffer).logits | |
if temperature is None or temperature == 1.0: | |
relevant_logits = logits[:, rel_start_fill_idx:, :codebook_size] | |
codebook_preds = torch.argmax(relevant_logits, -1) | |
else: | |
relevant_logits = logits[:, :, :codebook_size] / temperature | |
# apply softmax | |
probs = F.softmax(relevant_logits, dim=-1)[:, rel_start_fill_idx:max_fine_input_length] | |
# reshape to 2D: (batch_size, seq_len, codebook_size) -> (batch_size*seq_len, codebook_size) | |
probs = probs.reshape((-1, codebook_size)) | |
# multinomial then reshape : (batch_size*seq_len)-> (batch_size,seq_len) | |
codebook_preds = torch.multinomial(probs, num_samples=1).view(batch_size, -1) | |
codebook_preds = codebook_preds.to(torch.int32) | |
input_buffer[:, rel_start_fill_idx:, n_inner] = codebook_preds | |
del logits, codebook_preds | |
# transfer into fine_input | |
for n_inner in range(n_coarse, fine_generation_config.n_fine_codebooks): | |
fine_input[ | |
:, start_fill_idx : start_fill_idx + (max_fine_input_length - rel_start_fill_idx), n_inner | |
] = input_buffer[:, rel_start_fill_idx:, n_inner] | |
del input_buffer | |
fine_input = fine_input.transpose(1, 2)[:, :, n_history:] | |
if n_remove_from_end > 0: | |
fine_input = fine_input[:, :, :-n_remove_from_end] | |
if fine_input.shape[-1] != coarse_output.shape[-2]: | |
raise ValueError("input and output should have the same seq_len") | |
return fine_input | |
class BarkModel(BarkPreTrainedModel): | |
config_class = BarkConfig | |
def __init__(self, config): | |
super().__init__(config) | |
self.semantic = BarkSemanticModel(config.semantic_config) | |
self.coarse_acoustics = BarkCoarseModel(config.coarse_acoustics_config) | |
self.fine_acoustics = BarkFineModel(config.fine_acoustics_config) | |
self.codec_model = AutoModel.from_config(config.codec_config) | |
self.config = config | |
def device(self) -> torch.device: | |
""" | |
`torch.device`: The device on which the module is (assuming that all the module parameters are on the same | |
device). | |
""" | |
# for bark_model, device must be verified on its sub-models | |
# if has _hf_hook, has been offloaded so the device has to be found in the hook | |
if not hasattr(self.semantic, "_hf_hook"): | |
return get_parameter_device(self) | |
for module in self.semantic.modules(): | |
if ( | |
hasattr(module, "_hf_hook") | |
and hasattr(module._hf_hook, "execution_device") | |
and module._hf_hook.execution_device is not None | |
): | |
return torch.device(module._hf_hook.execution_device) | |
def enable_cpu_offload(self, gpu_id: Optional[int] = 0): | |
r""" | |
Offloads all sub-models to CPU using accelerate, reducing memory usage with a low impact on performance. This | |
method moves one whole sub-model at a time to the GPU when it is used, and the sub-model remains in GPU until | |
the next sub-model runs. | |
Args: | |
gpu_id (`int`, *optional*, defaults to 0): | |
GPU id on which the sub-models will be loaded and offloaded. | |
""" | |
if is_accelerate_available(): | |
from accelerate import cpu_offload_with_hook | |
else: | |
raise ImportError("`enable_model_cpu_offload` requires `accelerate`.") | |
device = torch.device(f"cuda:{gpu_id}") | |
if self.device.type != "cpu": | |
self.to("cpu") | |
torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist) | |
# this layer is used outside the first foward pass of semantic so need to be loaded before semantic | |
self.semantic.input_embeds_layer, _ = cpu_offload_with_hook(self.semantic.input_embeds_layer, device) | |
hook = None | |
for cpu_offloaded_model in [ | |
self.semantic, | |
self.coarse_acoustics, | |
self.fine_acoustics, | |
]: | |
_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook) | |
self.fine_acoustics_hook = hook | |
_, hook = cpu_offload_with_hook(self.codec_model, device, prev_module_hook=hook) | |
# We'll offload the last model manually. | |
self.codec_model_hook = hook | |
def codec_decode(self, fine_output): | |
"""Turn quantized audio codes into audio array using encodec.""" | |
fine_output = fine_output.transpose(0, 1) | |
emb = self.codec_model.quantizer.decode(fine_output) | |
out = self.codec_model.decoder(emb) | |
audio_arr = out.squeeze(1) # squeeze the codebook dimension | |
return audio_arr | |
def generate( | |
self, | |
input_ids: Optional[torch.Tensor] = None, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
**kwargs, | |
) -> torch.LongTensor: | |
""" | |
Generates audio from an input prompt and an additional optional `Bark` speaker prompt. | |
Args: | |
input_ids (`Optional[torch.Tensor]` of shape (batch_size, seq_len), *optional*): | |
Input ids. Will be truncated up to 256 tokens. Note that the output audios will be as long as the | |
longest generation among the batch. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*): | |
Optional `Bark` speaker prompt. Note that for now, this model takes only one speaker prompt per batch. | |
kwargs (*optional*): Remaining dictionary of keyword arguments. Keyword arguments are of two types: | |
- Without a prefix, they will be entered as `**kwargs` for the `generate` method of each sub-model. | |
- With a *semantic_*, *coarse_*, *fine_* prefix, they will be input for the `generate` method of the | |
semantic, coarse and fine respectively. It has the priority over the keywords without a prefix. | |
This means you can, for example, specify a generation strategy for all sub-models except one. | |
Returns: | |
torch.LongTensor: Output generated audio. | |
Example: | |
```python | |
>>> from transformers import AutoProcessor, BarkModel | |
>>> processor = AutoProcessor.from_pretrained("suno/bark-small") | |
>>> model = BarkModel.from_pretrained("suno/bark-small") | |
>>> # To add a voice preset, you can pass `voice_preset` to `BarkProcessor.__call__(...)` | |
>>> voice_preset = "v2/en_speaker_6" | |
>>> inputs = processor("Hello, my dog is cute, I need him in my life", voice_preset=voice_preset) | |
>>> audio_array = model.generate(**inputs, semantic_max_new_tokens=100) | |
>>> audio_array = audio_array.cpu().numpy().squeeze() | |
``` | |
""" | |
# TODO (joao):workaround until nested generation config is compatible with PreTrained Model | |
# todo: dict | |
semantic_generation_config = BarkSemanticGenerationConfig(**self.generation_config.semantic_config) | |
coarse_generation_config = BarkCoarseGenerationConfig(**self.generation_config.coarse_acoustics_config) | |
fine_generation_config = BarkFineGenerationConfig(**self.generation_config.fine_acoustics_config) | |
kwargs_semantic = { | |
# if "attention_mask" is set, it should not be passed to CoarseModel and FineModel | |
"attention_mask": kwargs.pop("attention_mask", None) | |
} | |
kwargs_coarse = {} | |
kwargs_fine = {} | |
for key, value in kwargs.items(): | |
if key.startswith("semantic_"): | |
key = key[len("semantic_") :] | |
kwargs_semantic[key] = value | |
elif key.startswith("coarse_"): | |
key = key[len("coarse_") :] | |
kwargs_coarse[key] = value | |
elif key.startswith("fine_"): | |
key = key[len("fine_") :] | |
kwargs_fine[key] = value | |
else: | |
# If the key is already in a specific config, then it's been set with a | |
# submodules specific value and we don't override | |
if key not in kwargs_semantic: | |
kwargs_semantic[key] = value | |
if key not in kwargs_coarse: | |
kwargs_coarse[key] = value | |
if key not in kwargs_fine: | |
kwargs_fine[key] = value | |
# 1. Generate from the semantic model | |
semantic_output = self.semantic.generate( | |
input_ids, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
**kwargs_semantic, | |
) | |
# 2. Generate from the coarse model | |
coarse_output = self.coarse_acoustics.generate( | |
semantic_output, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
coarse_generation_config=coarse_generation_config, | |
codebook_size=self.generation_config.codebook_size, | |
**kwargs_coarse, | |
) | |
# 3. "generate" from the fine model | |
output = self.fine_acoustics.generate( | |
coarse_output, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
coarse_generation_config=coarse_generation_config, | |
fine_generation_config=fine_generation_config, | |
codebook_size=self.generation_config.codebook_size, | |
**kwargs_fine, | |
) | |
if getattr(self, "fine_acoustics_hook", None) is not None: | |
# Manually offload fine_acoustics to CPU | |
# and load codec_model to GPU | |
# since bark doesn't use codec_model forward pass | |
self.fine_acoustics_hook.offload() | |
self.codec_model = self.codec_model.to(self.device) | |
# 4. Decode the output and generate audio array | |
audio = self.codec_decode(output) | |
if getattr(self, "codec_model_hook", None) is not None: | |
# Offload codec_model to CPU | |
self.codec_model_hook.offload() | |
return audio | |