Spaces:
Runtime error
Runtime error
from typing import List | |
import cv2 | |
import torch | |
import numpy as np | |
from tqdm import tqdm | |
import supervision as sv | |
import torch.nn.functional as F | |
from transformers import AutoModel | |
from sklearn.decomposition import PCA | |
from torchvision import transforms as T | |
from sklearn.preprocessing import MinMaxScaler | |
def load_video_frames(video_path: str) -> List[np.ndarray]: | |
frames = [] | |
for frame in tqdm(sv.get_video_frames_generator(source_path=video_path), unit=" frames"): | |
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
return frames | |
def preprocess(image: np.ndarray, n_patches: int, device: str, patch_size: int = 14) -> torch.Tensor: | |
IMAGENET_DEFAULT_MEAN = (0.485, 0.456, 0.406) | |
IMAGENET_DEFAULT_STD = (0.229, 0.224, 0.225) | |
transform = T.Compose([ | |
T.Resize((n_patches * patch_size, n_patches * patch_size)), | |
T.Normalize(mean=IMAGENET_DEFAULT_MEAN, std=IMAGENET_DEFAULT_STD), | |
]) | |
img = torch.from_numpy(image).type(torch.float).permute(2, 0, 1) / 255 | |
img_tensor = transform(img).unsqueeze(0).to(device) | |
return img_tensor | |
def process_video( | |
model: AutoModel, | |
video: str | List[np.ndarray], | |
is_larger: bool = True, | |
batch_size: int = 4, | |
threshold: float = 0.5, | |
n_patches: int = 40, | |
interpolate: bool = False, | |
device: str = "cpu" | |
) -> List[np.ndarray]: | |
# NP = N_PATCHES | |
# P = PATCH_SIZE | |
if isinstance(video, str): | |
frames = load_video_frames(video) | |
else: | |
frames = video | |
patch_size = model.config.patch_size | |
original_height = frames[0].shape[0] # C, H, W | |
original_width = frames[0].shape[1] # C, H, W | |
final_frames = [] | |
pca = PCA(n_components=3) | |
scaler = MinMaxScaler(clip=True) | |
for i in range(len(frames)//batch_size): | |
batch = frames[i*batch_size:batch_size*(i+1)] | |
pixel_values = [ | |
preprocess(f, n_patches, device, patch_size).squeeze(0) for f in batch | |
] | |
pixel_values = torch.stack(pixel_values) # B, C, NP * P, NP * P | |
with torch.no_grad(): | |
out = model(pixel_values=pixel_values) | |
features = out.last_hidden_state[:, 1:] # B, P * P, HIDDEN_DIM | |
features = features.cpu().numpy() | |
features = features.reshape(batch_size * n_patches * n_patches, -1) # B * P * P, HIDDEN_DIM | |
pca_features = pca.fit_transform(features) | |
pca_features = scaler.fit_transform(pca_features) | |
if is_larger: | |
pca_features_bg = pca_features[:, 0] > threshold | |
else: | |
pca_features_bg = pca_features[:, 0] < threshold | |
pca_features_fg = ~pca_features_bg | |
pca_features_fg_seg = pca.fit_transform(features[pca_features_fg]) | |
pca_features_fg_seg = scaler.fit_transform(pca_features_fg_seg) | |
pca_features_rgb = np.zeros((batch_size * n_patches * n_patches, 3)) | |
pca_features_rgb[pca_features_bg] = 0 | |
pca_features_rgb[pca_features_fg] = pca_features_fg_seg | |
pca_features_rgb = pca_features_rgb.reshape(batch_size, n_patches, n_patches, 3) | |
if interpolate: | |
# transformed into torch tensor | |
pca_features_rgb = torch.from_numpy(pca_features_rgb) # B, P, P, 3 | |
# reshaped to B, C, P, P | |
pca_features_rgb = pca_features_rgb.permute(0, 3, 1, 2) | |
# interpolate to B, C, H, W | |
# reshaped to B, H, W, C | |
# unbind to a list of len B with np.ndarray of shape H, W, C | |
pca_features_rgb = F.interpolate( | |
pca_features_rgb, | |
size=(original_height, original_width), | |
mode='bilinear', | |
align_corners=False | |
).permute(0, 2, 3, 1).unbind(0) | |
# Fixing range to np.uint8 | |
else: | |
pca_features_rgb = [f for f in pca_features_rgb] | |
# Adding to final_frames list | |
final_frames.extend(pca_features_rgb) | |
return final_frames | |
def create_video_from_frames_rgb( | |
frame_list: List[np.ndarray], | |
output_filename: str = "animation.mp4", | |
fps: int = 15 | |
) -> str: | |
# Get the shape of the frames to determine video dimensions | |
frame_height, frame_width, _ = frame_list[0].shape | |
# Define the codec and create a VideoWriter object | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can change the codec as needed | |
out = cv2.VideoWriter(output_filename, fourcc, fps, (frame_width, frame_height)) | |
for frame in frame_list: | |
# Write the frame to the video file | |
out.write(np.uint8(frame*255)) | |
# Release the VideoWriter object | |
out.release() | |
return output_filename |