import gradio as gr import cv2 import os import subprocess from ultralytics import YOLO from ultralytics.solutions import object_counter import spaces # Import spaces for ZeroGPU integration # Initialize the YOLO model MODEL = "yolov8n.pt" model = YOLO(MODEL) model.fuse() dict_classes = model.model.names # Auxiliary function to resize frame def resize_frame(frame, scale_percent): width = int(frame.shape[1] * scale_percent / 100) height = int(frame.shape[0] * scale_percent / 100) dim = (width, height) resized = cv2.resize(frame, dim, interpolation=cv2.INTER_AREA) return resized def count_frames(video_file): cap = cv2.VideoCapture(video_file) if not cap.isOpened(): raise ValueError("Failed to open video file") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() return total_frames def file_size_check(file_path, max_size_mb=500): if os.path.getsize(file_path) > max_size_mb * 1024 * 1024: raise ValueError("File size exceeds the maximum limit of {} MB".format(max_size_mb)) @spaces.GPU def process_video(video_file, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness, draw_tracks, view_img, view_in_counts, view_out_counts, track_thickness, region_thickness, line_dist_thresh, persist, conf, iou, classes, verbose, progress): # Check file size file_size_check(video_file) # Ensure classes is a list of integers classes = [int(x) for x in classes.split(',') if x.strip().isdigit()] if classes else None line_points = [(line_start_x, line_start_y), (line_end_x, line_end_y)] cap = cv2.VideoCapture(video_file) if not cap.isOpened(): raise ValueError("Failed to open video file") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) tmp_output_path = "processed_output_temp.mp4" w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) * scale_percent / 100) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) * scale_percent / 100) fps = int(cap.get(cv2.CAP_PROP_FPS)) video_writer = cv2.VideoWriter(tmp_output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) counter = object_counter.ObjectCounter( classes_names=model.names, view_img=view_img, reg_pts=line_points, draw_tracks=draw_tracks, line_thickness=int(line_thickness), track_thickness=int(track_thickness), region_thickness=int(region_thickness), line_dist_thresh=line_dist_thresh, view_in_counts=view_in_counts, view_out_counts=view_out_counts, count_reg_color=(255, 0, 255), # Magenta track_color=(0, 255, 0), # Green count_txt_color=(255, 255, 255), # White count_bg_color=(50, 50, 50) # Dark gray ) prev_frame = None prev_keypoints = None processed_frames = 0 try: while cap.isOpened(): ret, frame = cap.read() if not ret: break frame = resize_frame(frame, scale_percent) # Adjust line points based on scaling scaled_line_points = [(int(x * scale_percent / 100), int(y * scale_percent / 100)) for x, y in line_points] for point1, point2 in zip(scaled_line_points[:-1], scaled_line_points[1:]): cv2.line(frame, tuple(map(int, point1)), tuple(map(int, point2)), (255, 255, 0), int(line_thickness)) tracks = model.track(frame, persist=persist, conf=conf, iou=iou, classes=classes, verbose=verbose) # Update the counter with the current frame and tracks frame = counter.start_counting(frame, tracks) # Check if the previous frame is initialized for optical flow calculation if prev_frame is not None: try: prev_frame_resized = resize_frame(prev_frame, scale_percent) matched_keypoints, status, _ = cv2.calcOpticalFlowPyrLK(prev_frame_resized, frame, prev_keypoints, None) prev_keypoints = matched_keypoints except cv2.error as e: print(f"Error in optical flow calculation: {e}") prev_frame = frame.copy() prev_keypoints = cv2.goodFeaturesToTrack(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7) video_writer.write(frame) # Update progress processed_frames += 1 progress(processed_frames / total_frames) finally: cap.release() video_writer.release() # Reduce the resolution of the video for download output_path = "processed_output.mp4" if h > 1080: resolution = "1920x1080" else: resolution = "1280x720" try: subprocess.run( ["ffmpeg", "-y", "-i", tmp_output_path, "-vf", f"scale={resolution}", "-crf", "18", "-preset", "veryfast", "-hide_banner", "-loglevel", "error", output_path], check=True ) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error during video processing: {e}") if os.path.exists(tmp_output_path): os.remove(tmp_output_path) return output_path def preview_line(video_file, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness): cap = cv2.VideoCapture(video_file) if not cap.isOpened(): raise ValueError("Failed to read video frame") ret, frame = cap.read() if not ret: raise ValueError("Failed to read video frame") frame = resize_frame(frame, scale_percent) line_points = [(line_start_x, line_start_y), (line_end_x, line_end_y)] scaled_line_points = [(int(x * scale_percent / 100), int(y * scale_percent / 100)) for x, y in line_points] for point1, point2 in zip(scaled_line_points[:-1], scaled_line_points[1:]): cv2.line(frame, tuple(map(int, point1)), tuple(map(int, point2)), (255, 255, 0), int(line_thickness)) preview_path = "preview_line.jpg" cv2.imwrite(preview_path, frame) return preview_path def gradio_app(video, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness, draw_tracks, view_img, view_in_counts, view_out_counts, track_thickness, region_thickness, line_dist_thresh, persist, conf, iou, classes_to_track, verbose): # Save the uploaded video to the main folder main_folder_video_path = "vehicle-counting.mp4" with open(main_folder_video_path, "wb") as f: f.write(open(video.name, "rb").read()) progress = gr.Progress() total_frames = count_frames(main_folder_video_path) output_path = process_video(main_folder_video_path, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, int(line_thickness), draw_tracks, view_img, view_in_counts, view_out_counts, int(track_thickness), int(region_thickness), line_dist_thresh, persist, conf, iou, classes_to_track, verbose, progress) return output_path, output_path def update_preview(video, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness): return preview_line(video.name, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, int(line_thickness)) def set_4k_coordinates(): return 0, 1500, 3840, 1500 def set_1080p_coordinates(): return 0, 700, 1920, 700 def use_example_video(): example_video_path = "vehicle-counting.mp4" return example_video_path with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=1): video_input = gr.File(label="Upload your video") # Removed max_size parameter example_button = gr.Button("Use Example Video") with gr.Row(): set_4k_button = gr.Button("4K") set_1080p_button = gr.Button("1080p") line_start_x = gr.Number(label="Line Start X", value=500, precision=0) line_start_y = gr.Number(label="Line Start Y", value=1500, precision=0) line_end_x = gr.Number(label="Line End X", value=3400, precision=0) line_end_y = gr.Number(label="Line End Y", value=1500, precision=0) line_thickness = gr.Slider(minimum=1, maximum=10, value=2, label="Line Thickness") draw_tracks = gr.Checkbox(label="Draw Tracks", value=True) view_img = gr.Checkbox(label="Display Image with Annotations", value=True) view_in_counts = gr.Checkbox(label="Display In-Counts", value=True) view_out_counts = gr.Checkbox(label="Display Out-Counts", value=True) track_thickness = gr.Slider(minimum=1, maximum=10, value=2, label="Track Thickness") region_thickness = gr.Slider(minimum=1, maximum=10, value=5, label="Region Thickness") line_dist_thresh = gr.Slider(minimum=5, maximum=50, value=15, label="Line Distance Threshold") persist = gr.Checkbox(label="Persist Tracks", value=True) conf = gr.Slider(minimum=0.0, maximum=1.0, value=0.1, step=0.05, label="Confidence Threshold") iou = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, step=0.05, label="IOU Threshold") classes_to_track = gr.Textbox(label="Classes to Track (comma-separated ids)", value="2,3,5,7") verbose = gr.Checkbox(label="Verbose Tracking", value=True) scale_percent = gr.Slider(minimum=10, maximum=100, value=100, step=10, label="Scale Percentage") process_button = gr.Button("Process Video") with gr.Column(scale=2): preview_image = gr.Image(label="Preview Line") video_output = gr.Video(label="Processed Video") download_button = gr.File(label="Download Processed Video") def update_preview_and_display(video, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness): preview_path = update_preview(video, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness) return preview_path video_input.change(update_preview_and_display, inputs=[video_input, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness], outputs=preview_image) for component in [scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness, draw_tracks, view_img, view_in_counts, view_out_counts, track_thickness, region_thickness, line_dist_thresh, persist, conf, iou, classes_to_track, verbose]: component.change(update_preview_and_display, inputs=[video_input, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness], outputs=preview_image) set_4k_button.click(lambda: set_4k_coordinates(), outputs=[line_start_x, line_start_y, line_end_x, line_end_y]) set_1080p_button.click(lambda: set_1080p_coordinates(), outputs=[line_start_x, line_start_y, line_end_x, line_end_y]) def clear_previous_video(): return None, None process_button.click(clear_previous_video, outputs=[video_output, download_button], queue=False) process_button.click(gradio_app, inputs=[video_input, scale_percent, line_start_x, line_start_y, line_end_x, line_end_y, line_thickness, draw_tracks, view_img, view_in_counts, view_out_counts, track_thickness, region_thickness, line_dist_thresh, persist, conf, iou, classes_to_track, verbose], outputs=[video_output, download_button]) # Add the example button right below the video input example_button.click(use_example_video, outputs=[video_input]) demo.launch()