# Importing necessary libraries import streamlit as st st.set_page_config( page_title="Data Import", page_icon=":shark:", layout="wide", initial_sidebar_state="collapsed", ) import os import re import pickle import sqlite3 import pandas as pd from utilities import set_header, load_local_css, update_db, project_selection load_local_css("styles.css") set_header() if "project_name" not in st.session_state: st.session_state["project_name"] = None if "project_dct" not in st.session_state: project_selection() st.stop() if "username" in st.session_state and st.session_state["username"] is not None: cols1 = st.columns([2, 1]) with cols1[0]: st.markdown(f"**Welcome {st.session_state['username']}**") with cols1[1]: st.markdown(f"**Current Project: {st.session_state['project_name']}**") # Function to validate date column in dataframe def validate_date_column(df): try: # Attempt to convert the 'Date' column to datetime df["date"] = pd.to_datetime(df["date"], format="%d-%m-%Y") return True except: return False # Function to determine data interval def determine_data_interval(common_freq): if common_freq == 1: return "daily" elif common_freq == 7: return "weekly" elif 28 <= common_freq <= 31: return "monthly" else: return "irregular" # Function to read each uploaded Excel file into a pandas DataFrame and stores them in a dictionary st.cache_resource(show_spinner=False) def files_to_dataframes(uploaded_files): df_dict = {} for uploaded_file in uploaded_files: # Extract file name without extension file_name = uploaded_file.name.rsplit(".", 1)[0] # Check for duplicate file names if file_name in df_dict: st.warning( f"Duplicate File: {file_name}. This file will be skipped.", icon="⚠️", ) continue # Read the file into a DataFrame df = pd.read_excel(uploaded_file) # Convert all column names to lowercase df.columns = df.columns.str.lower().str.strip() # Separate numeric and non-numeric columns numeric_cols = list(df.select_dtypes(include=["number"]).columns) non_numeric_cols = [ col for col in df.select_dtypes(exclude=["number"]).columns if col.lower() != "date" ] # Check for 'Date' column if not (validate_date_column(df) and len(numeric_cols) > 0): st.warning( f"File Name: {file_name} ➜ Please upload data with Date column in 'DD-MM-YYYY' format and at least one media/exogenous column. This file will be skipped.", icon="⚠️", ) continue # Check for interval common_freq = common_freq = ( pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] ) # Calculate the data interval (daily, weekly, monthly or irregular) interval = determine_data_interval(common_freq) if interval == "irregular": st.warning( f"File Name: {file_name} ➜ Please upload data in daily, weekly or monthly interval. This file will be skipped.", icon="⚠️", ) continue # Store both DataFrames in the dictionary under their respective keys df_dict[file_name] = { "numeric": numeric_cols, "non_numeric": non_numeric_cols, "interval": interval, "df": df, } return df_dict # Function to adjust dataframe granularity def adjust_dataframe_granularity(df, current_granularity, target_granularity): # Set index df.set_index("date", inplace=True) # Define aggregation rules for resampling aggregation_rules = { col: "sum" if pd.api.types.is_numeric_dtype(df[col]) else "first" for col in df.columns } # Initialize resampled_df resampled_df = df if current_granularity == "daily" and target_granularity == "weekly": resampled_df = df.resample("W-MON", closed="left", label="left").agg( aggregation_rules ) elif current_granularity == "daily" and target_granularity == "monthly": resampled_df = df.resample("MS", closed="left", label="left").agg( aggregation_rules ) elif current_granularity == "daily" and target_granularity == "daily": resampled_df = df.resample("D").agg(aggregation_rules) elif ( current_granularity in ["weekly", "monthly"] and target_granularity == "daily" ): # For higher to lower granularity, distribute numeric and replicate non-numeric values equally across the new period expanded_data = [] for _, row in df.iterrows(): if current_granularity == "weekly": period_range = pd.date_range(start=row.name, periods=7) elif current_granularity == "monthly": period_range = pd.date_range( start=row.name, periods=row.name.days_in_month ) for date in period_range: new_row = {} for col in df.columns: if pd.api.types.is_numeric_dtype(df[col]): if current_granularity == "weekly": new_row[col] = row[col] / 7 elif current_granularity == "monthly": new_row[col] = row[col] / row.name.days_in_month else: new_row[col] = row[col] expanded_data.append((date, new_row)) resampled_df = pd.DataFrame( [data for _, data in expanded_data], index=[date for date, _ in expanded_data], ) # Reset index resampled_df = resampled_df.reset_index().rename(columns={"index": "date"}) return resampled_df # Function to clean and extract unique values of Panel_1 and Panel_2 st.cache_resource(show_spinner=False) def clean_and_extract_unique_values(files_dict, selections): all_panel1_values = set() all_panel2_values = set() for file_name, file_data in files_dict.items(): df = file_data["df"] # 'Panel_1' and 'Panel_2' selections selected_panel1 = selections[file_name].get("Panel_1") selected_panel2 = selections[file_name].get("Panel_2") # Clean and standardize Panel_1 column if it exists and is selected if ( selected_panel1 and selected_panel1 != "N/A" and selected_panel1 in df.columns ): df[selected_panel1] = ( df[selected_panel1].str.lower().str.strip().str.replace("_", " ") ) all_panel1_values.update(df[selected_panel1].dropna().unique()) # Clean and standardize Panel_2 column if it exists and is selected if ( selected_panel2 and selected_panel2 != "N/A" and selected_panel2 in df.columns ): df[selected_panel2] = ( df[selected_panel2].str.lower().str.strip().str.replace("_", " ") ) all_panel2_values.update(df[selected_panel2].dropna().unique()) # Update the processed DataFrame back in the dictionary files_dict[file_name]["df"] = df return all_panel1_values, all_panel2_values # Function to format values for display st.cache_resource(show_spinner=False) def format_values_for_display(values_list): # Capitalize the first letter of each word and replace underscores with spaces formatted_list = [value.replace("_", " ").title() for value in values_list] # Join values with commas and 'and' before the last value if len(formatted_list) > 1: return ", ".join(formatted_list[:-1]) + ", and " + formatted_list[-1] elif formatted_list: return formatted_list[0] return "No values available" # Function to normalizes all data within files_dict to a daily granularity st.cache(show_spinner=False, allow_output_mutation=True) def standardize_data_to_daily(files_dict, selections): # Normalize all data to a daily granularity using a provided function files_dict = apply_granularity_to_all(files_dict, "daily", selections) # Update the "interval" attribute for each dataset to indicate the new granularity for files_name, files_data in files_dict.items(): files_data["interval"] = "daily" return files_dict # Function to apply granularity transformation to all DataFrames in files_dict st.cache_resource(show_spinner=False) def apply_granularity_to_all(files_dict, granularity_selection, selections): for file_name, file_data in files_dict.items(): df = file_data["df"].copy() # Handling when Panel_1 or Panel_2 might be 'N/A' selected_panel1 = selections[file_name].get("Panel_1") selected_panel2 = selections[file_name].get("Panel_2") # Correcting the segment selection logic & handling 'N/A' if selected_panel1 != "N/A" and selected_panel2 != "N/A": unique_combinations = df[ [selected_panel1, selected_panel2] ].drop_duplicates() elif selected_panel1 != "N/A": unique_combinations = df[[selected_panel1]].drop_duplicates() selected_panel2 = None # Ensure Panel_2 is ignored if N/A elif selected_panel2 != "N/A": unique_combinations = df[[selected_panel2]].drop_duplicates() selected_panel1 = None # Ensure Panel_1 is ignored if N/A else: # If both are 'N/A', process the entire dataframe as is df = adjust_dataframe_granularity( df, file_data["interval"], granularity_selection ) files_dict[file_name]["df"] = df continue # Skip to the next file transformed_segments = [] for _, combo in unique_combinations.iterrows(): if selected_panel1 and selected_panel2: segment = df[ (df[selected_panel1] == combo[selected_panel1]) & (df[selected_panel2] == combo[selected_panel2]) ] elif selected_panel1: segment = df[df[selected_panel1] == combo[selected_panel1]] elif selected_panel2: segment = df[df[selected_panel2] == combo[selected_panel2]] # Adjust granularity of the segment transformed_segment = adjust_dataframe_granularity( segment, file_data["interval"], granularity_selection ) transformed_segments.append(transformed_segment) # Combine all transformed segments into a single DataFrame for this file transformed_df = pd.concat(transformed_segments, ignore_index=True) files_dict[file_name]["df"] = transformed_df return files_dict # Function to create main dataframe structure st.cache_resource(show_spinner=False) def create_main_dataframe( files_dict, all_panel1_values, all_panel2_values, granularity_selection ): # Determine the global start and end dates across all DataFrames global_start = min(df["df"]["date"].min() for df in files_dict.values()) global_end = max(df["df"]["date"].max() for df in files_dict.values()) # Adjust the date_range generation based on the granularity_selection if granularity_selection == "weekly": # Generate a weekly range, with weeks starting on Monday date_range = pd.date_range(start=global_start, end=global_end, freq="W-MON") elif granularity_selection == "monthly": # Generate a monthly range, starting from the first day of each month date_range = pd.date_range(start=global_start, end=global_end, freq="MS") else: # Default to daily if not weekly or monthly date_range = pd.date_range(start=global_start, end=global_end, freq="D") # Collect all unique Panel_1 and Panel_2 values, excluding 'N/A' all_panel1s = all_panel1_values all_panel2s = all_panel2_values # Dynamically build the list of dimensions (Panel_1, Panel_2) to include in the main DataFrame based on availability dimensions, merge_keys = [], [] if all_panel1s: dimensions.append(all_panel1s) merge_keys.append("Panel_1") if all_panel2s: dimensions.append(all_panel2s) merge_keys.append("Panel_2") dimensions.append(date_range) # Date range is always included merge_keys.append("date") # Date range is always included # Create a main DataFrame template with the dimensions main_df = pd.MultiIndex.from_product( dimensions, names=[name for name, _ in zip(merge_keys, dimensions)], ).to_frame(index=False) return main_df.reset_index(drop=True) # Function to prepare and merge dataFrames st.cache_resource(show_spinner=False) def merge_into_main_df(main_df, files_dict, selections): for file_name, file_data in files_dict.items(): df = file_data["df"].copy() # Rename selected Panel_1 and Panel_2 columns if not 'N/A' selected_panel1 = selections[file_name].get("Panel_1", "N/A") selected_panel2 = selections[file_name].get("Panel_2", "N/A") if selected_panel1 != "N/A": df.rename(columns={selected_panel1: "Panel_1"}, inplace=True) if selected_panel2 != "N/A": df.rename(columns={selected_panel2: "Panel_2"}, inplace=True) # Merge current DataFrame into main_df based on 'date', and where applicable, 'Panel_1' and 'Panel_2' merge_keys = ["date"] if "Panel_1" in df.columns: merge_keys.append("Panel_1") if "Panel_2" in df.columns: merge_keys.append("Panel_2") main_df = pd.merge(main_df, df, on=merge_keys, how="left") # After all merges, sort by 'date' and reset index for cleanliness sort_by = ["date"] if "Panel_1" in main_df.columns: sort_by.append("Panel_1") if "Panel_2" in main_df.columns: sort_by.append("Panel_2") main_df.sort_values(by=sort_by, inplace=True) main_df.reset_index(drop=True, inplace=True) return main_df # Function to categorize column def categorize_column(column_name): # Define keywords for each category internal_keywords = [ "Internal", "Price", "Discount", "product_price", "cost", "margin", "inventory", "sales", "revenue", "turnover", "expense", ] exogenous_keywords = [ "Exogenous", "GDP", "Tax", "Inflation", "interest_rate", "employment_rate", "exchange_rate", "consumer_spending", "retail_sales", "oil_prices", "weather", ] # Check if the column name matches any of the keywords for Internal or Exogenous categories if ( column_name in st.session_state["project_dct"]["data_import"]["cat_dct"].keys() and st.session_state["project_dct"]["data_import"]["cat_dct"][column_name] is not None ): return st.session_state["project_dct"]["data_import"]["cat_dct"][ column_name ] else: for keyword in ["Response", "Metric"]: if keyword.lower() in column_name.lower(): return "Response Metrics" for keyword in ["Spend", "Cost"]: if keyword.lower() in column_name.lower(): return "Spends" for keyword in internal_keywords: if keyword.lower() in column_name.lower(): return "Internal" for keyword in exogenous_keywords: if keyword.lower() in column_name.lower(): return "Exogenous" # Default to Media if no match found return "Media" # Function to calculate missing stats and prepare for editable DataFrame st.cache_resource(show_spinner=False) def prepare_missing_stats_df(df): missing_stats = [] for column in df.columns: if ( column == "date" or column == "Panel_2" or column == "Panel_1" ): # Skip Date, Panel_1 and Panel_2 column continue missing = df[column].isnull().sum() pct_missing = round((missing / len(df)) * 100, 2) # Dynamically assign category based on column name category = categorize_column(column) # category = "Media" # Keep default bin as Media missing_stats.append( { "Column": column, "Missing Values": missing, "Missing Percentage": pct_missing, "Impute Method": "Fill with 0", # Default value "Category": category, } ) stats_df = pd.DataFrame(missing_stats) return stats_df # Function to add API DataFrame details to the files dictionary st.cache_resource(show_spinner=False) def add_api_dataframe_to_dict(main_df, files_dict): files_dict["API"] = { "numeric": list(main_df.select_dtypes(include=["number"]).columns), "non_numeric": [ col for col in main_df.select_dtypes(exclude=["number"]).columns if col.lower() != "date" ], "interval": determine_data_interval( pd.Series(main_df["date"].unique()).diff().dt.days.dropna().mode()[0] ), "df": main_df, } return files_dict # Function to reads an API into a DataFrame, parsing specified columns as datetime # @st.cache_resource(show_spinner=False) def read_API_data(project_folder_path, file_path, file_name): # Paths using os.path file_path_os = os.path.join(os.getcwd(), "API_data", file_name) project_folder_path_os = os.path.normpath(project_folder_path) # Construct the full path of the file in the project folder project_file_path = os.path.join(project_folder_path_os, file_name) # Check if the file with the same name exists in the project path if os.path.exists(project_file_path): # If the file exists, load and return the existing file return pd.read_excel(project_file_path, parse_dates=["Date"]) else: # If the file does not exist, read the new file data = pd.read_excel(file_path_os, parse_dates=["Date"]) # Save the new file to the project folder data.to_excel(project_file_path, index=False) # Return the data return data # Function to set the 'Panel_1_Panel_2_Selected' session state variable to False def set_Panel_1_Panel_2_Selected_false(): st.session_state["Panel_1_Panel_2_Selected"] = False # Restoring project_dct to default values when user modify any widgets st.session_state["project_dct"]["data_import"]["edited_stats_df"] = None st.session_state["project_dct"]["data_import"]["merged_df"] = None st.session_state["project_dct"]["data_import"]["missing_stats_df"] = None st.session_state["project_dct"]["data_import"]["cat_dct"] = {} st.session_state["project_dct"]["data_import"]["numeric_columns"] = None st.session_state["project_dct"]["data_import"]["default_df"] = None st.session_state["project_dct"]["data_import"]["final_df"] = None st.session_state["project_dct"]["data_import"]["edited_df"] = None # Function to serialize and save the objects into a pickle file @st.cache_resource(show_spinner=False) def save_to_pickle(file_path, final_df, bin_dict): # Open the file in write-binary mode and dump the objects with open(file_path, "wb") as f: pickle.dump( {"final_df": final_df, "bin_dict": bin_dict}, f ) # Data is now saved to file # Function to processes the merged_df DataFrame based on operations defined in edited_df @st.cache_resource(show_spinner=False) def process_dataframes(merged_df, edited_df, edited_stats_df): # Ensure there are operations defined by the user if edited_df.empty: return merged_df, edited_stats_df # No operations to apply # Perform operations as defined by the user else: for index, row in edited_df.iterrows(): result_column_name = ( f"{row['Column 1']}{row['Operator']}{row['Column 2']}" ) col1 = row["Column 1"] col2 = row["Column 2"] op = row["Operator"] # Apply the specified operation if op == "+": merged_df[result_column_name] = merged_df[col1] + merged_df[col2] elif op == "-": merged_df[result_column_name] = merged_df[col1] - merged_df[col2] elif op == "*": merged_df[result_column_name] = merged_df[col1] * merged_df[col2] elif op == "/": merged_df[result_column_name] = merged_df[col1] / merged_df[ col2 ].replace(0, 1e-9) # Add summary of operation to edited_stats_df new_row = { "Column": result_column_name, "Missing Values": None, "Missing Percentage": None, "Impute Method": None, "Category": row["Category"], } new_row_df = pd.DataFrame([new_row]) # Use pd.concat to add the new_row_df to edited_stats_df edited_stats_df = pd.concat( [edited_stats_df, new_row_df], ignore_index=True, axis=0 ) # Combine column names from edited_df for cleanup combined_columns = set(edited_df["Column 1"]).union( set(edited_df["Column 2"]) ) # Filter out rows in edited_stats_df and drop columns from merged_df edited_stats_df = edited_stats_df[ ~edited_stats_df["Column"].isin(combined_columns) ] merged_df.drop( columns=list(combined_columns), errors="ignore", inplace=True ) return merged_df, edited_stats_df # Function to prepare a list of numeric column names and initialize an empty DataFrame with predefined structure st.cache_resource(show_spinner=False) def prepare_numeric_columns_and_default_df(merged_df, edited_stats_df): # Get columns categorized as 'Response Metrics' columns_response_metrics = edited_stats_df[ edited_stats_df["Category"] == "Response Metrics" ]["Column"].tolist() # Filter numeric columns, excluding those categorized as 'Response Metrics' numeric_columns = [ col for col in merged_df.select_dtypes(include=["number"]).columns if col not in columns_response_metrics ] # Define the structure of the empty DataFrame data = { "Column 1": pd.Series([], dtype="str"), "Operator": pd.Series([], dtype="str"), "Column 2": pd.Series([], dtype="str"), "Category": pd.Series([], dtype="str"), } default_df = pd.DataFrame(data) return numeric_columns, default_df # function to reset to default values in project_dct: # Initialize 'final_df' in session state if "final_df" not in st.session_state: st.session_state["final_df"] = pd.DataFrame() # Initialize 'bin_dict' in session state if "bin_dict" not in st.session_state: st.session_state["bin_dict"] = {} # Initialize 'Panel_1_Panel_2_Selected' in session state if "Panel_1_Panel_2_Selected" not in st.session_state: st.session_state["Panel_1_Panel_2_Selected"] = False # Page Title st.write("") # Top padding st.title("Data Import") conn = sqlite3.connect( r"DB\User.db", check_same_thread=False ) # connection with sql db c = conn.cursor() ######################################################################################################################################################### # Create a dictionary to hold all DataFrames and collect user input to specify "Panel_2" and "Panel_1" columns for each file ######################################################################################################################################################### # Read the Excel file, parsing 'Date' column as datetime main_df = read_API_data( project_folder_path=st.session_state["project_path"], file_path=st.session_state["project_dct"]["data_import"]["api_path"], file_name=st.session_state["project_dct"]["data_import"]["api_name"] + ".xlsx", ) # Convert all column names to lowercase main_df.columns = main_df.columns.str.lower().str.strip() # File uploader uploaded_files = st.file_uploader( "Upload additional data", type=["xlsx"], accept_multiple_files=True, on_change=set_Panel_1_Panel_2_Selected_false, ) # Custom HTML for upload instructions recommendation_html = f"""
Recommendation: For optimal processing, please ensure that all uploaded datasets including panel, media, internal, and exogenous data adhere to the following guidelines: Each dataset must include a Date column formatted as DD-MM-YYYY, be free of missing values.
""" st.markdown(recommendation_html, unsafe_allow_html=True) # RAW API DATA st.markdown("#### API Data") with st.expander("API Data", expanded=False): st.dataframe(main_df, hide_index=True) # Choose Desired Granularity st.markdown("#### Choose Desired Granularity") # Granularity Selection granularity_selection = st.selectbox( "Choose Date Granularity", ["Daily", "Weekly", "Monthly"], label_visibility="collapsed", on_change=set_Panel_1_Panel_2_Selected_false, index=st.session_state["project_dct"]["data_import"][ "granularity_selection" ], # resume ) # st.write(st.session_state['project_dct']['data_import']['granularity_selection']) st.session_state["project_dct"]["data_import"]["granularity_selection"] = [ "Daily", "Weekly", "Monthly", ].index(granularity_selection) # st.write(st.session_state['project_dct']['data_import']['granularity_selection']) granularity_selection = str(granularity_selection).lower() # Convert files to dataframes files_dict = files_to_dataframes(uploaded_files) # Add API Dataframe if main_df is not None: files_dict = add_api_dataframe_to_dict(main_df, files_dict) # Display a warning message if no files have been uploaded and halt further execution if not files_dict: st.warning( "Please upload at least one file to proceed.", icon="⚠️", ) st.stop() # Halts further execution until file is uploaded # Select Panel_1 and Panel_2 columns st.markdown("#### Select Panel columns") selections = {} with st.expander("Select Panel columns", expanded=False): count = 0 # Initialize counter to manage the visibility of labels and keys unique_numeric_columns = ( set() ) # Initialize a set to keep track of unique numeric column names for file_name, file_data in files_dict.items(): # Extract the numeric column names from the current file numeric_columns = file_data["numeric"] # Regular expression pattern to match valid column names (letters, numbers, and underscores) valid_column_pattern = re.compile(r"^[A-Za-z0-9_]+$") # Check for duplicates for column in numeric_columns: if column in unique_numeric_columns: # If a duplicate is found, display a warning and halt execution st.warning( f"Duplicate column name '{column}' found in file '{file_name}'. Each column name must be unique across all files.", icon="⚠️", ) st.stop() # Add the column to the set if it's not already there unique_numeric_columns.add(column) # Check if the column name is valid if not valid_column_pattern.match(column): st.warning( f"Column name '{column}' in file '{file_name}' contains invalid characters. " f"Column names should only contain letters (A-Z, a-z), numbers (0-9), and underscores (_).", icon="⚠️", ) st.stop() # Generatimg project dct keys dynamically if ( f"Panel_1_selectbox{file_name}" not in st.session_state["project_dct"]["data_import"].keys() ): st.session_state["project_dct"]["data_import"][ f"Panel_1_selectbox{file_name}" ] = 0 if ( f"Panel_2_selectbox{file_name}" not in st.session_state["project_dct"]["data_import"].keys() ): st.session_state["project_dct"]["data_import"][ f"Panel_2_selectbox{file_name}" ] = 0 # Determine visibility of the label based on the count if count == 0: label_visibility = "visible" else: label_visibility = "collapsed" # Extract non-numeric columns non_numeric_cols = file_data["non_numeric"] # Prepare Panel_1 and Panel_2 values for dropdown, adding "N/A" as an option panel1_values = non_numeric_cols + ["N/A"] panel2_values = non_numeric_cols + ["N/A"] # Skip if only one option is available if len(panel1_values) == 1 and len(panel2_values) == 1: selected_panel1, selected_panel2 = "N/A", "N/A" # Update the selections for Panel_1 and Panel_2 for the current file selections[file_name] = { "Panel_1": selected_panel1, "Panel_2": selected_panel2, } continue # Create layout columns for File Name, Panel_2, and Panel_1 selections file_name_col, Panel_1_col, Panel_2_col = st.columns([2, 4, 4]) with file_name_col: # Display "File Name" label only for the first file if count == 0: st.write("File Name") else: st.write("") st.write(file_name) # Display the file name with Panel_1_col: # Display a selectbox for Panel_1 values selected_panel1 = st.selectbox( "Select Panel Level 1", panel2_values, on_change=set_Panel_1_Panel_2_Selected_false, label_visibility=label_visibility, # Control visibility of the label key=f"Panel_1_selectbox{count}", # Ensure unique key for each selectbox index=st.session_state["project_dct"]["data_import"][ f"Panel_1_selectbox{file_name}" ], ) st.session_state["project_dct"]["data_import"][ f"Panel_1_selectbox{file_name}" ] = panel2_values.index(selected_panel1) with Panel_2_col: # Display a selectbox for Panel_2 values selected_panel2 = st.selectbox( "Select Panel Level 2", panel1_values, on_change=set_Panel_1_Panel_2_Selected_false, label_visibility=label_visibility, # Control visibility of the label key=f"Panel_2_selectbox{count}", # Ensure unique key for each selectbox index=st.session_state["project_dct"]["data_import"][ f"Panel_2_selectbox{file_name}" ], ) st.session_state["project_dct"]["data_import"][ f"Panel_2_selectbox{file_name}" ] = panel1_values.index(selected_panel2) # Check for potential data integrity issues if selected_panel2 == selected_panel1 and not ( selected_panel2 == "N/A" and selected_panel1 == "N/A" ): st.warning( f"File: {file_name} → The same column cannot serve as both Panel_1 and Panel_2. Please adjust your selections.", ) selected_panel1, selected_panel2 = "N/A", "N/A" st.stop() # Check for potential data integrity issues if len(non_numeric_cols) > 2: st.warning( f"File: {file_name} → The input file contains more than two non-numeric/panel columns. Please verify the file's contents.", ) st.stop() # Total selected panel level selected_panels_count = (1 if selected_panel1 != "N/A" else 0) + ( 1 if selected_panel2 != "N/A" else 0 ) # Check for potential data integrity issues if len(non_numeric_cols) != selected_panels_count: st.warning( f"File: {file_name} → The number of non-numeric columns selected does not match the expected panel count. Please ensure all required columns are selected.", ) st.stop() # Update the selections for Panel_1 and Panel_2 for the current file selections[file_name] = { "Panel_1": selected_panel1, "Panel_2": selected_panel2, } count += 1 # Increment the counter after processing each file st.write() # Accept Panel_1 and Panel_2 selection accept = st.button("Accept and Process", use_container_width=True) if ( accept == False and st.session_state["project_dct"]["data_import"]["edited_stats_df"] is not None ): # st.write(st.session_state['project_dct']) st.markdown("#### Unique Panel values") # Display Panel_1 and Panel_2 values with st.expander("Unique Panel values"): st.write("") st.markdown( f"""
Panel Level 1 Values: {st.session_state['project_dct']['data_import']['formatted_panel1_values']}
Panel Level 2 Values: {st.session_state['project_dct']['data_import']['formatted_panel2_values']}
""", unsafe_allow_html=True, ) # Display total Panel_1 and Panel_2 st.write("") st.markdown( f"""
Number of Level 1 Panels detected: {len(st.session_state['project_dct']['data_import']['formatted_panel2_values'])}
Number of Level 2 Panels detected: {len(st.session_state['project_dct']['data_import']['formatted_panel2_values'])}
""", unsafe_allow_html=True, ) st.write("") # Create an editable DataFrame in Streamlit st.markdown("#### Select Variables Category & Impute Missing Values") merged_df = st.session_state["project_dct"]["data_import"]["merged_df"].copy() missing_stats_df = st.session_state["project_dct"]["data_import"][ "missing_stats_df" ] editable_df = st.session_state["project_dct"]["data_import"]["edited_stats_df"] sorted_editable_df = editable_df.sort_values( by="Missing Values", ascending=False, na_position="first" ) edited_stats_df = st.data_editor( sorted_editable_df, column_config={ "Impute Method": st.column_config.SelectboxColumn( options=[ "Drop Column", "Fill with Mean", "Fill with Median", "Fill with 0", ], required=True, default="Fill with 0", ), "Category": st.column_config.SelectboxColumn( options=[ "Spends", "Media", "Exogenous", "Internal", "Response Metrics", ], required=True, default="Media", ), }, disabled=["Column", "Missing Values", "Missing Percentage"], hide_index=True, use_container_width=True, key="data-editor-1", ) st.session_state["project_dct"]["data_import"]["cat_dct"] = { col: cat for col, cat in zip(edited_stats_df["Column"], edited_stats_df["Category"]) if col in merged_df.columns } for i, row in edited_stats_df.iterrows(): column = row["Column"] if ( column not in st.session_state["project_dct"]["data_import"]["cat_dct"].keys() ): continue if row["Impute Method"] == "Drop Column": merged_df.drop(columns=[column], inplace=True) elif row["Impute Method"] == "Fill with Mean": merged_df[column].fillna( st.session_state["project_dct"]["data_import"]["merged_df"][ column ].mean(), inplace=True, ) elif row["Impute Method"] == "Fill with Median": merged_df[column].fillna( st.session_state["project_dct"]["data_import"]["merged_df"][ column ].median(), inplace=True, ) elif row["Impute Method"] == "Fill with 0": merged_df[column].fillna(0, inplace=True) ######################################################################################################################################################### # Group columns ######################################################################################################################################################### # Display Group columns header numeric_columns = st.session_state["project_dct"]["data_import"][ "numeric_columns" ] default_df = st.session_state["project_dct"]["data_import"]["default_df"] st.markdown("#### Feature engineering") edited_df = st.data_editor( st.session_state["project_dct"]["data_import"]["edited_df"], column_config={ "Column 1": st.column_config.SelectboxColumn( options=numeric_columns, required=True, width=400, ), "Operator": st.column_config.SelectboxColumn( options=["+", "-", "*", "/"], required=True, default="+", width=100, ), "Column 2": st.column_config.SelectboxColumn( options=numeric_columns, required=True, default=numeric_columns[0], width=400, ), "Category": st.column_config.SelectboxColumn( options=[ "Media", "Exogenous", "Internal", "Response Metrics", ], required=True, default="Media", width=200, ), }, num_rows="dynamic", key="data-editor-4", ) final_df, edited_stats_df = process_dataframes( merged_df, edited_df, edited_stats_df ) st.markdown("#### Final DataFrame") sort_col = [] for col in final_df.columns: if col in ["Panel_1", "Panel_2", "date"]: sort_col.append(col) sorted_final_df = final_df.sort_values( by=sort_col, ascending=True, na_position="first" ) st.dataframe(sorted_final_df, hide_index=True) # Initialize an empty dictionary to hold categories and their variables category_dict = { "Spends": [], "Media": [], "Exogenous": [], "Internal": [], "Response Metrics": [], } # Iterate over each row in the edited DataFrame to populate the dictionary for i, row in edited_stats_df.iterrows(): column = row["Column"] category = row[ "Category" ] # The category chosen by the user for this variable if column not in list(final_df.columns): # Skip columns that are dropped continue # Check if the category already exists in the dictionary if category not in category_dict: # If not, initialize it with the current column as its first element category_dict[category] = [column] else: # If it exists, append the current column to the list of variables under this category category_dict[category].append(column) # Add Date, Panel_1 and Panel_12 in category dictionary category_dict.update({"Date": ["date"]}) if "Panel_1" in final_df.columns: category_dict["Panel Level 1"] = ["Panel_1"] if "Panel_2" in final_df.columns: category_dict["Panel Level 2"] = ["Panel_2"] ###################################### Group Media Channels ###################################### with st.expander("Group Media Channels"): media_channels = category_dict["Media"] spends_channels = category_dict["Spends"] allowed_channels_bin = media_channels + spends_channels group_selection_placeholder = st.container() total_groups = st.number_input("Total Groups", value=0) try: total_groups = int(total_groups) except: total_groups = 0 group_dict = {} channels_added = set() with group_selection_placeholder: for i in range(total_groups): group_name_inp_col, group_col = st.columns([1, 4]) group_name = group_name_inp_col.text_input( "Group name", key=f"group_name_{i}" ) # Filter the allowed channels by removing those already added, then sort the list allowed_channels = sorted( [ channel for channel in allowed_channels_bin if channel not in channels_added ], key=lambda x: x.split("_")[ 0 ], # Split each string by '_' and sort by the first part ) selected_channels = group_col.multiselect( "Select channels to group", options=allowed_channels, key=f"selected_channels_key_{i}", ) if ((group_name is not None) and (group_name != "")) and ( len(selected_channels) > 0 ): group_dict[group_name] = selected_channels channels_added.update(selected_channels) ###################################### Group Media Channels ###################################### # Display the dictionary st.markdown("#### Variable Category") for category, variables in category_dict.items(): # Check if there are multiple variables to handle "and" insertion correctly if len(variables) > 1: # Join all but the last variable with ", ", then add " and " before the last variable variables_str = ", ".join(variables[:-1]) + " and " + variables[-1] else: # Skip empty category if len(variables) == 0: variables_str = "" else: # If there's only one variable, no need for "and" variables_str = variables[0] # Display the category and its variables in the desired format st.markdown( f"
{category}: {variables_str}
", unsafe_allow_html=True, ) # Function to check if Response Metrics is selected st.write("") # Define the required column categories to check. required_categories = ["Response Metrics", "Spends", "Media"] # Iterate over the required categories to check for missing columns. for category in required_categories: category_columns = category_dict.get(category, []) if len(category_columns) == 0: st.warning( f"Please select at least one column for the {category} category", icon="⚠️", ) st.stop() # Filter channels that are in category_dict["Media"] / category_dict["Spends"] and in final_df.columns filtered_channels = [ channel for channel in category_dict["Media"] + category_dict["Spends"] if channel in final_df.columns ] # Combine all channels into a single list using a list comprehension all_added_channels = [] for channels in group_dict: all_added_channels += group_dict[channels] # Check for duplicated channels across groups if len(all_added_channels) != len(set(all_added_channels)): st.warning( "A channel can only be grouped once, and duplicate groupings are not permitted", icon="⚠️", ) st.stop() # Check if all filtered channels are present in group_dict if not set(filtered_channels) == set(all_added_channels): st.warning("Please group all media channels", icon="⚠️") st.stop() # Store final dataframe and bin dictionary into session state st.session_state["final_df"], st.session_state["bin_dict"] = ( final_df, category_dict, ) # Save the DataFrame and dictionary from the session state to the pickle file if st.button( "Accept and Save", use_container_width=True, key="data-editor-button", ): update_db("1_Data_Import.py") final_df = final_df.loc[:, ~final_df.columns.duplicated()] project_dct_path = os.path.join( st.session_state["project_path"], "project_dct.pkl" ) with open(project_dct_path, "wb") as f: pickle.dump(st.session_state["project_dct"], f) data_path = os.path.join( st.session_state["project_path"], "data_import.pkl" ) st.session_state["data_path"] = data_path save_to_pickle( data_path, st.session_state["final_df"], st.session_state["bin_dict"], ) st.session_state["project_dct"]["data_import"][ "edited_stats_df" ] = edited_stats_df st.session_state["project_dct"]["data_import"]["merged_df"] = merged_df st.session_state["project_dct"]["data_import"][ "missing_stats_df" ] = missing_stats_df st.session_state["project_dct"]["data_import"]["cat_dct"] = { col: cat for col, cat in zip( edited_stats_df["Column"], edited_stats_df["Category"] ) } st.session_state["project_dct"]["data_import"][ "numeric_columns" ] = numeric_columns st.session_state["project_dct"]["data_import"]["default_df"] = default_df st.session_state["project_dct"]["data_import"]["final_df"] = final_df st.session_state["project_dct"]["data_import"]["edited_df"] = edited_df st.toast("💾 Saved Successfully!") if accept: # Normalize all data to a daily granularity. This initial standardization simplifies subsequent conversions to other levels of granularity with st.spinner("Processing..."): files_dict = standardize_data_to_daily(files_dict, selections) # Convert all data to daily level granularity files_dict = apply_granularity_to_all( files_dict, granularity_selection, selections ) # Update the 'files_dict' in the session state st.session_state["files_dict"] = files_dict # Set a flag in the session state to indicate that selection has been made st.session_state["Panel_1_Panel_2_Selected"] = True ######################################################################################################################################################### # Display unique Panel_1 and Panel_2 values ######################################################################################################################################################### # Halts further execution until Panel_1 and Panel_2 columns are selected if st.session_state["project_dct"]["data_import"]["edited_stats_df"] is None: if ( "files_dict" in st.session_state and st.session_state["Panel_1_Panel_2_Selected"] ): files_dict = st.session_state["files_dict"] st.session_state["project_dct"]["data_import"][ "files_dict" ] = files_dict # resume else: st.stop() # Set to store unique values of Panel_1 and Panel_2 with st.spinner("Fetching Panel values..."): all_panel1_values, all_panel2_values = clean_and_extract_unique_values( files_dict, selections ) # List of Panel_1 and Panel_2 columns unique values list_of_all_panel1_values = list(all_panel1_values) list_of_all_panel2_values = list(all_panel2_values) # Format Panel_1 and Panel_2 values for display formatted_panel1_values = format_values_for_display( list_of_all_panel1_values ) formatted_panel2_values = format_values_for_display( list_of_all_panel2_values ) st.session_state["project_dct"]["data_import"][ "formatted_panel1_values" ] = formatted_panel1_values st.session_state["project_dct"]["data_import"][ "formatted_panel2_values" ] = formatted_panel2_values # Unique Panel_1 and Panel_2 values st.markdown("#### Unique Panel values") # Display Panel_1 and Panel_2 values with st.expander("Unique Panel values"): st.write("") st.markdown( f"""
Panel Level 1 Values: {formatted_panel1_values}
Panel Level 2 Values: {formatted_panel2_values}
""", unsafe_allow_html=True, ) # Display total Panel_1 and Panel_2 st.write("") st.markdown( f"""
Number of Level 1 Panels detected: {len(list_of_all_panel1_values)}
Number of Level 2 Panels detected: {len(list_of_all_panel2_values)}
""", unsafe_allow_html=True, ) st.write("") ######################################################################################################################################################### # Merge all DataFrames ######################################################################################################################################################### # Merge all DataFrames selected main_df = create_main_dataframe( files_dict, all_panel1_values, all_panel2_values, granularity_selection, ) merged_df = merge_into_main_df(main_df, files_dict, selections) ######################################################################################################################################################### # Categorize Variables and Impute Missing Values ######################################################################################################################################################### # Create an editable DataFrame in Streamlit st.markdown("#### Select Variables Category & Impute Missing Values") # Prepare missing stats DataFrame for editing missing_stats_df = prepare_missing_stats_df(merged_df) sorted_missing_stats_df = missing_stats_df.sort_values( by="Missing Values", ascending=False, na_position="first" ) edited_stats_df = st.data_editor( sorted_missing_stats_df, column_config={ "Impute Method": st.column_config.SelectboxColumn( options=[ "Drop Column", "Fill with Mean", "Fill with Median", "Fill with 0", ], required=True, default="Fill with 0", ), "Category": st.column_config.SelectboxColumn( options=[ "Spends", "Media", "Exogenous", "Internal", "Response Metrics", ], required=True, default="Media", ), }, disabled=["Column", "Missing Values", "Missing Percentage"], hide_index=True, use_container_width=True, key="data-editor-2", ) # Apply changes based on edited DataFrame for i, row in edited_stats_df.iterrows(): column = row["Column"] if row["Impute Method"] == "Drop Column": merged_df.drop(columns=[column], inplace=True) elif row["Impute Method"] == "Fill with Mean": merged_df[column].fillna(merged_df[column].mean(), inplace=True) elif row["Impute Method"] == "Fill with Median": merged_df[column].fillna(merged_df[column].median(), inplace=True) elif row["Impute Method"] == "Fill with 0": merged_df[column].fillna(0, inplace=True) ######################################################################################################################################################### # Group columns ######################################################################################################################################################### # Display Group columns header st.markdown("#### Feature engineering") # Prepare the numeric columns and an empty DataFrame for user input numeric_columns, default_df = prepare_numeric_columns_and_default_df( merged_df, edited_stats_df ) # Display editable Dataframe edited_df = st.data_editor( default_df, column_config={ "Column 1": st.column_config.SelectboxColumn( options=numeric_columns, required=True, width=400, ), "Operator": st.column_config.SelectboxColumn( options=["+", "-", "*", "/"], required=True, default="+", width=100, ), "Column 2": st.column_config.SelectboxColumn( options=numeric_columns, required=True, default=numeric_columns[0], width=400, ), "Category": st.column_config.SelectboxColumn( options=[ "Media", "Exogenous", "Internal", "Response Metrics", ], required=True, default="Media", width=200, ), }, num_rows="dynamic", key="data-editor-3", ) # Process the DataFrame based on user inputs and operations specified in edited_df final_df, edited_stats_df = process_dataframes( merged_df, edited_df, edited_stats_df ) ######################################################################################################################################################### # Display the Final DataFrame and variables ######################################################################################################################################################### # Display the Final DataFrame and variables st.markdown("#### Final DataFrame") sort_col = [] for col in final_df.columns: if col in ["Panel_1", "Panel_2", "date"]: sort_col.append(col) sorted_final_df = final_df.sort_values( by=sort_col, ascending=True, na_position="first" ) st.dataframe(sorted_final_df, hide_index=True) # Initialize an empty dictionary to hold categories and their variables category_dict = { "Spends": [], "Media": [], "Exogenous": [], "Internal": [], "Response Metrics": [], } # Iterate over each row in the edited DataFrame to populate the dictionary for i, row in edited_stats_df.iterrows(): column = row["Column"] category = row[ "Category" ] # The category chosen by the user for this variable if column not in list(final_df.columns): # Skip columns that are dropped continue # Check if the category already exists in the dictionary if category not in category_dict: # If not, initialize it with the current column as its first element category_dict[category] = [column] else: # If it exists, append the current column to the list of variables under this category category_dict[category].append(column) # Add Date, Panel_1 and Panel_12 in category dictionary category_dict.update({"Date": ["date"]}) if "Panel_1" in final_df.columns: category_dict["Panel Level 1"] = ["Panel_1"] if "Panel_2" in final_df.columns: category_dict["Panel Level 2"] = ["Panel_2"] ###################################### Group Media Channels ###################################### with st.expander("Group Media Channels"): media_channels = category_dict["Media"] spends_channels = category_dict["Spends"] allowed_channels_bin = media_channels + spends_channels group_selection_placeholder = st.container() total_groups = st.number_input("Total Groups", value=0) try: total_groups = int(total_groups) except: total_groups = 0 group_dict = {} channels_added = set() with group_selection_placeholder: for i in range(total_groups): group_name_inp_col, group_col = st.columns([1, 4]) group_name = group_name_inp_col.text_input( "Group name", key=f"group_name_{i}" ) # Filter the allowed channels by removing those already added, then sort the list allowed_channels = sorted( [ channel for channel in allowed_channels_bin if channel not in channels_added ], key=lambda x: x.split("_")[ 0 ], # Split each string by '_' and sort by the first part ) selected_channels = group_col.multiselect( "Select channels to group", options=allowed_channels, key=f"selected_channels_key_{i}", ) if ((group_name is not None) and (group_name != "")) and ( len(selected_channels) > 0 ): group_dict[group_name] = selected_channels channels_added.update(selected_channels) ###################################### Group Media Channels ###################################### # Display the dictionary st.markdown("#### Variable Category") for category, variables in category_dict.items(): # Skip empty category if len(variables) == 0: variables_str = "" # Check if there are multiple variables to handle "and" insertion correctly if len(variables) > 1: # Join all but the last variable with ", ", then add " and " before the last variable variables_str = ", ".join(variables[:-1]) + " and " + variables[-1] else: # Skip empty category if len(variables) == 0: variables_str = "" else: # If there's only one variable, no need for "and" variables_str = variables[0] # Display the category and its variables in the desired format st.markdown( f"
{category}: {variables_str}
", unsafe_allow_html=True, ) # Function to check if Response Metrics is selected st.write("") # Define the required column categories to check. required_categories = ["Response Metrics", "Spends", "Media"] # Iterate over the required categories to check for missing columns. for category in required_categories: category_columns = category_dict.get(category, []) if len(category_columns) == 0: st.warning( f"Please select at least one column for the {category} category", icon="⚠️", ) st.stop() # Filter channels that are in category_dict["Media"] / category_dict["Spends"] and in final_df.columns filtered_channels = [ channel for channel in category_dict["Media"] + category_dict["Spends"] if channel in final_df.columns ] # Combine all channels into a single list using a list comprehension all_added_channels = [] for channels in group_dict: all_added_channels += group_dict[channels] # Check for duplicated channels across groups if len(all_added_channels) != len(set(all_added_channels)): st.warning( "A channel can only be grouped once, and duplicate groupings are not permitted", icon="⚠️", ) st.stop() # Check if all filtered channels are present in group_dict if not set(filtered_channels) == set(all_added_channels): st.warning("Please group all media channels", icon="⚠️") st.stop() # Store final dataframe and bin dictionary into session state st.session_state["final_df"], st.session_state["bin_dict"] = ( final_df, category_dict, ) # Save the DataFrame and dictionary from the session state to the pickle file if st.button("Accept and Save", use_container_width=True): update_db("1_Data_Import.py") project_dct_path = os.path.join( st.session_state["project_path"], "project_dct.pkl" ) with open(project_dct_path, "wb") as f: pickle.dump(st.session_state["project_dct"], f) data_path = os.path.join( st.session_state["project_path"], "data_import.pkl" ) st.session_state["data_path"] = data_path save_to_pickle( data_path, st.session_state["final_df"], st.session_state["bin_dict"], ) ## ADD Exog vars to channels & save channels if len(category_dict["Exogenous"]) > 0: for exog_var in category_dict["Exogenous"]: group_dict[exog_var] = [exog_var] with open( os.path.join(st.session_state["project_path"], "channel_groups.pkl"), "wb", ) as f: pickle.dump(group_dict, f) st.session_state["project_dct"]["data_import"][ "edited_stats_df" ] = edited_stats_df st.session_state["project_dct"]["data_import"]["merged_df"] = merged_df st.session_state["project_dct"]["data_import"][ "missing_stats_df" ] = missing_stats_df st.session_state["project_dct"]["data_import"]["cat_dct"] = { col: cat for col, cat in zip( edited_stats_df["Column"], edited_stats_df["Category"] ) } st.session_state["project_dct"]["data_import"][ "numeric_columns" ] = numeric_columns st.session_state["project_dct"]["data_import"]["default_df"] = default_df st.session_state["project_dct"]["data_import"]["final_df"] = final_df st.session_state["project_dct"]["data_import"]["edited_df"] = edited_df st.toast("💾 Saved Successfully!")