Dinov2-Video / utils.py
EduardoPacheco's picture
Fixed bug
350d5de
raw
history blame
4.62 kB
from typing import List
import cv2
import torch
import numpy as np
from tqdm import tqdm
import supervision as sv
import torch.nn.functional as F
from transformers import AutoModel
from sklearn.decomposition import PCA
from torchvision import transforms as T
from sklearn.preprocessing import MinMaxScaler
def load_video_frames(video_path: str) -> List[np.ndarray]:
frames = []
for frame in tqdm(sv.get_video_frames_generator(source_path=video_path), unit=" frames"):
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
return frames
def preprocess(image: np.ndarray, n_patches: int, device: str, patch_size: int = 14) -> torch.Tensor:
IMAGENET_DEFAULT_MEAN = (0.485, 0.456, 0.406)
IMAGENET_DEFAULT_STD = (0.229, 0.224, 0.225)
transform = T.Compose([
T.Resize((n_patches * patch_size, n_patches * patch_size)),
T.Normalize(mean=IMAGENET_DEFAULT_MEAN, std=IMAGENET_DEFAULT_STD),
])
img = torch.from_numpy(image).type(torch.float).permute(2, 0, 1) / 255
img_tensor = transform(img).unsqueeze(0).to(device)
return img_tensor
def process_video(
model: AutoModel,
video: str | List[np.ndarray],
is_larger: bool = True,
batch_size: int = 4,
threshold: float = 0.5,
n_patches: int = 40,
interpolate: bool = False,
device: str = "cpu"
) -> List[np.ndarray]:
# NP = N_PATCHES
# P = PATCH_SIZE
if isinstance(video, str):
frames = load_video_frames(video)
else:
frames = video
patch_size = model.config.patch_size
original_height = frames[0].shape[0] # C, H, W
original_width = frames[0].shape[1] # C, H, W
final_frames = []
pca = PCA(n_components=3)
scaler = MinMaxScaler(clip=True)
for i in range(len(frames)//batch_size):
batch = frames[i*batch_size:batch_size*(i+1)]
pixel_values = [
preprocess(f, n_patches, device, patch_size).squeeze(0) for f in batch
]
pixel_values = torch.stack(pixel_values) # B, C, NP * P, NP * P
with torch.no_grad():
out = model(pixel_values=pixel_values)
features = out.last_hidden_state[:, 1:] # B, P * P, HIDDEN_DIM
features = features.cpu().numpy()
features = features.reshape(batch_size * n_patches * n_patches, -1) # B * P * P, HIDDEN_DIM
pca_features = pca.fit_transform(features)
pca_features = scaler.fit_transform(pca_features)
if is_larger:
pca_features_bg = pca_features[:, 0] > threshold
else:
pca_features_bg = pca_features[:, 0] < threshold
pca_features_fg = ~pca_features_bg
pca_features_fg_seg = pca.fit_transform(features[pca_features_fg])
pca_features_fg_seg = scaler.fit_transform(pca_features_fg_seg)
pca_features_rgb = np.zeros((batch_size * n_patches * n_patches, 3))
pca_features_rgb[pca_features_bg] = 0
pca_features_rgb[pca_features_fg] = pca_features_fg_seg
pca_features_rgb = pca_features_rgb.reshape(batch_size, n_patches, n_patches, 3)
if interpolate:
# transformed into torch tensor
pca_features_rgb = torch.from_numpy(pca_features_rgb) # B, P, P, 3
# reshaped to B, C, P, P
pca_features_rgb = pca_features_rgb.permute(0, 3, 1, 2)
# interpolate to B, C, H, W
# reshaped to B, H, W, C
# unbind to a list of len B with np.ndarray of shape H, W, C
pca_features_rgb = F.interpolate(
pca_features_rgb,
size=(original_height, original_width),
mode='bilinear',
align_corners=False
).permute(0, 2, 3, 1).unbind(0)
# Fixing range to np.uint8
else:
pca_features_rgb = [f for f in pca_features_rgb]
# Adding to final_frames list
final_frames.extend(pca_features_rgb)
return final_frames
def create_video_from_frames_rgb(
frame_list: List[np.ndarray],
output_filename: str = "animation.mp4",
fps: int = 15
) -> str:
# Get the shape of the frames to determine video dimensions
frame_height, frame_width, _ = frame_list[0].shape
# Define the codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can change the codec as needed
out = cv2.VideoWriter(output_filename, fourcc, fps, (frame_width, frame_height))
for frame in frame_list:
# Write the frame to the video file
out.write(np.uint8(frame*255))
# Release the VideoWriter object
out.release()
return output_filename