Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
import copy | |
import os | |
from PIL import Image | |
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
from typing import Tuple, Optional | |
from my_model.utilities.gen_utilities import free_gpu_resources | |
from my_model.captioner.image_captioning import ImageCaptioningModel | |
from my_model.object_detection import ObjectDetector | |
import my_model.config.kbvqa_config as config | |
class KBVQA: | |
""" | |
The KBVQA class encapsulates the functionality for the Knowledge-Based Visual Question Answering (KBVQA) model. | |
It integrates various components such as an image captioning model, object detection model, and a fine-tuned | |
language model (LLAMA2) on OK-VQA dataset for generating answers to visual questions. | |
Attributes: | |
kbvqa_model_name (str): Name of the fine-tuned language model used for KBVQA. | |
quantization (str): The quantization setting for the model (e.g., '4bit', '8bit'). | |
max_context_window (int): The maximum number of tokens allowed in the model's context window. | |
add_eos_token (bool): Flag to indicate whether to add an end-of-sentence token to the tokenizer. | |
trust_remote (bool): Flag to indicate whether to trust remote code when using the tokenizer. | |
use_fast (bool): Flag to indicate whether to use the fast version of the tokenizer. | |
low_cpu_mem_usage (bool): Flag to optimize model loading for low CPU memory usage. | |
kbvqa_tokenizer (Optional[AutoTokenizer]): The tokenizer for the KBVQA model. | |
captioner (Optional[ImageCaptioningModel]): The model used for generating image captions. | |
detector (Optional[ObjectDetector]): The object detection model. | |
detection_model (Optional[str]): The name of the object detection model. | |
detection_confidence (Optional[float]): The confidence threshold for object detection. | |
kbvqa_model (Optional[AutoModelForCausalLM]): The fine-tuned language model for KBVQA. | |
bnb_config (BitsAndBytesConfig): Configuration for BitsAndBytes optimized model. | |
access_token (str): Access token for Hugging Face API. | |
current_prompt_length (int): Prompt length. | |
Methods: | |
create_bnb_config: Creates a BitsAndBytes configuration based on the quantization setting. | |
load_caption_model: Loads the image captioning model. | |
get_caption: Generates a caption for a given image. | |
load_detector: Loads the object detection model. | |
detect_objects: Detects objects in a given image. | |
load_fine_tuned_model: Loads the fine-tuned KBVQA model along with its tokenizer. | |
all_models_loaded: Checks if all the required models are loaded. | |
force_reload_model: Forces a reload of all models, freeing up GPU resources. | |
format_prompt: Formats the prompt for the KBVQA model. | |
generate_answer: Generates an answer to a given question using the KBVQA model. | |
""" | |
def __init__(self): | |
# self.col1, self.col2, self.col3 = st.columns([0.2, 0.6, 0.2]) | |
if st.session_state["method"] == "7b-Fine-Tuned Model": | |
self.kbvqa_model_name: str = config.KBVQA_MODEL_NAME_7b | |
elif st.session_state["method"] == "13b-Fine-Tuned Model": | |
self.kbvqa_model_name: str = config.KBVQA_MODEL_NAME_13b | |
self.quantization: str = config.QUANTIZATION | |
self.max_context_window: int = config.MAX_CONTEXT_WINDOW | |
self.add_eos_token: bool = config.ADD_EOS_TOKEN | |
self.trust_remote: bool = config.TRUST_REMOTE | |
self.use_fast: bool = config.USE_FAST | |
self.low_cpu_mem_usage: bool = config.LOW_CPU_MEM_USAGE | |
self.kbvqa_tokenizer: Optional[AutoTokenizer] = None | |
self.captioner: Optional[ImageCaptioningModel] = None | |
self.detector: Optional[ObjectDetector] = None | |
self.detection_model: Optional[str] = None | |
self.detection_confidence: Optional[float] = None | |
self.kbvqa_model: Optional[AutoModelForCausalLM] = None | |
self.bnb_config: BitsAndBytesConfig = self.create_bnb_config() | |
self.access_token: str = config.HUGGINGFACE_TOKEN | |
self.current_prompt_length = None | |
def create_bnb_config(self) -> BitsAndBytesConfig: | |
""" | |
Creates a BitsAndBytes configuration based on the quantization setting. | |
Returns: | |
BitsAndBytesConfig: Configuration for BitsAndBytes optimized model. | |
""" | |
if self.quantization == '4bit': | |
return BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_compute_dtype=torch.bfloat16 | |
) | |
elif self.quantization == '8bit': | |
return BitsAndBytesConfig( | |
load_in_8bit=True, | |
bnb_8bit_use_double_quant=True, | |
bnb_8bit_quant_type="nf4", | |
bnb_8bit_compute_dtype=torch.bfloat16 | |
) | |
def load_caption_model(self) -> None: | |
""" | |
Loads the image captioning model into the KBVQA instance. | |
""" | |
self.captioner = ImageCaptioningModel() | |
self.captioner.load_model() | |
free_gpu_resources() | |
def get_caption(self, img: Image.Image) -> str: | |
""" | |
Generates a caption for a given image using the image captioning model. | |
Args: | |
img (PIL.Image.Image): The image for which to generate a caption. | |
Returns: | |
str: The generated caption for the image. | |
""" | |
caption = self.captioner.generate_caption(img) | |
free_gpu_resources() | |
return caption | |
def load_detector(self, model: str) -> None: | |
""" | |
Loads the object detection model. | |
Args: | |
model (str): The name of the object detection model to load. | |
""" | |
self.detector = ObjectDetector() | |
self.detector.load_model(model) | |
free_gpu_resources() | |
def detect_objects(self, img: Image.Image) -> Tuple[Image.Image, str]: | |
""" | |
Detects objects in a given image using the loaded object detection model. | |
Args: | |
img (PIL.Image.Image): The image in which to detect objects. | |
Returns: | |
tuple: A tuple containing the image with detected objects drawn and a string representation of detected objects. | |
""" | |
image = self.detector.process_image(img) | |
free_gpu_resources() | |
detected_objects_string, detected_objects_list = self.detector.detect_objects(image, threshold=st.session_state['confidence_level']) | |
free_gpu_resources() | |
image_with_boxes = self.detector.draw_boxes(img, detected_objects_list) | |
free_gpu_resources() | |
return image_with_boxes, detected_objects_string | |
def load_fine_tuned_model(self) -> None: | |
""" | |
Loads the fine-tuned KBVQA model along with its tokenizer. | |
""" | |
self.kbvqa_model = AutoModelForCausalLM.from_pretrained(self.kbvqa_model_name, | |
device_map="auto", | |
low_cpu_mem_usage=True, | |
quantization_config=self.bnb_config, | |
token=self.access_token) | |
free_gpu_resources() | |
self.kbvqa_tokenizer = AutoTokenizer.from_pretrained(self.kbvqa_model_name, | |
use_fast=self.use_fast, | |
low_cpu_mem_usage=True, | |
trust_remote_code=self.trust_remote, | |
add_eos_token=self.add_eos_token, | |
token=self.access_token) | |
free_gpu_resources() | |
def all_models_loaded(self): | |
""" | |
Checks if all the required models (KBVQA, captioner, detector) are loaded. | |
Returns: | |
bool: True if all models are loaded, False otherwise. | |
""" | |
return self.kbvqa_model is not None and self.captioner is not None and self.detector is not None | |
def format_prompt(self, current_query: str, history: Optional[str] = None, sys_prompt: Optional[str] = None, caption: str = None, objects: Optional[str] = None) -> str: | |
""" | |
Formats the prompt for the KBVQA model based on the provided parameters. | |
Args: | |
current_query (str): The current question to be answered. | |
history (str, optional): The history of previous interactions. | |
sys_prompt (str, optional): The system prompt or instructions for the model. | |
caption (str, optional): The caption of the image. | |
objects (str, optional): The detected objects in the image. | |
Returns: | |
str: The formatted prompt for the KBVQA model. | |
""" | |
B_SENT = '<s>' | |
E_SENT = '</s>' | |
B_INST = '[INST]' | |
E_INST = '[/INST]' | |
B_SYS = '<<SYS>>\n' | |
E_SYS = '\n<</SYS>>\n\n' | |
B_CAP = '[CAP]' | |
E_CAP = '[/CAP]' | |
B_QES = '[QES]' | |
E_QES = '[/QES]' | |
B_OBJ = '[OBJ]' | |
E_OBJ = '[/OBJ]' | |
current_query = current_query.strip() | |
if sys_prompt is None: | |
sys_prompt = config.SYSTEM_PROMPT.strip() | |
if history is None: | |
if objects is None: | |
p = f"""{B_SENT}{B_INST} {B_SYS}{sys_prompt}{E_SYS}{B_CAP}{caption}{E_CAP}{B_QES}{current_query}{E_QES}{E_INST}""" | |
else: | |
p = f"""{B_SENT}{B_INST} {B_SYS}{sys_prompt}{E_SYS}{B_CAP}{caption}{E_CAP}{B_OBJ}{objects}{E_OBJ}{B_QES}taking into consideration the objects with high certainty, {current_query}{E_QES}{E_INST}""" | |
else: | |
p = f"""{history}\n{B_SENT}{B_INST} {B_QES}{current_query}{E_QES}{E_INST}""" | |
return p | |
def trim_objects(detected_objects_str): | |
""" | |
Trim the last object from the detected objects string. | |
Args: | |
- detected_objects_str (str): String containing detected objects. | |
Returns: | |
- (str): The string with the last object removed. | |
""" | |
objects = detected_objects_str.strip().split("\n") | |
if len(objects) >= 1: | |
return "\n".join(objects[:-1]) | |
return "" | |
def generate_answer(self, question: str, caption: str, detected_objects_str: str) -> str: | |
""" | |
Generates an answer to a given question using the KBVQA model. | |
Args: | |
question (str): The question to be answered. | |
caption (str): The caption of the image related to the question. | |
detected_objects_str (str): The string representation of detected objects in the image. | |
Returns: | |
str: The generated answer to the question. | |
""" | |
free_gpu_resources() | |
prompt = self.format_prompt(question, caption=caption, objects=detected_objects_str) | |
num_tokens = len(self.kbvqa_tokenizer.tokenize(prompt)) | |
self.current_prompt_length = num_tokens | |
if self.current_prompt_length > self.max_context_window: | |
trim = True | |
st.warning(f"Prompt length is {self.current_prompt_length} which is larger than the maximum context window of LLaMA-2, objects detected with low confidence will be removed one at a time until the prompt length is within the maximum context window ...") | |
while self.current_prompt_length > self.max_context_window: | |
detected_objects_str = self.trim_objects(detected_objects_str) | |
prompt = self.format_prompt(question, caption=caption, objects=detected_objects_str) | |
self.current_prompt_length = len(self.kbvqa_tokenizer.tokenize(prompt)) | |
if detected_objects_str == "": | |
break # Break if no objects are left | |
if trim: | |
st.warning(f"New prompt length is: {self.current_prompt_length}") | |
trim = False | |
model_inputs = self.kbvqa_tokenizer(prompt, add_special_tokens=False, return_tensors="pt").to('cuda') | |
free_gpu_resources() | |
input_ids = model_inputs["input_ids"] | |
output_ids = self.kbvqa_model.generate(input_ids) | |
free_gpu_resources() | |
index = input_ids.shape[1] # needed to avoid printing the input prompt | |
history = self.kbvqa_tokenizer.decode(output_ids[0], skip_special_tokens=False) | |
output_text = self.kbvqa_tokenizer.decode(output_ids[0][index:], skip_special_tokens=True) | |
return output_text.capitalize() | |
def prepare_kbvqa_model(only_reload_detection_model: bool = False, force_reload: bool = False) -> KBVQA: | |
""" | |
Prepares the KBVQA model for use, including loading necessary sub-models. | |
Args: | |
only_reload_detection_model (bool): If True, only the object detection model is reloaded. | |
Returns: | |
KBVQA: An instance of the KBVQA model ready for inference. | |
""" | |
if force_reload: | |
loading_message = 'Force Reloading model.. this should take no more than a few minutes!' | |
try: | |
del kbvqa | |
except: | |
free_gpu_resources() | |
pass | |
free_gpu_resources() | |
else: loading_message = 'Looading model.. this should take no more than 2 or 3 minutes!' | |
free_gpu_resources() | |
kbvqa = KBVQA() | |
kbvqa.detection_model = st.session_state.detection_model | |
# Progress bar for model loading | |
with st.spinner(loading_message): | |
if not only_reload_detection_model: | |
progress_bar = st.progress(0) | |
kbvqa.load_detector(kbvqa.detection_model) | |
progress_bar.progress(33) | |
kbvqa.load_caption_model() | |
free_gpu_resources() | |
progress_bar.progress(75) | |
st.text('Almost there :)') | |
kbvqa.load_fine_tuned_model() | |
free_gpu_resources() | |
progress_bar.progress(100) | |
else: | |
free_gpu_resources() | |
progress_bar = st.progress(0) | |
kbvqa.load_detector(kbvqa.detection_model) | |
progress_bar.progress(100) | |
if kbvqa.all_models_loaded: | |
st.success('Model loaded successfully and ready for inferecne!') | |
kbvqa.kbvqa_model.eval() | |
free_gpu_resources() | |
return kbvqa | |