TangoFlux / audioldm /pipeline.py
hungchiayu1
initial commit
ffead1e
raw
history blame
10.7 kB
import os
import argparse
import yaml
import torch
from torch import autocast
from tqdm import tqdm, trange
from audioldm import LatentDiffusion, seed_everything
from audioldm.utils import default_audioldm_config, get_duration, get_bit_depth, get_metadata, download_checkpoint
from audioldm.audio import wav_to_fbank, TacotronSTFT, read_wav_file
from audioldm.latent_diffusion.ddim import DDIMSampler
from einops import repeat
import os
def make_batch_for_text_to_audio(text, waveform=None, fbank=None, batchsize=1):
text = [text] * batchsize
if batchsize < 1:
print("Warning: Batchsize must be at least 1. Batchsize is set to .")
if(fbank is None):
fbank = torch.zeros((batchsize, 1024, 64)) # Not used, here to keep the code format
else:
fbank = torch.FloatTensor(fbank)
fbank = fbank.expand(batchsize, 1024, 64)
assert fbank.size(0) == batchsize
stft = torch.zeros((batchsize, 1024, 512)) # Not used
if(waveform is None):
waveform = torch.zeros((batchsize, 160000)) # Not used
else:
waveform = torch.FloatTensor(waveform)
waveform = waveform.expand(batchsize, -1)
assert waveform.size(0) == batchsize
fname = [""] * batchsize # Not used
batch = (
fbank,
stft,
None,
fname,
waveform,
text,
)
return batch
def round_up_duration(duration):
return int(round(duration/2.5) + 1) * 2.5
def build_model(
ckpt_path=None,
config=None,
model_name="audioldm-s-full"
):
print("Load AudioLDM: %s", model_name)
if(ckpt_path is None):
ckpt_path = get_metadata()[model_name]["path"]
if(not os.path.exists(ckpt_path)):
download_checkpoint(model_name)
if torch.cuda.is_available():
device = torch.device("cuda:0")
else:
device = torch.device("cpu")
if config is not None:
assert type(config) is str
config = yaml.load(open(config, "r"), Loader=yaml.FullLoader)
else:
config = default_audioldm_config(model_name)
# Use text as condition instead of using waveform during training
config["model"]["params"]["device"] = device
config["model"]["params"]["cond_stage_key"] = "text"
# No normalization here
latent_diffusion = LatentDiffusion(**config["model"]["params"])
resume_from_checkpoint = ckpt_path
checkpoint = torch.load(resume_from_checkpoint, map_location=device)
latent_diffusion.load_state_dict(checkpoint["state_dict"])
latent_diffusion.eval()
latent_diffusion = latent_diffusion.to(device)
latent_diffusion.cond_stage_model.embed_mode = "text"
return latent_diffusion
def duration_to_latent_t_size(duration):
return int(duration * 25.6)
def set_cond_audio(latent_diffusion):
latent_diffusion.cond_stage_key = "waveform"
latent_diffusion.cond_stage_model.embed_mode="audio"
return latent_diffusion
def set_cond_text(latent_diffusion):
latent_diffusion.cond_stage_key = "text"
latent_diffusion.cond_stage_model.embed_mode="text"
return latent_diffusion
def text_to_audio(
latent_diffusion,
text,
original_audio_file_path = None,
seed=42,
ddim_steps=200,
duration=10,
batchsize=1,
guidance_scale=2.5,
n_candidate_gen_per_text=3,
config=None,
):
seed_everything(int(seed))
waveform = None
if(original_audio_file_path is not None):
waveform = read_wav_file(original_audio_file_path, int(duration * 102.4) * 160)
batch = make_batch_for_text_to_audio(text, waveform=waveform, batchsize=batchsize)
latent_diffusion.latent_t_size = duration_to_latent_t_size(duration)
if(waveform is not None):
print("Generate audio that has similar content as %s" % original_audio_file_path)
latent_diffusion = set_cond_audio(latent_diffusion)
else:
print("Generate audio using text %s" % text)
latent_diffusion = set_cond_text(latent_diffusion)
with torch.no_grad():
waveform = latent_diffusion.generate_sample(
[batch],
unconditional_guidance_scale=guidance_scale,
ddim_steps=ddim_steps,
n_candidate_gen_per_text=n_candidate_gen_per_text,
duration=duration,
)
return waveform
def style_transfer(
latent_diffusion,
text,
original_audio_file_path,
transfer_strength,
seed=42,
duration=10,
batchsize=1,
guidance_scale=2.5,
ddim_steps=200,
config=None,
):
if torch.cuda.is_available():
device = torch.device("cuda:0")
else:
device = torch.device("cpu")
assert original_audio_file_path is not None, "You need to provide the original audio file path"
audio_file_duration = get_duration(original_audio_file_path)
assert get_bit_depth(original_audio_file_path) == 16, "The bit depth of the original audio file %s must be 16" % original_audio_file_path
# if(duration > 20):
# print("Warning: The duration of the audio file %s must be less than 20 seconds. Longer duration will result in Nan in model output (we are still debugging that); Automatically set duration to 20 seconds")
# duration = 20
if(duration >= audio_file_duration):
print("Warning: Duration you specified %s-seconds must equal or smaller than the audio file duration %ss" % (duration, audio_file_duration))
duration = round_up_duration(audio_file_duration)
print("Set new duration as %s-seconds" % duration)
# duration = round_up_duration(duration)
latent_diffusion = set_cond_text(latent_diffusion)
if config is not None:
assert type(config) is str
config = yaml.load(open(config, "r"), Loader=yaml.FullLoader)
else:
config = default_audioldm_config()
seed_everything(int(seed))
# latent_diffusion.latent_t_size = duration_to_latent_t_size(duration)
latent_diffusion.cond_stage_model.embed_mode = "text"
fn_STFT = TacotronSTFT(
config["preprocessing"]["stft"]["filter_length"],
config["preprocessing"]["stft"]["hop_length"],
config["preprocessing"]["stft"]["win_length"],
config["preprocessing"]["mel"]["n_mel_channels"],
config["preprocessing"]["audio"]["sampling_rate"],
config["preprocessing"]["mel"]["mel_fmin"],
config["preprocessing"]["mel"]["mel_fmax"],
)
mel, _, _ = wav_to_fbank(
original_audio_file_path, target_length=int(duration * 102.4), fn_STFT=fn_STFT
)
mel = mel.unsqueeze(0).unsqueeze(0).to(device)
mel = repeat(mel, "1 ... -> b ...", b=batchsize)
init_latent = latent_diffusion.get_first_stage_encoding(
latent_diffusion.encode_first_stage(mel)
) # move to latent space, encode and sample
if(torch.max(torch.abs(init_latent)) > 1e2):
init_latent = torch.clip(init_latent, min=-10, max=10)
sampler = DDIMSampler(latent_diffusion)
sampler.make_schedule(ddim_num_steps=ddim_steps, ddim_eta=1.0, verbose=False)
t_enc = int(transfer_strength * ddim_steps)
prompts = text
with torch.no_grad():
with autocast("cuda"):
with latent_diffusion.ema_scope():
uc = None
if guidance_scale != 1.0:
uc = latent_diffusion.cond_stage_model.get_unconditional_condition(
batchsize
)
c = latent_diffusion.get_learned_conditioning([prompts] * batchsize)
z_enc = sampler.stochastic_encode(
init_latent, torch.tensor([t_enc] * batchsize).to(device)
)
samples = sampler.decode(
z_enc,
c,
t_enc,
unconditional_guidance_scale=guidance_scale,
unconditional_conditioning=uc,
)
# x_samples = latent_diffusion.decode_first_stage(samples) # Will result in Nan in output
# print(torch.sum(torch.isnan(samples)))
x_samples = latent_diffusion.decode_first_stage(samples)
# print(x_samples)
x_samples = latent_diffusion.decode_first_stage(samples[:,:,:-3,:])
# print(x_samples)
waveform = latent_diffusion.first_stage_model.decode_to_waveform(
x_samples
)
return waveform
def super_resolution_and_inpainting(
latent_diffusion,
text,
original_audio_file_path = None,
seed=42,
ddim_steps=200,
duration=None,
batchsize=1,
guidance_scale=2.5,
n_candidate_gen_per_text=3,
time_mask_ratio_start_and_end=(0.10, 0.15), # regenerate the 10% to 15% of the time steps in the spectrogram
# time_mask_ratio_start_and_end=(1.0, 1.0), # no inpainting
# freq_mask_ratio_start_and_end=(0.75, 1.0), # regenerate the higher 75% to 100% mel bins
freq_mask_ratio_start_and_end=(1.0, 1.0), # no super-resolution
config=None,
):
seed_everything(int(seed))
if config is not None:
assert type(config) is str
config = yaml.load(open(config, "r"), Loader=yaml.FullLoader)
else:
config = default_audioldm_config()
fn_STFT = TacotronSTFT(
config["preprocessing"]["stft"]["filter_length"],
config["preprocessing"]["stft"]["hop_length"],
config["preprocessing"]["stft"]["win_length"],
config["preprocessing"]["mel"]["n_mel_channels"],
config["preprocessing"]["audio"]["sampling_rate"],
config["preprocessing"]["mel"]["mel_fmin"],
config["preprocessing"]["mel"]["mel_fmax"],
)
# waveform = read_wav_file(original_audio_file_path, None)
mel, _, _ = wav_to_fbank(
original_audio_file_path, target_length=int(duration * 102.4), fn_STFT=fn_STFT
)
batch = make_batch_for_text_to_audio(text, fbank=mel[None,...], batchsize=batchsize)
# latent_diffusion.latent_t_size = duration_to_latent_t_size(duration)
latent_diffusion = set_cond_text(latent_diffusion)
with torch.no_grad():
waveform = latent_diffusion.generate_sample_masked(
[batch],
unconditional_guidance_scale=guidance_scale,
ddim_steps=ddim_steps,
n_candidate_gen_per_text=n_candidate_gen_per_text,
duration=duration,
time_mask_ratio_start_and_end=time_mask_ratio_start_and_end,
freq_mask_ratio_start_and_end=freq_mask_ratio_start_and_end
)
return waveform