Spaces:
Running
Running
File size: 3,913 Bytes
075372d 7495ad5 075372d 7495ad5 075372d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
##STREAMLINK CODE
import cv2
import streamlink
import streamlit as st
import time
import tempfile
import base64
import os
from dotenv import load_dotenv
from openai import OpenAI
import assemblyai as aai
# Load environment variables
load_dotenv()
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY")
OpenAI.api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI()
def extract_recent_frames(video_url, output_folder, duration=10, frames_per_second=1):
streams = streamlink.streams(video_url)
if not streams:
st.error("Error: Unable to retrieve streams. Make sure the YouTube video URL is valid.")
return
stream_url = streams['best'].url
cap = cv2.VideoCapture(stream_url)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(fps * duration)
frame_interval = int(fps / frames_per_second)
frame_count = 0
start_time = time.time()
extracted_frames = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
st.error("Error: Couldn't read frame.")
break
elapsed_time = time.time() - start_time
if frame_count % frame_interval == 0 and elapsed_time <= duration:
# Convert frame to base64
_, buffer = cv2.imencode(".jpg", frame)
base64_frame = base64.b64encode(buffer).decode("utf-8")
extracted_frames.append(base64_frame)
frame_count += 1
if elapsed_time > duration:
break
cap.release()
return extracted_frames
def main():
st.title("Insightly Live Video Analysis")
youtube_video_url = st.text_input("Enter YouTube Video URL:")
duration = st.slider("Select Duration (seconds):", min_value=1, max_value=60, value=10)
frames_per_second = st.slider("Select Frames per Second:", min_value=1, max_value=10, value=1)
if st.button("Extract Frames"):
st.info("Extracting frames. Please wait...")
extracted_frames = extract_recent_frames(youtube_video_url, "temp_frames", duration, frames_per_second)
if extracted_frames:
st.success("Frames extracted successfully!")
# Display frames in a grid format with frame description on click
display_frame_grid(extracted_frames)
else:
st.error("Failed to extract frames.")
#####################33
def generate_description(base64_frames):
try:
prompt_messages = [
{
"role": "user",
"content": [
"1. Generate a description for this sequence of video frames in about 90 words. Return the following: 1. List of objects in the video 2. Any restrictive content or sensitive content and if so which frame.",
*map(lambda x: {"image": x, "resize": 428}, base64_frames[0::30]),
],
},
]
response = client.chat.completions.create(
model="gpt-4-vision-preview",
messages=prompt_messages,
max_tokens=3000,
)
return response.choices[0].message.content
except Exception as e:
print(f"Error in generate_description: {e}")
return None
#########################################3333
def display_frame_grid(extracted_frames):
cols_per_row = 3
n_frames = len(extracted_frames)
for idx in range(0, n_frames, cols_per_row):
cols = st.columns(cols_per_row)
for col_index in range(cols_per_row):
frame_idx = idx + col_index
if frame_idx < n_frames:
with cols[col_index]:
# Decode base64 and display the frame
decoded_frame = base64.b64decode(extracted_frames[frame_idx])
st.image(decoded_frame, channels="BGR", caption=f'Frame {frame_idx + 1}', use_column_width=True, output_format="JPEG")
if __name__ == "__main__":
main()
|