vits-simple-api-bv2 / manager /ModelManager.py
Artrajz's picture
init
1c9751a
raw
history blame
26.7 kB
import gc
import glob
import logging
import os
import traceback
import cpuinfo
import numpy as np
import psutil
import torch
# from contants import config
from contants import config
import utils
from bert_vits2 import Bert_VITS2
from contants import ModelType
from gpt_sovits.gpt_sovits import GPT_SoVITS
from logger import logger
from manager.observer import Subject
from utils.data_utils import HParams, check_is_none
from vits import VITS
from vits.hubert_vits import HuBert_VITS
from vits.w2v2_vits import W2V2_VITS
class ModelManager(Subject):
def __init__(self, device=config.system.device):
self.device = device
self.logger = logger
self.models = {
# ModelType:{model_id: {"model_path": model_path, "config_path": config_path, "model": model,
# "n_speakers": n_speakers}},
# model_id 类型为 int
ModelType.VITS: {},
ModelType.HUBERT_VITS: {},
ModelType.W2V2_VITS: {},
ModelType.BERT_VITS2: {},
ModelType.GPT_SOVITS: {},
}
self.sid2model = {
# ModelType:[{"real_id": real_id, "model": model, "model_id": model_id, "n_speakers": n_speakers}]
ModelType.VITS: [],
ModelType.HUBERT_VITS: [],
ModelType.W2V2_VITS: [],
ModelType.BERT_VITS2: [],
ModelType.GPT_SOVITS: [],
}
self.voice_speakers = {
ModelType.VITS.value: [],
ModelType.HUBERT_VITS.value: [],
ModelType.W2V2_VITS.value: [],
ModelType.BERT_VITS2.value: [],
ModelType.GPT_SOVITS.value: [],
}
self.emotion_reference = None
self.hubert = None
self.dimensional_emotion_model = None
self.tts_front = None
self.bert_models = {}
self.model_handler = None
self.emotion_model = None
self.processor = None
# self.sid2model = []
# self.name_mapping_id = []
self.voice_objs_count = 0
self._observers = []
self.model_class_map = {
ModelType.VITS: VITS,
ModelType.HUBERT_VITS: HuBert_VITS,
ModelType.W2V2_VITS: W2V2_VITS,
ModelType.BERT_VITS2: Bert_VITS2,
ModelType.GPT_SOVITS: GPT_SoVITS,
}
self.available_tts_model = set()
def model_init(self):
if config.tts_config.auto_load:
models = self.scan_path()
else:
models = config.tts_config.asdict().get("models")
for model in models:
self.load_model(model_path=model.get("model_path"),
config_path=model.get("config_path"),
sovits_path=model.get("sovits_path"),
gpt_path=model.get("gpt_path"))
dimensional_emotion_model_path = os.path.join(config.abs_path, config.system.data_path,
config.model_config.dimensional_emotion_model)
if os.path.isfile(dimensional_emotion_model_path):
if self.dimensional_emotion_model is None:
self.dimensional_emotion_model = self.load_dimensional_emotion_model(dimensional_emotion_model_path)
self.log_device_info()
if self.vits_speakers_count != 0:
self.logger.info(f"[{ModelType.VITS.value}] {self.vits_speakers_count} speakers")
if self.hubert_speakers_count != 0:
self.logger.info(f"[{ModelType.HUBERT_VITS.value}] {self.hubert_speakers_count} speakers")
if self.w2v2_speakers_count != 0:
self.logger.info(f"[{ModelType.W2V2_VITS.value}] {self.w2v2_speakers_count} speakers")
if self.bert_vits2_speakers_count != 0:
self.logger.info(f"[{ModelType.BERT_VITS2.value}] {self.bert_vits2_speakers_count} speakers")
if self.gpt_sovits_speakers_count != 0:
self.logger.info(f"[{ModelType.GPT_SOVITS.value}] {self.gpt_sovits_speakers_count} speakers")
self.logger.info(f"{self.speakers_count} speakers in total.")
if self.speakers_count == 0:
self.logger.warning(f"No model was loaded.")
@property
def vits_speakers(self):
return self.voice_speakers[ModelType.VITS]
@property
def speakers_count(self):
return self.vits_speakers_count + self.hubert_speakers_count + self.w2v2_speakers_count + self.bert_vits2_speakers_count + self.gpt_sovits_speakers_count
@property
def vits_speakers_count(self):
return len(self.voice_speakers[ModelType.VITS.value])
@property
def hubert_speakers_count(self):
return len(self.voice_speakers[ModelType.HUBERT_VITS.value])
@property
def w2v2_speakers_count(self):
return len(self.voice_speakers[ModelType.W2V2_VITS.value])
@property
def w2v2_emotion_count(self):
return len(self.emotion_reference) if self.emotion_reference is not None else 0
@property
def bert_vits2_speakers_count(self):
return len(self.voice_speakers[ModelType.BERT_VITS2.value])
@property
def gpt_sovits_speakers_count(self):
return len(self.voice_speakers[ModelType.GPT_SOVITS.value])
# 添加观察者
def attach(self, observer):
self._observers.append(observer)
# 移除观察者
def detach(self, observer):
self._observers.remove(observer)
# 通知所有观察者
def notify(self, event_type, **kwargs):
for observer in self._observers:
observer.update(event_type, **kwargs)
def log_device_info(self):
cuda_available = torch.cuda.is_available()
self.logger.info(
f"PyTorch Version: {torch.__version__} Cuda available:{cuda_available} Device type:{self.device.type}")
if self.device.type == 'cuda':
if cuda_available:
device_name = torch.cuda.get_device_name(self.device.index)
gpu_memory_info = round(torch.cuda.get_device_properties(self.device).total_memory / 1024 ** 3) # GB
self.logger.info(
f"Using GPU on {device_name} {gpu_memory_info}GB, GPU Device Index: {self.device.index}")
else:
self.logger.warning("GPU device specified, but CUDA is not available.")
else:
cpu_info = cpuinfo.get_cpu_info()
cpu_name = cpu_info.get("brand_raw")
cpu_count = psutil.cpu_count(logical=False)
thread_count = psutil.cpu_count(logical=True)
memory_info = psutil.virtual_memory()
total_memory = round(memory_info.total / (1024 ** 3))
self.logger.info(
f"Using CPU on {cpu_name} with {cpu_count} cores and {thread_count} threads. Total memory: {total_memory}GB")
def relative_to_absolute_path(self, *paths):
absolute_paths = []
for path in paths:
if path is None:
return None
path = os.path.normpath(path)
if path.startswith('models'):
path = os.path.join(config.abs_path, config.system.data_path, path)
else:
path = os.path.join(config.abs_path, config.system.data_path, config.tts_config.models_path,
path)
absolute_paths.append(path)
return absolute_paths
def absolute_to_relative_path(self, *paths):
relative_paths = []
for path in paths:
if path is None:
relative_paths.append(None)
continue
# 获取models目录下的相对路径
relative_path = os.path.relpath(path, os.path.join(config.abs_path, config.system.data_path,
config.tts_config.models_path))
relative_paths.append(relative_path)
return relative_paths
def _load_model_from_path(self, model_path, config_path, sovits_path, gpt_path):
if check_is_none(sovits_path, gpt_path):
hps = utils.get_hparams_from_file(config_path)
model_type = self.recognition_model_type(hps)
else:
hps = None
model_type = ModelType.GPT_SOVITS
model_args = {
"model_type": model_type,
"model_path": model_path,
"config_path": config_path,
"sovits_path": sovits_path,
"gpt_path": gpt_path,
"config": hps,
"device": self.device
}
model_class = self.model_class_map[model_type]
model = model_class(**model_args)
if model_type == ModelType.VITS:
bert_embedding = getattr(hps.data, 'bert_embedding', getattr(hps.model, 'bert_embedding', False))
if bert_embedding and self.tts_front is None:
self.load_VITS_PinYin_model(
os.path.join(config.abs_path, config.system.data_path, config.model_config.vits_chinese_bert))
if not config.vits_config.dynamic_loading:
model.load_model()
self.available_tts_model.add(ModelType.VITS.value)
elif model_type == ModelType.W2V2_VITS:
if self.emotion_reference is None:
self.emotion_reference = self.load_npy(
os.path.join(config.abs_path, config.system.data_path, config.model_config.dimensional_emotion_npy))
model.load_model(emotion_reference=self.emotion_reference,
dimensional_emotion_model=self.dimensional_emotion_model)
self.available_tts_model.add(ModelType.W2V2_VITS.value)
elif model_type == ModelType.HUBERT_VITS:
if self.hubert is None:
self.hubert = self.load_hubert_model(
os.path.join(config.abs_path, config.system.data_path, config.model_config.hubert_soft_0d54a1f4))
model.load_model(hubert=self.hubert)
elif model_type == ModelType.BERT_VITS2:
bert_model_names = model.bert_model_names
for bert_model_name in bert_model_names.values():
if self.model_handler is None:
from manager.model_handler import ModelHandler
self.model_handler = ModelHandler(self.device)
self.model_handler.load_bert(bert_model_name)
if model.hps_ms.model.emotion_embedding == 1:
self.model_handler.load_emotion()
elif model.hps_ms.model.emotion_embedding == 2:
self.model_handler.load_clap()
model.load_model(self.model_handler)
self.available_tts_model.add(ModelType.BERT_VITS2.value)
elif model_type == ModelType.GPT_SOVITS:
if self.model_handler is None:
from manager.model_handler import ModelHandler
self.model_handler = ModelHandler(self.device)
self.model_handler.load_ssl()
self.model_handler.load_bert("CHINESE_ROBERTA_WWM_EXT_LARGE")
model.load_model(self.model_handler)
sid2model = []
speakers = []
new_id = len(self.voice_speakers[model_type.value])
model_id = max([-1] + list(self.models[model_type].keys())) + 1
for real_id, name in enumerate(model.speakers):
sid2model.append({"real_id": real_id, "model": model, "model_id": model_id})
speakers.append({"id": new_id, "name": name, "lang": model.lang})
new_id += 1
model_data = {
"model": model,
"model_type": model_type,
"model_id": model_id,
"model_path": model_path,
"config": hps,
"sovits_path": sovits_path,
"gpt_path": gpt_path,
"sid2model": sid2model,
"speakers": speakers
}
if model_type == ModelType.GPT_SOVITS:
logging.info(
f"model_type:{model_type.value} model_id:{model_id} sovits_path:{sovits_path} gpt_path:{gpt_path}")
else:
logging.info(
f"model_type:{model_type.value} model_id:{model_id} n_speakers:{len(speakers)} model_path:{model_path}")
return model_data
def load_model(self, model_path: str, config_path: str, sovits_path: str, gpt_path: str):
try:
if not check_is_none(model_path, config_path):
model_path, config_path = self.relative_to_absolute_path(model_path, config_path)
else:
sovits_path, gpt_path = self.relative_to_absolute_path(sovits_path, gpt_path)
model_data = self._load_model_from_path(model_path, config_path, sovits_path, gpt_path)
model_id = model_data["model_id"]
sid2model = model_data["sid2model"]
model_type = model_data["model_type"]
self.models[model_type][model_id] = {
"model_type": model_data.get("model_type"),
"model_path": model_path,
"config_path": config_path,
"sovits_path": sovits_path,
"gpt_path": gpt_path,
"model": model_data.get("model"),
"n_speakers": len(model_data["speakers"])}
self.sid2model[model_type].extend(sid2model)
self.voice_speakers[model_type.value].extend(model_data["speakers"])
self.notify("model_loaded", model_manager=self)
state = True
except Exception as e:
self.logger.info(f"Loading failed. {e}")
self.logger.error(traceback.format_exc())
state = False
return state
def unload_model(self, model_type_value: str, model_id: str):
state = False
model_type = ModelType(model_type_value)
model_id = int(model_id)
try:
if model_id in self.models[model_type].keys():
model_data = self.models[model_type][model_id]
model = model_data.get("model")
n_speakers = model_data.get("n_speakers")
start = 0
for key, value in self.models[model_type].items():
if key == model_id:
break
start += value.get("n_speakers")
if model_type == ModelType.BERT_VITS2:
for bert_model_name in model.bert_model_names.values():
self.model_handler.release_bert(bert_model_name)
if model.version == "2.1":
self.model_handler.release_emotion()
elif model.version in ["2.2", "extra", "2.4"]:
self.model_handler.release_clap()
elif model_type == ModelType.GPT_SOVITS:
self.model_handler.release_bert("CHINESE_ROBERTA_WWM_EXT_LARGE")
self.model_handler.release_ssl_model()
del self.sid2model[model_type][start:start + n_speakers]
del self.voice_speakers[model_type.value][start:start + n_speakers]
del self.models[model_type][model_id]
for new_id, speaker in enumerate(self.voice_speakers[model_type.value]):
speaker["id"] = new_id
gc.collect()
torch.cuda.empty_cache()
state = True
self.notify("model_unloaded", model_manager=self)
self.logger.info(f"Unloading success.")
except Exception as e:
logging.error(traceback.print_exc())
logging.error(f"Unloading failed. {e}")
state = False
return state
def load_dimensional_emotion_model(self, model_path):
try:
import audonnx
root = os.path.dirname(model_path)
model_file = model_path
dimensional_emotion_model = audonnx.load(root=root, model_file=model_file)
self.notify("model_loaded", model_manager=self)
except Exception as e:
self.logger.warning(f"Load DIMENSIONAL_EMOTION_MODEL failed {e}")
return dimensional_emotion_model
def unload_dimensional_emotion_model(self):
self.dimensional_emotion_model = None
self.notify("model_unloaded", model_manager=self)
def load_hubert_model(self, model_path):
""""HuBERT-VITS"""
try:
from vits.hubert_model import hubert_soft
hubert = hubert_soft(model_path)
except Exception as e:
self.logger.warning(f"Load HUBERT_SOFT_MODEL failed {e}")
return hubert
def unload_hubert_model(self):
self.hubert = None
self.notify("model_unloaded", model_manager=self)
def load_VITS_PinYin_model(self, bert_path):
""""vits_chinese"""
from vits.text.vits_pinyin import VITS_PinYin
if self.tts_front is None:
self.tts_front = VITS_PinYin(bert_path, self.device)
def reorder_model(self, old_index, new_index):
"""重新排序模型,将old_index位置的模型移动到new_index位置"""
if 0 <= old_index < len(self.models) and 0 <= new_index < len(self.models):
model = self.models[old_index]
del self.models[old_index]
self.models.insert(new_index, model)
def get_models_path(self):
"""按返回模型路径列表,列表每一项为{"model_path": model_path, "config_path": config_path}"""
info = []
for models in self.models.values():
for model in models.values():
info.append({
"model_type": model.get("model_type"),
"model_path": model.get("model_path"),
"config_path": model.get("config_path"),
"sovits_path": model.get("sovits_path"),
"gpt_path": model.get("gpt_path"),
})
return info
def get_models_path_by_type(self):
"""按模型类型返回模型路径"""
info = {
ModelType.VITS.value: [],
ModelType.HUBERT_VITS.value: [],
ModelType.W2V2_VITS.value: [],
ModelType.BERT_VITS2.value: [],
ModelType.GPT_SOVITS.value: [],
}
for model_type, models in self.models.items():
for values in models.values():
info[model_type].append(values[0])
return info
def get_models_info(self):
"""按模型类型返回模型文件夹名以及模型文件名,speakers数量"""
info = {
ModelType.VITS.value: [],
ModelType.HUBERT_VITS.value: [],
ModelType.W2V2_VITS.value: [],
ModelType.BERT_VITS2.value: [],
ModelType.GPT_SOVITS.value: [],
}
for model_type, model_data in self.models.items():
if model_type != ModelType.GPT_SOVITS:
for model_id, model in model_data.items():
model_path = model.get("model_path")
config_path = model.get("config_path")
model_path = self.absolute_to_relative_path(model_path)[0].replace("\\", "/")
config_path = self.absolute_to_relative_path(config_path)[0].replace("\\", "/")
info[model_type.value].append(
{"model_id": model_id,
"model_path": model_path,
"config_path": config_path,
"n_speakers": model.get("n_speakers")})
else:
for model_id, model in model_data.items():
sovits_path = model.get("sovits_path")
gpt_path = model.get("gpt_path")
sovits_path = self.absolute_to_relative_path(sovits_path)[0].replace("\\", "/")
gpt_path = self.absolute_to_relative_path(gpt_path)[0].replace("\\", "/")
info[model_type.value].append(
{"model_id": model_id,
"sovits_path": sovits_path,
"gpt_path": gpt_path,
"n_speakers": model.get("n_speakers")})
return info
def get_model_by_index(self, model_type, model_id):
"""根据给定的索引返回模型"""
if 0 <= model_id < len(self.models):
_, model, _ = self.models[model_type][model_id]
return model
return None
# def get_bert_model(self, bert_model_name):
# if bert_model_name not in self.bert_models:
# raise ValueError(f"Model {bert_model_name} not loaded!")
# return self.bert_models[bert_model_name]
def clear_all(self):
"""清除所有模型"""
self.models.clear()
def recognition_model_type(self, hps: HParams) -> str:
# model_config = json.load(model_config_json)
symbols = getattr(hps, "symbols", None)
# symbols = model_config.get("symbols", None)
emotion_embedding = getattr(hps.data, "emotion_embedding", False)
if "use_spk_conditioned_encoder" in hps.model:
model_type = ModelType.BERT_VITS2
return model_type
if symbols != None:
if not emotion_embedding:
mode_type = ModelType.VITS
else:
mode_type = ModelType.W2V2_VITS
else:
mode_type = ModelType.HUBERT_VITS
return mode_type
def _load_npy_from_path(self, path):
model_extention = os.path.splitext(path)[1]
if model_extention != ".npy":
raise ValueError(f"Unsupported model type: {model_extention}")
return np.load(path).reshape(-1, 1024)
def load_npy(self, emotion_reference_npy):
emotion_reference = np.empty((0, 1024))
if isinstance(emotion_reference_npy, list):
for i in emotion_reference_npy:
emotion_reference = np.append(emotion_reference, self._load_npy_from_path(i), axis=0)
elif os.path.isdir(emotion_reference_npy):
for root, dirs, files in os.walk(emotion_reference_npy):
for file_name in files:
if file_name.endswith(".npy"):
file_path = os.path.join(root, file_name)
emotion_reference = np.append(emotion_reference, self._load_npy_from_path(file_path),
axis=0)
elif os.path.isfile(emotion_reference_npy):
emotion_reference = self._load_npy_from_path(emotion_reference_npy)
logging.info(f"Loaded emotional dimention npy range: {len(emotion_reference)}")
return emotion_reference
def scan_path(self):
folder_path = os.path.join(config.abs_path, config.system.data_path, config.tts_config.models_path)
model_paths = glob.glob(folder_path + "/**/*.pth", recursive=True)
all_paths = []
for id, pth_path in enumerate(model_paths):
pth_name = os.path.basename(pth_path)
if pth_name.startswith(("D_", "DUR_")):
continue
dir_name = os.path.dirname(pth_path)
config_paths = glob.glob(dir_name + "/*.json", recursive=True)
gpt_paths = glob.glob(dir_name + "/*.ckpt", recursive=True)
model_path, config_path, sovits_path, gpt_path, model_type = None, None, None, None, None
if len(config_paths) > 0:
model_path = pth_path
config_path = config_paths[0]
elif len(gpt_paths) > 0:
gpt_path = gpt_paths[0]
sovits_path = pth_path
model_type = ModelType.GPT_SOVITS
else:
continue
info = {
"model_id": id,
"model_type": model_type,
"model_path": model_path,
"config_path": config_path,
"sovits_path": sovits_path,
"gpt_path": gpt_path,
}
all_paths.append(info)
return all_paths
def scan_unload_path(self):
all_paths = self.scan_path()
unload_paths = []
loaded_paths = []
loaded_paths_2 = []
for model in self.get_models_path():
# 只取已加载的模型路径
if model.get("model_type") == ModelType.GPT_SOVITS:
sovits_path, gpt_path = self.absolute_to_relative_path(model.get("sovits_path"),
model.get("gpt_path"))
sovits_path, gpt_path = sovits_path.replace("\\", "/"), gpt_path.replace("\\", "/")
loaded_paths_2.append((sovits_path, gpt_path))
else:
model_path = self.absolute_to_relative_path(model.get("model_path"))[0].replace("\\", "/")
loaded_paths.append(model_path)
for info in all_paths:
# 将绝对路径修改为相对路径,并将分隔符格式化为'/'
if info.get("model_type") == ModelType.GPT_SOVITS:
sovits_path, gpt_path = self.absolute_to_relative_path(info.get("sovits_path"),
info.get("gpt_path"))
sovits_path, gpt_path = sovits_path.replace("\\", "/"), gpt_path.replace("\\", "/")
if not self.is_path_loaded((sovits_path, gpt_path), loaded_paths_2):
info.update(
{"model_type": info.get("model_type").value, "sovits_path": sovits_path, "gpt_path": gpt_path})
unload_paths.append(info)
else:
model_path, config_path = self.absolute_to_relative_path(info.get("model_path"),
info.get("config_path"))
model_path, config_path = model_path.replace("\\", "/"), config_path.replace("\\", "/")
if not self.is_path_loaded(model_path, loaded_paths):
info.update({"model_path": model_path, "config_path": config_path})
unload_paths.append(info)
return unload_paths
def is_path_loaded(self, paths, loaded_paths):
if len(paths) == 2:
sovits_path, gpt_path = paths
for loaded_path in loaded_paths:
if sovits_path == loaded_path[0] and gpt_path == loaded_path[1]:
return True
else:
path = paths
for loaded_path in loaded_paths:
if path == loaded_path:
return True
return False