Spaces:
Running
Running
##STREAMLINK CODE | |
import cv2 | |
import streamlink | |
import streamlit as st | |
import time | |
import tempfile | |
import base64 | |
import os | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
import assemblyai as aai | |
# Load environment variables | |
load_dotenv() | |
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
OpenAI.api_key = os.getenv("OPENAI_API_KEY") | |
client = OpenAI() | |
def extract_recent_frames(video_url, output_folder, duration=10, frames_per_second=1): | |
streams = streamlink.streams(video_url) | |
if not streams: | |
st.error("Error: Unable to retrieve streams. Make sure the YouTube video URL is valid.") | |
return | |
stream_url = streams['best'].url | |
cap = cv2.VideoCapture(stream_url) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(fps * duration) | |
frame_interval = int(fps / frames_per_second) | |
frame_count = 0 | |
start_time = time.time() | |
extracted_frames = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
st.error("Error: Couldn't read frame.") | |
break | |
elapsed_time = time.time() - start_time | |
if frame_count % frame_interval == 0 and elapsed_time <= duration: | |
# Convert frame to base64 | |
_, buffer = cv2.imencode(".jpg", frame) | |
base64_frame = base64.b64encode(buffer).decode("utf-8") | |
extracted_frames.append(base64_frame) | |
frame_count += 1 | |
if elapsed_time > duration: | |
break | |
cap.release() | |
return extracted_frames | |
def main(): | |
st.title("Insightly Live Video Analysis") | |
youtube_video_url = st.text_input("Enter YouTube Video URL:") | |
duration = st.slider("Select Duration (seconds):", min_value=1, max_value=60, value=10) | |
frames_per_second = st.slider("Select Frames per Second:", min_value=1, max_value=10, value=1) | |
if st.button("Extract Frames"): | |
st.info("Extracting frames. Please wait...") | |
extracted_frames = extract_recent_frames(youtube_video_url, "temp_frames", duration, frames_per_second) | |
if extracted_frames: | |
st.success("Frames extracted successfully!") | |
# Display frames in a grid format with frame description on click | |
display_frame_grid(extracted_frames) | |
else: | |
st.error("Failed to extract frames.") | |
#####################33 | |
def generate_description(base64_frames): | |
try: | |
prompt_messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
"1. Generate a description for this sequence of video frames in about 90 words. Return the following: 1. List of objects in the video 2. Any restrictive content or sensitive content and if so which frame.", | |
*map(lambda x: {"image": x, "resize": 428}, base64_frames[0::30]), | |
], | |
}, | |
] | |
response = client.chat.completions.create( | |
model="gpt-4-vision-preview", | |
messages=prompt_messages, | |
max_tokens=3000, | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error in generate_description: {e}") | |
return None | |
#########################################3333 | |
def display_frame_grid(extracted_frames): | |
cols_per_row = 3 | |
n_frames = len(extracted_frames) | |
for idx in range(0, n_frames, cols_per_row): | |
cols = st.columns(cols_per_row) | |
for col_index in range(cols_per_row): | |
frame_idx = idx + col_index | |
if frame_idx < n_frames: | |
with cols[col_index]: | |
# Decode base64 and display the frame | |
decoded_frame = base64.b64decode(extracted_frames[frame_idx]) | |
st.image(decoded_frame, channels="BGR", caption=f'Frame {frame_idx + 1}', use_column_width=True, output_format="JPEG") | |
if __name__ == "__main__": | |
main() | |