|
from gtts import gTTS |
|
import edge_tts, asyncio, json, glob |
|
from tqdm import tqdm |
|
import librosa, os, re, torch, gc, subprocess |
|
from .language_configuration import ( |
|
fix_code_language, |
|
BARK_VOICES_LIST, |
|
VITS_VOICES_LIST, |
|
) |
|
from .utils import ( |
|
download_manager, |
|
create_directories, |
|
copy_files, |
|
rename_file, |
|
remove_directory_contents, |
|
remove_files, |
|
run_command, |
|
) |
|
import numpy as np |
|
from typing import Any, Dict |
|
from pathlib import Path |
|
import soundfile as sf |
|
import platform |
|
import logging |
|
import traceback |
|
from .logging_setup import logger |
|
|
|
|
|
class TTS_OperationError(Exception): |
|
def __init__(self, message="The operation did not complete successfully."): |
|
self.message = message |
|
super().__init__(self.message) |
|
|
|
|
|
def verify_saved_file_and_size(filename): |
|
if not os.path.exists(filename): |
|
raise TTS_OperationError(f"File '{filename}' was not saved.") |
|
if os.path.getsize(filename) == 0: |
|
raise TTS_OperationError( |
|
f"File '{filename}' has a zero size. " |
|
"Related to incorrect TTS for the target language" |
|
) |
|
|
|
|
|
def error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename): |
|
traceback.print_exc() |
|
logger.error(f"Error: {str(error)}") |
|
try: |
|
from tempfile import TemporaryFile |
|
|
|
tts = gTTS(segment["text"], lang=fix_code_language(TRANSLATE_AUDIO_TO)) |
|
|
|
f = TemporaryFile() |
|
tts.write_to_fp(f) |
|
|
|
|
|
f.seek(0) |
|
|
|
|
|
audio_data, samplerate = sf.read(f) |
|
f.close() |
|
sf.write( |
|
filename, audio_data, samplerate, format="ogg", subtype="vorbis" |
|
) |
|
|
|
logger.warning( |
|
'TTS auxiliary will be utilized ' |
|
f'rather than TTS: {segment["tts_name"]}' |
|
) |
|
verify_saved_file_and_size(filename) |
|
except Exception as error: |
|
logger.critical(f"Error: {str(error)}") |
|
sample_rate_aux = 22050 |
|
duration = float(segment["end"]) - float(segment["start"]) |
|
data = np.zeros(int(sample_rate_aux * duration)).astype(np.float32) |
|
sf.write( |
|
filename, data, sample_rate_aux, format="ogg", subtype="vorbis" |
|
) |
|
logger.error("Audio will be replaced -> [silent audio].") |
|
verify_saved_file_and_size(filename) |
|
|
|
|
|
def pad_array(array, sr): |
|
|
|
if isinstance(array, list): |
|
array = np.array(array) |
|
|
|
if not array.shape[0]: |
|
raise ValueError("The generated audio does not contain any data") |
|
|
|
valid_indices = np.where(np.abs(array) > 0.001)[0] |
|
|
|
if len(valid_indices) == 0: |
|
logger.debug(f"No valid indices: {array}") |
|
return array |
|
|
|
try: |
|
pad_indice = int(0.1 * sr) |
|
start_pad = max(0, valid_indices[0] - pad_indice) |
|
end_pad = min(len(array), valid_indices[-1] + 1 + pad_indice) |
|
padded_array = array[start_pad:end_pad] |
|
return padded_array |
|
except Exception as error: |
|
logger.error(str(error)) |
|
return array |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def edge_tts_voices_list(): |
|
try: |
|
completed_process = subprocess.run( |
|
["edge-tts", "--list-voices"], capture_output=True, text=True |
|
) |
|
lines = completed_process.stdout.strip().split("\n") |
|
except Exception as error: |
|
logger.debug(str(error)) |
|
lines = [] |
|
|
|
voices = [] |
|
for line in lines: |
|
if line.startswith("Name: "): |
|
voice_entry = {} |
|
voice_entry["Name"] = line.split(": ")[1] |
|
elif line.startswith("Gender: "): |
|
voice_entry["Gender"] = line.split(": ")[1] |
|
voices.append(voice_entry) |
|
|
|
formatted_voices = [ |
|
f"{entry['Name']}-{entry['Gender']}" for entry in voices |
|
] |
|
|
|
if not formatted_voices: |
|
logger.warning( |
|
"The list of Edge TTS voices could not be obtained, " |
|
"switching to an alternative method" |
|
) |
|
tts_voice_list = asyncio.new_event_loop().run_until_complete( |
|
edge_tts.list_voices() |
|
) |
|
formatted_voices = sorted( |
|
[f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] |
|
) |
|
|
|
if not formatted_voices: |
|
logger.error("Can't get EDGE TTS - list voices") |
|
|
|
return formatted_voices |
|
|
|
|
|
def segments_egde_tts(filtered_edge_segments, TRANSLATE_AUDIO_TO, is_gui): |
|
for segment in tqdm(filtered_edge_segments["segments"]): |
|
speaker = segment["speaker"] |
|
text = segment["text"] |
|
start = segment["start"] |
|
tts_name = segment["tts_name"] |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
temp_file = filename[:-3] + "mp3" |
|
|
|
logger.info(f"{text} >> {filename}") |
|
try: |
|
if is_gui: |
|
asyncio.run( |
|
edge_tts.Communicate( |
|
text, "-".join(tts_name.split("-")[:-1]) |
|
).save(temp_file) |
|
) |
|
else: |
|
|
|
command = f'edge-tts -t "{text}" -v "{tts_name.replace("-Male", "").replace("-Female", "")}" --write-media "{temp_file}"' |
|
run_command(command) |
|
verify_saved_file_and_size(temp_file) |
|
|
|
data, sample_rate = sf.read(temp_file) |
|
data = pad_array(data, sample_rate) |
|
|
|
|
|
|
|
sf.write( |
|
file=filename, |
|
samplerate=sample_rate, |
|
data=data, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
|
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def segments_bark_tts( |
|
filtered_bark_segments, TRANSLATE_AUDIO_TO, model_id_bark="suno/bark-small" |
|
): |
|
from transformers import AutoProcessor, BarkModel |
|
from optimum.bettertransformer import BetterTransformer |
|
|
|
device = os.environ.get("SONITR_DEVICE") |
|
torch_dtype_env = torch.float16 if device == "cuda" else torch.float32 |
|
|
|
|
|
model = BarkModel.from_pretrained( |
|
model_id_bark, torch_dtype=torch_dtype_env |
|
).to(device) |
|
model = model.to(device) |
|
processor = AutoProcessor.from_pretrained( |
|
model_id_bark, return_tensors="pt" |
|
) |
|
if device == "cuda": |
|
|
|
model = BetterTransformer.transform(model, keep_original_model=False) |
|
|
|
|
|
sampling_rate = model.generation_config.sample_rate |
|
|
|
|
|
|
|
|
|
|
|
|
|
for segment in tqdm(filtered_bark_segments["segments"]): |
|
speaker = segment["speaker"] |
|
text = segment["text"] |
|
start = segment["start"] |
|
tts_name = segment["tts_name"] |
|
|
|
inputs = processor(text, voice_preset=BARK_VOICES_LIST[tts_name]).to( |
|
device |
|
) |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
logger.info(f"{text} >> {filename}") |
|
try: |
|
|
|
with torch.inference_mode(): |
|
speech_output = model.generate( |
|
**inputs, |
|
do_sample=True, |
|
fine_temperature=0.4, |
|
coarse_temperature=0.8, |
|
pad_token_id=processor.tokenizer.pad_token_id, |
|
) |
|
|
|
data_tts = pad_array( |
|
speech_output.cpu().numpy().squeeze().astype(np.float32), |
|
sampling_rate, |
|
) |
|
sf.write( |
|
file=filename, |
|
samplerate=sampling_rate, |
|
data=data_tts, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
try: |
|
del processor |
|
del model |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def uromanize(input_string): |
|
"""Convert non-Roman strings to Roman using the `uroman` perl package.""" |
|
|
|
|
|
if not os.path.exists("./uroman"): |
|
logger.info( |
|
"Clonning repository uroman https://github.com/isi-nlp/uroman.git" |
|
" for romanize the text" |
|
) |
|
process = subprocess.Popen( |
|
["git", "clone", "https://github.com/isi-nlp/uroman.git"], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
) |
|
stdout, stderr = process.communicate() |
|
script_path = os.path.join("./uroman", "uroman", "uroman.pl") |
|
|
|
command = ["perl", script_path] |
|
|
|
process = subprocess.Popen( |
|
command, |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
) |
|
|
|
stdout, stderr = process.communicate(input=input_string.encode()) |
|
|
|
if process.returncode != 0: |
|
raise ValueError(f"Error {process.returncode}: {stderr.decode()}") |
|
|
|
|
|
return stdout.decode()[:-1] |
|
|
|
|
|
def segments_vits_tts(filtered_vits_segments, TRANSLATE_AUDIO_TO): |
|
from transformers import VitsModel, AutoTokenizer |
|
|
|
filtered_segments = filtered_vits_segments["segments"] |
|
|
|
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) |
|
logger.debug(sorted_segments) |
|
|
|
model_name_key = None |
|
for segment in tqdm(sorted_segments): |
|
speaker = segment["speaker"] |
|
text = segment["text"] |
|
start = segment["start"] |
|
tts_name = segment["tts_name"] |
|
|
|
if tts_name != model_name_key: |
|
model_name_key = tts_name |
|
model = VitsModel.from_pretrained(VITS_VOICES_LIST[tts_name]) |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
VITS_VOICES_LIST[tts_name] |
|
) |
|
sampling_rate = model.config.sampling_rate |
|
|
|
if tokenizer.is_uroman: |
|
romanize_text = uromanize(text) |
|
logger.debug(f"Romanize text: {romanize_text}") |
|
inputs = tokenizer(romanize_text, return_tensors="pt") |
|
else: |
|
inputs = tokenizer(text, return_tensors="pt") |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
logger.info(f"{text} >> {filename}") |
|
try: |
|
|
|
with torch.no_grad(): |
|
speech_output = model(**inputs).waveform |
|
|
|
data_tts = pad_array( |
|
speech_output.cpu().numpy().squeeze().astype(np.float32), |
|
sampling_rate, |
|
) |
|
|
|
sf.write( |
|
file=filename, |
|
samplerate=sampling_rate, |
|
data=data_tts, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
try: |
|
del tokenizer |
|
del model |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def coqui_xtts_voices_list(): |
|
main_folder = "_XTTS_" |
|
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") |
|
pattern_automatic_speaker = re.compile(r"AUTOMATIC_SPEAKER_\d+\.wav$") |
|
|
|
|
|
|
|
wav_voices = [ |
|
"_XTTS_/" + f |
|
for f in os.listdir(main_folder) |
|
if os.path.isfile(os.path.join(main_folder, f)) |
|
and pattern_coqui.match(f) |
|
and not pattern_automatic_speaker.match(f) |
|
] |
|
|
|
return ["_XTTS_/AUTOMATIC.wav"] + wav_voices |
|
|
|
|
|
def seconds_to_hhmmss_ms(seconds): |
|
hours = seconds // 3600 |
|
minutes = (seconds % 3600) // 60 |
|
seconds = seconds % 60 |
|
milliseconds = int((seconds - int(seconds)) * 1000) |
|
return "%02d:%02d:%02d.%03d" % (hours, minutes, int(seconds), milliseconds) |
|
|
|
|
|
def audio_trimming(audio_path, destination, start, end): |
|
if isinstance(start, (int, float)): |
|
start = seconds_to_hhmmss_ms(start) |
|
if isinstance(end, (int, float)): |
|
end = seconds_to_hhmmss_ms(end) |
|
|
|
if destination: |
|
file_directory = destination |
|
else: |
|
file_directory = os.path.dirname(audio_path) |
|
|
|
file_name = os.path.splitext(os.path.basename(audio_path))[0] |
|
file_ = f"{file_name}_trim.wav" |
|
|
|
output_path = os.path.join(file_directory, file_) |
|
|
|
|
|
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ss {start} -to {end} -acodec pcm_s16le -f wav "{output_path}"' |
|
run_command(command) |
|
|
|
return output_path |
|
|
|
|
|
def convert_to_xtts_good_sample(audio_path: str = "", destination: str = ""): |
|
if destination: |
|
file_directory = destination |
|
else: |
|
file_directory = os.path.dirname(audio_path) |
|
|
|
file_name = os.path.splitext(os.path.basename(audio_path))[0] |
|
file_ = f"{file_name}_good_sample.wav" |
|
|
|
mono_path = os.path.join(file_directory, file_) |
|
|
|
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 1 -ar 22050 -sample_fmt s16 -f wav "{mono_path}"' |
|
run_command(command) |
|
|
|
return mono_path |
|
|
|
|
|
def sanitize_file_name(file_name): |
|
import unicodedata |
|
|
|
|
|
|
|
normalized_name = unicodedata.normalize("NFKD", file_name) |
|
|
|
sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name) |
|
return sanitized_name |
|
|
|
|
|
def create_wav_file_vc( |
|
sample_name="", |
|
audio_wav="", |
|
start=None, |
|
end=None, |
|
output_final_path="_XTTS_", |
|
get_vocals_dereverb=True, |
|
): |
|
sample_name = sample_name if sample_name else "default_name" |
|
sample_name = sanitize_file_name(sample_name) |
|
audio_wav = audio_wav if isinstance(audio_wav, str) else audio_wav.name |
|
|
|
BASE_DIR = ( |
|
"." |
|
) |
|
|
|
output_dir = os.path.join(BASE_DIR, "clean_song_output") |
|
|
|
|
|
if start or end: |
|
|
|
audio_segment = audio_trimming(audio_wav, output_dir, start, end) |
|
else: |
|
|
|
audio_segment = audio_wav |
|
|
|
from .mdx_net import process_uvr_task |
|
|
|
try: |
|
_, _, _, _, audio_segment = process_uvr_task( |
|
orig_song_path=audio_segment, |
|
main_vocals=True, |
|
dereverb=get_vocals_dereverb, |
|
) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
|
|
sample = convert_to_xtts_good_sample(audio_segment) |
|
|
|
sample_name = f"{sample_name}.wav" |
|
sample_rename = rename_file(sample, sample_name) |
|
|
|
copy_files(sample_rename, output_final_path) |
|
|
|
final_sample = os.path.join(output_final_path, sample_name) |
|
if os.path.exists(final_sample): |
|
logger.info(final_sample) |
|
return final_sample |
|
else: |
|
raise Exception(f"Error wav: {final_sample}") |
|
|
|
|
|
def create_new_files_for_vc( |
|
speakers_coqui, |
|
segments_base, |
|
dereverb_automatic=True |
|
): |
|
|
|
output_dir = os.path.join(".", "clean_song_output") |
|
remove_directory_contents(output_dir) |
|
|
|
for speaker in speakers_coqui: |
|
filtered_speaker = [ |
|
segment |
|
for segment in segments_base |
|
if segment["speaker"] == speaker |
|
] |
|
if len(filtered_speaker) > 4: |
|
filtered_speaker = filtered_speaker[1:] |
|
if filtered_speaker[0]["tts_name"] == "_XTTS_/AUTOMATIC.wav": |
|
name_automatic_wav = f"AUTOMATIC_{speaker}" |
|
if os.path.exists(f"_XTTS_/{name_automatic_wav}.wav"): |
|
logger.info(f"WAV automatic {speaker} exists") |
|
|
|
pass |
|
else: |
|
|
|
wav_ok = False |
|
for seg in filtered_speaker: |
|
duration = float(seg["end"]) - float(seg["start"]) |
|
if duration > 7.0 and duration < 12.0: |
|
logger.info( |
|
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' |
|
) |
|
create_wav_file_vc( |
|
sample_name=name_automatic_wav, |
|
audio_wav="audio.wav", |
|
start=(float(seg["start"]) + 1.0), |
|
end=(float(seg["end"]) - 1.0), |
|
get_vocals_dereverb=dereverb_automatic, |
|
) |
|
wav_ok = True |
|
break |
|
|
|
if not wav_ok: |
|
logger.info("Taking the first segment") |
|
seg = filtered_speaker[0] |
|
logger.info( |
|
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' |
|
) |
|
max_duration = float(seg["end"]) - float(seg["start"]) |
|
max_duration = max(2.0, min(max_duration, 9.0)) |
|
|
|
create_wav_file_vc( |
|
sample_name=name_automatic_wav, |
|
audio_wav="audio.wav", |
|
start=(float(seg["start"])), |
|
end=(float(seg["start"]) + max_duration), |
|
get_vocals_dereverb=dereverb_automatic, |
|
) |
|
|
|
|
|
def segments_coqui_tts( |
|
filtered_coqui_segments, |
|
TRANSLATE_AUDIO_TO, |
|
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", |
|
speakers_coqui=None, |
|
delete_previous_automatic=True, |
|
dereverb_automatic=True, |
|
emotion=None, |
|
): |
|
"""XTTS |
|
Install: |
|
pip install -q TTS==0.21.1 |
|
pip install -q numpy==1.23.5 |
|
|
|
Notes: |
|
- tts_name is the wav|mp3|ogg|m4a file for VC |
|
""" |
|
from TTS.api import TTS |
|
|
|
TRANSLATE_AUDIO_TO = fix_code_language(TRANSLATE_AUDIO_TO, syntax="coqui") |
|
supported_lang_coqui = [ |
|
"zh-cn", |
|
"en", |
|
"fr", |
|
"de", |
|
"it", |
|
"pt", |
|
"pl", |
|
"tr", |
|
"ru", |
|
"nl", |
|
"cs", |
|
"ar", |
|
"es", |
|
"hu", |
|
"ko", |
|
"ja", |
|
] |
|
if TRANSLATE_AUDIO_TO not in supported_lang_coqui: |
|
raise TTS_OperationError( |
|
f"'{TRANSLATE_AUDIO_TO}' is not a supported language for Coqui XTTS" |
|
) |
|
|
|
|
|
|
|
if delete_previous_automatic: |
|
for spk in speakers_coqui: |
|
remove_files(f"_XTTS_/AUTOMATIC_{spk}.wav") |
|
|
|
directory_audios_vc = "_XTTS_" |
|
create_directories(directory_audios_vc) |
|
create_new_files_for_vc( |
|
speakers_coqui, |
|
filtered_coqui_segments["segments"], |
|
dereverb_automatic, |
|
) |
|
|
|
|
|
device = os.environ.get("SONITR_DEVICE") |
|
model = TTS(model_id_coqui).to(device) |
|
sampling_rate = 24000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
for segment in tqdm(filtered_coqui_segments["segments"]): |
|
speaker = segment["speaker"] |
|
text = segment["text"] |
|
start = segment["start"] |
|
tts_name = segment["tts_name"] |
|
if tts_name == "_XTTS_/AUTOMATIC.wav": |
|
tts_name = f"_XTTS_/AUTOMATIC_{speaker}.wav" |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
logger.info(f"{text} >> {filename}") |
|
try: |
|
|
|
wav = model.tts( |
|
text=text, speaker_wav=tts_name, language=TRANSLATE_AUDIO_TO |
|
) |
|
data_tts = pad_array( |
|
wav, |
|
sampling_rate, |
|
) |
|
|
|
sf.write( |
|
file=filename, |
|
samplerate=sampling_rate, |
|
data=data_tts, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
try: |
|
del model |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def piper_tts_voices_list(): |
|
file_path = download_manager( |
|
url="https://huggingface.co/rhasspy/piper-voices/resolve/main/voices.json", |
|
path="./PIPER_MODELS", |
|
) |
|
|
|
with open(file_path, "r", encoding="utf8") as file: |
|
data = json.load(file) |
|
piper_id_models = [key + " VITS-onnx" for key in data.keys()] |
|
|
|
return piper_id_models |
|
|
|
|
|
def replace_text_in_json(file_path, key_to_replace, new_text, condition=None): |
|
|
|
with open(file_path, "r", encoding="utf-8") as file: |
|
data = json.load(file) |
|
|
|
|
|
if key_to_replace in data: |
|
if condition: |
|
value_condition = condition |
|
else: |
|
value_condition = data[key_to_replace] |
|
|
|
if data[key_to_replace] == value_condition: |
|
data[key_to_replace] = new_text |
|
|
|
|
|
with open(file_path, "w") as file: |
|
json.dump( |
|
data, file, indent=2 |
|
) |
|
|
|
|
|
def load_piper_model( |
|
model: str, |
|
data_dir: list, |
|
download_dir: str = "", |
|
update_voices: bool = False, |
|
): |
|
from piper import PiperVoice |
|
from piper.download import ensure_voice_exists, find_voice, get_voices |
|
|
|
try: |
|
import onnxruntime as rt |
|
|
|
if rt.get_device() == "GPU" and os.environ.get("SONITR_DEVICE") == "cuda": |
|
logger.debug("onnxruntime device > GPU") |
|
cuda = True |
|
else: |
|
logger.info( |
|
"onnxruntime device > CPU" |
|
) |
|
cuda = False |
|
except Exception as error: |
|
raise TTS_OperationError(f"onnxruntime error: {str(error)}") |
|
|
|
|
|
if platform.system() == "Windows": |
|
logger.info("Employing CPU exclusivity with Piper TTS") |
|
cuda = False |
|
|
|
if not download_dir: |
|
|
|
download_dir = data_dir[0] |
|
else: |
|
data_dir = [os.path.join(data_dir[0], download_dir)] |
|
|
|
|
|
model_path = Path(model) |
|
if not model_path.exists(): |
|
|
|
voices_info = get_voices(download_dir, update_voices=update_voices) |
|
|
|
|
|
aliases_info: Dict[str, Any] = {} |
|
for voice_info in voices_info.values(): |
|
for voice_alias in voice_info.get("aliases", []): |
|
aliases_info[voice_alias] = {"_is_alias": True, **voice_info} |
|
|
|
voices_info.update(aliases_info) |
|
ensure_voice_exists(model, data_dir, download_dir, voices_info) |
|
model, config = find_voice(model, data_dir) |
|
|
|
replace_text_in_json( |
|
config, "phoneme_type", "espeak", "PhonemeType.ESPEAK" |
|
) |
|
|
|
|
|
voice = PiperVoice.load(model, config_path=config, use_cuda=cuda) |
|
|
|
return voice |
|
|
|
|
|
def synthesize_text_to_audio_np_array(voice, text, synthesize_args): |
|
audio_stream = voice.synthesize_stream_raw(text, **synthesize_args) |
|
|
|
|
|
audio_data = b"" |
|
for audio_bytes in audio_stream: |
|
audio_data += audio_bytes |
|
|
|
|
|
audio_np = np.frombuffer(audio_data, dtype=np.int16) |
|
return audio_np |
|
|
|
|
|
def segments_vits_onnx_tts(filtered_onnx_vits_segments, TRANSLATE_AUDIO_TO): |
|
""" |
|
Install: |
|
pip install -q piper-tts==1.2.0 onnxruntime-gpu # for cuda118 |
|
""" |
|
|
|
data_dir = [ |
|
str(Path.cwd()) |
|
] |
|
download_dir = "PIPER_MODELS" |
|
|
|
update_voices = True |
|
|
|
synthesize_args = { |
|
"speaker_id": None, |
|
"length_scale": 1.0, |
|
"noise_scale": 0.667, |
|
"noise_w": 0.8, |
|
"sentence_silence": 0.0, |
|
} |
|
|
|
filtered_segments = filtered_onnx_vits_segments["segments"] |
|
|
|
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) |
|
logger.debug(sorted_segments) |
|
|
|
model_name_key = None |
|
for segment in tqdm(sorted_segments): |
|
speaker = segment["speaker"] |
|
text = segment["text"] |
|
start = segment["start"] |
|
tts_name = segment["tts_name"].replace(" VITS-onnx", "") |
|
|
|
if tts_name != model_name_key: |
|
model_name_key = tts_name |
|
model = load_piper_model( |
|
tts_name, data_dir, download_dir, update_voices |
|
) |
|
sampling_rate = model.config.sample_rate |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
logger.info(f"{text} >> {filename}") |
|
try: |
|
|
|
speech_output = synthesize_text_to_audio_np_array( |
|
model, text, synthesize_args |
|
) |
|
data_tts = pad_array( |
|
speech_output, |
|
sampling_rate, |
|
) |
|
|
|
sf.write( |
|
file=filename, |
|
samplerate=sampling_rate, |
|
data=data_tts, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
try: |
|
del model |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def segments_openai_tts( |
|
filtered_openai_tts_segments, TRANSLATE_AUDIO_TO |
|
): |
|
from openai import OpenAI |
|
|
|
client = OpenAI() |
|
sampling_rate = 24000 |
|
|
|
|
|
|
|
|
|
|
|
for segment in tqdm(filtered_openai_tts_segments["segments"]): |
|
speaker = segment["speaker"] |
|
text = segment["text"].strip() |
|
start = segment["start"] |
|
tts_name = segment["tts_name"] |
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
logger.info(f"{text} >> {filename}") |
|
|
|
try: |
|
|
|
response = client.audio.speech.create( |
|
model="tts-1-hd" if "HD" in tts_name else "tts-1", |
|
voice=tts_name.split()[0][1:], |
|
response_format="wav", |
|
input=text |
|
) |
|
|
|
audio_bytes = b'' |
|
for data in response.iter_bytes(chunk_size=4096): |
|
audio_bytes += data |
|
|
|
speech_output = np.frombuffer(audio_bytes, dtype=np.int16) |
|
|
|
|
|
data_tts = pad_array( |
|
speech_output[240:], |
|
sampling_rate, |
|
) |
|
|
|
sf.write( |
|
file=filename, |
|
samplerate=sampling_rate, |
|
data=data_tts, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
verify_saved_file_and_size(filename) |
|
|
|
except Exception as error: |
|
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_spkr(pattern, speaker_to_voice, segments): |
|
return [ |
|
speaker |
|
for speaker, voice in speaker_to_voice.items() |
|
if pattern.match(voice) and any( |
|
segment["speaker"] == speaker for segment in segments |
|
) |
|
] |
|
|
|
|
|
def filter_by_speaker(speakers, segments): |
|
return { |
|
"segments": [ |
|
segment |
|
for segment in segments |
|
if segment["speaker"] in speakers |
|
] |
|
} |
|
|
|
|
|
def audio_segmentation_to_voice( |
|
result_diarize, |
|
TRANSLATE_AUDIO_TO, |
|
is_gui, |
|
tts_voice00, |
|
tts_voice01="", |
|
tts_voice02="", |
|
tts_voice03="", |
|
tts_voice04="", |
|
tts_voice05="", |
|
tts_voice06="", |
|
tts_voice07="", |
|
tts_voice08="", |
|
tts_voice09="", |
|
tts_voice10="", |
|
tts_voice11="", |
|
dereverb_automatic=True, |
|
model_id_bark="suno/bark-small", |
|
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", |
|
delete_previous_automatic=True, |
|
): |
|
|
|
remove_directory_contents("audio") |
|
|
|
|
|
speaker_to_voice = { |
|
"SPEAKER_00": tts_voice00, |
|
"SPEAKER_01": tts_voice01, |
|
"SPEAKER_02": tts_voice02, |
|
"SPEAKER_03": tts_voice03, |
|
"SPEAKER_04": tts_voice04, |
|
"SPEAKER_05": tts_voice05, |
|
"SPEAKER_06": tts_voice06, |
|
"SPEAKER_07": tts_voice07, |
|
"SPEAKER_08": tts_voice08, |
|
"SPEAKER_09": tts_voice09, |
|
"SPEAKER_10": tts_voice10, |
|
"SPEAKER_11": tts_voice11, |
|
} |
|
|
|
|
|
for segment in result_diarize["segments"]: |
|
if "speaker" not in segment: |
|
segment["speaker"] = "SPEAKER_00" |
|
logger.warning( |
|
"NO SPEAKER DETECT IN SEGMENT: First TTS will be used in the" |
|
f" segment time {segment['start'], segment['text']}" |
|
) |
|
|
|
segment["tts_name"] = speaker_to_voice[segment["speaker"]] |
|
|
|
|
|
pattern_edge = re.compile(r".*-(Male|Female)$") |
|
pattern_bark = re.compile(r".* BARK$") |
|
pattern_vits = re.compile(r".* VITS$") |
|
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") |
|
pattern_vits_onnx = re.compile(r".* VITS-onnx$") |
|
pattern_openai_tts = re.compile(r".* OpenAI-TTS$") |
|
|
|
all_segments = result_diarize["segments"] |
|
|
|
speakers_edge = find_spkr(pattern_edge, speaker_to_voice, all_segments) |
|
speakers_bark = find_spkr(pattern_bark, speaker_to_voice, all_segments) |
|
speakers_vits = find_spkr(pattern_vits, speaker_to_voice, all_segments) |
|
speakers_coqui = find_spkr(pattern_coqui, speaker_to_voice, all_segments) |
|
speakers_vits_onnx = find_spkr( |
|
pattern_vits_onnx, speaker_to_voice, all_segments |
|
) |
|
speakers_openai_tts = find_spkr( |
|
pattern_openai_tts, speaker_to_voice, all_segments |
|
) |
|
|
|
|
|
filtered_edge = filter_by_speaker(speakers_edge, all_segments) |
|
filtered_bark = filter_by_speaker(speakers_bark, all_segments) |
|
filtered_vits = filter_by_speaker(speakers_vits, all_segments) |
|
filtered_coqui = filter_by_speaker(speakers_coqui, all_segments) |
|
filtered_vits_onnx = filter_by_speaker(speakers_vits_onnx, all_segments) |
|
filtered_openai_tts = filter_by_speaker(speakers_openai_tts, all_segments) |
|
|
|
|
|
if filtered_edge["segments"]: |
|
logger.info(f"EDGE TTS: {speakers_edge}") |
|
segments_egde_tts(filtered_edge, TRANSLATE_AUDIO_TO, is_gui) |
|
if filtered_bark["segments"]: |
|
logger.info(f"BARK TTS: {speakers_bark}") |
|
segments_bark_tts( |
|
filtered_bark, TRANSLATE_AUDIO_TO, model_id_bark |
|
) |
|
if filtered_vits["segments"]: |
|
logger.info(f"VITS TTS: {speakers_vits}") |
|
segments_vits_tts(filtered_vits, TRANSLATE_AUDIO_TO) |
|
if filtered_coqui["segments"]: |
|
logger.info(f"Coqui TTS: {speakers_coqui}") |
|
segments_coqui_tts( |
|
filtered_coqui, |
|
TRANSLATE_AUDIO_TO, |
|
model_id_coqui, |
|
speakers_coqui, |
|
delete_previous_automatic, |
|
dereverb_automatic, |
|
) |
|
if filtered_vits_onnx["segments"]: |
|
logger.info(f"PIPER TTS: {speakers_vits_onnx}") |
|
segments_vits_onnx_tts(filtered_vits_onnx, TRANSLATE_AUDIO_TO) |
|
if filtered_openai_tts["segments"]: |
|
logger.info(f"OpenAI TTS: {speakers_openai_tts}") |
|
segments_openai_tts(filtered_openai_tts, TRANSLATE_AUDIO_TO) |
|
|
|
[result.pop("tts_name", None) for result in result_diarize["segments"]] |
|
return [ |
|
speakers_edge, |
|
speakers_bark, |
|
speakers_vits, |
|
speakers_coqui, |
|
speakers_vits_onnx, |
|
speakers_openai_tts |
|
] |
|
|
|
|
|
def accelerate_segments( |
|
result_diarize, |
|
max_accelerate_audio, |
|
valid_speakers, |
|
acceleration_rate_regulation=False, |
|
folder_output="audio2", |
|
): |
|
logger.info("Apply acceleration") |
|
|
|
( |
|
speakers_edge, |
|
speakers_bark, |
|
speakers_vits, |
|
speakers_coqui, |
|
speakers_vits_onnx, |
|
speakers_openai_tts |
|
) = valid_speakers |
|
|
|
create_directories(f"{folder_output}/audio/") |
|
remove_directory_contents(f"{folder_output}/audio/") |
|
|
|
audio_files = [] |
|
speakers_list = [] |
|
|
|
max_count_segments_idx = len(result_diarize["segments"]) - 1 |
|
|
|
for i, segment in tqdm(enumerate(result_diarize["segments"])): |
|
text = segment["text"] |
|
start = segment["start"] |
|
end = segment["end"] |
|
speaker = segment["speaker"] |
|
|
|
|
|
|
|
filename = f"audio/{start}.ogg" |
|
|
|
|
|
|
|
|
|
duration_true = end - start |
|
duration_tts = librosa.get_duration(filename=filename) |
|
|
|
|
|
acc_percentage = duration_tts / duration_true |
|
|
|
|
|
if acceleration_rate_regulation and acc_percentage >= 1.3: |
|
try: |
|
next_segment = result_diarize["segments"][ |
|
min(max_count_segments_idx, i + 1) |
|
] |
|
next_start = next_segment["start"] |
|
next_speaker = next_segment["speaker"] |
|
duration_with_next_start = next_start - start |
|
|
|
if duration_with_next_start > duration_true: |
|
extra_time = duration_with_next_start - duration_true |
|
|
|
if speaker == next_speaker: |
|
|
|
smoth_duration = duration_true + (extra_time * 0.5) |
|
else: |
|
|
|
smoth_duration = duration_true + (extra_time * 0.7) |
|
logger.debug( |
|
f"Base acc: {acc_percentage}, " |
|
f"smoth acc: {duration_tts / smoth_duration}" |
|
) |
|
acc_percentage = max(1.2, (duration_tts / smoth_duration)) |
|
|
|
except Exception as error: |
|
logger.error(str(error)) |
|
|
|
if acc_percentage > max_accelerate_audio: |
|
acc_percentage = max_accelerate_audio |
|
elif acc_percentage <= 1.15 and acc_percentage >= 0.8: |
|
acc_percentage = 1.0 |
|
elif acc_percentage <= 0.79: |
|
acc_percentage = 0.8 |
|
|
|
|
|
acc_percentage = round(acc_percentage + 0.0, 1) |
|
|
|
|
|
if speaker in speakers_edge: |
|
info_enc = sf.info(filename).format |
|
else: |
|
info_enc = "OGG" |
|
|
|
|
|
if acc_percentage == 1.0 and info_enc == "OGG": |
|
copy_files(filename, f"{folder_output}{os.sep}audio") |
|
else: |
|
os.system( |
|
f"ffmpeg -y -loglevel panic -i {filename} -filter:a atempo={acc_percentage} {folder_output}/{filename}" |
|
) |
|
|
|
if logger.isEnabledFor(logging.DEBUG): |
|
duration_create = librosa.get_duration( |
|
filename=f"{folder_output}/{filename}" |
|
) |
|
logger.debug( |
|
f"acc_percen is {acc_percentage}, tts duration " |
|
f"is {duration_tts}, new duration is {duration_create}" |
|
f", for {filename}" |
|
) |
|
|
|
audio_files.append(f"{folder_output}/{filename}") |
|
speaker = "TTS Speaker {:02d}".format(int(speaker[-2:]) + 1) |
|
speakers_list.append(speaker) |
|
|
|
return audio_files, speakers_list |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def se_process_audio_segments( |
|
source_seg, tone_color_converter, device, remove_previous_processed=True |
|
): |
|
|
|
source_audio_segs = glob.glob(f"{source_seg}/*.wav") |
|
if not source_audio_segs: |
|
raise ValueError( |
|
f"No audio segments found in {str(source_audio_segs)}" |
|
) |
|
|
|
source_se_path = os.path.join(source_seg, "se.pth") |
|
|
|
|
|
if os.path.isfile(source_se_path): |
|
se = torch.load(source_se_path).to(device) |
|
logger.debug(f"Previous created {source_se_path}") |
|
else: |
|
se = tone_color_converter.extract_se(source_audio_segs, source_se_path) |
|
|
|
return se |
|
|
|
|
|
def create_wav_vc( |
|
valid_speakers, |
|
segments_base, |
|
audio_name, |
|
max_segments=10, |
|
target_dir="processed", |
|
get_vocals_dereverb=False, |
|
): |
|
|
|
|
|
|
|
output_dir = os.path.join(".", target_dir) |
|
|
|
|
|
path_source_segments = [] |
|
path_target_segments = [] |
|
for speaker in valid_speakers: |
|
filtered_speaker = [ |
|
segment |
|
for segment in segments_base |
|
if segment["speaker"] == speaker |
|
] |
|
if len(filtered_speaker) > 4: |
|
filtered_speaker = filtered_speaker[1:] |
|
|
|
dir_name_speaker = speaker + audio_name |
|
dir_name_speaker_tts = "tts" + speaker + audio_name |
|
dir_path_speaker = os.path.join(output_dir, dir_name_speaker) |
|
dir_path_speaker_tts = os.path.join(output_dir, dir_name_speaker_tts) |
|
create_directories([dir_path_speaker, dir_path_speaker_tts]) |
|
|
|
path_target_segments.append(dir_path_speaker) |
|
path_source_segments.append(dir_path_speaker_tts) |
|
|
|
|
|
max_segments_count = 0 |
|
for seg in filtered_speaker: |
|
duration = float(seg["end"]) - float(seg["start"]) |
|
if duration > 3.0 and duration < 18.0: |
|
logger.info( |
|
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' |
|
) |
|
name_new_wav = str(seg["start"]) |
|
|
|
check_segment_audio_target_file = os.path.join( |
|
dir_path_speaker, f"{name_new_wav}.wav" |
|
) |
|
|
|
if os.path.exists(check_segment_audio_target_file): |
|
logger.debug( |
|
"Segment vc source exists: " |
|
f"{check_segment_audio_target_file}" |
|
) |
|
pass |
|
else: |
|
create_wav_file_vc( |
|
sample_name=name_new_wav, |
|
audio_wav="audio.wav", |
|
start=(float(seg["start"]) + 1.0), |
|
end=(float(seg["end"]) - 1.0), |
|
output_final_path=dir_path_speaker, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
) |
|
|
|
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" |
|
|
|
convert_to_xtts_good_sample( |
|
file_name_tts, dir_path_speaker_tts |
|
) |
|
|
|
max_segments_count += 1 |
|
if max_segments_count == max_segments: |
|
break |
|
|
|
if max_segments_count == 0: |
|
logger.info("Taking the first segment") |
|
seg = filtered_speaker[0] |
|
logger.info( |
|
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' |
|
) |
|
max_duration = float(seg["end"]) - float(seg["start"]) |
|
max_duration = max(1.0, min(max_duration, 18.0)) |
|
|
|
name_new_wav = str(seg["start"]) |
|
create_wav_file_vc( |
|
sample_name=name_new_wav, |
|
audio_wav="audio.wav", |
|
start=(float(seg["start"])), |
|
end=(float(seg["start"]) + max_duration), |
|
output_final_path=dir_path_speaker, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
) |
|
|
|
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" |
|
|
|
convert_to_xtts_good_sample(file_name_tts, dir_path_speaker_tts) |
|
|
|
logger.debug(f"Base: {str(path_source_segments)}") |
|
logger.debug(f"Target: {str(path_target_segments)}") |
|
|
|
return path_source_segments, path_target_segments |
|
|
|
|
|
def toneconverter_openvoice( |
|
result_diarize, |
|
preprocessor_max_segments, |
|
remove_previous_process=True, |
|
get_vocals_dereverb=False, |
|
model="openvoice", |
|
): |
|
audio_path = "audio.wav" |
|
|
|
target_dir = "processed" |
|
create_directories(target_dir) |
|
|
|
from openvoice import se_extractor |
|
from openvoice.api import ToneColorConverter |
|
|
|
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" |
|
|
|
|
|
|
|
|
|
valid_speakers = list( |
|
{item["speaker"] for item in result_diarize["segments"]} |
|
) |
|
|
|
logger.info("Openvoice preprocessor...") |
|
|
|
if remove_previous_process: |
|
remove_directory_contents(target_dir) |
|
|
|
path_source_segments, path_target_segments = create_wav_vc( |
|
valid_speakers, |
|
result_diarize["segments"], |
|
audio_name, |
|
max_segments=preprocessor_max_segments, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
) |
|
|
|
logger.info("Openvoice loading model...") |
|
model_path_openvoice = "./OPENVOICE_MODELS" |
|
url_model_openvoice = "https://huggingface.co/myshell-ai/OpenVoice/resolve/main/checkpoints/converter" |
|
|
|
if "v2" in model: |
|
model_path = os.path.join(model_path_openvoice, "v2") |
|
url_model_openvoice = url_model_openvoice.replace( |
|
"OpenVoice", "OpenVoiceV2" |
|
).replace("checkpoints/", "") |
|
else: |
|
model_path = os.path.join(model_path_openvoice, "v1") |
|
create_directories(model_path) |
|
|
|
config_url = f"{url_model_openvoice}/config.json" |
|
checkpoint_url = f"{url_model_openvoice}/checkpoint.pth" |
|
|
|
config_path = download_manager(url=config_url, path=model_path) |
|
checkpoint_path = download_manager( |
|
url=checkpoint_url, path=model_path |
|
) |
|
|
|
device = os.environ.get("SONITR_DEVICE") |
|
tone_color_converter = ToneColorConverter(config_path, device=device) |
|
tone_color_converter.load_ckpt(checkpoint_path) |
|
|
|
logger.info("Openvoice tone color converter:") |
|
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") |
|
|
|
for source_seg, target_seg, speaker in zip( |
|
path_source_segments, path_target_segments, valid_speakers |
|
): |
|
|
|
source_se = se_process_audio_segments(source_seg, tone_color_converter, device) |
|
|
|
target_se = se_process_audio_segments(target_seg, tone_color_converter, device) |
|
|
|
|
|
encode_message = "@MyShell" |
|
filtered_speaker = [ |
|
segment |
|
for segment in result_diarize["segments"] |
|
if segment["speaker"] == speaker |
|
] |
|
for seg in filtered_speaker: |
|
src_path = ( |
|
save_path |
|
) = f"audio2/audio/{str(seg['start'])}.ogg" |
|
logger.debug(f"{src_path}") |
|
|
|
tone_color_converter.convert( |
|
audio_src_path=src_path, |
|
src_se=source_se, |
|
tgt_se=target_se, |
|
output_path=save_path, |
|
message=encode_message, |
|
) |
|
|
|
global_progress_bar.update(1) |
|
|
|
global_progress_bar.close() |
|
|
|
try: |
|
del tone_color_converter |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
def toneconverter_freevc( |
|
result_diarize, |
|
remove_previous_process=True, |
|
get_vocals_dereverb=False, |
|
): |
|
audio_path = "audio.wav" |
|
target_dir = "processed" |
|
create_directories(target_dir) |
|
|
|
from openvoice import se_extractor |
|
|
|
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" |
|
|
|
|
|
valid_speakers = list( |
|
{item["speaker"] for item in result_diarize["segments"]} |
|
) |
|
|
|
logger.info("FreeVC preprocessor...") |
|
|
|
if remove_previous_process: |
|
remove_directory_contents(target_dir) |
|
|
|
path_source_segments, path_target_segments = create_wav_vc( |
|
valid_speakers, |
|
result_diarize["segments"], |
|
audio_name, |
|
max_segments=1, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
) |
|
|
|
logger.info("FreeVC loading model...") |
|
device_id = os.environ.get("SONITR_DEVICE") |
|
device = None if device_id == "cpu" else device_id |
|
try: |
|
from TTS.api import TTS |
|
tts = TTS( |
|
model_name="voice_conversion_models/multilingual/vctk/freevc24", |
|
progress_bar=False |
|
).to(device) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
logger.error("Error loading the FreeVC model.") |
|
return |
|
|
|
logger.info("FreeVC process:") |
|
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") |
|
|
|
for source_seg, target_seg, speaker in zip( |
|
path_source_segments, path_target_segments, valid_speakers |
|
): |
|
|
|
filtered_speaker = [ |
|
segment |
|
for segment in result_diarize["segments"] |
|
if segment["speaker"] == speaker |
|
] |
|
|
|
files_and_directories = os.listdir(target_seg) |
|
wav_files = [file for file in files_and_directories if file.endswith(".wav")] |
|
original_wav_audio_segment = os.path.join(target_seg, wav_files[0]) |
|
|
|
for seg in filtered_speaker: |
|
|
|
src_path = ( |
|
save_path |
|
) = f"audio2/audio/{str(seg['start'])}.ogg" |
|
logger.debug(f"{src_path} - {original_wav_audio_segment}") |
|
|
|
wav = tts.voice_conversion( |
|
source_wav=src_path, |
|
target_wav=original_wav_audio_segment, |
|
) |
|
|
|
sf.write( |
|
file=save_path, |
|
samplerate=tts.voice_converter.vc_config.audio.output_sample_rate, |
|
data=wav, |
|
format="ogg", |
|
subtype="vorbis", |
|
) |
|
|
|
global_progress_bar.update(1) |
|
|
|
global_progress_bar.close() |
|
|
|
try: |
|
del tts |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
except Exception as error: |
|
logger.error(str(error)) |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
def toneconverter( |
|
result_diarize, |
|
preprocessor_max_segments, |
|
remove_previous_process=True, |
|
get_vocals_dereverb=False, |
|
method_vc="freevc" |
|
): |
|
|
|
if method_vc == "freevc": |
|
if preprocessor_max_segments > 1: |
|
logger.info("FreeVC only uses one segment.") |
|
return toneconverter_freevc( |
|
result_diarize, |
|
remove_previous_process=remove_previous_process, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
) |
|
elif "openvoice" in method_vc: |
|
return toneconverter_openvoice( |
|
result_diarize, |
|
preprocessor_max_segments, |
|
remove_previous_process=remove_previous_process, |
|
get_vocals_dereverb=get_vocals_dereverb, |
|
model=method_vc, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
from segments import result_diarize |
|
|
|
audio_segmentation_to_voice( |
|
result_diarize, |
|
TRANSLATE_AUDIO_TO="en", |
|
max_accelerate_audio=2.1, |
|
is_gui=True, |
|
tts_voice00="en-facebook-mms VITS", |
|
tts_voice01="en-CA-ClaraNeural-Female", |
|
tts_voice02="en-GB-ThomasNeural-Male", |
|
tts_voice03="en-GB-SoniaNeural-Female", |
|
tts_voice04="en-NZ-MitchellNeural-Male", |
|
tts_voice05="en-GB-MaisieNeural-Female", |
|
) |
|
|