import gradio as gr import torch from transformers import AutoModel, AutoTokenizer, pipeline from PIL import Image from decord import VideoReader, cpu import base64 import io import spaces import time import os from transformers.pipelines.audio_utils import ffmpeg_read import moviepy.editor as mp # Load models model_path = 'openbmb/MiniCPM-V-2_6' model = AutoModel.from_pretrained(model_path, trust_remote_code=True, torch_dtype=torch.bfloat16) model = model.to(device='cuda') tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) model.eval() # Load Whisper model whisper_model = "openai/whisper-large-v3" asr_pipeline = pipeline( task="automatic-speech-recognition", model=whisper_model, chunk_length_s=30, device="cuda" if torch.cuda.is_available() else "cpu", ) MAX_NUM_FRAMES = 64 def encode_image(image): if not isinstance(image, Image.Image): image = Image.open(image).convert("RGB") max_size = 448*16 if max(image.size) > max_size: w,h = image.size if w > h: new_w = max_size new_h = int(h * max_size / w) else: new_h = max_size new_w = int(w * max_size / h) image = image.resize((new_w, new_h), resample=Image.BICUBIC) return image def encode_video(video_path): vr = VideoReader(video_path, ctx=cpu(0)) sample_fps = round(vr.get_avg_fps() / 1) frame_idx = [i for i in range(0, len(vr), sample_fps)] if len(frame_idx) > MAX_NUM_FRAMES: frame_idx = frame_idx[:MAX_NUM_FRAMES] video = vr.get_batch(frame_idx).asnumpy() video = [Image.fromarray(v.astype('uint8')) for v in video] video = [encode_image(v) for v in video] return video def extract_audio(video_path): import subprocess audio_path = "temp_audio.wav" subprocess.call(['ffmpeg', '-i', video_path, '-ab', '160k', '-ac', '2', '-ar', '44100', '-vn', audio_path]) return audio_path def transcribe_audio(audio_file): with open(audio_file, "rb") as f: inputs = f.read() inputs = ffmpeg_read(inputs, asr_pipeline.feature_extractor.sampling_rate) inputs = {"array": inputs, "sampling_rate": asr_pipeline.feature_extractor.sampling_rate} transcription = asr_pipeline(inputs, batch_size=8, generate_kwargs={"task": "translate"}, return_timestamps=False)["text"] return transcription @spaces.GPU def analyze_video(prompt, video, progress=gr.Progress()): start_time = time.time() progress(0, desc="Initializing") if isinstance(video, str): video_path = video else: video_path = video.name progress(0.1, desc="Encoding video") encoded_video = encode_video(video_path) progress(0.3, desc="Extracting audio") # Extract audio and transcribe audio_path = extract_audio(video_path) progress(0.5, desc="Transcribing audio") transcription = transcribe_audio(audio_path) print(f"Transcription: {transcription}") # Clean up temporary audio file os.remove(audio_path) progress(0.7, desc="Preparing context") context = [ {"role": "user", "content": encoded_video}, {"role": "assistant", "content": f"Transcription of the video: {transcription}"}, {"role": "user", "content": prompt} ] params = { 'sampling': True, 'top_p': 0.8, 'top_k': 100, 'temperature': 0.7, 'repetition_penalty': 1.05, "max_new_tokens": 2048, "max_inp_length": 4352, "use_image_id": False, "max_slice_nums": 1 if len(encoded_video) > 16 else 2 } progress(0.8, desc="Generating response") response = model.chat(image=None, msgs=context, tokenizer=tokenizer, **params) progress(0.9, desc="Finalizing") end_time = time.time() processing_time = end_time - start_time analysis_result = f"Analysis Result:\n{response}\n\n" processing_time = f"Processing Time: {processing_time:.2f} seconds" progress(1, desc="Complete") return analysis_result, processing_time with gr.Blocks(theme="NoCrypt/miku") as demo: gr.Label("Video Analyzer with MiniCPM-V-2_6 and Whisper") with gr.Accordion("Input (Work best with English videos)"): with gr.Row(): video_input = gr.Video(label="Upload Video") prompt_input = gr.Textbox(label="Prompt", value="Analyze this video, give me advice on how to improve it and score from 0 to 100 for each point") with gr.Accordion("Output"): with gr.Row(): analysis_result = gr.Textbox(label="Analysis Result") processing_time = gr.Textbox(label="Processing Time") analyze_button = gr.Button("Analyze Video") analyze_button.click(fn=analyze_video, inputs=[prompt_input, video_input], outputs=[analysis_result, processing_time]) demo.launch()