# Copyright (2023) Tsinghua University, Bytedance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Modifications to allow device parallelism. # Copyright (2024) William Held # Same Terms as as above apply import librosa import soundfile as sf import torch import torch.nn as nn import torch.nn.functional as F from peft import LoraConfig, TaskType, get_peft_model from transformers import ( LlamaForCausalLM, LlamaTokenizer, WhisperFeatureExtractor, WhisperModel, ) from models.beats.BEATs import BEATs, BEATsConfig from models.qformer.Qformer import BertConfig, BertLMHeadModel class SALMONN(nn.Module): def __init__( self, ckpt, whisper_path, beats_path, vicuna_path, speech_qformer_token_num=1, speech_qformer_layer=2, lora=True, device="cuda:0", lora_alpha=32, lora_rank=8, lora_dropout=0.1, second_per_frame=0.333333, second_stride=0.333333, low_resource=False, ): super().__init__() # feature_extractor self.feature_extractor = WhisperFeatureExtractor.from_pretrained(whisper_path) # whisper self.speech_encoder = WhisperModel.from_pretrained(whisper_path).encoder.to( device ) self.ln_speech = nn.LayerNorm(self.speech_encoder.config.d_model).to(device) # beats self.beats_ckpt = beats_path beats_checkpoint = torch.load(self.beats_ckpt, map_location=device) beats_cfg = BEATsConfig(beats_checkpoint["cfg"]) beats = BEATs(beats_cfg) beats.load_state_dict(beats_checkpoint["model"]) self.beats = beats self.beats.to(device) self.ln_audio = nn.LayerNorm(self.beats.cfg.encoder_embed_dim).to(device) for name, param in self.beats.named_parameters(): param.requires_grad = False self.beats.eval() # init speech Qformer self.speech_Qformer, self.speech_query_tokens = self.init_speech_Qformer( speech_qformer_token_num, self.speech_encoder.config.d_model + self.beats.cfg.encoder_embed_dim, speech_qformer_layer, ) self.speech_Qformer.to(device) self.speech_query_tokens.to(device) self.second_per_frame = second_per_frame self.second_stride = second_stride # vicuna if not low_resource: self.llama_model = LlamaForCausalLM.from_pretrained( vicuna_path, torch_dtype=torch.float16, device_map="auto", ) else: self.llama_model = LlamaForCausalLM.from_pretrained( vicuna_path, torch_dtype=torch.float16, load_in_8bit=True, device_map="auto", ) # lora self.lora = lora if lora: target_modules = None self.peft_config = LoraConfig( task_type=TaskType.CAUSAL_LM, inference_mode=True, r=lora_rank, lora_alpha=lora_alpha, lora_dropout=lora_dropout, target_modules=target_modules, ) self.llama_model = get_peft_model(self.llama_model, self.peft_config) # tokenizer self.llama_tokenizer = LlamaTokenizer.from_pretrained( vicuna_path, use_fast=False ) self.llama_tokenizer.add_special_tokens({"pad_token": "[PAD]"}) self.llama_tokenizer.padding_side = "right" # proj self.speech_llama_proj = nn.Linear( self.speech_Qformer.config.hidden_size, self.llama_model.config.hidden_size ).to(device) # load ckpt ckpt_dict = torch.load(ckpt)["model"] self.load_state_dict(ckpt_dict, strict=False) def generate( self, wav_path, prompt, prompt_pattern="USER: {}\nASSISTANT:", device="cuda:0", max_length=200, max_new_tokens=128, num_beams=1, do_sample=True, min_length=1, top_p=0.9, repetition_penalty=1.0, length_penalty=1.0, temperature=1.0, logits_processor=None, streamer=None ): # read wav wav, sr = sf.read(wav_path) if len(wav.shape) == 2: wav = wav[:, 0] if len(wav) > 30 * sr: wav = wav[: 30 * sr] if sr != 16000: wav = librosa.resample(wav, orig_sr=sr, target_sr=16000, res_type="fft") # whisper spectrogram = self.feature_extractor( wav, return_tensors="pt", sampling_rate=16000 ).input_features.to( device ) # [1, 80, 3000] speech_embeds = self.speech_encoder( spectrogram, return_dict=True ).last_hidden_state # beats raw_wav = torch.from_numpy(wav).to(device).unsqueeze(0) audio_padding_mask = torch.zeros(raw_wav.shape, device=device).bool() audio_embeds, _ = self.beats.extract_features( raw_wav, padding_mask=audio_padding_mask, feature_only=True ) # auditory embeds speech_embeds = self.ln_speech(speech_embeds) audio_embeds = self.ln_audio(audio_embeds) audio_embeds = F.pad( audio_embeds, (0, 0, 0, speech_embeds.size(1) - audio_embeds.size(1)) ) speech_embeds = torch.cat([speech_embeds, audio_embeds], dim=-1) # split frames B, T, C = speech_embeds.shape kernel = round(T * self.second_per_frame / 30.0) stride = round(T * self.second_stride / 30.0) kernel = (1, kernel) stride = (1, stride) speech_embeds_tr = speech_embeds.transpose(1, 2).unsqueeze(2) speech_embeds_overlap = F.unfold( speech_embeds_tr, kernel_size=kernel, dilation=1, padding=0, stride=stride ) _, _, L = speech_embeds_overlap.shape speech_embeds_overlap = speech_embeds_overlap.view(B, -1, kernel[1], L) speech_embeds_overlap = torch.permute(speech_embeds_overlap, [0, 3, 2, 1]) speech_embeds = speech_embeds_overlap.reshape(-1, kernel[1], C) speech_atts = torch.ones( speech_embeds.size()[:-1], dtype=torch.long, device=speech_embeds.device ) # Qformer query_tokens = self.speech_query_tokens.expand(speech_embeds.shape[0], -1, -1) query_output = self.speech_Qformer.bert( query_embeds=query_tokens.to(device), encoder_hidden_states=speech_embeds, encoder_attention_mask=speech_atts, return_dict=True, ) speech_embeds = self.speech_llama_proj(query_output.last_hidden_state) speech_embeds = speech_embeds.view(B, -1, speech_embeds.size(2)).contiguous() speech_atts = torch.ones(speech_embeds.size()[:-1], dtype=torch.long).to( speech_embeds.device ) # USER: speech_embeds prompt\nASSISTANT: embed_tokens = ( self.llama_model.model.model.embed_tokens if self.lora else self.llama_model.model.embed_tokens ) prompt_left, prompts_right = prompt_pattern.format(prompt).split("") prompt_left_ids = ( self.llama_tokenizer( prompt_left, return_tensors="pt", add_special_tokens=False ) .to(speech_embeds.device) .input_ids ) prompt_left_embeds = embed_tokens(prompt_left_ids) prompt_right_ids = ( self.llama_tokenizer( prompts_right, return_tensors="pt", add_special_tokens=False ) .to(speech_embeds.device) .input_ids ) prompt_right_embeds = embed_tokens(prompt_right_ids) bos_embeds = ( self.llama_model.model.embed_tokens( torch.ones( [1, 1], dtype=torch.long, device=device, ) * self.llama_tokenizer.bos_token_id ) if not self.lora else self.llama_model.model.model.embed_tokens( torch.ones( [1, 1], dtype=torch.long, device=device, ) * self.llama_tokenizer.bos_token_id ) ) embed_list = [bos_embeds, prompt_left_embeds, speech_embeds, prompt_right_embeds] embeds = torch.cat( [embed.to(bos_embeds.device) for embed in embed_list], dim=1 ) atts = torch.ones(embeds.size()[:-1], dtype=torch.long).to(embeds.device) # generate output = self.llama_model.generate( inputs_embeds=embeds, max_length=max_length, max_new_tokens=max_new_tokens, num_beams=num_beams, do_sample=do_sample, min_length=min_length, top_p=top_p, repetition_penalty=repetition_penalty, length_penalty=length_penalty, temperature=temperature, attention_mask=atts, bos_token_id=self.llama_tokenizer.bos_token_id, eos_token_id=self.llama_tokenizer.eos_token_id, pad_token_id=self.llama_tokenizer.pad_token_id, logits_processor=[logits_processor] if logits_processor != None else None, streamer=streamer, ) output_text = self.llama_tokenizer.batch_decode( output, add_special_tokens=False, skip_special_tokens=True ) return output_text def init_speech_Qformer(self, num_query_token, speech_width, num_hidden_layers=2): encoder_config = BertConfig() encoder_config.num_hidden_layers = num_hidden_layers encoder_config.encoder_width = speech_width encoder_config.add_cross_attention = True encoder_config.cross_attention_freq = 1 encoder_config.query_length = num_query_token Qformer = BertLMHeadModel(config=encoder_config) query_tokens = nn.Parameter( torch.zeros(1, num_query_token, encoder_config.hidden_size) ) query_tokens.data.normal_(mean=0.0, std=encoder_config.initializer_range) return Qformer, query_tokens