Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import numpy as np | |
import pandas as pd | |
from IPython import display | |
import faiss | |
import torch | |
from transformers import CLIPTokenizer, CLIPTextModelWithProjection | |
HTML=""" | |
<!DOCTYPE html> | |
<html> | |
<style> | |
.container { | |
align-items: center; | |
justify-content: center; | |
} | |
img { | |
max-width: 10%; | |
max-height:10%; | |
float: left; | |
} | |
.text { | |
font-size: 20px; | |
padding-top: 12%; | |
padding-left: 20px; | |
float: left; | |
} | |
</style> | |
<body> | |
<div class="container"> | |
<div class="image"> | |
<img src="https://huggingface.co/spaces/Diangle/Clip4Clip-webvid/resolve/main/Searchium.png" width="333" height="216"> | |
</div> | |
<div class="text"> | |
<h1 style="font-size: 64px;"> Video Retrieval </h1> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
DESCRIPTION="""This is a video retrieval demo using [Diangle/clip4clip-webvid](https://huggingface.co/Diangle/clip4clip-webvid).""" | |
DATA_PATH = './data' | |
ft_visual_features_file = DATA_PATH + '/dataset_v1_visual_features_database.npy' | |
#load database features: | |
ft_visual_features_database = np.load(ft_visual_features_file) | |
database_csv_path = os.path.join(DATA_PATH, 'dataset_v1.csv') | |
database_df = pd.read_csv(database_csv_path) | |
class NearestNeighbors: | |
""" | |
Class for NearestNeighbors. | |
""" | |
def __init__(self, n_neighbors=10, metric='cosine', rerank_from=-1): | |
""" | |
metric = 'cosine' / 'binary' | |
if metric ~= 'cosine' and rerank_from > n_neighbors then a cosine rerank will be performed | |
""" | |
self.n_neighbors = n_neighbors | |
self.metric = metric | |
self.rerank_from = rerank_from | |
def normalize(self, a): | |
return a / np.sum(a**2, axis=1, keepdims=True) | |
def fit(self, data, o_data=None): | |
if self.metric == 'cosine': | |
data = self.normalize(data) | |
self.index = faiss.IndexFlatIP(data.shape[1]) | |
elif self.metric == 'binary': | |
self.o_data = data if o_data is None else o_data | |
#assuming data already packed | |
self.index = faiss.IndexBinaryFlat(data.shape[1]*8) | |
self.index.add(np.ascontiguousarray(data)) | |
def kneighbors(self, q_data): | |
if self.metric == 'cosine': | |
q_data = self.normalize(q_data) | |
sim, idx = self.index.search(q_data, self.n_neighbors) | |
else: | |
if self.metric == 'binary': | |
print('This is binary search.') | |
bq_data = np.packbits((q_data > 0.0).astype(bool), axis=1) | |
sim, idx = self.index.search(bq_data, max(self.rerank_from, self.n_neighbors)) | |
if self.rerank_from > self.n_neighbors: | |
re_sims = np.zeros([len(q_data), self.n_neighbors], dtype=float) | |
re_idxs = np.zeros([len(q_data), self.n_neighbors], dtype=float) | |
for i, q in enumerate(q_data): | |
rerank_data = self.o_data[idx[i]] | |
rerank_search = NearestNeighbors(n_neighbors=self.n_neighbors, metric='cosine') | |
rerank_search.fit(rerank_data) | |
re_sim, re_idx = rerank_search.kneighbors(np.asarray([q])) | |
re_sims[i, :] = re_sim | |
re_idxs[i, :] = idx[i][re_idx] | |
idx = re_idxs | |
sim = re_sims | |
return sim, idx | |
model = CLIPTextModelWithProjection.from_pretrained("Diangle/clip4clip-webvid") | |
tokenizer = CLIPTokenizer.from_pretrained("Diangle/clip4clip-webvid") | |
def search(search_sentence): | |
inputs = tokenizer(text=search_sentence , return_tensors="pt", padding=True) | |
outputs = model(input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], return_dict=False) | |
text_projection = model.state_dict()['text_projection.weight'] | |
text_embeds = outputs[1] @ text_projection | |
final_output = text_embeds[torch.arange(text_embeds.shape[0]), inputs["input_ids"].argmax(dim=-1)] | |
# Normalization | |
final_output = final_output / final_output.norm(dim=-1, keepdim=True) | |
final_output = final_output.cpu().detach().numpy() | |
sequence_output = final_output / np.sum(final_output**2, axis=1, keepdims=True) | |
nn_search = NearestNeighbors(n_neighbors=5, metric='binary', rerank_from=100) | |
nn_search.fit(np.packbits((ft_visual_features_database > 0.0).astype(bool), axis=1), o_data=ft_visual_features_database) | |
sims, idxs = nn_search.kneighbors(sequence_output) | |
# print(database_df.iloc[idxs[0]]['contentUrl']) | |
urls = database_df.iloc[idxs[0]]['contentUrl'].to_list() | |
AUTOPLAY_VIDEOS = [] | |
for url in urls: | |
AUTOPLAY_VIDEOS.append("""<video controls muted autoplay> | |
<source src={} type="video/mp4"> | |
</video>""".format(url)) | |
return AUTOPLAY_VIDEOS | |
with gr.Blocks() as demo: | |
gr.HTML(HTML) | |
gr.Markdown(DESCRIPTION) | |
gr.Markdown("Retrieval of top 5 videos relevant to the input sentence: ") | |
with gr.Row(): | |
with gr.Column(): | |
inp = gr.Textbox(placeholder="Write a sentence.") | |
btn = gr.Button(value="Retrieve") | |
ex = [["mind-blowing magic tricks"],["baking chocolate cake"], | |
["birds fly in the sky"], ["natural wonders of the world"]] | |
gr.Examples(examples=ex, | |
inputs=[inp] | |
) | |
with gr.Column(): | |
out = [gr.HTML() for _ in range(5)] | |
btn.click(search, inputs=inp, outputs=out) | |
demo.launch() |