Spaces:
Running
Running
import os | |
import tempfile | |
import re | |
import librosa | |
import torch | |
import json | |
import numpy as np | |
from transformers import Wav2Vec2ForCTC, AutoProcessor | |
from huggingface_hub import hf_hub_download | |
from torchaudio.models.decoder import ctc_decoder | |
uroman_dir = "uroman" | |
assert os.path.exists(uroman_dir) | |
UROMAN_PL = os.path.join(uroman_dir, "bin", "uroman.pl") | |
ASR_SAMPLING_RATE = 16_000 | |
WORD_SCORE_DEAULT_IF_LM = -0.18 | |
WORD_SCORE_DEAULT_IF_NOLM = -3.5 | |
LM_SCORE_DEAULT = 1.48 | |
MODEL_ID = "upload/mms_zs" | |
processor = AutoProcessor.from_pretrained(MODEL_ID) | |
model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) | |
token_file = "upload/mms_zs/tokens.txt" | |
def error_check_file(filepath): | |
if not isinstance(filepath, str): | |
return "Expected file to be of type 'str'. Instead got {}".format( | |
type(filepath) | |
) | |
if not os.path.exists(filepath): | |
return "Input file '{}' doesn't exists".format(type(filepath)) | |
def norm_uroman(text): | |
text = text.lower() | |
text = text.replace("β", "'") | |
text = re.sub("([^a-z' ])", " ", text) | |
text = re.sub(" +", " ", text) | |
return text.strip() | |
def uromanize(words): | |
iso = "xxx" | |
with tempfile.NamedTemporaryFile() as tf, tempfile.NamedTemporaryFile() as tf2: | |
with open(tf.name, "w") as f: | |
f.write("\n".join(words)) | |
cmd = f"perl " + UROMAN_PL | |
cmd += f" -l {iso} " | |
cmd += f" < {tf.name} > {tf2.name}" | |
os.system(cmd) | |
lexicon = {} | |
with open(tf2.name) as f: | |
for idx, line in enumerate(f): | |
if not line.strip(): | |
continue | |
line = re.sub(r"\s+", " ", norm_uroman(line)).strip() | |
lexicon[words[idx]] = " ".join(line) + " |" | |
return lexicon | |
def load_lexicon(filepath): | |
words = {} | |
with open(filepath) as f: | |
for line in f: | |
line = line.strip() | |
# ignore invalid words. | |
if not line or " " in line or len(line) > 50: | |
continue | |
for w in line.split(): | |
words[w.lower()] = True | |
return uromanize(list(words.keys())) | |
def process( | |
audio_data, | |
words_file, | |
lm_path=None, | |
wscore=None, | |
lmscore=None, | |
wscore_usedefault=True, | |
lmscore_usedefault=True, | |
): | |
if isinstance(audio_data, tuple): | |
# microphone | |
sr, audio_samples = audio_data | |
audio_samples = (audio_samples / 32768.0).astype(np.float) | |
assert sr == ASR_SAMPLING_RATE, "Invalid sampling rate" | |
else: | |
# file upload | |
assert isinstance(audio_data, str) | |
audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] | |
# print(audio_samples[:10]) | |
# print("I'm here 102") | |
print("len audio_samples", len(audio_samples)) | |
lang_code = "eng" | |
# processor.tokenizer.set_target_lang(lang_code) | |
# print("I'm here 107") | |
# model.load_adapter(lang_code) | |
# print("I'm here 109") | |
inputs = processor( | |
audio_samples, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt" | |
) | |
# print("I'm here 106") | |
print("inputs type", type(inputs)) | |
# print("inputs size", inputs.size) | |
# set device | |
if torch.cuda.is_available(): | |
device = torch.device("cuda") | |
elif ( | |
hasattr(torch.backends, "mps") | |
and torch.backends.mps.is_available() | |
and torch.backends.mps.is_built() | |
): | |
device = torch.device("mps") | |
else: | |
device = torch.device("cpu") | |
device = torch.device("cpu") | |
model.to(device) | |
inputs = inputs.to(device) | |
# print("I'm here 122") | |
with torch.no_grad(): | |
outputs = model(**inputs).logits | |
# Setup lexicon and decoder | |
# print("before uroman") | |
lexicon = load_lexicon(words_file) | |
# print("after uroman") | |
# print("len lexicon", len(lexicon)) | |
with tempfile.NamedTemporaryFile() as lexicon_file: | |
with open(lexicon_file.name, "w") as f: | |
idx = 10 | |
for word, spelling in lexicon.items(): | |
f.write(word + " " + spelling + "\n") | |
if idx % 100 == 0: | |
print(word, spelling, flush=True) | |
idx += 1 | |
if wscore_usedefault: | |
wscore = ( | |
WORD_SCORE_DEAULT_IF_LM | |
if lm_path is not None | |
else WORD_SCORE_DEAULT_IF_NOLM | |
) | |
if lmscore_usedefault: | |
lmscore = LM_SCORE_DEAULT if lm_path is not None else 0 | |
beam_search_decoder = ctc_decoder( | |
lexicon=lexicon_file.name, | |
tokens=token_file, | |
lm=None, | |
nbest=1, | |
beam_size=500, | |
beam_size_token=50, | |
lm_weight=lmscore, | |
word_score=wscore, | |
sil_score=0, | |
blank_token="<s>", | |
) | |
beam_search_result = beam_search_decoder(outputs.to("cpu")) | |
transcription = " ".join(beam_search_result[0][0].words).strip() | |
return transcription | |
ZS_EXAMPLES = [["upload/english.mp3", "upload/words_top10k.txt"]] | |
print(process("upload/english.mp3", "upload/words_top10k.txt")) | |