lighthouse_demo / app.py
awkrail's picture
fix lib
5bd932e
"""
Copyright $today.year LY Corporation
LY Corporation licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
"""
import os
import torch
import subprocess
import ffmpeg
import pandas as pd
import gradio as gr
from tqdm import tqdm
from lighthouse.models import *
# use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_NAMES = ['cg_detr', 'moment_detr', 'eatr', 'qd_detr', 'tr_detr', 'uvcom']
FEATURES = ['clip', 'clip_slowfast']
TOPK_MOMENT = 5
TOPK_HIGHLIGHT = 5
"""
Helper functions
"""
def load_pretrained_weights():
file_urls = []
for model_name in MODEL_NAMES:
for feature in FEATURES:
file_urls.append(
"https://zenodo.org/records/13363606/files/{}_{}_qvhighlight.ckpt".format(feature, model_name)
)
for file_url in tqdm(file_urls):
if not os.path.exists('weights/' + os.path.basename(file_url)):
command = 'wget -P weights/ {}'.format(file_url)
subprocess.run(command, shell=True)
# Slowfast weights
if not os.path.exists('SLOWFAST_8x8_R50.pkl'):
subprocess.run('wget https://dl.fbaipublicfiles.com/pyslowfast/model_zoo/kinetics400/SLOWFAST_8x8_R50.pkl', shell=True)
return file_urls
def flatten(array2d):
list1d = []
for elem in array2d:
list1d += elem
return list1d
"""
Model initialization
"""
load_pretrained_weights()
model = CGDETRPredictor('weights/clip_cg_detr_qvhighlight.ckpt', device=device,
feature_name='clip', slowfast_path='SLOWFAST_8x8_R50.pkl')
js_codes = ["""() => {{
let moment_text = document.getElementById('result_{}').textContent;
var replaced_text = moment_text.replace(/moment..../, '').replace(/\ Score.*/, '');
let start_end = JSON.parse(replaced_text);
document.getElementsByTagName("video")[0].currentTime = start_end[0];
document.getElementsByTagName("video")[0].play();
}}""".format(i) for i in range(TOPK_MOMENT)]
"""
Gradio functions
"""
def video_upload(video):
if video is None:
model.video_feats = None
model.video_mask = None
model.video_path = None
yield gr.update(value="Removed the video", visible=True)
else:
yield gr.update(value="Processing the video. Wait for a minute...", visible=True)
model.encode_video(video)
yield gr.update(value="Finished video processing!", visible=True)
def model_load(radio):
if radio is not None:
yield gr.update(value="Loading new model. Wait for a minute...", visible=True)
global model
feature, model_name = radio.split('+')
feature, model_name = feature.strip(), model_name.strip()
if model_name == 'moment_detr':
model_class = MomentDETRPredictor
elif model_name == 'qd_detr':
model_class = QDDETRPredictor
elif model_name == 'eatr':
model_class = EaTRPredictor
elif model_name == 'tr_detr':
model_class = TRDETRPredictor
elif model_name == 'uvcom':
model_class = UVCOMPredictor
elif model_name == 'taskweave':
model_class = TaskWeavePredictor
elif model_name == 'cg_detr':
model_class = CGDETRPredictor
else:
raise gr.Error("Select from the models")
model = model_class('weights/{}_{}_qvhighlight.ckpt'.format(feature, model_name),
device=device, feature_name='{}'.format(feature), slowfast_path='SLOWFAST_8x8_R50.pkl')
yield gr.update(value="Model loaded: {}".format(radio), visible=True)
def predict(textbox, line, gallery):
prediction = model.predict(textbox)
if prediction is None:
raise gr.Error('Upload the video before pushing the `Retrieve moment & highlight detection` button.')
else:
mr_results = prediction['pred_relevant_windows']
hl_results = prediction['pred_saliency_scores']
buttons = []
for i, pred in enumerate(mr_results[:TOPK_MOMENT]):
buttons.append(gr.Button(value='moment {}: [{}, {}] Score: {}'.format(i+1, pred[0], pred[1], pred[2]), visible=True))
# Visualize the HD score
seconds = [model.clip_len * i for i in range(len(hl_results))]
hl_data = pd.DataFrame({ 'second': seconds, 'saliency_score': hl_results })
min_val, max_val = min(hl_results), max(hl_results) + 1
min_x, max_x = min(seconds), max(seconds)
line = gr.LinePlot(value=hl_data, x='second', y='saliency_score', visible=True, y_lim=[min_val, max_val], x_lim=[min_x, max_x])
# Show highlight frames
n_largest_df = hl_data.nlargest(columns='saliency_score', n=TOPK_HIGHLIGHT)
highlighted_seconds = n_largest_df.second.tolist()
highlighted_scores = n_largest_df.saliency_score.tolist()
output_image_paths = []
for i, (second, score) in enumerate(zip(highlighted_seconds, highlighted_scores)):
output_path = "highlight_frames/highlight_{}.png".format(i)
(
ffmpeg
.input(model.video_path, ss=second)
.output(output_path, vframes=1, qscale=2)
.global_args('-loglevel', 'quiet', '-y')
.run()
)
output_image_paths.append((output_path, "Highlight: {} - score: {:.02f}".format(i+1, score)))
gallery = gr.Gallery(value=output_image_paths, label='gradio', columns=5, show_download_button=True, visible=True)
return buttons + [line, gallery]
def main():
title = """# Moment Retrieval & Highlight Detection Demo"""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(title)
with gr.Row():
with gr.Column():
with gr.Group():
gr.Markdown("## Model selection")
radio_list = flatten([["{} + {}".format(feature, model_name) for model_name in MODEL_NAMES] for feature in FEATURES])
radio = gr.Radio(radio_list, label="models", value="clip + cg_detr", info="Which model do you want to use?")
load_status_text = gr.Textbox(label='Model load status', value='Model loaded: clip + cg_detr')
with gr.Group():
gr.Markdown("## Video and query")
video_input = gr.Video(elem_id='video', height=600)
output = gr.Textbox(label='Video processing progress')
query_input = gr.Textbox(label='query')
button = gr.Button("Retrieve moment & highlight detection", variant="primary")
with gr.Column():
with gr.Group():
gr.Markdown("## Retrieved moments")
button_1 = gr.Button(value='moment 1', visible=False, elem_id='result_0')
button_2 = gr.Button(value='moment 2', visible=False, elem_id='result_1')
button_3 = gr.Button(value='moment 3', visible=False, elem_id='result_2')
button_4 = gr.Button(value='moment 4', visible=False, elem_id='result_3')
button_5 = gr.Button(value='moment 5', visible=False, elem_id='result_4')
button_1.click(None, None, None, js=js_codes[0])
button_2.click(None, None, None, js=js_codes[1])
button_3.click(None, None, None, js=js_codes[2])
button_4.click(None, None, None, js=js_codes[3])
button_5.click(None, None, None, js=js_codes[4])
# dummy
with gr.Group():
gr.Markdown("## Saliency score")
line = gr.LinePlot(value=pd.DataFrame({'x': [], 'y': []}), x='x', y='y', visible=False)
gr.Markdown("### Highlighted frames")
gallery = gr.Gallery(value=[], label="highlight", columns=5, visible=False)
video_input.change(video_upload, inputs=[video_input], outputs=output)
radio.select(model_load, inputs=[radio], outputs=load_status_text)
button.click(predict,
inputs=[query_input, line, gallery],
outputs=[button_1, button_2, button_3, button_4, button_5, line, gallery])
demo.launch()
if __name__ == "__main__":
main()