mms-zeroshot / zeroshot.py
Vineel Pratap
fix sampling rate
1be831a
raw
history blame
7.83 kB
import os
import tempfile
import re
import librosa
import torch
import json
import numpy as np
from transformers import Wav2Vec2ForCTC, AutoProcessor
from huggingface_hub import hf_hub_download
from torchaudio.models.decoder import ctc_decoder
from utils.text_norm import text_normalize
from utils.lm import create_unigram_lm, maybe_generate_pseudo_bigram_arpa
uroman_dir = "uroman"
assert os.path.exists(uroman_dir)
UROMAN_PL = os.path.join(uroman_dir, "bin", "uroman.pl")
ASR_SAMPLING_RATE = 16_000
WORD_SCORE_DEFAULT_IF_LM = -0.18
WORD_SCORE_DEFAULT_IF_NOLM = -3.5
LM_SCORE_DEFAULT = 1.48
MODEL_ID = "upload/mms_zs"
processor = AutoProcessor.from_pretrained(MODEL_ID)
model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID)
token_file = "upload/mms_zs/tokens.txt"
class MY_LOG:
def __init__(self):
self.text = "[START]"
def add(self, new_log, new_line= True):
self.text = self.text + ("\n" if new_line else " ") + new_log
self.text = self.text.strip()
return self.text
def error_check_file(filepath):
if not isinstance(filepath, str):
return "Expected file to be of type 'str'. Instead got {}".format(
type(filepath)
)
if not os.path.exists(filepath):
return "Input file '{}' doesn't exists".format(type(filepath))
def norm_uroman(text):
text = text.lower()
text = text.replace("’", "'")
text = re.sub("([^a-z' ])", " ", text)
text = re.sub(" +", " ", text)
return text.strip()
def uromanize(words):
iso = "xxx"
with tempfile.NamedTemporaryFile() as tf, tempfile.NamedTemporaryFile() as tf2:
with open(tf.name, "w") as f:
f.write("\n".join(words))
cmd = f"perl " + UROMAN_PL
cmd += f" -l {iso} "
cmd += f" < {tf.name} > {tf2.name}"
os.system(cmd)
lexicon = {}
with open(tf2.name) as f:
for idx, line in enumerate(f):
if not line.strip():
continue
line = re.sub(r"\s+", "", norm_uroman(line)).strip()
lexicon[words[idx]] = " ".join(line) + " |"
return lexicon
def filter_lexicon(lexicon, word_counts):
spelling_to_words = {}
for w, s in lexicon.items():
spelling_to_words.setdefault(s, [])
spelling_to_words[s].append(w)
lexicon = {}
for s, ws in spelling_to_words.items():
if len(ws) > 1:
# use the word which has higest counts, fewed additional characters
ws.sort(key=lambda w: (-word_counts[w], len(w)))
lexicon[ws[0]] = s
return lexicon
def load_words(filepath):
words = {}
num_sentences = 0
with open(filepath) as f:
for line in f:
line = line.strip().lower()
num_sentences += 1
line = text_normalize(line, iso_code="xxx")
# ignore invalid words.
for w in line.split():
words.setdefault(w, 0)
words[w] += 1
return words, num_sentences
def process(
audio_data,
words_file,
lm_path=None,
wscore=None,
lmscore=None,
wscore_usedefault=True,
lmscore_usedefault=True,
autolm=True,
reference=None,
):
transcription, logs = "", MY_LOG()
if not audio_data or not words_file:
yield "ERROR: Empty audio data or words file", logs.text
return
if isinstance(audio_data, tuple):
# microphone
sr, audio_samples = audio_data
audio_samples = (audio_samples / 32768.0).astype(float)
if sr != ASR_SAMPLING_RATE:
audio_samples = librosa.resample(audio_samples, sr, ASR_SAMPLING_RATE)
else:
# file upload
assert isinstance(audio_data, str)
audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0]
yield transcription, logs.add(f"Number of audio samples: {len(audio_samples)}")
inputs = processor(
audio_samples, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt"
)
# set device
if torch.cuda.is_available():
device = torch.device("cuda")
elif (
hasattr(torch.backends, "mps")
and torch.backends.mps.is_available()
and torch.backends.mps.is_built()
):
device = torch.device("mps")
else:
device = torch.device("cpu")
device = torch.device("cpu")
model.to(device)
inputs = inputs.to(device)
yield transcription, logs.add(f"Using device: {device}")
with torch.no_grad():
outputs = model(**inputs).logits
# Setup lexicon and decoder
yield transcription, logs.add(f"Loading words....")
try:
word_counts, num_sentences = load_words(words_file)
except Exception as e:
yield f"ERROR: Loading words failed '{str(e)}'", logs.text
return
yield transcription, logs.add(
f"Loaded {len(word_counts)} words from {num_sentences} lines.\nPreparing lexicon...."
)
try:
lexicon = uromanize(list(word_counts.keys()))
except Exception as e:
yield f"ERROR: Creating lexicon failed '{str(e)}'", logs.text
return
# for k, v in lexicon.items():
# if len(v) < 5:
# print(k, v)
yield transcription, logs.add(f"Leixcon size: {len(lexicon)}")
# Input could be sentences OR list of words. Check if atleast one word has a count > 1 to diffentiate
tmp_file = tempfile.NamedTemporaryFile() # could be used for LM
if autolm and any([cnt > 2 for cnt in word_counts.values()]):
yield transcription, logs.add(f"Creating unigram LM...", False)
lm_path = tmp_file.name
create_unigram_lm(word_counts, num_sentences, lm_path)
yield transcription, logs.add(f"OK")
if lm_path is None:
yield transcription, logs.add(f"Filtering lexicon....")
lexicon = filter_lexicon(lexicon, word_counts)
yield transcription, logs.add(
f"Ok. Leixcon size after filtering: {len(lexicon)}"
)
else:
# kenlm throws an error if unigram LM is being used
# HACK: generate a bigram LM from unigram LM and a dummy bigram to trick it
maybe_generate_pseudo_bigram_arpa(lm_path)
# for k, v in lexicon.items():
# if len(v) < 5:
# print(k, v)
# print(lexicon["the"], lexicon["\"(t)he"])
with tempfile.NamedTemporaryFile() as lexicon_file:
if lm_path is not None and not lm_path.strip():
lm_path = None
with open(lexicon_file.name, "w") as f:
idx = 10
for word, spelling in lexicon.items():
f.write(word + " " + spelling + "\n")
idx += 1
if wscore_usedefault:
wscore = (
WORD_SCORE_DEFAULT_IF_LM
if lm_path is not None
else WORD_SCORE_DEFAULT_IF_NOLM
)
if lmscore_usedefault:
lmscore = LM_SCORE_DEFAULT if lm_path is not None else 0
yield transcription, logs.add(
f"Using word score: {wscore}\nUsing lm score: {lmscore}"
)
beam_search_decoder = ctc_decoder(
lexicon=lexicon_file.name,
tokens=token_file,
lm=lm_path,
nbest=1,
beam_size=500,
beam_size_token=50,
lm_weight=lmscore,
word_score=wscore,
sil_score=0,
blank_token="<s>",
)
beam_search_result = beam_search_decoder(outputs.to("cpu"))
transcription = " ".join(beam_search_result[0][0].words).strip()
yield transcription, logs.add(f"[DONE]")
for i in process("upload/english/english.mp3", "upload/english/c4_5k_sentences.txt"):
print(i)
# for i in process("upload/ligurian/ligurian_1.mp3", "upload/ligurian/zenamt_5k_sentences.txt"):
# print(i)