Spaces:
Running
Running
from __future__ import annotations | |
import os | |
from logging import getLogger | |
from multiprocessing import freeze_support | |
from pathlib import Path | |
from typing import Literal | |
import click | |
import torch | |
from so_vits_svc_fork import __version__ | |
from so_vits_svc_fork.utils import get_optimal_device | |
LOG = getLogger(__name__) | |
IS_TEST = "test" in Path(__file__).parent.stem | |
if IS_TEST: | |
LOG.debug("Test mode is on.") | |
class RichHelpFormatter(click.HelpFormatter): | |
def __init__( | |
self, | |
indent_increment: int = 2, | |
width: int | None = None, | |
max_width: int | None = None, | |
) -> None: | |
width = 100 | |
super().__init__(indent_increment, width, max_width) | |
LOG.info(f"Version: {__version__}") | |
def patch_wrap_text(): | |
orig_wrap_text = click.formatting.wrap_text | |
def wrap_text( | |
text, | |
width=78, | |
initial_indent="", | |
subsequent_indent="", | |
preserve_paragraphs=False, | |
): | |
return orig_wrap_text( | |
text.replace("\n", "\n\n"), | |
width=width, | |
initial_indent=initial_indent, | |
subsequent_indent=subsequent_indent, | |
preserve_paragraphs=True, | |
).replace("\n\n", "\n") | |
click.formatting.wrap_text = wrap_text | |
patch_wrap_text() | |
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], show_default=True) | |
click.Context.formatter_class = RichHelpFormatter | |
def cli(): | |
"""so-vits-svc allows any folder structure for training data. | |
However, the following folder structure is recommended.\n | |
When training: dataset_raw/{speaker_name}/**/{wav_name}.{any_format}\n | |
When inference: configs/44k/config.json, logs/44k/G_XXXX.pth\n | |
If the folder structure is followed, you DO NOT NEED TO SPECIFY model path, config path, etc. | |
(The latest model will be automatically loaded.)\n | |
To train a model, run pre-resample, pre-config, pre-hubert, train.\n | |
To infer a model, run infer. | |
""" | |
def train( | |
config_path: Path, | |
model_path: Path, | |
tensorboard: bool = False, | |
reset_optimizer: bool = False, | |
): | |
"""Train model | |
If D_0.pth or G_0.pth not found, automatically download from hub.""" | |
from .train import train | |
config_path = Path(config_path) | |
model_path = Path(model_path) | |
if tensorboard: | |
import webbrowser | |
from tensorboard import program | |
getLogger("tensorboard").setLevel(30) | |
tb = program.TensorBoard() | |
tb.configure(argv=[None, "--logdir", model_path.as_posix()]) | |
url = tb.launch() | |
webbrowser.open(url) | |
train( | |
config_path=config_path, model_path=model_path, reset_optimizer=reset_optimizer | |
) | |
def gui(): | |
"""Opens GUI | |
for conversion and realtime inference""" | |
from .gui import main | |
main() | |
def infer( | |
# paths | |
input_path: Path, | |
output_path: Path, | |
model_path: Path, | |
config_path: Path, | |
recursive: bool, | |
# svc config | |
speaker: str, | |
cluster_model_path: Path | None = None, | |
transpose: int = 0, | |
auto_predict_f0: bool = False, | |
cluster_infer_ratio: float = 0, | |
noise_scale: float = 0.4, | |
f0_method: Literal["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"] = "dio", | |
# slice config | |
db_thresh: int = -40, | |
pad_seconds: float = 0.5, | |
chunk_seconds: float = 0.5, | |
absolute_thresh: bool = False, | |
max_chunk_seconds: float = 40, | |
device: str | torch.device = get_optimal_device(), | |
): | |
"""Inference""" | |
from so_vits_svc_fork.inference.main import infer | |
if not auto_predict_f0: | |
LOG.warning( | |
f"auto_predict_f0 = False, transpose = {transpose}. If you want to change the pitch, please set transpose." | |
"Generally transpose = 0 does not work because your voice pitch and target voice pitch are different." | |
) | |
input_path = Path(input_path) | |
if output_path is None: | |
output_path = input_path.parent / f"{input_path.stem}.out{input_path.suffix}" | |
output_path = Path(output_path) | |
if input_path.is_dir() and not recursive: | |
raise ValueError( | |
"input_path is a directory. Use 0re or --recursive to infer recursively." | |
) | |
model_path = Path(model_path) | |
if model_path.is_dir(): | |
model_path = list( | |
sorted(model_path.glob("G_*.pth"), key=lambda x: x.stat().st_mtime) | |
)[-1] | |
LOG.info(f"Since model_path is a directory, use {model_path}") | |
config_path = Path(config_path) | |
if cluster_model_path is not None: | |
cluster_model_path = Path(cluster_model_path) | |
infer( | |
# paths | |
input_path=input_path, | |
output_path=output_path, | |
model_path=model_path, | |
config_path=config_path, | |
recursive=recursive, | |
# svc config | |
speaker=speaker, | |
cluster_model_path=cluster_model_path, | |
transpose=transpose, | |
auto_predict_f0=auto_predict_f0, | |
cluster_infer_ratio=cluster_infer_ratio, | |
noise_scale=noise_scale, | |
f0_method=f0_method, | |
# slice config | |
db_thresh=db_thresh, | |
pad_seconds=pad_seconds, | |
chunk_seconds=chunk_seconds, | |
absolute_thresh=absolute_thresh, | |
max_chunk_seconds=max_chunk_seconds, | |
device=device, | |
) | |
def vc( | |
# paths | |
model_path: Path, | |
config_path: Path, | |
# svc config | |
speaker: str, | |
cluster_model_path: Path | None, | |
transpose: int, | |
auto_predict_f0: bool, | |
cluster_infer_ratio: float, | |
noise_scale: float, | |
f0_method: Literal["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"], | |
# slice config | |
db_thresh: int, | |
pad_seconds: float, | |
chunk_seconds: float, | |
# realtime config | |
crossfade_seconds: float, | |
additional_infer_before_seconds: float, | |
additional_infer_after_seconds: float, | |
block_seconds: float, | |
version: int, | |
input_device: int | str | None, | |
output_device: int | str | None, | |
device: torch.device, | |
passthrough_original: bool = False, | |
) -> None: | |
"""Realtime inference from microphone""" | |
from so_vits_svc_fork.inference.main import realtime | |
if auto_predict_f0: | |
LOG.warning( | |
"auto_predict_f0 = True in realtime inference will cause unstable voice pitch, use with caution" | |
) | |
else: | |
LOG.warning( | |
f"auto_predict_f0 = False, transpose = {transpose}. If you want to change the pitch, please change the transpose value." | |
"Generally transpose = 0 does not work because your voice pitch and target voice pitch are different." | |
) | |
model_path = Path(model_path) | |
config_path = Path(config_path) | |
if cluster_model_path is not None: | |
cluster_model_path = Path(cluster_model_path) | |
if model_path.is_dir(): | |
model_path = list( | |
sorted(model_path.glob("G_*.pth"), key=lambda x: x.stat().st_mtime) | |
)[-1] | |
LOG.info(f"Since model_path is a directory, use {model_path}") | |
realtime( | |
# paths | |
model_path=model_path, | |
config_path=config_path, | |
# svc config | |
speaker=speaker, | |
cluster_model_path=cluster_model_path, | |
transpose=transpose, | |
auto_predict_f0=auto_predict_f0, | |
cluster_infer_ratio=cluster_infer_ratio, | |
noise_scale=noise_scale, | |
f0_method=f0_method, | |
# slice config | |
db_thresh=db_thresh, | |
pad_seconds=pad_seconds, | |
chunk_seconds=chunk_seconds, | |
# realtime config | |
crossfade_seconds=crossfade_seconds, | |
additional_infer_before_seconds=additional_infer_before_seconds, | |
additional_infer_after_seconds=additional_infer_after_seconds, | |
block_seconds=block_seconds, | |
version=version, | |
input_device=input_device, | |
output_device=output_device, | |
device=device, | |
passthrough_original=passthrough_original, | |
) | |
def pre_resample( | |
input_dir: Path, | |
output_dir: Path, | |
sampling_rate: int, | |
n_jobs: int, | |
top_db: int, | |
frame_seconds: float, | |
hop_seconds: float, | |
) -> None: | |
"""Preprocessing part 1: resample""" | |
from so_vits_svc_fork.preprocessing.preprocess_resample import preprocess_resample | |
input_dir = Path(input_dir) | |
output_dir = Path(output_dir) | |
preprocess_resample( | |
input_dir=input_dir, | |
output_dir=output_dir, | |
sampling_rate=sampling_rate, | |
n_jobs=n_jobs, | |
top_db=top_db, | |
frame_seconds=frame_seconds, | |
hop_seconds=hop_seconds, | |
) | |
from so_vits_svc_fork.preprocessing.preprocess_flist_config import CONFIG_TEMPLATE_DIR | |
def pre_config( | |
input_dir: Path, | |
filelist_path: Path, | |
config_path: Path, | |
config_type: str, | |
): | |
"""Preprocessing part 2: config""" | |
from so_vits_svc_fork.preprocessing.preprocess_flist_config import preprocess_config | |
input_dir = Path(input_dir) | |
filelist_path = Path(filelist_path) | |
config_path = Path(config_path) | |
preprocess_config( | |
input_dir=input_dir, | |
train_list_path=filelist_path / "train.txt", | |
val_list_path=filelist_path / "val.txt", | |
test_list_path=filelist_path / "test.txt", | |
config_path=config_path, | |
config_name=config_type, | |
) | |
def pre_hubert( | |
input_dir: Path, | |
config_path: Path, | |
n_jobs: bool, | |
force_rebuild: bool, | |
f0_method: Literal["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"], | |
) -> None: | |
"""Preprocessing part 3: hubert | |
If the HuBERT model is not found, it will be downloaded automatically.""" | |
from so_vits_svc_fork.preprocessing.preprocess_hubert_f0 import preprocess_hubert_f0 | |
input_dir = Path(input_dir) | |
config_path = Path(config_path) | |
preprocess_hubert_f0( | |
input_dir=input_dir, | |
config_path=config_path, | |
n_jobs=n_jobs, | |
force_rebuild=force_rebuild, | |
f0_method=f0_method, | |
) | |
def pre_sd( | |
input_dir: Path | str, | |
output_dir: Path | str, | |
min_speakers: int, | |
max_speakers: int, | |
huggingface_token: str | None, | |
n_jobs: int, | |
sr: int, | |
): | |
"""Speech diarization using pyannote.audio""" | |
if huggingface_token is None: | |
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN", None) | |
if huggingface_token is None: | |
huggingface_token = click.prompt( | |
"Please enter your HuggingFace token", hide_input=True | |
) | |
if os.environ.get("HUGGINGFACE_TOKEN", None) is None: | |
LOG.info("You can also set the HUGGINGFACE_TOKEN environment variable.") | |
assert huggingface_token is not None | |
huggingface_token = huggingface_token.rstrip(" \n\r\t\0") | |
if len(huggingface_token) <= 1: | |
raise ValueError("HuggingFace token is empty: " + huggingface_token) | |
if max_speakers == 1: | |
LOG.warning("Consider using pre-split if max_speakers == 1") | |
from so_vits_svc_fork.preprocessing.preprocess_speaker_diarization import ( | |
preprocess_speaker_diarization, | |
) | |
preprocess_speaker_diarization( | |
input_dir=input_dir, | |
output_dir=output_dir, | |
min_speakers=min_speakers, | |
max_speakers=max_speakers, | |
huggingface_token=huggingface_token, | |
n_jobs=n_jobs, | |
sr=sr, | |
) | |
def pre_split( | |
input_dir: Path | str, | |
output_dir: Path | str, | |
max_length: float, | |
top_db: int, | |
frame_seconds: float, | |
hop_seconds: float, | |
n_jobs: int, | |
sr: int, | |
): | |
"""Split audio files into multiple files""" | |
from so_vits_svc_fork.preprocessing.preprocess_split import preprocess_split | |
preprocess_split( | |
input_dir=input_dir, | |
output_dir=output_dir, | |
max_length=max_length, | |
top_db=top_db, | |
frame_seconds=frame_seconds, | |
hop_seconds=hop_seconds, | |
n_jobs=n_jobs, | |
sr=sr, | |
) | |
def pre_classify( | |
input_dir: Path | str, | |
output_dir: Path | str | None, | |
create_new: bool, | |
) -> None: | |
"""Classify multiple audio files into multiple files""" | |
from so_vits_svc_fork.preprocessing.preprocess_classify import preprocess_classify | |
if output_dir is None: | |
output_dir = input_dir | |
preprocess_classify( | |
input_dir=input_dir, | |
output_dir=output_dir, | |
create_new=create_new, | |
) | |
def clean(): | |
"""Clean up files, only useful if you are using the default file structure""" | |
import shutil | |
folders = ["dataset", "filelists", "logs"] | |
# if pyip.inputYesNo(f"Are you sure you want to delete files in {folders}?") == "yes": | |
if input("Are you sure you want to delete files in {folders}?") in ["yes", "y"]: | |
for folder in folders: | |
if Path(folder).exists(): | |
shutil.rmtree(folder) | |
LOG.info("Cleaned up files") | |
else: | |
LOG.info("Aborted") | |
def onnx( | |
input_path: Path, output_path: Path, config_path: Path, device: torch.device | str | |
) -> None: | |
"""Export model to onnx (currently not working)""" | |
raise NotImplementedError("ONNX export is not yet supported") | |
input_path = Path(input_path) | |
if input_path.is_dir(): | |
input_path = list(input_path.glob("*.pth"))[0] | |
if output_path is None: | |
output_path = input_path.with_suffix(".onnx") | |
output_path = Path(output_path) | |
if output_path.is_dir(): | |
output_path = output_path / (input_path.stem + ".onnx") | |
config_path = Path(config_path) | |
device_ = torch.device(device) | |
from so_vits_svc_fork.modules.onnx._export import onnx_export | |
onnx_export( | |
input_path=input_path, | |
output_path=output_path, | |
config_path=config_path, | |
device=device_, | |
) | |
def train_cluster( | |
input_dir: Path, | |
output_path: Path, | |
n_clusters: int, | |
minibatch: bool, | |
batch_size: int, | |
partial_fit: bool, | |
) -> None: | |
"""Train k-means clustering""" | |
from .cluster.train_cluster import main | |
main( | |
input_dir=input_dir, | |
output_path=output_path, | |
n_clusters=n_clusters, | |
verbose=True, | |
use_minibatch=minibatch, | |
batch_size=batch_size, | |
partial_fit=partial_fit, | |
) | |
if __name__ == "__main__": | |
freeze_support() | |
cli() | |