import gradio as gr from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration import torch import librosa import subprocess from langdetect import detect_langs import os import warnings from transformers import logging as transformers_logging import math import json import tempfile import logging import concurrent.futures # Configurar logging logging.basicConfig(level=logging.INFO) # Suprimir advertencias warnings.filterwarnings("ignore") # Configurar verbosidad para transformers transformers_logging.set_verbosity_error() # Modelos actualizados por idioma MODELS = { "es": [ "openai/whisper-large-v3", "facebook/wav2vec2-large-xlsr-53-spanish", "jonatasgrosman/wav2vec2-xls-r-1b-spanish" ], "en": [ "openai/whisper-large-v3", "facebook/wav2vec2-large-960h", "microsoft/wav2vec2-base-960h" ], "pt": [ "facebook/wav2vec2-large-xlsr-53-portuguese", "openai/whisper-medium", "jonatasgrosman/wav2vec2-xlsr-53-portuguese" ], "fr": [ "jonatasgrosman/wav2vec2-large-xlsr-53-french" ] } # Cache de modelos para evitar múltiples cargas model_cache = {} def get_model(model_name): if model_name not in model_cache: model_cache[model_name] = WhisperForConditionalGeneration.from_pretrained(model_name) return model_cache[model_name] # Función para verificar si ffmpeg está instalado def verify_ffmpeg_installation(): try: subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) except subprocess.CalledProcessError as e: logging.error("ffmpeg no está instalado o no se puede ejecutar correctamente.") raise e def convert_audio_to_wav(audio_path): if os.path.isdir(audio_path): raise ValueError(f"La ruta proporcionada es un directorio, no un archivo: {audio_path}") with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: wav_path = tmp.name # Añadir la opción '-y' para sobrescribir el archivo existente sin preguntar command = ["ffmpeg", "-y", "-i", audio_path, "-ac", "1", "-ar", "16000", wav_path] process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Imprimir resultados para depuración logging.info(process.stdout.decode()) logging.error(process.stderr.decode()) if process.returncode != 0: raise ValueError(f"Error al convertir el archivo de audio a wav: {process.stderr.decode()}") return wav_path def detect_language(audio_path): try: speech, _ = librosa.load(audio_path, sr=16000, duration=30) except Exception as e: raise ValueError(f"Error al cargar el archivo de audio con librosa: {e}") processor = WhisperProcessor.from_pretrained("openai/whisper-base") model = get_model("openai/whisper-base") input_features = processor(speech, sampling_rate=16000, return_tensors="pt").input_features predicted_ids = model.generate(input_features) transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] langs = detect_langs(transcription) es_confidence = next((lang.prob for lang in langs if lang.lang == 'es'), 0) pt_confidence = next((lang.prob for lang in langs if lang.lang == 'pt'), 0) if abs(es_confidence - pt_confidence) < 0.2: return 'es' return max(langs, key=lambda x: x.prob).lang def transcribe_audio_stream(audio, model_name): wav_audio = convert_audio_to_wav(audio) speech, rate = librosa.load(wav_audio, sr=16000) duration = len(speech) / rate transcriptions = [] processor = WhisperProcessor.from_pretrained(model_name) model = get_model(model_name) chunk_duration = 30 # segundos for i in range(0, int(duration), chunk_duration): end = min(i + chunk_duration, duration) chunk = speech[int(i * rate):int(end * rate)] input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features predicted_ids = model.generate(input_features) transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] progress = min(100, (end / duration) * 100) transcriptions.append({ "start_time": i, "end_time": end, "text": transcription }) yield transcriptions, progress def detect_and_select_model(audio): wav_audio = convert_audio_to_wav(audio) language = detect_language(wav_audio) model_options = MODELS.get(language, MODELS["en"]) return language, model_options def save_transcription(transcriptions, file_format): if file_format == "JSON": with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp: json.dump(transcriptions, tmp, ensure_ascii=False, indent=4) file_path = tmp.name elif file_format == "TXT": with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp: for entry in transcriptions: tmp.write(f"{entry['start_time']:.2f},{entry['end_time']:.2f},{entry['text']}\n".encode()) file_path = tmp.name logging.info(f"Archivo de transcripción guardado en: {file_path}") return file_path def combined_interface(audio, file_format, confirmed_language, chosen_model): try: logging.info(f"Ruta del archivo de audio subido: {audio}") verify_ffmpeg_installation() language, model_options = detect_and_select_model(audio) # Si el usuario ha confirmado el idioma, lo usamos, sino, lo detectamos automáticamente if not confirmed_language: confirmed_language = language # Sugerimos un modelo, pero permitimos que el usuario elija uno if not chosen_model: chosen_model = model_options[0] logging.info(f"Idioma detectado: {confirmed_language}") logging.info(f"Modelos disponibles: {model_options}") logging.info(f"Modelo seleccionado: {chosen_model}") # Primer yield: Añadir None para la séptima salida (Archivo de Descarga) yield confirmed_language, model_options, chosen_model, "", 0, "Initializing...", None transcriptions = [] for partial_transcriptions, progress in transcribe_audio_stream(audio, chosen_model): transcriptions = partial_transcriptions full_transcription = " ".join([t["text"] for t in transcriptions]) progress_int = math.floor(progress) status = f"Transcribing... {progress_int}% complete" logging.info(f"Progreso: {progress_int}%") yield confirmed_language, model_options, chosen_model, full_transcription.strip(), progress_int, status, None logging.info("Guardando transcripción.") file_path = save_transcription(transcriptions, file_format) if os.path.isdir(file_path): raise ValueError(f"El archivo de transcripción debería ser un archivo, pero es un directorio: {file_path}") if not os.path.isfile(file_path): raise ValueError(f"El archivo de transcripción no existe: {file_path}") os.remove("converted_audio.wav") logging.info("Archivos temporales limpiados.") yield confirmed_language, model_options, chosen_model, full_transcription.strip(), 100, "Transcription complete! Download the file below.", file_path except Exception as e: logging.error(f"Error: {e}") yield str(e), [], "", "An error occurred during processing.", 0, "Error", "" iface = gr.Interface( fn=combined_interface, inputs=[ gr.Audio(type="filepath", label="Upload Audio File"), gr.Radio(choices=["JSON", "TXT"], label="Choose output format"), gr.Dropdown(choices=["", "es", "en", "pt", "fr"], label="Confirm detected language (optional)"), gr.Dropdown(choices=["", "openai/whisper-large-v3", "facebook/wav2vec2-large-xlsr-53-spanish", "jonatasgrosman/wav2vec2-xls-r-1b-spanish", "microsoft/wav2vec2-base-960h"], label="Choose model (optional)") ], outputs=[ gr.Textbox(label="Detected Language"), gr.Dropdown(label="Available Models", choices=[]), gr.Textbox(label="Selected Model"), gr.Textbox(label="Transcription", lines=10), gr.Slider(minimum=0, maximum=100, label="Progress", interactive=False), gr.Textbox(label="Status"), gr.File(label="Download Transcription") ], title="Multilingual Audio Transcriber with Real-time Display and Progress Indicator", description="Upload an audio file to detect the language, confirm the detection or choose a model, and get the transcription in real-time. Optimized for Spanish, English, and Portuguese.", live=True ) if __name__ == "__main__": iface.queue().launch()