import PySimpleGUI as sg import sounddevice as sd import torch, librosa, threading, pickle from enhancer import Enhancer import numpy as np from torch.nn import functional as F from torchaudio.transforms import Resample from ddsp.vocoder import load_model, F0_Extractor, Volume_Extractor, Units_Encoder from ddsp.core import upsample import time import gui_locale def phase_vocoder(a, b, fade_out, fade_in): fa = torch.fft.rfft(a) fb = torch.fft.rfft(b) absab = torch.abs(fa) + torch.abs(fb) n = a.shape[0] if n % 2 == 0: absab[1:-1] *= 2 else: absab[1:] *= 2 phia = torch.angle(fa) phib = torch.angle(fb) deltaphase = phib - phia deltaphase = deltaphase - 2 * np.pi * torch.floor(deltaphase / 2 / np.pi + 0.5) w = 2 * np.pi * torch.arange(n // 2 + 1).to(a) + deltaphase t = torch.arange(n).unsqueeze(-1).to(a) / n result = a * (fade_out ** 2) + b * (fade_in ** 2) + torch.sum(absab * torch.cos(w * t + phia), -1) * fade_out * fade_in / n return result class SvcDDSP: def __init__(self) -> None: self.model = None self.units_encoder = None self.encoder_type = None self.encoder_ckpt = None self.enhancer = None self.enhancer_type = None self.enhancer_ckpt = None def update_model(self, model_path): self.device = 'cuda' if torch.cuda.is_available() else 'cpu' # load ddsp model if self.model is None or self.model_path != model_path: self.model, self.args = load_model(model_path, device=self.device) self.model_path = model_path # load units encoder if self.units_encoder is None or self.args.data.encoder != self.encoder_type or self.args.data.encoder_ckpt != self.encoder_ckpt: if self.args.data.encoder == 'cnhubertsoftfish': cnhubertsoft_gate = self.args.data.cnhubertsoft_gate else: cnhubertsoft_gate = 10 self.units_encoder = Units_Encoder( self.args.data.encoder, self.args.data.encoder_ckpt, self.args.data.encoder_sample_rate, self.args.data.encoder_hop_size, cnhubertsoft_gate=cnhubertsoft_gate, device=self.device) self.encoder_type = self.args.data.encoder self.encoder_ckpt = self.args.data.encoder_ckpt # load enhancer if self.enhancer is None or self.args.enhancer.type != self.enhancer_type or self.args.enhancer.ckpt != self.enhancer_ckpt: self.enhancer = Enhancer(self.args.enhancer.type, self.args.enhancer.ckpt, device=self.device) self.enhancer_type = self.args.enhancer.type self.enhancer_ckpt = self.args.enhancer.ckpt def infer(self, audio, sample_rate, spk_id=1, threhold=-45, pitch_adjust=0, use_spk_mix=False, spk_mix_dict=None, use_enhancer=True, enhancer_adaptive_key='auto', pitch_extractor_type='crepe', f0_min=50, f0_max=1100, safe_prefix_pad_length=0, ): print("Infering...") # load input # audio, sample_rate = librosa.load(input_wav, sr=None, mono=True) hop_size = self.args.data.block_size * sample_rate / self.args.data.sampling_rate # safe front silence if safe_prefix_pad_length > 0.03: silence_front = safe_prefix_pad_length - 0.03 else: silence_front = 0 # extract f0 pitch_extractor = F0_Extractor( pitch_extractor_type, sample_rate, hop_size, float(f0_min), float(f0_max)) f0 = pitch_extractor.extract(audio, uv_interp=True, device=self.device, silence_front=silence_front) f0 = torch.from_numpy(f0).float().to(self.device).unsqueeze(-1).unsqueeze(0) f0 = f0 * 2 ** (float(pitch_adjust) / 12) # extract volume volume_extractor = Volume_Extractor(hop_size) volume = volume_extractor.extract(audio) mask = (volume > 10 ** (float(threhold) / 20)).astype('float') mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1])) mask = np.array([np.max(mask[n: n + 9]) for n in range(len(mask) - 8)]) mask = torch.from_numpy(mask).float().to(self.device).unsqueeze(-1).unsqueeze(0) mask = upsample(mask, self.args.data.block_size).squeeze(-1) volume = torch.from_numpy(volume).float().to(self.device).unsqueeze(-1).unsqueeze(0) # extract units audio_t = torch.from_numpy(audio).float().unsqueeze(0).to(self.device) units = self.units_encoder.encode(audio_t, sample_rate, hop_size) # spk_id or spk_mix_dict spk_id = torch.LongTensor(np.array([[spk_id]])).to(self.device) dictionary = None if use_spk_mix: dictionary = spk_mix_dict # forward and return the output with torch.no_grad(): output, _, (s_h, s_n) = self.model(units, f0, volume, spk_id=spk_id, spk_mix_dict=dictionary) output *= mask if use_enhancer: output, output_sample_rate = self.enhancer.enhance( output, self.args.data.sampling_rate, f0, self.args.data.block_size, adaptive_key=enhancer_adaptive_key, silence_front=silence_front) else: output_sample_rate = self.args.data.sampling_rate output = output.squeeze() return output, output_sample_rate class Config: def __init__(self) -> None: self.samplerate = 44100 # Hz self.block_time = 1.5 # s self.f_pitch_change: float = 0.0 # float(request_form.get("fPitchChange", 0)) self.spk_id = 1 # 默认说话人。 self.spk_mix_dict = None # {1:0.5, 2:0.5} 表示1号说话人和2号说话人的音色按照0.5:0.5的比例混合 self.use_vocoder_based_enhancer = True self.use_phase_vocoder = True self.checkpoint_path = '' self.threhold = -35 self.buffer_num = 2 self.crossfade_time = 0.03 self.select_pitch_extractor = 'harvest' # F0预测器["parselmouth", "dio", "harvest", "crepe"] self.use_spk_mix = False self.sounddevices = ['', ''] def save(self, path): with open(path + '\\config.pkl', 'wb') as f: pickle.dump(vars(self), f) def load(self, path) -> bool: try: with open(path + '\\config.pkl', 'rb') as f: self.update(pickle.load(f)) return True except: print('config.pkl does not exist') return False def update(self, data_dict): for key, value in data_dict.items(): setattr(self, key, value) class GUI: def __init__(self) -> None: self.config = Config() self.flag_vc: bool = False # 变声线程flag self.block_frame = 0 self.crossfade_frame = 0 self.sola_search_frame = 0 self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.svc_model: SvcDDSP = SvcDDSP() self.fade_in_window: np.ndarray = None # crossfade计算用numpy数组 self.fade_out_window: np.ndarray = None # crossfade计算用numpy数组 self.input_wav: np.ndarray = None # 输入音频规范化后的保存地址 self.output_wav: np.ndarray = None # 输出音频规范化后的保存地址 self.sola_buffer: torch.Tensor = None # 保存上一个output的crossfade self.f0_mode_list = ["parselmouth", "dio", "harvest", "crepe"] # F0预测器 self.f_safe_prefix_pad_length: float = 0.0 self.resample_kernel = {} self.launcher() # start def launcher(self): '''窗口加载''' input_devices, output_devices, _, _ = self.get_devices() sg.theme('DarkAmber') # 设置主题 # 界面布局 layout = [ [sg.Frame(layout=[ [sg.Input(key='sg_model', default_text='exp\\multi_speaker\\model_300000.pt'), sg.FileBrowse(i18n('选择模型文件'), key='choose_model')] ], title=i18n('模型:.pt格式(自动识别同目录下config.yaml)')), sg.Frame(layout=[ [sg.Text(i18n('选择配置文件所在目录')), sg.Input(key='config_file_dir', default_text='exp'), sg.FolderBrowse(i18n('打开文件夹'), key='choose_config')], [sg.Button(i18n('读取配置文件'), key='load_config'), sg.Button(i18n('保存配置文件'), key='save_config')] ], title=i18n('快速配置文件')) ], [sg.Frame(layout=[ [sg.Text(i18n("输入设备")), sg.Combo(input_devices, key='sg_input_device', default_value=input_devices[sd.default.device[0]], enable_events=True)], [sg.Text(i18n("输出设备")), sg.Combo(output_devices, key='sg_output_device', default_value=output_devices[sd.default.device[1]], enable_events=True)] ], title=i18n('音频设备')) ], [sg.Frame(layout=[ [sg.Text(i18n("说话人id")), sg.Input(key='spk_id', default_text='1')], [sg.Text(i18n("响应阈值")), sg.Slider(range=(-60, 0), orientation='h', key='threhold', resolution=1, default_value=-45, enable_events=True)], [sg.Text(i18n("变调")), sg.Slider(range=(-24, 24), orientation='h', key='pitch', resolution=1, default_value=0, enable_events=True)], [sg.Text(i18n("采样率")), sg.Input(key='samplerate', default_text='44100')], [sg.Checkbox(text=i18n('启用捏音色功能'), default=False, key='spk_mix', enable_events=True), sg.Button(i18n("设置混合音色"), key='set_spk_mix')] ], title=i18n('普通设置')), sg.Frame(layout=[ [sg.Text(i18n("音频切分大小")), sg.Slider(range=(0.05, 3.0), orientation='h', key='block', resolution=0.01, default_value=0.3, enable_events=True)], [sg.Text(i18n("交叉淡化时长")), sg.Slider(range=(0.01, 0.15), orientation='h', key='crossfade', resolution=0.01, default_value=0.04, enable_events=True)], [sg.Text(i18n("使用历史区块数量")), sg.Slider(range=(1, 20), orientation='h', key='buffernum', resolution=1, default_value=4, enable_events=True)], [sg.Text(i18n("f0预测模式")), sg.Combo(values=self.f0_mode_list, key='f0_mode', default_value=self.f0_mode_list[2], enable_events=True)], [sg.Checkbox(text=i18n('启用增强器'), default=True, key='use_enhancer', enable_events=True), sg.Checkbox(text=i18n('启用相位声码器'), default=False, key='use_phase_vocoder', enable_events=True)] ], title=i18n('性能设置')), ], [sg.Button(i18n("开始音频转换"), key="start_vc"), sg.Button(i18n("停止音频转换"), key="stop_vc"), sg.Text(i18n('推理所用时间(ms):')), sg.Text('0', key='infer_time')] ] # 创造窗口 self.window = sg.Window('DDSP - GUI', layout, finalize=True) self.window['spk_id'].bind('', '') self.window['samplerate'].bind('', '') self.event_handler() def event_handler(self): '''事件处理''' while True: # 事件处理循环 event, values = self.window.read() print('event: ' + event) if event == sg.WINDOW_CLOSED: # 如果用户关闭窗口 self.flag_vc = False exit() elif event == 'start_vc' and self.flag_vc == False: # set values 和界面布局layout顺序一一对应 self.set_values(values) print('crossfade_time:' + str(self.config.crossfade_time)) print("buffer_num:" + str(self.config.buffer_num)) print("samplerate:" + str(self.config.samplerate)) print('block_time:' + str(self.config.block_time)) print("prefix_pad_length:" + str(self.f_safe_prefix_pad_length)) print("mix_mode:" + str(self.config.spk_mix_dict)) print("enhancer:" + str(self.config.use_vocoder_based_enhancer)) print('using_cuda:' + str(torch.cuda.is_available())) self.start_vc() elif event == 'spk_id': self.config.spk_id = int(values['spk_id']) elif event == 'threhold': self.config.threhold = values['threhold'] elif event == 'pitch': self.config.f_pitch_change = values['pitch'] elif event == 'spk_mix': self.config.use_spk_mix = values['spk_mix'] elif event == 'set_spk_mix': spk_mix = sg.popup_get_text(message='示例:1:0.3,2:0.5,3:0.2', title="设置混合音色,支持多人") if spk_mix != None: self.config.spk_mix_dict = eval("{" + spk_mix.replace(',', ',').replace(':', ':') + "}") elif event == 'f0_mode': self.config.select_pitch_extractor = values['f0_mode'] elif event == 'use_enhancer': self.config.use_vocoder_based_enhancer = values['use_enhancer'] elif event == 'use_phase_vocoder': self.config.use_phase_vocoder = values['use_phase_vocoder'] elif event == 'load_config' and self.flag_vc == False: if self.config.load(values['config_file_dir']): self.update_values() elif event == 'save_config' and self.flag_vc == False: self.set_values(values) self.config.save(values['config_file_dir']) elif event != 'start_vc' and self.flag_vc == True: self.flag_vc = False def set_values(self, values): self.set_devices(values["sg_input_device"], values['sg_output_device']) self.config.sounddevices = [values["sg_input_device"], values['sg_output_device']] self.config.checkpoint_path = values['sg_model'] self.config.spk_id = int(values['spk_id']) self.config.threhold = values['threhold'] self.config.f_pitch_change = values['pitch'] self.config.samplerate = int(values['samplerate']) self.config.block_time = float(values['block']) self.config.crossfade_time = float(values['crossfade']) self.config.buffer_num = int(values['buffernum']) self.config.select_pitch_extractor = values['f0_mode'] self.config.use_vocoder_based_enhancer = values['use_enhancer'] self.config.use_phase_vocoder = values['use_phase_vocoder'] self.config.use_spk_mix = values['spk_mix'] self.block_frame = int(self.config.block_time * self.config.samplerate) self.crossfade_frame = int(self.config.crossfade_time * self.config.samplerate) self.sola_search_frame = int(0.01 * self.config.samplerate) self.last_delay_frame = int(0.02 * self.config.samplerate) self.input_frames = max( self.block_frame + self.crossfade_frame + self.sola_search_frame + 2 * self.last_delay_frame, (1 + self.config.buffer_num) * self.block_frame) self.f_safe_prefix_pad_length = self.config.block_time * self.config.buffer_num - self.config.crossfade_time - 0.01 - 0.02 def update_values(self): self.window['sg_model'].update(self.config.checkpoint_path) self.window['sg_input_device'].update(self.config.sounddevices[0]) self.window['sg_output_device'].update(self.config.sounddevices[1]) self.window['spk_id'].update(self.config.spk_id) self.window['threhold'].update(self.config.threhold) self.window['pitch'].update(self.config.f_pitch_change) self.window['samplerate'].update(self.config.samplerate) self.window['spk_mix'].update(self.config.use_spk_mix) self.window['block'].update(self.config.block_time) self.window['crossfade'].update(self.config.crossfade_time) self.window['buffernum'].update(self.config.buffer_num) self.window['f0_mode'].update(self.config.select_pitch_extractor) self.window['use_enhancer'].update(self.config.use_vocoder_based_enhancer) def start_vc(self): '''开始音频转换''' torch.cuda.empty_cache() self.flag_vc = True self.input_wav = np.zeros(self.input_frames, dtype='float32') self.sola_buffer = torch.zeros(self.crossfade_frame, device=self.device) self.fade_in_window = torch.sin( np.pi * torch.arange(0, 1, 1 / self.crossfade_frame, device=self.device) / 2) ** 2 self.fade_out_window = 1 - self.fade_in_window self.svc_model.update_model(self.config.checkpoint_path) thread_vc = threading.Thread(target=self.soundinput) thread_vc.start() def soundinput(self): ''' 接受音频输入 ''' with sd.Stream(callback=self.audio_callback, blocksize=self.block_frame, samplerate=self.config.samplerate, dtype='float32'): while self.flag_vc: time.sleep(self.config.block_time) print('Audio block passed.') print('ENDing VC') def audio_callback(self, indata: np.ndarray, outdata: np.ndarray, frames, times, status): ''' 音频处理 ''' start_time = time.perf_counter() print("\nStarting callback") self.input_wav[:] = np.roll(self.input_wav, -self.block_frame) self.input_wav[-self.block_frame:] = librosa.to_mono(indata.T) # infer _audio, _model_sr = self.svc_model.infer( self.input_wav, self.config.samplerate, spk_id=self.config.spk_id, threhold=self.config.threhold, pitch_adjust=self.config.f_pitch_change, use_spk_mix=self.config.use_spk_mix, spk_mix_dict=self.config.spk_mix_dict, use_enhancer=self.config.use_vocoder_based_enhancer, pitch_extractor_type=self.config.select_pitch_extractor, safe_prefix_pad_length=self.f_safe_prefix_pad_length, ) # debug sola ''' _audio, _model_sr = self.input_wav, self.config.samplerate rs = int(np.random.uniform(-200,200)) print('debug_random_shift: ' + str(rs)) _audio = np.roll(_audio, rs) _audio = torch.from_numpy(_audio).to(self.device) ''' if _model_sr != self.config.samplerate: key_str = str(_model_sr) + '_' + str(self.config.samplerate) if key_str not in self.resample_kernel: self.resample_kernel[key_str] = Resample(_model_sr, self.config.samplerate, lowpass_filter_width=128).to(self.device) _audio = self.resample_kernel[key_str](_audio) temp_wav = _audio[ - self.block_frame - self.crossfade_frame - self.sola_search_frame - self.last_delay_frame: - self.last_delay_frame] # sola shift conv_input = temp_wav[None, None, : self.crossfade_frame + self.sola_search_frame] cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :]) cor_den = torch.sqrt( F.conv1d(conv_input ** 2, torch.ones(1, 1, self.crossfade_frame, device=self.device)) + 1e-8) sola_shift = torch.argmax(cor_nom[0, 0] / cor_den[0, 0]) temp_wav = temp_wav[sola_shift: sola_shift + self.block_frame + self.crossfade_frame] print('sola_shift: ' + str(int(sola_shift))) # phase vocoder if self.config.use_phase_vocoder: temp_wav[: self.crossfade_frame] = phase_vocoder( self.sola_buffer, temp_wav[: self.crossfade_frame], self.fade_out_window, self.fade_in_window) else: temp_wav[: self.crossfade_frame] *= self.fade_in_window temp_wav[: self.crossfade_frame] += self.sola_buffer * self.fade_out_window self.sola_buffer = temp_wav[- self.crossfade_frame:] outdata[:] = temp_wav[: - self.crossfade_frame, None].repeat(1, 2).cpu().numpy() end_time = time.perf_counter() print('infer_time: ' + str(end_time - start_time)) self.window['infer_time'].update(int((end_time - start_time) * 1000)) def get_devices(self, update: bool = True): '''获取设备列表''' if update: sd._terminate() sd._initialize() devices = sd.query_devices() hostapis = sd.query_hostapis() for hostapi in hostapis: for device_idx in hostapi["devices"]: devices[device_idx]["hostapi_name"] = hostapi["name"] input_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_input_channels"] > 0 ] output_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_output_channels"] > 0 ] input_devices_indices = [d["index"] for d in devices if d["max_input_channels"] > 0] output_devices_indices = [ d["index"] for d in devices if d["max_output_channels"] > 0 ] return input_devices, output_devices, input_devices_indices, output_devices_indices def set_devices(self, input_device, output_device): '''设置输出设备''' input_devices, output_devices, input_device_indices, output_device_indices = self.get_devices() sd.default.device[0] = input_device_indices[input_devices.index(input_device)] sd.default.device[1] = output_device_indices[output_devices.index(output_device)] print("input device:" + str(sd.default.device[0]) + ":" + str(input_device)) print("output device:" + str(sd.default.device[1]) + ":" + str(output_device)) if __name__ == "__main__": i18n = gui_locale.I18nAuto() gui = GUI()