Spaces:
Running
Running
import os | |
import tempfile | |
import re | |
import librosa | |
import torch | |
import json | |
import numpy as np | |
from transformers import Wav2Vec2ForCTC, AutoProcessor | |
from huggingface_hub import hf_hub_download | |
from torchaudio.models.decoder import ctc_decoder | |
from utils.text_norm import text_normalize | |
from utils.lm import create_unigram_lm, maybe_generate_pseudo_bigram_arpa | |
uroman_dir = "uroman" | |
assert os.path.exists(uroman_dir) | |
UROMAN_PL = os.path.join(uroman_dir, "bin", "uroman.pl") | |
ASR_SAMPLING_RATE = 16_000 | |
WORD_SCORE_DEFAULT_IF_LM = -0.18 | |
WORD_SCORE_DEFAULT_IF_NOLM = -3.5 | |
LM_SCORE_DEFAULT = 1.48 | |
MODEL_ID = "upload/mms_zs" | |
processor = AutoProcessor.from_pretrained(MODEL_ID) | |
model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) | |
token_file = "upload/mms_zs/tokens.txt" | |
class MY_LOG: | |
def __init__(self): | |
self.text = "[START]" | |
def add(self, new_log, new_line= True): | |
self.text = self.text + ("\n" if new_line else " ") + new_log | |
self.text = self.text.strip() | |
return self.text | |
def error_check_file(filepath): | |
if not isinstance(filepath, str): | |
return "Expected file to be of type 'str'. Instead got {}".format( | |
type(filepath) | |
) | |
if not os.path.exists(filepath): | |
return "Input file '{}' doesn't exists".format(type(filepath)) | |
def norm_uroman(text): | |
text = text.lower() | |
text = text.replace("’", "'") | |
text = re.sub("([^a-z' ])", " ", text) | |
text = re.sub(" +", " ", text) | |
return text.strip() | |
def uromanize(words): | |
iso = "xxx" | |
with tempfile.NamedTemporaryFile() as tf, tempfile.NamedTemporaryFile() as tf2: | |
with open(tf.name, "w") as f: | |
f.write("\n".join(words)) | |
cmd = f"perl " + UROMAN_PL | |
cmd += f" -l {iso} " | |
cmd += f" < {tf.name} > {tf2.name}" | |
os.system(cmd) | |
lexicon = {} | |
with open(tf2.name) as f: | |
for idx, line in enumerate(f): | |
if not line.strip(): | |
continue | |
line = re.sub(r"\s+", "", norm_uroman(line)).strip() | |
lexicon[words[idx]] = " ".join(line) + " |" | |
return lexicon | |
def filter_lexicon(lexicon, word_counts): | |
spelling_to_words = {} | |
for w, s in lexicon.items(): | |
spelling_to_words.setdefault(s, []) | |
spelling_to_words[s].append(w) | |
lexicon = {} | |
for s, ws in spelling_to_words.items(): | |
if len(ws) > 1: | |
# use the word which has higest counts, fewed additional characters | |
ws.sort(key=lambda w: (-word_counts[w], len(w))) | |
lexicon[ws[0]] = s | |
return lexicon | |
def load_words(filepath): | |
words = {} | |
num_sentences = 0 | |
with open(filepath) as f: | |
for line in f: | |
line = line.strip().lower() | |
num_sentences += 1 | |
line = text_normalize(line, iso_code="xxx") | |
# ignore invalid words. | |
for w in line.split(): | |
words.setdefault(w, 0) | |
words[w] += 1 | |
return words, num_sentences | |
def process( | |
audio_data, | |
words_file, | |
lm_path=None, | |
wscore=None, | |
lmscore=None, | |
wscore_usedefault=True, | |
lmscore_usedefault=True, | |
autolm=True, | |
reference=None, | |
): | |
transcription, logs = "", MY_LOG() | |
if not audio_data or not words_file: | |
yield "ERROR: Empty audio data or words file", logs.text | |
return | |
if isinstance(audio_data, tuple): | |
# microphone | |
sr, audio_samples = audio_data | |
audio_samples = (audio_samples / 32768.0).astype(float) | |
if sr != ASR_SAMPLING_RATE: | |
audio_samples = librosa.resample(audio_samples, sr, ASR_SAMPLING_RATE) | |
else: | |
# file upload | |
assert isinstance(audio_data, str) | |
audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] | |
yield transcription, logs.add(f"Number of audio samples: {len(audio_samples)}") | |
inputs = processor( | |
audio_samples, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt" | |
) | |
# set device | |
if torch.cuda.is_available(): | |
device = torch.device("cuda") | |
elif ( | |
hasattr(torch.backends, "mps") | |
and torch.backends.mps.is_available() | |
and torch.backends.mps.is_built() | |
): | |
device = torch.device("mps") | |
else: | |
device = torch.device("cpu") | |
device = torch.device("cpu") | |
model.to(device) | |
inputs = inputs.to(device) | |
yield transcription, logs.add(f"Using device: {device}") | |
with torch.no_grad(): | |
outputs = model(**inputs).logits | |
# Setup lexicon and decoder | |
yield transcription, logs.add(f"Loading words....") | |
try: | |
word_counts, num_sentences = load_words(words_file) | |
except Exception as e: | |
yield f"ERROR: Loading words failed '{str(e)}'", logs.text | |
return | |
yield transcription, logs.add( | |
f"Loaded {len(word_counts)} words from {num_sentences} lines.\nPreparing lexicon...." | |
) | |
try: | |
lexicon = uromanize(list(word_counts.keys())) | |
except Exception as e: | |
yield f"ERROR: Creating lexicon failed '{str(e)}'", logs.text | |
return | |
# for k, v in lexicon.items(): | |
# if len(v) < 5: | |
# print(k, v) | |
yield transcription, logs.add(f"Leixcon size: {len(lexicon)}") | |
# Input could be sentences OR list of words. Check if atleast one word has a count > 1 to diffentiate | |
tmp_file = tempfile.NamedTemporaryFile() # could be used for LM | |
if autolm and any([cnt > 2 for cnt in word_counts.values()]): | |
yield transcription, logs.add(f"Creating unigram LM...", False) | |
lm_path = tmp_file.name | |
create_unigram_lm(word_counts, num_sentences, lm_path) | |
yield transcription, logs.add(f"OK") | |
if lm_path is None: | |
yield transcription, logs.add(f"Filtering lexicon....") | |
lexicon = filter_lexicon(lexicon, word_counts) | |
yield transcription, logs.add( | |
f"Ok. Leixcon size after filtering: {len(lexicon)}" | |
) | |
else: | |
# kenlm throws an error if unigram LM is being used | |
# HACK: generate a bigram LM from unigram LM and a dummy bigram to trick it | |
maybe_generate_pseudo_bigram_arpa(lm_path) | |
# for k, v in lexicon.items(): | |
# if len(v) < 5: | |
# print(k, v) | |
# print(lexicon["the"], lexicon["\"(t)he"]) | |
with tempfile.NamedTemporaryFile() as lexicon_file: | |
if lm_path is not None and not lm_path.strip(): | |
lm_path = None | |
with open(lexicon_file.name, "w") as f: | |
idx = 10 | |
for word, spelling in lexicon.items(): | |
f.write(word + " " + spelling + "\n") | |
idx += 1 | |
if wscore_usedefault: | |
wscore = ( | |
WORD_SCORE_DEFAULT_IF_LM | |
if lm_path is not None | |
else WORD_SCORE_DEFAULT_IF_NOLM | |
) | |
if lmscore_usedefault: | |
lmscore = LM_SCORE_DEFAULT if lm_path is not None else 0 | |
yield transcription, logs.add( | |
f"Using word score: {wscore}\nUsing lm score: {lmscore}" | |
) | |
beam_search_decoder = ctc_decoder( | |
lexicon=lexicon_file.name, | |
tokens=token_file, | |
lm=lm_path, | |
nbest=1, | |
beam_size=500, | |
beam_size_token=50, | |
lm_weight=lmscore, | |
word_score=wscore, | |
sil_score=0, | |
blank_token="<s>", | |
) | |
beam_search_result = beam_search_decoder(outputs.to("cpu")) | |
transcription = " ".join(beam_search_result[0][0].words).strip() | |
yield transcription, logs.add(f"[DONE]") | |
for i in process("upload/english/english.mp3", "upload/english/c4_5k_sentences.txt"): | |
print(i) | |
# for i in process("upload/ligurian/ligurian_1.mp3", "upload/ligurian/zenamt_5k_sentences.txt"): | |
# print(i) | |