import sqlite3 import uuid import streamlit as st from utilities import ( load_local_css, set_header, ) import os import datetime import shutil import pandas as pd import pickle from pathlib import Path import re st.set_page_config(layout="wide") load_local_css("styles.css") set_header() # Define the path to the database file database_file = r"DB/User.db" # Establish a connection to the SQLite database specified by database_file conn = sqlite3.connect(database_file, check_same_thread=False) # Create a cursor object using the connection c = conn.cursor() def get_excel_files(directory="API_data"): excel_files = {} for filename in os.listdir(directory): if filename.endswith(".xlsx"): file_path = os.path.join(directory, filename) file_key = os.path.splitext(filename)[0] # Remove the .xlsx extension excel_files[file_key] = file_path return excel_files def update_summary_df(): # print("[DEBUG]: Running update_summary_df") """Function to fetch the project details everytime user changes the slection box this function is being called on change in username select box""" # Execute a SQL query to select distinct project names, the last edited page, # and the last updated time from the 'sessions' table where the owner matches the user's name c.execute( """ SELECT project_name, last_edited_page, updated_time as last_updated FROM ( SELECT project_name, last_edited_page, updated_time FROM sessions WHERE owner=? ORDER BY updated_time DESC ) sub GROUP BY project_name """, (st.session_state["username"],), ) # Fetch all the results of the query project_summary = c.fetchall() # This will hold the user's owned sessions # Create a DataFrame from the fetched data with specified column names project_summary_df = pd.DataFrame( project_summary, columns=["Project Name", "Last Page Edited", "Modified Date"], ) # Convert the 'Modified Date' column to datetime format project_summary_df["Modified Date"] = project_summary_df["Modified Date"].map( lambda x: pd.to_datetime(x).date() ) # Sort the DataFrame by 'Modified Date' in descending order session_summary_df = project_summary_df.sort_values( by=["Modified Date"], ascending=False ) session_summary_df["Last Page Modified"] = session_summary_df[ "Last Page Edited" ].map(lambda x: re.sub(r"[_1-9]", " ", x).replace(".py", "")) # Save the resulting DataFrame to the session state st.session_state["session_summary_df"] = session_summary_df # Add a 'selected' column to the DataFrame and initialize it with False for all rows if "selected" not in st.session_state.session_summary_df.columns: st.session_state.session_summary_df["selected"] = [False] * len( st.session_state.session_summary_df ) # Reset the index of the DataFrame and save it back to the session state st.session_state["session_summary_df"] = ( st.session_state["session_summary_df"].reset_index(drop=True).copy() ) st.header("Manage Projects") users = { "ioannis": "Ioannis Papadopoulos", "sharon": "Sharon Sheng", "herman": "Herman Kwong", "ismail": "Ismail Mohammed", "geetha": "Geetha Krishna", "srishti": "Srishti Verma", "samkeet": "Samkeet Sangai", "manoj": "Manoj P", "loveesh": "Loveesh Bhatt", "bhavya": "Bhavya Jayantilal Kanzariya", "pritisha": "Pritisha Punukollu", "ashish": "Ashish Sharma", "swarupa": "Swarupa Parepalli", } if "username" not in st.session_state: st.session_state["username"] = "" # first_name_value = [key for key, value in users.items() if value == st.session_state['username']] # # Extract the first key from the list if the list is not empty # first_name_value = first_name_value[0] if first_name_value else '' first_name = st.text_input("Enter Name").lower() if st.button("Login"): if first_name not in users.keys(): st.warning("Please enter a valid name") st.stop() name = users[first_name] st.session_state.name = name # storing in session state st.session_state["username"] = name update_summary_df() # function call to fetch user saved projects # st.success('Projects sucessfully loaded') if len(first_name) == 0 or first_name not in users.keys(): st.stop() # name=st.session_state['Username'] # c.execute('Delete from sessions') # conn.commit() # c.execute("PRAGMA table_info(users);") # c.execute("SELECT * FROM users;") # st.write(c.fetchall()) # Delete all existing data in the users table # c.execute("DELETE FROM users") # # Insert new data into the users table # for idx, (username, full_name) in enumerate(users.items(), start=1): # user_id = idx # email = f"{username}@mastercard.com" # user_type = "technical" # c.execute( # "INSERT INTO users (user_id, username, email, user_type) VALUES (?, ?, ?, ?)", # (user_id, full_name, email, user_type) # ) # # Commit the changes and close the connection # conn.commit() if st.session_state["username"] in users.values(): if "session_summary_df" not in st.session_state: st.session_state.session_summary_df = pd.DataFrame() if "project_name" not in st.session_state: st.session_state["project_name"] = None cols1 = st.columns([2, 1]) with cols1[0]: st.markdown(f"**Welcome {st.session_state['username']}**") with cols1[1]: st.markdown(f"**Current Project: {st.session_state['project_name']}**") # Execute a SQL query to select all project names from the 'sessions' table # where the owner matches the user's name c.execute( "SELECT project_name FROM sessions WHERE owner=?", (st.session_state["username"],), ) # Fetch all the results and create a list of project names user_projects = [project[0] for project in c.fetchall()] c.execute("SELECT DISTINCT username FROM users") # Fetch all the results and create a list of usernames excluding the current user's name allowed_users_db = [ user[0] for user in c.fetchall() if user[0] != st.session_state["username"] ] page_name = "Home Page" # Execute a SQL query to select the email, user_id, and user_type from the 'users' table # where the username matches the current user's name c.execute( "SELECT email, user_id, user_type FROM users WHERE username = ?", (st.session_state["username"],), ) # Fetch the result of the query (assume there is only one matching row) user_data = c.fetchone() # Unpack the fetched data into corresponding variables email, user_id, user_type = user_data folder_path = r"Users" user_folder_path = os.path.join(folder_path, email) if not os.path.exists(user_folder_path): os.makedirs(user_folder_path) def dump_session_details_db(allowed_users, project_name): "Function to dump details of project in db when a project is created/modified/cloned" created_time = datetime.datetime.now().strftime( "%Y-%m-%d %H:%M" ) # Get the current time session_id = str(uuid.uuid4()) # Generate a unique session ID if len(allowed_users) == 0: # Insert a new session into the database with no allowed users c.execute( "INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?,?)", ( user_id, st.session_state["username"], session_id, project_name, page_name, created_time, created_time, None, ), ) conn.commit() # Commit the transaction else: # Insert new sessions for each allowed user for allowed_user in allowed_users: c.execute( "INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?,?)", ( user_id, st.session_state["username"], session_id, project_name, "1_Home.py", created_time, created_time, allowed_user, ), ) conn.commit() # Commit the transaction st.markdown( """ * **Delete Project:** If you wish to delete a project, select it and click 'Delete Project'. * **Modify User Access:** Make changes to user access permissions as needed. push """ ) session_col = st.columns([5, 5]) # data editor if "selected_row_index" not in st.session_state: st.session_state["selected_row_index"] = None def selection_change(): # Get the edited rows from the session state # print(st.session_state['session_summary_df']) edited_rows: dict = st.session_state["project_selection"]["edited_rows"] # print(edited_rows) # Set the selected row index in the session state st.session_state["selected_row_index"] = next(iter(edited_rows)) # # Set all 'selected' flags to False in the DataFrame # st.session_state["session_summary_df"] = st.session_state[ # "session_summary_df" # ].assign(selected=False) # Create a dictionary to update the DataFrame update_dict = {idx: values for idx, values in edited_rows.items()} # Update the DataFrame with the edited rows st.session_state["session_summary_df"].update( pd.DataFrame.from_dict(update_dict, orient="index") ) # Reset the DataFrame index st.session_state["session_summary_df"] = st.session_state[ "session_summary_df" ].reset_index(drop=True) st.markdown("Select Project") if len(st.session_state["session_summary_df"]) != 0: with st.container(): # Display an editable data table using Streamlit's data editor component table = st.data_editor( st.session_state["session_summary_df"] .drop(["Last Page Edited"], axis=1) .reindex( columns=[ "selected", "Project Name", "Last Page Modified", "Modified Date", ] ), hide_index=True, on_change=selection_change, # Function to call when data is edited key="project_selection", # Key for the data editor component in the session state use_container_width=False, ) if ( len(st.session_state["session_summary_df"]) > 0 and st.session_state["selected_row_index"] is not None ): selected_row_index = st.session_state["session_summary_df"]["selected"] # st.write(st.session_state['selected_row_index']) if len(selected_row_index) != 0: try: project_name = st.session_state["session_summary_df"].at[ st.session_state["selected_row_index"], "Project Name" ] except Exception as e: st.session_state["selected_row_index"] = None st.rerun() last_edited_page = st.session_state["session_summary_df"].at[ st.session_state["selected_row_index"], "Last Page Edited" ] st.session_state["project_name"] = project_name project_col = st.columns(2) with project_col[0]: if st.button("Load Project", use_container_width=True): st.session_state["project_name"] = project_name st.rerun() project_path = os.path.join(user_folder_path, project_name) st.session_state["project_path"] = project_path # load project dct project_dct_path = os.path.join(project_path, "project_dct.pkl") with open(project_dct_path, "rb") as f: try: st.session_state["project_dct"] = pickle.load(f) st.success("Project Loded") except Exception as e: st.warning( "Something went wrong unable to load saved details / data is lost due to app refresh. Please uncheck the check box and create a new project." ) st.stop() with project_col[1]: if st.button( f"Delete Project - **{project_name}**", use_container_width=True ): project_name_to_delete = project_name st.warning( f"{project_name_to_delete} will be deleted permanentaly and all the information regarding the project will be lost" ) try: c.execute( "Delete FROM sessions WHERE project_name =? AND owner =?", ( project_name_to_delete, st.session_state["name"], ), ) if os.path.exists(project_path): shutil.rmtree(project_path) conn.commit() update_summary_df() st.rerun() except: st.warning( "Failed to Delete project try refreshing the page or try after some time" ) st.stop() with st.expander("Add users with access to the selected project"): c.execute( "SELECT DISTINCT allowed_users FROM sessions WHERE project_name = ?", (project_name,), ) present_users = c.fetchall() present_users = [ user[0] for user in present_users if user[0] != st.session_state["username"] and user[0] is not None ] present_users = None if len(present_users) == 0 else present_users if present_users is not None: allowed_users = st.multiselect( "", list(set(allowed_users_db) - set(present_users)), ) else: allowed_users = st.multiselect( "", list(set(allowed_users_db)), ) if st.button("Save Changes", use_container_width=True): dump_session_details_db(allowed_users, project_name) c.execute("SELECT * from sessions") with st.expander("Create New Project"): st.markdown( "To create a new project, Enter Project name below, select user who you want to give access of this project and click **Create New Project**" ) project_col1 = st.columns(3) with project_col1[0]: project_name = st.text_input("Enter Project Name", key="project_name_box") if project_name in user_projects: st.warning("Project already exists please enter new name") with project_col1[1]: allowed_users = st.multiselect( "Select Users who can access to this Project", allowed_users_db ) allowed_users = list(allowed_users) with project_col1[2]: API_path_dict = get_excel_files() api_name = st.selectbox("Select API data", API_path_dict.keys(), index=0) api_path = API_path_dict[api_name] Create = st.button("Create New Project", use_container_width=True) if Create: if len(project_name) == 0: st.error("Plase enter a valid project name") st.stop() if project_name in user_projects: st.warning("Project already exists please enter new name") st.stop() project_path = os.path.join(user_folder_path, project_name) if not os.path.exists(project_path): os.makedirs(project_path) else: st.warning("Project already exists please enter new name") st.stop() dump_session_details_db(allowed_users, project_name) project_dct = { "data_import": { "granularity_selection": 0, "cat_dct": {}, "merged_df": None, "edited_df": None, "numeric_columns": None, "files_dict": None, "formatted_panel1_values": None, "formatted_panel2_values": None, "missing_stats_df": None, "edited_stats_df": None, "default_df": None, "final_df": None, "edited_df": None, "api_path": api_path, "api_name": api_name, }, "data_validation": { "target_column": 0, "selected_panels": None, "selected_feature": 0, "validated_variables": [], "Non_media_variables": 0, }, "transformations": {"Media": {}, "Exogenous": {}}, "model_build": { "sel_target_col": None, "all_iters_check": False, "iterations": 0, "build_button": False, "show_results_check": False, "session_state_saved": {}, }, "model_tuning": { "sel_target_col": None, "sel_model": {}, "flag_expander": False, "start_date_default": None, "end_date_default": None, "repeat_default": "No", "flags": {}, "select_all_flags_check": {}, "selected_flags": {}, "trend_check": False, "week_num_check": False, "sine_cosine_check": False, "session_state_saved": {}, }, "saved_model_results": { "selected_options": None, "model_grid_sel": [1], }, "model_result_overview": {}, "build_response_curves": { "response_metrics_selectbox": 0, "panel_selected_selectbox": 0, "selected_channel_name_selectbox": 0, "K_number_input": "default", "b_number_input": "default", "a_number_input": "default", "x0_number_input": "default", }, "scenario_planner": { "panel_selected": 0, "metrics_selected": 0, "scenario": None, "optimization_key_value": None, "total_spends_change": None, "optimze_all_channels": False, }, "saved_scenarios": { "selected_scenario_selectbox_key": 0, }, "optimized_result_analysis": { "selected_scenario_selectbox_visualize": 0, "metric_selectbox_visualize": 0, }, } st.session_state["project_dct"] = project_dct st.session_state["project_path"] = project_path st.session_state["project_name"] = project_name project_dct_path = os.path.join(project_path, "project_dct.pkl") with open(project_dct_path, "wb") as f: pickle.dump(project_dct, f) st.success("Project Created") update_summary_df() st.rerun() # st.header('Clone Project') with st.expander("**Clone saved projects**"): c.execute( "SELECT DISTINCT owner FROM sessions WHERE allowed_users=?", (st.session_state["username"],), ) # owner owners = c.fetchall() owners = [ owner[0] for owner in owners if owner[0] != st.session_state["username"] ] if len(owners) == 0: st.warning("You dont have any shared project yet!") st.stop() cols = st.columns(2) with cols[0]: owner = st.selectbox("Select Owner", owners) c.execute("SELECT email FROM users WHERE username=?", (owner,)) owner_email = c.fetchone()[0] owner_folder_path = os.path.join(folder_path, owner_email) with cols[1]: c.execute( "SELECT project_name FROM sessions WHERE owner=? AND allowed_users = ?", (owner, st.session_state["username"]), ) # available sessions for user project_names = c.fetchall() project_name_owner = st.selectbox( "Select a saved Project available for you", [project_name[0] for project_name in project_names], ) owner_project_path = os.path.join(owner_folder_path, project_name) project_name_user = st.text_input( "Enter Project Name", value=project_name_owner ) if project_name in user_projects: st.warning( "This Project name already exists in your directory Please enter a different name" ) project_path = os.path.join(user_folder_path, project_name_user) owner_project_path = os.path.join(owner_folder_path, project_name_owner) if st.button("Load Project", use_container_width=True, key="load_project_button_key"): if os.path.exists(project_path): st.warning( "This Project name already exists in your directory Please enter a different name" ) st.stop() shutil.copytree(owner_project_path, project_path) project_dct_path = os.path.join(project_path, "project_dct.pkl") with open(project_dct_path, "rb") as f: st.session_state["project_dct"] = pickle.load(f) st.session_state["project_path"] = project_path dump_session_details_db( [], project_name_user ) # passing empty list for allowed users st.success("Project Cloned") st.rerun()