|
import streamlit as st |
|
import torch |
|
import bitsandbytes |
|
import accelerate |
|
import scipy |
|
import copy |
|
from PIL import Image |
|
import torch.nn as nn |
|
import pandas as pd |
|
from my_model.object_detection import detect_and_draw_objects |
|
from my_model.captioner.image_captioning import get_caption |
|
from my_model.gen_utilities import free_gpu_resources |
|
from my_model.KBVQA import KBVQA, prepare_kbvqa_model |
|
from my_model.utilities.st_utils import UIManager, StateManager |
|
|
|
|
|
|
|
def answer_question(caption, detected_objects_str, question, model): |
|
|
|
answer = model.generate_answer(question, caption, detected_objects_str) |
|
return answer |
|
|
|
|
|
|
|
sample_images = ["Files/sample1.jpg", "Files/sample2.jpg", "Files/sample3.jpg", |
|
"Files/sample4.jpg", "Files/sample5.jpg", "Files/sample6.jpg", |
|
"Files/sample7.jpg"] |
|
|
|
|
|
|
|
def analyze_image(image, model): |
|
|
|
img = copy.deepcopy(image) |
|
caption = model.get_caption(img) |
|
image_with_boxes, detected_objects_str = model.detect_objects(img) |
|
st.text("I am ready, let's talk!") |
|
free_gpu_resources() |
|
|
|
return caption, detected_objects_str, image_with_boxes |
|
|
|
|
|
def image_qa_app(kbvqa): |
|
if 'images_data' not in st.session_state: |
|
st.session_state['images_data'] = {} |
|
|
|
|
|
st.write("Choose from sample images:") |
|
cols = st.columns(len(sample_images)) |
|
for idx, sample_image_path in enumerate(sample_images): |
|
with cols[idx]: |
|
image = Image.open(sample_image_path) |
|
st.image(image, use_column_width=True) |
|
if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx}'): |
|
process_new_image(sample_image_path, image, kbvqa) |
|
|
|
|
|
uploaded_image = st.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) |
|
if uploaded_image is not None: |
|
process_new_image(uploaded_image.name, Image.open(uploaded_image), kbvqa) |
|
|
|
|
|
for image_key, image_data in st.session_state['images_data'].items(): |
|
st.image(image_data['image'], caption=f'Uploaded Image: {image_key[-11:]}', use_column_width=True) |
|
if not image_data['analysis_done']: |
|
st.text("Cool image, please click 'Analyze Image'..") |
|
if st.button('Analyze Image', key=f'analyze_{image_key}'): |
|
caption, detected_objects_str, image_with_boxes = analyze_image(image_data['image'], kbvqa) |
|
image_data['caption'] = caption |
|
image_data['detected_objects_str'] = detected_objects_str |
|
image_data['analysis_done'] = True |
|
|
|
|
|
qa_history = image_data.get('qa_history', []) |
|
|
|
if image_data['analysis_done']: |
|
question = st.text_input(f"Ask a question about this image ({image_key[-11:]}):", key=f'question_{image_key}') |
|
if st.button('Get Answer', key=f'answer_{image_key}'): |
|
if question not in [q for q, _ in qa_history]: |
|
answer = answer_question(image_data['caption'], image_data['detected_objects_str'], question, kbvqa) |
|
qa_history.append((question, answer)) |
|
image_data['qa_history'] = qa_history |
|
else: |
|
st.info("This question has already been asked.") |
|
|
|
|
|
for q, a in qa_history: |
|
st.text(f"Q: {q}\nA: {a}\n") |
|
|
|
|
|
def process_new_image(image_key, image, kbvqa): |
|
"""Process a new image and update the session state.""" |
|
if image_key not in st.session_state['images_data']: |
|
st.session_state['images_data'][image_key] = { |
|
'image': image, |
|
'caption': '', |
|
'detected_objects_str': '', |
|
'qa_history': [], |
|
'analysis_done': False |
|
} |
|
|
|
def run_inference(): |
|
st.title("Run Inference") |
|
st.write("Please note that this is not a general purpose model, it is specifically trained on OK-VQA dataset and is designed to give direct and short answers to the given questions.") |
|
|
|
method = st.selectbox( |
|
"Choose a method:", |
|
["Fine-Tuned Model", "In-Context Learning (n-shots)"], |
|
index=0 |
|
) |
|
|
|
detection_model = st.selectbox( |
|
"Choose a model for objects detection:", |
|
["yolov5", "detic"], |
|
index=1 |
|
) |
|
|
|
default_confidence = 0.2 if detection_model == "yolov5" else 0.4 |
|
confidence_level = st.slider( |
|
"Select minimum detection confidence level", |
|
min_value=0.1, |
|
max_value=0.9, |
|
value=default_confidence, |
|
step=0.1 |
|
) |
|
|
|
if 'model_settings' not in st.session_state: |
|
st.session_state['model_settings'] = {'detection_model': detection_model, 'confidence_level': confidence_level} |
|
|
|
settings_changed = (st.session_state['model_settings']['detection_model'] != detection_model or |
|
st.session_state['model_settings']['confidence_level'] != confidence_level) |
|
|
|
need_model_reload = settings_changed and 'kbvqa' in st.session_state and st.session_state['kbvqa'] is not None |
|
|
|
if need_model_reload: |
|
st.text("Model Settings have changed, please reload the model, this will take no time :)") |
|
|
|
button_label = "Reload Model" if need_model_reload else "Load Model" |
|
|
|
if method == "Fine-Tuned Model": |
|
if 'kbvqa' not in st.session_state: |
|
st.session_state['kbvqa'] = None |
|
|
|
if st.button(button_label): |
|
|
|
free_gpu_resources() |
|
if st.session_state['kbvqa'] is not None: |
|
if not settings_changed: |
|
st.write("Model already loaded.") |
|
else: |
|
free_gpu_resources() |
|
detection_model = st.session_state['model_settings']['detection_model'] |
|
confidence_level = st.session_state['model_settings']['confidence_level'] |
|
prepare_kbvqa_model(detection_model, only_reload_detection_model=True) |
|
st.session_state['kbvqa'].detection_confidence = confidence_level |
|
free_gpu_resources() |
|
else: |
|
st.text("Loading the model will take no more than a few minutes . .") |
|
st.session_state['kbvqa'] = prepare_kbvqa_model(detection_model) |
|
st.session_state['kbvqa'].detection_confidence = confidence_level |
|
st.session_state['model_settings'] = {'detection_model': detection_model, 'confidence_level': confidence_level} |
|
st.write("Model is ready for inference.") |
|
free_gpu_resources() |
|
|
|
|
|
|
|
if st.session_state['kbvqa']: |
|
display_model_settings() |
|
display_session_state() |
|
image_qa_app(st.session_state['kbvqa']) |
|
|
|
else: |
|
st.write('Model is not ready yet, will be updated later.') |
|
|
|
|
|
def display_model_settings(): |
|
st.write("### Current Model Settings:") |
|
st.table(pd.DataFrame(st.session_state['model_settings'], index=[0])) |
|
|
|
def display_session_state(): |
|
st.write("### Current Session State:") |
|
|
|
data = [{'Key': key, 'Value': str(value)} for key, value in st.session_state.items()] |
|
|
|
df = pd.DataFrame(data) |
|
st.table(df) |
|
|
|
|
|
|
|
|