import pandas as pd import copy import time from PIL import Image import streamlit as st from my_model.utilities.gen_utilities import free_gpu_resources from my_model.KBVQA import KBVQA, prepare_kbvqa_model class StateManager: def __init__(self): # Create three columns with different widths self.col1, self.col2, self.col3 = st.columns([0.2, 0.6, 0.2]) def initialize_state(self): if 'images_data' not in st.session_state: st.session_state['images_data'] = {} if 'kbvqa' not in st.session_state: st.session_state['kbvqa'] = None if "button_label" not in st.session_state: st.session_state['button_label'] = "Load Model" if "previous_state" not in st.session_state: st.session_state['previous_state'] = {} if 'loading_in_progress' not in st.session_state: st.session_state['loading_in_progress'] = False if 'load_button_clicked' not in st.session_state: st.session_state['load_button_clicked'] = False if 'force_reload_button_clicked' not in st.session_state: st.session_state['force_reload_button_clicked'] = False if 'time_taken_to_load_model' not in st.session_state: st.session_state['time_taken_to_load_model'] = None if 'confidence_level_changed' not in st.session_state: st.session_state['confidence_level_changed'] = False if "settings_changed" not in st.session_state: st.session_state['settings_changed'] = self.settings_changed if 'model_loaded' not in st.session_state: st.session_state['model_loaded'] = self.is_model_loaded def set_up_widgets(self): """ Sets up user interface widgets for selecting models, settings, and displaying model settings conditionally. """ self.col1.selectbox("Choose a method:", ["Fine-Tuned Model", "In-Context Learning (n-shots)"], index=0, key='method', disabled=self.is_widget_disabled) detection_model = self.col1.selectbox("Choose a model for objects detection:", ["yolov5", "detic"], index=0, key='detection_model', disabled=self.is_widget_disabled) default_confidence = 0.2 if st.session_state.detection_model == "yolov5" else 0.4 self.set_slider_value(text="Select minimum detection confidence level", min_value=0.1, max_value=0.9, value=default_confidence, step=0.1, slider_key_name='confidence_level', col=self.col1) # Conditional display of model settings show_model_settings = self.col3.checkbox("Show Model Settings", True, disabled=self.is_widget_disabled) if show_model_settings: self.display_model_settings def set_slider_value(self, text, min_value, max_value, value, step, slider_key_name, col=None): """ Creates a slider widget with the specified parameters, optionally placing it in a specific column. Args: text (str): Text to display next to the slider. min_value (float): Minimum value for the slider. max_value (float): Maximum value for the slider. value (float): Initial value for the slider. step (float): Step size for the slider. slider_key_name (str): Unique key for the slider. col (streamlit.columns.Column, optional): Column to place the slider in. Defaults to None (displayed in main area). """ if col is None: return st.slider(text, min_value, max_value, value, step, key=slider_key_name, disabled=self.is_widget_disabledd) else: return col.slider(text, min_value, max_value, value, step, key=slider_key_name, disabled=self.is_widget_disabled) @property def is_widget_disabled(self): return st.session_state['loading_in_progress'] def disable_widgets(self): st.session_state['loading_in_progress'] = True @property def settings_changed(self): """ Checks if any model settings have changed compared to the previous state. Returns: bool: True if any setting has changed, False otherwise. """ return self.has_state_changed() @property def display_model_settings(self): """ Displays a table of current model settings in the third column. """ self.col3.write("##### Current Model Settings:") data = [{'Setting': key, 'Value': str(value)} for key, value in st.session_state.items() if key in ["confidence_level", 'detection_model', 'method', 'kbvqa', 'previous_state', 'settings_changed', 'loading_in_progress', 'model_loaded', 'time_taken_to_load_model' ]] df = pd.DataFrame(data).reset_index(drop=True) return self.col3.write(df) def display_session_state(self): """ Displays a table of the complete application state.. """ st.write("Current Model:") data = [{'Key': key, 'Value': str(value)} for key, value in st.session_state.items()] df = pd.DataFrame(data).reset_index(drop=True) st.write(df) def load_model(self): """ Loads the KBVQA model based on the chosen method and settings. - Frees GPU resources before loading. - Calls `prepare_kbvqa_model` to create the model. - Sets the detection confidence level on the model object. - Updates previous state with current settings for change detection. - Updates the button label to "Reload Model". """ try: free_gpu_resources() st.session_state['kbvqa'] = prepare_kbvqa_model() st.session_state['kbvqa'].detection_confidence = st.session_state.confidence_level # Update the previous state with current session state values st.session_state['previous_state'] = {'method': st.session_state.method, 'detection_model': st.session_state.detection_model, 'confidence_level': st.session_state.confidence_level} st.session_state['model_loaded'] = True st.session_state['button_label'] = "Reload Model" free_gpu_resources() except Exception as e: st.error(f"Error loading model: {e}") def force_reload_model(self): try: self.delete_model() free_gpu_resources() st.session_state['kbvqa'] = prepare_kbvqa_model(force_reload=True) st.session_state['kbvqa'].detection_confidence = st.session_state.confidence_level # Update the previous state with current session state values st.session_state['previous_state'] = {'method': st.session_state.method, 'detection_model': st.session_state.detection_model, 'confidence_level': st.session_state.confidence_level} st.session_state['model_loaded'] = True free_gpu_resources() except Exception as e: st.error(f"Error reloading model: {e}") free_gpu_resources() def delete_model(self): """ Forces a reload of all models, freeing up GPU resources. This method deletes the current models and calls `free_gpu_resources`. """ free_gpu_resources() if self.is_model_loaded: try: del st.session_state['kbvqa'] free_gpu_resources() except: free_gpu_resources() pass # Function to check if any session state values have changed def has_state_changed(self): """ Compares current session state with the previous state to identify changes. Returns: bool: True if any change is found, False otherwise. """ for key in st.session_state['previous_state']: if st.session_state[key] != st.session_state['previous_state'][key]: return True # Found a change else: return False # No changes found def get_model(self): """ Retrieve the KBVQA model from the session state. Returns: KBVQA object: The loaded KBVQA model, or None if not loaded. """ return st.session_state.get('kbvqa', None) @property def is_model_loaded(self): """ Checks if the KBVQA model is loaded in the session state. Returns: bool: True if the model is loaded, False otherwise. """ return 'kbvqa' in st.session_state and st.session_state['kbvqa'] is not None and st.session_state.kbvqa.all_models_loaded def reload_detection_model(self): """ Reloads only the detection model of the KBVQA model with updated settings. - Frees GPU resources before reloading. - Checks if the model is already loaded. - Calls `prepare_kbvqa_model` with `only_reload_detection_model=True`. - Updates detection confidence level on the model object. - Displays a success message if model is reloaded successfully. """ try: free_gpu_resources() if self.is_model_loaded: st.write(st.session_state['detection_model'] == st.session_state['previous_state']['detection_model']) st.write(st.session_state['method'] == st.session_state['previous_state']['method']) st.write(st.session_state['confidence_level'] != st.session_state['previous_state']['confidence_level']) # check if only confidence is changed if st.session_state['detection_model'] == st.session_state['previous_state']['detection_model']\ and st.session_state['method'] == st.session_state['previous_state']['method'] \ and st.session_state['confidence_level'] != st.session_state['previous_state']['confidence_level']: st.session_state['kbvqa'].detection_confidence = st.session_state.confidence_level st.write("reload func", st.session_state['kbvqa'].detection_confidence ) st.session_state['confidence_level_changed'] = True return # only update the confidence level prepare_kbvqa_model(only_reload_detection_model=True) st.session_state['kbvqa'].detection_confidence = st.session_state.confidence_level self.col1.success("Model reloaded with updated settings and ready for inference.") st.session_state['previous_state'] = {'method': st.session_state.method, 'detection_model': st.session_state.detection_model, 'confidence_level': st.session_state.confidence_level} st.session_state['button_label'] = "Reload Model" free_gpu_resources() except Exception as e: st.error(f"Error reloading detection model: {e}") def process_new_image(self, image_key, image): """ Processes a new uploaded image by creating an entry in the `images_data` dictionary in the application session state. This dictionary stores information about each processed image, including: - `image`: The original image data. - `caption`: Generated caption for the image. - `detected_objects_str`: String representation of detected objects. - `qa_history`: List of questions and answers related to the image. - `analysis_done`: Flag indicating if analysis is complete. Args: image_key (str): Unique key for the image. image (obj): The uploaded image data. """ if image_key not in st.session_state['images_data']: st.session_state['images_data'][image_key] = { 'image': image, 'caption': '', 'detected_objects_str': '', 'qa_history': [], 'analysis_done': False } def analyze_image(self, image): """ Analyzes the image using the KBVQA model. - Creates a copy of the image to avoid modifying the original. - Displays a "Analyzing the image .." message. - Calls KBVQA methods to generate a caption and detect objects. - Returns the generated caption, detected objects string, and image with bounding boxes. Args: image (obj): The image data to analyze. Returns: tuple: A tuple containing the generated caption, detected objects string, and image with bounding boxes. """ img = copy.deepcopy(image) st.text("Analyzing the image .. ") caption = st.session_state['kbvqa'].get_caption(img) st.write("analyze func", st.session_state['kbvqa'].detection_confidence) image_with_boxes, detected_objects_str = st.session_state['kbvqa'].detect_objects(img) return caption, detected_objects_str, image_with_boxes def add_to_qa_history(self, image_key, question, answer, prompt_length): """ Adds a question-answer pair to the QA history of a specific image, to be used as hitory tracker. Args: image_key (str): Unique key for the image. question (str): The question asked about the image. answer (str): The answer generated by the KBVQA model. """ if image_key in st.session_state['images_data']: st.session_state['images_data'][image_key]['qa_history'].append((question, answer, prompt_length)) def get_images_data(self): """ Returns the dictionary containing processed image data from the session state. Returns: dict: The dictionary storing information about processed images. """ return st.session_state['images_data'] def update_image_data(self, image_key, caption, detected_objects_str, analysis_done): """ Updates the information stored for a specific image in the `images_data` dictionary in the application session state. Args: image_key (str): Unique key for the image. caption (str): The generated caption for the image. detected_objects_str (str): String representation of detected objects. analysis_done (bool): Flag indicating if analysis of the image is complete. """ if image_key in st.session_state['images_data'] or st.session_state["confidence_level_changed"]: st.session_state['images_data'][image_key].update({ 'caption': caption, 'detected_objects_str': detected_objects_str, 'analysis_done': analysis_done }) def resize_image(self, image_input, new_width=None, new_height=None): """ Resize an image. If only new_width is provided, the height is adjusted to maintain aspect ratio. If both new_width and new_height are provided, the image is resized to those dimensions. Args: image (PIL.Image.Image): The image to resize. new_width (int, optional): The target width of the image. new_height (int, optional): The target height of the image. Returns: PIL.Image.Image: The resized image. """ img = copy.deepcopy(image_input) if isinstance(img, str): # Open the image from a file path image = Image.open(img) elif isinstance(img, Image.Image): # Use the image directly if it's already a PIL Image object image = img else: raise ValueError("image_input must be a file path or a PIL Image object") if new_width is not None and new_height is None: # Calculate new height to maintain aspect ratio original_width, original_height = image.size ratio = new_width / original_width new_height = int(original_height * ratio) elif new_width is None and new_height is not None: # Calculate new width to maintain aspect ratio original_width, original_height = image.size ratio = new_height / original_height new_width = int(original_width * ratio) elif new_width is None and new_height is None: raise ValueError("At least one of new_width or new_height must be provided") # Resize the image resized_image = image.resize((new_width, new_height)) return resized_image def display_message(self, message, message_type): if message_type == "warning": st.warning(message) elif message_type == "text": st.text(message) elif message_type == "success": st.success(messae) elif message_type == "write": st.write(message) else: st.error("Message type unknown")