import os import json import logging from datetime import datetime, timedelta import requests import replicate import gradio as gr from dotenv import load_dotenv from flask import Flask, jsonify from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips # Configurar logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Cargar variables de entorno load_dotenv() # Constantes CHUNK_SIZE = 1024 XI_API_KEY = os.getenv("XI_API_KEY") VOICE_ID = os.getenv("VOICE_ID") REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") MESSAGE_LIMIT = 45 TIME_LIMIT = timedelta(hours=2) # Datos de uso usage_data = { 'message_count': 0, 'last_reset': datetime.now() } app = Flask(__name__) @app.route('/reset_usage', methods=['POST']) def reset_usage(): global usage_data usage_data = { 'message_count': 0, 'last_reset': datetime.now() } return jsonify({"success": "Uso reiniciado."}), 200 def text_to_speech(text): global usage_data current_time = datetime.now() if current_time - usage_data['last_reset'] > TIME_LIMIT: usage_data = { 'message_count': 0, 'last_reset': current_time } if usage_data['message_count'] >= MESSAGE_LIMIT: return "Error: Límite de mensajes alcanzado. Intente nuevamente en 2 horas." tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" headers = { "Accept": "application/json", "xi-api-key": XI_API_KEY } data = { "text": text, "model_id": "eleven_multilingual_v2", "voice_settings": { "stability": 0.5, "similarity_boost": 0.8, "style": 0.0, "use_speaker_boost": True } } try: response = requests.post(tts_url, headers=headers, json=data, stream=True) response.raise_for_status() output_path = "output.mp3" with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): f.write(chunk) usage_data['message_count'] += 1 return output_path except requests.RequestException as e: logger.error(f"Error en text_to_speech: {e}") return f"Error: {str(e)}" def save_transcript_to_json(text): transcript = {"text": text} json_path = "transcript.json" with open(json_path, "w") as json_file: json.dump(transcript, json_file) return json_path def upload_to_temp_storage(file_path): url = "https://file.io" try: with open(file_path, 'rb') as f: response = requests.post(url, files={'file': f}) response.raise_for_status() return response.json()['link'] except requests.RequestException as e: logger.error(f"Error al subir archivo al almacenamiento temporal: {e}") raise def split_video(video_file_path, segment_duration=30): video = VideoFileClip(video_file_path) duration = video.duration segments = [] for start in range(0, int(duration), segment_duration): end = min(start + segment_duration, duration) segment = video.subclip(start, end) segment_path = f"segment_{start}_{end}.mp4" segment.write_videofile(segment_path, codec='libx264', audio_codec='aac') segments.append(segment_path) return segments def download_video(uri, output_path): try: response = requests.get(uri, stream=True) response.raise_for_status() with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): f.write(chunk) return output_path except requests.RequestException as e: logger.error(f"Error al descargar video de {uri}: {e}") raise def generate_video_with_subtitles(text, video_file_path): transcript_json_path = save_transcript_to_json(text) transcript_url = upload_to_temp_storage(transcript_json_path) segments = split_video(video_file_path) processed_segments = [] for segment_path in segments: try: with open(segment_path, "rb") as video_file: output = replicate.run( "fictions-ai/autocaption:18a45ff0d95feb4449d192bbdc06b4a6df168fa33def76dfc51b78ae224b599b", input={ "font": "Poppins/Poppins-ExtraBold.ttf", "color": "white", "kerning": -5, "opacity": 0, "MaxChars": 20, "fontsize": 4, "translate": False, "output_video": True, "stroke_color": "black", "stroke_width": 2.6, "right_to_left": False, "subs_position": "bottom75", "highlight_color": "yellow", "video_file_input": video_file, "output_transcript": True, "transcript_file_input": transcript_url } ) logger.debug(f"Tipo de salida: {type(output)}") logger.debug(f"Contenido de salida: {output}") if isinstance(output, list): for item in output: if isinstance(item, str): # Asumiendo que cada elemento es una URL video_output_path = f"processed_{os.path.basename(segment_path)}" processed_segments.append(download_video(item, video_output_path)) else: logger.warning(f"Elemento inesperado en la salida: {item}") elif isinstance(output, dict): # Manejar el caso en que la salida es un diccionario if 'url' in output: video_output_path = f"processed_{os.path.basename(segment_path)}" processed_segments.append(download_video(output['url'], video_output_path)) else: logger.error(f"Formato de salida inesperado: {output}") else: raise ValueError(f"Formato de salida inesperado: {output}") except Exception as e: logger.error(f"Error procesando segmento {segment_path}: {e}") raise return concatenate_videos(processed_segments) def concatenate_videos(video_paths): clips = [VideoFileClip(path) for path in video_paths] final_clip = concatenate_videoclips(clips) output_path = "final_combined_video.mp4" final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") return output_path def combine_audio_and_video(video_path, audio_path, output_path="final_output.mp4"): video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(audio_path) final_clip = video_clip.set_audio(audio_clip) final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") return output_path def process_text_and_generate_video(text, video_file): if video_file is None: return None, None try: audio_path = text_to_speech(text) if audio_path.startswith("Error"): return audio_path, None logger.debug(f"Generando video con subtítulos para el archivo: {video_file.name}") video_output = generate_video_with_subtitles(text, video_file.name) logger.debug(f"Video con subtítulos generado: {video_output}") final_output = combine_audio_and_video(video_output, audio_path) logger.debug(f"Video final generado: {final_output}") return audio_path, final_output except Exception as e: logger.error(f"Error en process_text_and_generate_video: {e}") return None, f"Error: {str(e)}" iface = gr.Interface( fn=process_text_and_generate_video, inputs=["text", "file"], outputs=[ gr.Audio(label="Audio generado"), gr.File(label="Video generado") ], title="Generación de Video con Subtítulos y Audio", description="Ingrese texto y suba un archivo de video para generar un video con subtítulos y audio." ) if __name__ == "__main__": iface.launch()