Mark Duppenthaler
dummpy outputs
7fb1760
raw
history blame
7.34 kB
from __future__ import annotations
import os
import gradio as gr
import numpy as np
import torch
import torchaudio
import sys
from sample_wav import sample_wav
np.set_printoptions(threshold=sys.maxsize)
from simuleval_transcoder import *
from pydub import AudioSegment
import time
from time import sleep
from seamless_communication.cli.streaming.agents.tt_waitk_unity_s2t_m4t import (
TestTimeWaitKUnityS2TM4T,
)
language_code_to_name = {
"cmn": "Mandarin Chinese",
"deu": "German",
"eng": "English",
"fra": "French",
"spa": "Spanish",
}
S2ST_TARGET_LANGUAGE_NAMES = language_code_to_name.values()
LANGUAGE_NAME_TO_CODE = {v: k for k, v in language_code_to_name.items()}
DEFAULT_TARGET_LANGUAGE = "English"
# TODO: Update this so it takes in target langs from input, refactor sample rate
transcoder = SimulevalTranscoder(
sample_rate=48_000,
debug=False,
buffer_limit=1,
)
def start_recording():
logger.debug(f"start_recording: starting transcoder")
transcoder.start()
def translate_audio_segment(audio):
logger.debug(f"translate_audio_segment: incoming audio")
sample_rate, data = audio
# print(sample_rate)
# print("--------- start \n")
# # print(data)
# def map(x):
# return x
# print(data.tolist())
# print("--------- end \n")
transcoder.process_incoming_bytes(data.tobytes(), 'eng', sample_rate)
speech_and_text_output = transcoder.get_buffered_output()
if speech_and_text_output is None:
logger.debug("No output from transcoder.get_buffered_output()")
return None, None
logger.debug(f"We DID get output from the transcoder! {speech_and_text_output}")
text = None
speech = None
if speech_and_text_output.speech_samples:
speech = (speech_and_text_output.speech_samples, speech_and_text_output.speech_sample_rate)
if speech_and_text_output.text:
text = speech_and_text_output.text
if speech_and_text_output.final:
text += "\n"
return speech, text
def dummy_ouput():
np.array()
def streaming_input_callback(
audio_file, translated_audio_bytes_state, translated_text_state
):
translated_wav_segment, translated_text = translate_audio_segment(audio_file)
logger.debug(f'translated_audio_bytes_state {translated_audio_bytes_state}')
logger.debug(f'translated_wav_segment {translated_wav_segment}')
# TODO: accumulate each segment to provide a continuous audio segment
# TEMP
translated_wav_segment = (46_000, sample_wav())
if translated_wav_segment is not None:
sample_rate, audio_bytes = translated_wav_segment
# TODO: convert to 16 bit int
# audio_np_array = np.frombuffer(audio_bytes, dtype=np.float32, count=3)
audio_np_array = audio_bytes
# combine translated wav
if type(translated_audio_bytes_state) is not tuple:
translated_audio_bytes_state = (sample_rate, audio_np_array)
# translated_audio_bytes_state = np.array([])
else:
translated_audio_bytes_state = (translated_audio_bytes_state[0], np.append(translated_audio_bytes_state[1], translated_wav_segment[1]))
if translated_text is not None:
translated_text_state += " | " + str(translated_text)
# most_recent_input_audio_segment = (most_recent_input_audio_segment[0], np.append(most_recent_input_audio_segment[1], audio_file[1]))
# Not necessary but for readability.
most_recent_input_audio_segment = audio_file
translated_wav_segment = translated_wav_segment
output_translation_combined = translated_audio_bytes_state
stream_output_text = translated_text_state
return [
most_recent_input_audio_segment,
translated_wav_segment,
output_translation_combined,
stream_output_text,
translated_audio_bytes_state,
translated_text_state,
]
def clear():
logger.debug(f"Clearing State")
return [bytes(), ""]
def blocks():
with gr.Blocks() as demo:
with gr.Row():
# Hook this up once supported
target_language = gr.Dropdown(
label="Target language",
choices=S2ST_TARGET_LANGUAGE_NAMES,
value=DEFAULT_TARGET_LANGUAGE,
)
translated_audio_bytes_state = gr.State(None)
translated_text_state = gr.State("")
input_audio = gr.Audio(
label="Input Audio",
# source="microphone", # gradio==3.41.0
sources=["microphone"], # new gradio seems to call this less often...
streaming=True,
)
# input_audio = gr.Audio(
# label="Input Audio",
# type="filepath",
# source="microphone",
# streaming=True,
# )
most_recent_input_audio_segment = gr.Audio(
label="Recent Input Audio Segment segments",
# format="bytes",
streaming=True
)
# Force translate
stream_as_bytes_btn = gr.Button("Force translate most recent recording segment (ask for model output)")
output_translation_segment = gr.Audio(
label="Translated audio segment",
autoplay=False,
streaming=True,
type="numpy",
)
output_translation_combined = gr.Audio(
label="Translated audio combined",
autoplay=False,
streaming=True,
type="numpy",
)
# Could add output text segment
stream_output_text = gr.Textbox(label="Translated text")
stream_as_bytes_btn.click(
streaming_input_callback,
[input_audio, translated_audio_bytes_state, translated_text_state],
[
most_recent_input_audio_segment,
output_translation_segment,
output_translation_combined,
stream_output_text,
translated_audio_bytes_state,
translated_text_state,
],
)
# input_audio.change(
# streaming_input_callback,
# [input_audio, translated_audio_bytes_state, translated_text_state],
# [
# most_recent_input_audio_segment,
# output_translation_segment,
# output_translation_combined,
# stream_output_text,
# translated_audio_bytes_state,
# translated_text_state,
# ],
# )
input_audio.stream(
streaming_input_callback,
[input_audio, translated_audio_bytes_state, translated_text_state],
[
most_recent_input_audio_segment,
output_translation_segment,
output_translation_combined,
stream_output_text,
translated_audio_bytes_state,
translated_text_state,
],
)
input_audio.start_recording(
start_recording,
)
input_audio.clear(
clear, None, [translated_audio_bytes_state, translated_text_state]
)
input_audio.start_recording(
clear, None, [translated_audio_bytes_state, translated_text_state]
)
demo.queue().launch()
blocks()