Spaces:
Runtime error
Runtime error
import gradio as gr | |
import torch | |
from transformers import AutoModel, AutoTokenizer, pipeline | |
from PIL import Image | |
from decord import VideoReader, cpu | |
import base64 | |
import io | |
import spaces | |
import time | |
import os | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
import moviepy.editor as mp | |
# Load models | |
model_path = 'openbmb/MiniCPM-V-2_6' | |
model = AutoModel.from_pretrained(model_path, trust_remote_code=True, torch_dtype=torch.bfloat16) | |
model = model.to(device='cuda') | |
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | |
model.eval() | |
# Load Whisper model | |
whisper_model = "openai/whisper-large-v3" | |
asr_pipeline = pipeline( | |
task="automatic-speech-recognition", | |
model=whisper_model, | |
chunk_length_s=30, | |
device="cuda" if torch.cuda.is_available() else "cpu", | |
) | |
MAX_NUM_FRAMES = 64 | |
def encode_image(image): | |
if not isinstance(image, Image.Image): | |
image = Image.open(image).convert("RGB") | |
max_size = 448*16 | |
if max(image.size) > max_size: | |
w,h = image.size | |
if w > h: | |
new_w = max_size | |
new_h = int(h * max_size / w) | |
else: | |
new_h = max_size | |
new_w = int(w * max_size / h) | |
image = image.resize((new_w, new_h), resample=Image.BICUBIC) | |
return image | |
def encode_video(video_path): | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
sample_fps = round(vr.get_avg_fps() / 1) | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
if len(frame_idx) > MAX_NUM_FRAMES: | |
frame_idx = frame_idx[:MAX_NUM_FRAMES] | |
video = vr.get_batch(frame_idx).asnumpy() | |
video = [Image.fromarray(v.astype('uint8')) for v in video] | |
video = [encode_image(v) for v in video] | |
return video | |
def extract_audio(video_path): | |
import subprocess | |
audio_path = "temp_audio.wav" | |
subprocess.call(['ffmpeg', '-i', video_path, '-ab', '160k', '-ac', '2', '-ar', '44100', '-vn', audio_path]) | |
return audio_path | |
def transcribe_audio(audio_file): | |
with open(audio_file, "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, asr_pipeline.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": asr_pipeline.feature_extractor.sampling_rate} | |
transcription = asr_pipeline(inputs, batch_size=8, generate_kwargs={"task": "translate"}, return_timestamps=False)["text"] | |
return transcription | |
def analyze_video(prompt, video, progress=gr.Progress()): | |
start_time = time.time() | |
progress(0, desc="Initializing") | |
if isinstance(video, str): | |
video_path = video | |
else: | |
video_path = video.name | |
progress(0.1, desc="Encoding video") | |
encoded_video = encode_video(video_path) | |
progress(0.3, desc="Extracting audio") | |
# Extract audio and transcribe | |
audio_path = extract_audio(video_path) | |
progress(0.5, desc="Transcribing audio") | |
transcription = transcribe_audio(audio_path) | |
print(f"Transcription: {transcription}") | |
# Clean up temporary audio file | |
os.remove(audio_path) | |
progress(0.7, desc="Preparing context") | |
context = [ | |
{"role": "user", "content": encoded_video}, | |
{"role": "assistant", "content": f"Transcription of the video: {transcription}"}, | |
{"role": "user", "content": prompt} | |
] | |
params = { | |
'sampling': True, | |
'top_p': 0.8, | |
'top_k': 100, | |
'temperature': 0.7, | |
'repetition_penalty': 1.05, | |
"max_new_tokens": 2048, | |
"max_inp_length": 4352, | |
"use_image_id": False, | |
"max_slice_nums": 1 if len(encoded_video) > 16 else 2 | |
} | |
progress(0.8, desc="Generating response") | |
response = model.chat(image=None, msgs=context, tokenizer=tokenizer, **params) | |
progress(0.9, desc="Finalizing") | |
end_time = time.time() | |
processing_time = end_time - start_time | |
analysis_result = f"Analysis Result:\n{response}\n\n" | |
processing_time = f"Processing Time: {processing_time:.2f} seconds" | |
progress(1, desc="Complete") | |
return analysis_result, processing_time | |
with gr.Blocks(theme="NoCrypt/miku") as demo: | |
gr.Label("Video Analyzer with MiniCPM-V-2_6 and Whisper") | |
with gr.Accordion("Input (Work best with English videos)"): | |
with gr.Row(): | |
video_input = gr.Video(label="Upload Video") | |
prompt_input = gr.Textbox(label="Prompt", value="Analyze this video, give me advice on how to improve it and score from 0 to 100 for each point") | |
with gr.Accordion("Output"): | |
with gr.Row(): | |
analysis_result = gr.Textbox(label="Analysis Result") | |
processing_time = gr.Textbox(label="Processing Time") | |
analyze_button = gr.Button("Analyze Video") | |
analyze_button.click(fn=analyze_video, inputs=[prompt_input, video_input], outputs=[analysis_result, processing_time]) | |
demo.launch() | |