# Importing necessary libraries
import streamlit as st
st.set_page_config(
page_title="Data Import",
page_icon=":shark:",
layout="wide",
initial_sidebar_state="collapsed",
)
import os
import re
import pickle
import sqlite3
import pandas as pd
from utilities import set_header, load_local_css, update_db, project_selection
load_local_css("styles.css")
set_header()
if "project_name" not in st.session_state:
st.session_state["project_name"] = None
if "project_dct" not in st.session_state:
project_selection()
st.stop()
if "username" in st.session_state and st.session_state["username"] is not None:
cols1 = st.columns([2, 1])
with cols1[0]:
st.markdown(f"**Welcome {st.session_state['username']}**")
with cols1[1]:
st.markdown(f"**Current Project: {st.session_state['project_name']}**")
# Function to validate date column in dataframe
def validate_date_column(df):
try:
# Attempt to convert the 'Date' column to datetime
df["date"] = pd.to_datetime(df["date"], format="%d-%m-%Y")
return True
except:
return False
# Function to determine data interval
def determine_data_interval(common_freq):
if common_freq == 1:
return "daily"
elif common_freq == 7:
return "weekly"
elif 28 <= common_freq <= 31:
return "monthly"
else:
return "irregular"
# Function to read each uploaded Excel file into a pandas DataFrame and stores them in a dictionary
st.cache_resource(show_spinner=False)
def files_to_dataframes(uploaded_files):
df_dict = {}
for uploaded_file in uploaded_files:
# Extract file name without extension
file_name = uploaded_file.name.rsplit(".", 1)[0]
# Check for duplicate file names
if file_name in df_dict:
st.warning(
f"Duplicate File: {file_name}. This file will be skipped.",
icon="⚠️",
)
continue
# Read the file into a DataFrame
df = pd.read_excel(uploaded_file)
# Convert all column names to lowercase
df.columns = df.columns.str.lower().str.strip()
# Separate numeric and non-numeric columns
numeric_cols = list(df.select_dtypes(include=["number"]).columns)
non_numeric_cols = [
col
for col in df.select_dtypes(exclude=["number"]).columns
if col.lower() != "date"
]
# Check for 'Date' column
if not (validate_date_column(df) and len(numeric_cols) > 0):
st.warning(
f"File Name: {file_name} ➜ Please upload data with Date column in 'DD-MM-YYYY' format and at least one media/exogenous column. This file will be skipped.",
icon="⚠️",
)
continue
# Check for interval
common_freq = common_freq = (
pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0]
)
# Calculate the data interval (daily, weekly, monthly or irregular)
interval = determine_data_interval(common_freq)
if interval == "irregular":
st.warning(
f"File Name: {file_name} ➜ Please upload data in daily, weekly or monthly interval. This file will be skipped.",
icon="⚠️",
)
continue
# Store both DataFrames in the dictionary under their respective keys
df_dict[file_name] = {
"numeric": numeric_cols,
"non_numeric": non_numeric_cols,
"interval": interval,
"df": df,
}
return df_dict
# Function to adjust dataframe granularity
def adjust_dataframe_granularity(df, current_granularity, target_granularity):
# Set index
df.set_index("date", inplace=True)
# Define aggregation rules for resampling
aggregation_rules = {
col: "sum" if pd.api.types.is_numeric_dtype(df[col]) else "first"
for col in df.columns
}
# Initialize resampled_df
resampled_df = df
if current_granularity == "daily" and target_granularity == "weekly":
resampled_df = df.resample("W-MON", closed="left", label="left").agg(
aggregation_rules
)
elif current_granularity == "daily" and target_granularity == "monthly":
resampled_df = df.resample("MS", closed="left", label="left").agg(
aggregation_rules
)
elif current_granularity == "daily" and target_granularity == "daily":
resampled_df = df.resample("D").agg(aggregation_rules)
elif (
current_granularity in ["weekly", "monthly"]
and target_granularity == "daily"
):
# For higher to lower granularity, distribute numeric and replicate non-numeric values equally across the new period
expanded_data = []
for _, row in df.iterrows():
if current_granularity == "weekly":
period_range = pd.date_range(start=row.name, periods=7)
elif current_granularity == "monthly":
period_range = pd.date_range(
start=row.name, periods=row.name.days_in_month
)
for date in period_range:
new_row = {}
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
if current_granularity == "weekly":
new_row[col] = row[col] / 7
elif current_granularity == "monthly":
new_row[col] = row[col] / row.name.days_in_month
else:
new_row[col] = row[col]
expanded_data.append((date, new_row))
resampled_df = pd.DataFrame(
[data for _, data in expanded_data],
index=[date for date, _ in expanded_data],
)
# Reset index
resampled_df = resampled_df.reset_index().rename(columns={"index": "date"})
return resampled_df
# Function to clean and extract unique values of Panel_1 and Panel_2
st.cache_resource(show_spinner=False)
def clean_and_extract_unique_values(files_dict, selections):
all_panel1_values = set()
all_panel2_values = set()
for file_name, file_data in files_dict.items():
df = file_data["df"]
# 'Panel_1' and 'Panel_2' selections
selected_panel1 = selections[file_name].get("Panel_1")
selected_panel2 = selections[file_name].get("Panel_2")
# Clean and standardize Panel_1 column if it exists and is selected
if (
selected_panel1
and selected_panel1 != "N/A"
and selected_panel1 in df.columns
):
df[selected_panel1] = (
df[selected_panel1].str.lower().str.strip().str.replace("_", " ")
)
all_panel1_values.update(df[selected_panel1].dropna().unique())
# Clean and standardize Panel_2 column if it exists and is selected
if (
selected_panel2
and selected_panel2 != "N/A"
and selected_panel2 in df.columns
):
df[selected_panel2] = (
df[selected_panel2].str.lower().str.strip().str.replace("_", " ")
)
all_panel2_values.update(df[selected_panel2].dropna().unique())
# Update the processed DataFrame back in the dictionary
files_dict[file_name]["df"] = df
return all_panel1_values, all_panel2_values
# Function to format values for display
st.cache_resource(show_spinner=False)
def format_values_for_display(values_list):
# Capitalize the first letter of each word and replace underscores with spaces
formatted_list = [value.replace("_", " ").title() for value in values_list]
# Join values with commas and 'and' before the last value
if len(formatted_list) > 1:
return ", ".join(formatted_list[:-1]) + ", and " + formatted_list[-1]
elif formatted_list:
return formatted_list[0]
return "No values available"
# Function to normalizes all data within files_dict to a daily granularity
st.cache(show_spinner=False, allow_output_mutation=True)
def standardize_data_to_daily(files_dict, selections):
# Normalize all data to a daily granularity using a provided function
files_dict = apply_granularity_to_all(files_dict, "daily", selections)
# Update the "interval" attribute for each dataset to indicate the new granularity
for files_name, files_data in files_dict.items():
files_data["interval"] = "daily"
return files_dict
# Function to apply granularity transformation to all DataFrames in files_dict
st.cache_resource(show_spinner=False)
def apply_granularity_to_all(files_dict, granularity_selection, selections):
for file_name, file_data in files_dict.items():
df = file_data["df"].copy()
# Handling when Panel_1 or Panel_2 might be 'N/A'
selected_panel1 = selections[file_name].get("Panel_1")
selected_panel2 = selections[file_name].get("Panel_2")
# Correcting the segment selection logic & handling 'N/A'
if selected_panel1 != "N/A" and selected_panel2 != "N/A":
unique_combinations = df[
[selected_panel1, selected_panel2]
].drop_duplicates()
elif selected_panel1 != "N/A":
unique_combinations = df[[selected_panel1]].drop_duplicates()
selected_panel2 = None # Ensure Panel_2 is ignored if N/A
elif selected_panel2 != "N/A":
unique_combinations = df[[selected_panel2]].drop_duplicates()
selected_panel1 = None # Ensure Panel_1 is ignored if N/A
else:
# If both are 'N/A', process the entire dataframe as is
df = adjust_dataframe_granularity(
df, file_data["interval"], granularity_selection
)
files_dict[file_name]["df"] = df
continue # Skip to the next file
transformed_segments = []
for _, combo in unique_combinations.iterrows():
if selected_panel1 and selected_panel2:
segment = df[
(df[selected_panel1] == combo[selected_panel1])
& (df[selected_panel2] == combo[selected_panel2])
]
elif selected_panel1:
segment = df[df[selected_panel1] == combo[selected_panel1]]
elif selected_panel2:
segment = df[df[selected_panel2] == combo[selected_panel2]]
# Adjust granularity of the segment
transformed_segment = adjust_dataframe_granularity(
segment, file_data["interval"], granularity_selection
)
transformed_segments.append(transformed_segment)
# Combine all transformed segments into a single DataFrame for this file
transformed_df = pd.concat(transformed_segments, ignore_index=True)
files_dict[file_name]["df"] = transformed_df
return files_dict
# Function to create main dataframe structure
st.cache_resource(show_spinner=False)
def create_main_dataframe(
files_dict, all_panel1_values, all_panel2_values, granularity_selection
):
# Determine the global start and end dates across all DataFrames
global_start = min(df["df"]["date"].min() for df in files_dict.values())
global_end = max(df["df"]["date"].max() for df in files_dict.values())
# Adjust the date_range generation based on the granularity_selection
if granularity_selection == "weekly":
# Generate a weekly range, with weeks starting on Monday
date_range = pd.date_range(start=global_start, end=global_end, freq="W-MON")
elif granularity_selection == "monthly":
# Generate a monthly range, starting from the first day of each month
date_range = pd.date_range(start=global_start, end=global_end, freq="MS")
else: # Default to daily if not weekly or monthly
date_range = pd.date_range(start=global_start, end=global_end, freq="D")
# Collect all unique Panel_1 and Panel_2 values, excluding 'N/A'
all_panel1s = all_panel1_values
all_panel2s = all_panel2_values
# Dynamically build the list of dimensions (Panel_1, Panel_2) to include in the main DataFrame based on availability
dimensions, merge_keys = [], []
if all_panel1s:
dimensions.append(all_panel1s)
merge_keys.append("Panel_1")
if all_panel2s:
dimensions.append(all_panel2s)
merge_keys.append("Panel_2")
dimensions.append(date_range) # Date range is always included
merge_keys.append("date") # Date range is always included
# Create a main DataFrame template with the dimensions
main_df = pd.MultiIndex.from_product(
dimensions,
names=[name for name, _ in zip(merge_keys, dimensions)],
).to_frame(index=False)
return main_df.reset_index(drop=True)
# Function to prepare and merge dataFrames
st.cache_resource(show_spinner=False)
def merge_into_main_df(main_df, files_dict, selections):
for file_name, file_data in files_dict.items():
df = file_data["df"].copy()
# Rename selected Panel_1 and Panel_2 columns if not 'N/A'
selected_panel1 = selections[file_name].get("Panel_1", "N/A")
selected_panel2 = selections[file_name].get("Panel_2", "N/A")
if selected_panel1 != "N/A":
df.rename(columns={selected_panel1: "Panel_1"}, inplace=True)
if selected_panel2 != "N/A":
df.rename(columns={selected_panel2: "Panel_2"}, inplace=True)
# Merge current DataFrame into main_df based on 'date', and where applicable, 'Panel_1' and 'Panel_2'
merge_keys = ["date"]
if "Panel_1" in df.columns:
merge_keys.append("Panel_1")
if "Panel_2" in df.columns:
merge_keys.append("Panel_2")
main_df = pd.merge(main_df, df, on=merge_keys, how="left")
# After all merges, sort by 'date' and reset index for cleanliness
sort_by = ["date"]
if "Panel_1" in main_df.columns:
sort_by.append("Panel_1")
if "Panel_2" in main_df.columns:
sort_by.append("Panel_2")
main_df.sort_values(by=sort_by, inplace=True)
main_df.reset_index(drop=True, inplace=True)
return main_df
# Function to categorize column
def categorize_column(column_name):
# Define keywords for each category
internal_keywords = [
"Internal",
"Price",
"Discount",
"product_price",
"cost",
"margin",
"inventory",
"sales",
"revenue",
"turnover",
"expense",
]
exogenous_keywords = [
"Exogenous",
"GDP",
"Tax",
"Inflation",
"interest_rate",
"employment_rate",
"exchange_rate",
"consumer_spending",
"retail_sales",
"oil_prices",
"weather",
]
# Check if the column name matches any of the keywords for Internal or Exogenous categories
if (
column_name
in st.session_state["project_dct"]["data_import"]["cat_dct"].keys()
and st.session_state["project_dct"]["data_import"]["cat_dct"][column_name]
is not None
):
return st.session_state["project_dct"]["data_import"]["cat_dct"][
column_name
]
else:
for keyword in ["Response", "Metric"]:
if keyword.lower() in column_name.lower():
return "Response Metrics"
for keyword in ["Spend", "Cost"]:
if keyword.lower() in column_name.lower():
return "Spends"
for keyword in internal_keywords:
if keyword.lower() in column_name.lower():
return "Internal"
for keyword in exogenous_keywords:
if keyword.lower() in column_name.lower():
return "Exogenous"
# Default to Media if no match found
return "Media"
# Function to calculate missing stats and prepare for editable DataFrame
st.cache_resource(show_spinner=False)
def prepare_missing_stats_df(df):
missing_stats = []
for column in df.columns:
if (
column == "date" or column == "Panel_2" or column == "Panel_1"
): # Skip Date, Panel_1 and Panel_2 column
continue
missing = df[column].isnull().sum()
pct_missing = round((missing / len(df)) * 100, 2)
# Dynamically assign category based on column name
category = categorize_column(column)
# category = "Media" # Keep default bin as Media
missing_stats.append(
{
"Column": column,
"Missing Values": missing,
"Missing Percentage": pct_missing,
"Impute Method": "Fill with 0", # Default value
"Category": category,
}
)
stats_df = pd.DataFrame(missing_stats)
return stats_df
# Function to add API DataFrame details to the files dictionary
st.cache_resource(show_spinner=False)
def add_api_dataframe_to_dict(main_df, files_dict):
files_dict["API"] = {
"numeric": list(main_df.select_dtypes(include=["number"]).columns),
"non_numeric": [
col
for col in main_df.select_dtypes(exclude=["number"]).columns
if col.lower() != "date"
],
"interval": determine_data_interval(
pd.Series(main_df["date"].unique()).diff().dt.days.dropna().mode()[0]
),
"df": main_df,
}
return files_dict
# Function to reads an API into a DataFrame, parsing specified columns as datetime
# @st.cache_resource(show_spinner=False)
def read_API_data(project_folder_path, file_path, file_name):
# Paths using os.path
file_path_os = os.path.join(os.getcwd(), "API_data", file_name)
project_folder_path_os = os.path.normpath(project_folder_path)
# Construct the full path of the file in the project folder
project_file_path = os.path.join(project_folder_path_os, file_name)
# Check if the file with the same name exists in the project path
if os.path.exists(project_file_path):
# If the file exists, load and return the existing file
return pd.read_excel(project_file_path, parse_dates=["Date"])
else:
# If the file does not exist, read the new file
data = pd.read_excel(file_path_os, parse_dates=["Date"])
# Save the new file to the project folder
data.to_excel(project_file_path, index=False)
# Return the data
return data
# Function to set the 'Panel_1_Panel_2_Selected' session state variable to False
def set_Panel_1_Panel_2_Selected_false():
st.session_state["Panel_1_Panel_2_Selected"] = False
# Restoring project_dct to default values when user modify any widgets
st.session_state["project_dct"]["data_import"]["edited_stats_df"] = None
st.session_state["project_dct"]["data_import"]["merged_df"] = None
st.session_state["project_dct"]["data_import"]["missing_stats_df"] = None
st.session_state["project_dct"]["data_import"]["cat_dct"] = {}
st.session_state["project_dct"]["data_import"]["numeric_columns"] = None
st.session_state["project_dct"]["data_import"]["default_df"] = None
st.session_state["project_dct"]["data_import"]["final_df"] = None
st.session_state["project_dct"]["data_import"]["edited_df"] = None
# Function to serialize and save the objects into a pickle file
@st.cache_resource(show_spinner=False)
def save_to_pickle(file_path, final_df, bin_dict):
# Open the file in write-binary mode and dump the objects
with open(file_path, "wb") as f:
pickle.dump(
{"final_df": final_df, "bin_dict": bin_dict}, f
) # Data is now saved to file
# Function to processes the merged_df DataFrame based on operations defined in edited_df
@st.cache_resource(show_spinner=False)
def process_dataframes(merged_df, edited_df, edited_stats_df):
# Ensure there are operations defined by the user
if edited_df.empty:
return merged_df, edited_stats_df # No operations to apply
# Perform operations as defined by the user
else:
for index, row in edited_df.iterrows():
result_column_name = (
f"{row['Column 1']}{row['Operator']}{row['Column 2']}"
)
col1 = row["Column 1"]
col2 = row["Column 2"]
op = row["Operator"]
# Apply the specified operation
if op == "+":
merged_df[result_column_name] = merged_df[col1] + merged_df[col2]
elif op == "-":
merged_df[result_column_name] = merged_df[col1] - merged_df[col2]
elif op == "*":
merged_df[result_column_name] = merged_df[col1] * merged_df[col2]
elif op == "/":
merged_df[result_column_name] = merged_df[col1] / merged_df[
col2
].replace(0, 1e-9)
# Add summary of operation to edited_stats_df
new_row = {
"Column": result_column_name,
"Missing Values": None,
"Missing Percentage": None,
"Impute Method": None,
"Category": row["Category"],
}
new_row_df = pd.DataFrame([new_row])
# Use pd.concat to add the new_row_df to edited_stats_df
edited_stats_df = pd.concat(
[edited_stats_df, new_row_df], ignore_index=True, axis=0
)
# Combine column names from edited_df for cleanup
combined_columns = set(edited_df["Column 1"]).union(
set(edited_df["Column 2"])
)
# Filter out rows in edited_stats_df and drop columns from merged_df
edited_stats_df = edited_stats_df[
~edited_stats_df["Column"].isin(combined_columns)
]
merged_df.drop(
columns=list(combined_columns), errors="ignore", inplace=True
)
return merged_df, edited_stats_df
# Function to prepare a list of numeric column names and initialize an empty DataFrame with predefined structure
st.cache_resource(show_spinner=False)
def prepare_numeric_columns_and_default_df(merged_df, edited_stats_df):
# Get columns categorized as 'Response Metrics'
columns_response_metrics = edited_stats_df[
edited_stats_df["Category"] == "Response Metrics"
]["Column"].tolist()
# Filter numeric columns, excluding those categorized as 'Response Metrics'
numeric_columns = [
col
for col in merged_df.select_dtypes(include=["number"]).columns
if col not in columns_response_metrics
]
# Define the structure of the empty DataFrame
data = {
"Column 1": pd.Series([], dtype="str"),
"Operator": pd.Series([], dtype="str"),
"Column 2": pd.Series([], dtype="str"),
"Category": pd.Series([], dtype="str"),
}
default_df = pd.DataFrame(data)
return numeric_columns, default_df
# function to reset to default values in project_dct:
# Initialize 'final_df' in session state
if "final_df" not in st.session_state:
st.session_state["final_df"] = pd.DataFrame()
# Initialize 'bin_dict' in session state
if "bin_dict" not in st.session_state:
st.session_state["bin_dict"] = {}
# Initialize 'Panel_1_Panel_2_Selected' in session state
if "Panel_1_Panel_2_Selected" not in st.session_state:
st.session_state["Panel_1_Panel_2_Selected"] = False
# Page Title
st.write("") # Top padding
st.title("Data Import")
conn = sqlite3.connect(
r"DB\User.db", check_same_thread=False
) # connection with sql db
c = conn.cursor()
#########################################################################################################################################################
# Create a dictionary to hold all DataFrames and collect user input to specify "Panel_2" and "Panel_1" columns for each file
#########################################################################################################################################################
# Read the Excel file, parsing 'Date' column as datetime
main_df = read_API_data(
project_folder_path=st.session_state["project_path"],
file_path=st.session_state["project_dct"]["data_import"]["api_path"],
file_name=st.session_state["project_dct"]["data_import"]["api_name"] + ".xlsx",
)
# Convert all column names to lowercase
main_df.columns = main_df.columns.str.lower().str.strip()
# File uploader
uploaded_files = st.file_uploader(
"Upload additional data",
type=["xlsx"],
accept_multiple_files=True,
on_change=set_Panel_1_Panel_2_Selected_false,
)
# Custom HTML for upload instructions
recommendation_html = f"""
Recommendation: For optimal processing, please ensure that all uploaded datasets including panel, media, internal, and exogenous data adhere to the following guidelines: Each dataset must include a Date
column formatted as DD-MM-YYYY
, be free of missing values.
"""
st.markdown(recommendation_html, unsafe_allow_html=True)
# RAW API DATA
st.markdown("#### API Data")
with st.expander("API Data", expanded=False):
st.dataframe(main_df, hide_index=True)
# Choose Desired Granularity
st.markdown("#### Choose Desired Granularity")
# Granularity Selection
granularity_selection = st.selectbox(
"Choose Date Granularity",
["Daily", "Weekly", "Monthly"],
label_visibility="collapsed",
on_change=set_Panel_1_Panel_2_Selected_false,
index=st.session_state["project_dct"]["data_import"][
"granularity_selection"
], # resume
)
# st.write(st.session_state['project_dct']['data_import']['granularity_selection'])
st.session_state["project_dct"]["data_import"]["granularity_selection"] = [
"Daily",
"Weekly",
"Monthly",
].index(granularity_selection)
# st.write(st.session_state['project_dct']['data_import']['granularity_selection'])
granularity_selection = str(granularity_selection).lower()
# Convert files to dataframes
files_dict = files_to_dataframes(uploaded_files)
# Add API Dataframe
if main_df is not None:
files_dict = add_api_dataframe_to_dict(main_df, files_dict)
# Display a warning message if no files have been uploaded and halt further execution
if not files_dict:
st.warning(
"Please upload at least one file to proceed.",
icon="⚠️",
)
st.stop() # Halts further execution until file is uploaded
# Select Panel_1 and Panel_2 columns
st.markdown("#### Select Panel columns")
selections = {}
with st.expander("Select Panel columns", expanded=False):
count = 0 # Initialize counter to manage the visibility of labels and keys
unique_numeric_columns = (
set()
) # Initialize a set to keep track of unique numeric column names
for file_name, file_data in files_dict.items():
# Extract the numeric column names from the current file
numeric_columns = file_data["numeric"]
# Regular expression pattern to match valid column names (letters, numbers, and underscores)
valid_column_pattern = re.compile(r"^[A-Za-z0-9_]+$")
# Check for duplicates
for column in numeric_columns:
if column in unique_numeric_columns:
# If a duplicate is found, display a warning and halt execution
st.warning(
f"Duplicate column name '{column}' found in file '{file_name}'. Each column name must be unique across all files.",
icon="⚠️",
)
st.stop()
# Add the column to the set if it's not already there
unique_numeric_columns.add(column)
# Check if the column name is valid
if not valid_column_pattern.match(column):
st.warning(
f"Column name '{column}' in file '{file_name}' contains invalid characters. "
f"Column names should only contain letters (A-Z, a-z), numbers (0-9), and underscores (_).",
icon="⚠️",
)
st.stop()
# Generatimg project dct keys dynamically
if (
f"Panel_1_selectbox{file_name}"
not in st.session_state["project_dct"]["data_import"].keys()
):
st.session_state["project_dct"]["data_import"][
f"Panel_1_selectbox{file_name}"
] = 0
if (
f"Panel_2_selectbox{file_name}"
not in st.session_state["project_dct"]["data_import"].keys()
):
st.session_state["project_dct"]["data_import"][
f"Panel_2_selectbox{file_name}"
] = 0
# Determine visibility of the label based on the count
if count == 0:
label_visibility = "visible"
else:
label_visibility = "collapsed"
# Extract non-numeric columns
non_numeric_cols = file_data["non_numeric"]
# Prepare Panel_1 and Panel_2 values for dropdown, adding "N/A" as an option
panel1_values = non_numeric_cols + ["N/A"]
panel2_values = non_numeric_cols + ["N/A"]
# Skip if only one option is available
if len(panel1_values) == 1 and len(panel2_values) == 1:
selected_panel1, selected_panel2 = "N/A", "N/A"
# Update the selections for Panel_1 and Panel_2 for the current file
selections[file_name] = {
"Panel_1": selected_panel1,
"Panel_2": selected_panel2,
}
continue
# Create layout columns for File Name, Panel_2, and Panel_1 selections
file_name_col, Panel_1_col, Panel_2_col = st.columns([2, 4, 4])
with file_name_col:
# Display "File Name" label only for the first file
if count == 0:
st.write("File Name")
else:
st.write("")
st.write(file_name) # Display the file name
with Panel_1_col:
# Display a selectbox for Panel_1 values
selected_panel1 = st.selectbox(
"Select Panel Level 1",
panel2_values,
on_change=set_Panel_1_Panel_2_Selected_false,
label_visibility=label_visibility, # Control visibility of the label
key=f"Panel_1_selectbox{count}", # Ensure unique key for each selectbox
index=st.session_state["project_dct"]["data_import"][
f"Panel_1_selectbox{file_name}"
],
)
st.session_state["project_dct"]["data_import"][
f"Panel_1_selectbox{file_name}"
] = panel2_values.index(selected_panel1)
with Panel_2_col:
# Display a selectbox for Panel_2 values
selected_panel2 = st.selectbox(
"Select Panel Level 2",
panel1_values,
on_change=set_Panel_1_Panel_2_Selected_false,
label_visibility=label_visibility, # Control visibility of the label
key=f"Panel_2_selectbox{count}", # Ensure unique key for each selectbox
index=st.session_state["project_dct"]["data_import"][
f"Panel_2_selectbox{file_name}"
],
)
st.session_state["project_dct"]["data_import"][
f"Panel_2_selectbox{file_name}"
] = panel1_values.index(selected_panel2)
# Check for potential data integrity issues
if selected_panel2 == selected_panel1 and not (
selected_panel2 == "N/A" and selected_panel1 == "N/A"
):
st.warning(
f"File: {file_name} → The same column cannot serve as both Panel_1 and Panel_2. Please adjust your selections.",
)
selected_panel1, selected_panel2 = "N/A", "N/A"
st.stop()
# Check for potential data integrity issues
if len(non_numeric_cols) > 2:
st.warning(
f"File: {file_name} → The input file contains more than two non-numeric/panel columns. Please verify the file's contents.",
)
st.stop()
# Total selected panel level
selected_panels_count = (1 if selected_panel1 != "N/A" else 0) + (
1 if selected_panel2 != "N/A" else 0
)
# Check for potential data integrity issues
if len(non_numeric_cols) != selected_panels_count:
st.warning(
f"File: {file_name} → The number of non-numeric columns selected does not match the expected panel count. Please ensure all required columns are selected.",
)
st.stop()
# Update the selections for Panel_1 and Panel_2 for the current file
selections[file_name] = {
"Panel_1": selected_panel1,
"Panel_2": selected_panel2,
}
count += 1 # Increment the counter after processing each file
st.write()
# Accept Panel_1 and Panel_2 selection
accept = st.button("Accept and Process", use_container_width=True)
if (
accept == False
and st.session_state["project_dct"]["data_import"]["edited_stats_df"]
is not None
):
# st.write(st.session_state['project_dct'])
st.markdown("#### Unique Panel values")
# Display Panel_1 and Panel_2 values
with st.expander("Unique Panel values"):
st.write("")
st.markdown(
f"""
Panel Level 1 Values: {st.session_state['project_dct']['data_import']['formatted_panel1_values']}
Panel Level 2 Values: {st.session_state['project_dct']['data_import']['formatted_panel2_values']}
""",
unsafe_allow_html=True,
)
# Display total Panel_1 and Panel_2
st.write("")
st.markdown(
f"""
Number of Level 1 Panels detected: {len(st.session_state['project_dct']['data_import']['formatted_panel2_values'])}
Number of Level 2 Panels detected: {len(st.session_state['project_dct']['data_import']['formatted_panel2_values'])}
""",
unsafe_allow_html=True,
)
st.write("")
# Create an editable DataFrame in Streamlit
st.markdown("#### Select Variables Category & Impute Missing Values")
merged_df = st.session_state["project_dct"]["data_import"]["merged_df"].copy()
missing_stats_df = st.session_state["project_dct"]["data_import"][
"missing_stats_df"
]
editable_df = st.session_state["project_dct"]["data_import"]["edited_stats_df"]
sorted_editable_df = editable_df.sort_values(
by="Missing Values", ascending=False, na_position="first"
)
edited_stats_df = st.data_editor(
sorted_editable_df,
column_config={
"Impute Method": st.column_config.SelectboxColumn(
options=[
"Drop Column",
"Fill with Mean",
"Fill with Median",
"Fill with 0",
],
required=True,
default="Fill with 0",
),
"Category": st.column_config.SelectboxColumn(
options=[
"Spends",
"Media",
"Exogenous",
"Internal",
"Response Metrics",
],
required=True,
default="Media",
),
},
disabled=["Column", "Missing Values", "Missing Percentage"],
hide_index=True,
use_container_width=True,
key="data-editor-1",
)
st.session_state["project_dct"]["data_import"]["cat_dct"] = {
col: cat
for col, cat in zip(edited_stats_df["Column"], edited_stats_df["Category"])
if col in merged_df.columns
}
for i, row in edited_stats_df.iterrows():
column = row["Column"]
if (
column
not in st.session_state["project_dct"]["data_import"]["cat_dct"].keys()
):
continue
if row["Impute Method"] == "Drop Column":
merged_df.drop(columns=[column], inplace=True)
elif row["Impute Method"] == "Fill with Mean":
merged_df[column].fillna(
st.session_state["project_dct"]["data_import"]["merged_df"][
column
].mean(),
inplace=True,
)
elif row["Impute Method"] == "Fill with Median":
merged_df[column].fillna(
st.session_state["project_dct"]["data_import"]["merged_df"][
column
].median(),
inplace=True,
)
elif row["Impute Method"] == "Fill with 0":
merged_df[column].fillna(0, inplace=True)
#########################################################################################################################################################
# Group columns
#########################################################################################################################################################
# Display Group columns header
numeric_columns = st.session_state["project_dct"]["data_import"][
"numeric_columns"
]
default_df = st.session_state["project_dct"]["data_import"]["default_df"]
st.markdown("#### Feature engineering")
edited_df = st.data_editor(
st.session_state["project_dct"]["data_import"]["edited_df"],
column_config={
"Column 1": st.column_config.SelectboxColumn(
options=numeric_columns,
required=True,
width=400,
),
"Operator": st.column_config.SelectboxColumn(
options=["+", "-", "*", "/"],
required=True,
default="+",
width=100,
),
"Column 2": st.column_config.SelectboxColumn(
options=numeric_columns,
required=True,
default=numeric_columns[0],
width=400,
),
"Category": st.column_config.SelectboxColumn(
options=[
"Media",
"Exogenous",
"Internal",
"Response Metrics",
],
required=True,
default="Media",
width=200,
),
},
num_rows="dynamic",
key="data-editor-4",
)
final_df, edited_stats_df = process_dataframes(
merged_df, edited_df, edited_stats_df
)
st.markdown("#### Final DataFrame")
sort_col = []
for col in final_df.columns:
if col in ["Panel_1", "Panel_2", "date"]:
sort_col.append(col)
sorted_final_df = final_df.sort_values(
by=sort_col, ascending=True, na_position="first"
)
st.dataframe(sorted_final_df, hide_index=True)
# Initialize an empty dictionary to hold categories and their variables
category_dict = {
"Spends": [],
"Media": [],
"Exogenous": [],
"Internal": [],
"Response Metrics": [],
}
# Iterate over each row in the edited DataFrame to populate the dictionary
for i, row in edited_stats_df.iterrows():
column = row["Column"]
category = row[
"Category"
] # The category chosen by the user for this variable
if column not in list(final_df.columns): # Skip columns that are dropped
continue
# Check if the category already exists in the dictionary
if category not in category_dict:
# If not, initialize it with the current column as its first element
category_dict[category] = [column]
else:
# If it exists, append the current column to the list of variables under this category
category_dict[category].append(column)
# Add Date, Panel_1 and Panel_12 in category dictionary
category_dict.update({"Date": ["date"]})
if "Panel_1" in final_df.columns:
category_dict["Panel Level 1"] = ["Panel_1"]
if "Panel_2" in final_df.columns:
category_dict["Panel Level 2"] = ["Panel_2"]
###################################### Group Media Channels ######################################
with st.expander("Group Media Channels"):
media_channels = category_dict["Media"]
spends_channels = category_dict["Spends"]
allowed_channels_bin = media_channels + spends_channels
group_selection_placeholder = st.container()
total_groups = st.number_input("Total Groups", value=0)
try:
total_groups = int(total_groups)
except:
total_groups = 0
group_dict = {}
channels_added = set()
with group_selection_placeholder:
for i in range(total_groups):
group_name_inp_col, group_col = st.columns([1, 4])
group_name = group_name_inp_col.text_input(
"Group name", key=f"group_name_{i}"
)
# Filter the allowed channels by removing those already added, then sort the list
allowed_channels = sorted(
[
channel
for channel in allowed_channels_bin
if channel not in channels_added
],
key=lambda x: x.split("_")[
0
], # Split each string by '_' and sort by the first part
)
selected_channels = group_col.multiselect(
"Select channels to group",
options=allowed_channels,
key=f"selected_channels_key_{i}",
)
if ((group_name is not None) and (group_name != "")) and (
len(selected_channels) > 0
):
group_dict[group_name] = selected_channels
channels_added.update(selected_channels)
###################################### Group Media Channels ######################################
# Display the dictionary
st.markdown("#### Variable Category")
for category, variables in category_dict.items():
# Check if there are multiple variables to handle "and" insertion correctly
if len(variables) > 1:
# Join all but the last variable with ", ", then add " and " before the last variable
variables_str = ", ".join(variables[:-1]) + " and " + variables[-1]
else:
# Skip empty category
if len(variables) == 0:
variables_str = ""
else:
# If there's only one variable, no need for "and"
variables_str = variables[0]
# Display the category and its variables in the desired format
st.markdown(
f"{category}: {variables_str}
",
unsafe_allow_html=True,
)
# Function to check if Response Metrics is selected
st.write("")
# Define the required column categories to check.
required_categories = ["Response Metrics", "Spends", "Media"]
# Iterate over the required categories to check for missing columns.
for category in required_categories:
category_columns = category_dict.get(category, [])
if len(category_columns) == 0:
st.warning(
f"Please select at least one column for the {category} category",
icon="⚠️",
)
st.stop()
# Filter channels that are in category_dict["Media"] / category_dict["Spends"] and in final_df.columns
filtered_channels = [
channel
for channel in category_dict["Media"] + category_dict["Spends"]
if channel in final_df.columns
]
# Combine all channels into a single list using a list comprehension
all_added_channels = []
for channels in group_dict:
all_added_channels += group_dict[channels]
# Check for duplicated channels across groups
if len(all_added_channels) != len(set(all_added_channels)):
st.warning(
"A channel can only be grouped once, and duplicate groupings are not permitted",
icon="⚠️",
)
st.stop()
# Check if all filtered channels are present in group_dict
if not set(filtered_channels) == set(all_added_channels):
st.warning("Please group all media channels", icon="⚠️")
st.stop()
# Store final dataframe and bin dictionary into session state
st.session_state["final_df"], st.session_state["bin_dict"] = (
final_df,
category_dict,
)
# Save the DataFrame and dictionary from the session state to the pickle file
if st.button(
"Accept and Save",
use_container_width=True,
key="data-editor-button",
):
update_db("1_Data_Import.py")
final_df = final_df.loc[:, ~final_df.columns.duplicated()]
project_dct_path = os.path.join(
st.session_state["project_path"], "project_dct.pkl"
)
with open(project_dct_path, "wb") as f:
pickle.dump(st.session_state["project_dct"], f)
data_path = os.path.join(
st.session_state["project_path"], "data_import.pkl"
)
st.session_state["data_path"] = data_path
save_to_pickle(
data_path,
st.session_state["final_df"],
st.session_state["bin_dict"],
)
st.session_state["project_dct"]["data_import"][
"edited_stats_df"
] = edited_stats_df
st.session_state["project_dct"]["data_import"]["merged_df"] = merged_df
st.session_state["project_dct"]["data_import"][
"missing_stats_df"
] = missing_stats_df
st.session_state["project_dct"]["data_import"]["cat_dct"] = {
col: cat
for col, cat in zip(
edited_stats_df["Column"], edited_stats_df["Category"]
)
}
st.session_state["project_dct"]["data_import"][
"numeric_columns"
] = numeric_columns
st.session_state["project_dct"]["data_import"]["default_df"] = default_df
st.session_state["project_dct"]["data_import"]["final_df"] = final_df
st.session_state["project_dct"]["data_import"]["edited_df"] = edited_df
st.toast("💾 Saved Successfully!")
if accept:
# Normalize all data to a daily granularity. This initial standardization simplifies subsequent conversions to other levels of granularity
with st.spinner("Processing..."):
files_dict = standardize_data_to_daily(files_dict, selections)
# Convert all data to daily level granularity
files_dict = apply_granularity_to_all(
files_dict, granularity_selection, selections
)
# Update the 'files_dict' in the session state
st.session_state["files_dict"] = files_dict
# Set a flag in the session state to indicate that selection has been made
st.session_state["Panel_1_Panel_2_Selected"] = True
#########################################################################################################################################################
# Display unique Panel_1 and Panel_2 values
#########################################################################################################################################################
# Halts further execution until Panel_1 and Panel_2 columns are selected
if st.session_state["project_dct"]["data_import"]["edited_stats_df"] is None:
if (
"files_dict" in st.session_state
and st.session_state["Panel_1_Panel_2_Selected"]
):
files_dict = st.session_state["files_dict"]
st.session_state["project_dct"]["data_import"][
"files_dict"
] = files_dict # resume
else:
st.stop()
# Set to store unique values of Panel_1 and Panel_2
with st.spinner("Fetching Panel values..."):
all_panel1_values, all_panel2_values = clean_and_extract_unique_values(
files_dict, selections
)
# List of Panel_1 and Panel_2 columns unique values
list_of_all_panel1_values = list(all_panel1_values)
list_of_all_panel2_values = list(all_panel2_values)
# Format Panel_1 and Panel_2 values for display
formatted_panel1_values = format_values_for_display(
list_of_all_panel1_values
)
formatted_panel2_values = format_values_for_display(
list_of_all_panel2_values
)
st.session_state["project_dct"]["data_import"][
"formatted_panel1_values"
] = formatted_panel1_values
st.session_state["project_dct"]["data_import"][
"formatted_panel2_values"
] = formatted_panel2_values
# Unique Panel_1 and Panel_2 values
st.markdown("#### Unique Panel values")
# Display Panel_1 and Panel_2 values
with st.expander("Unique Panel values"):
st.write("")
st.markdown(
f"""
Panel Level 1 Values: {formatted_panel1_values}
Panel Level 2 Values: {formatted_panel2_values}
""",
unsafe_allow_html=True,
)
# Display total Panel_1 and Panel_2
st.write("")
st.markdown(
f"""
Number of Level 1 Panels detected: {len(list_of_all_panel1_values)}
Number of Level 2 Panels detected: {len(list_of_all_panel2_values)}
""",
unsafe_allow_html=True,
)
st.write("")
#########################################################################################################################################################
# Merge all DataFrames
#########################################################################################################################################################
# Merge all DataFrames selected
main_df = create_main_dataframe(
files_dict,
all_panel1_values,
all_panel2_values,
granularity_selection,
)
merged_df = merge_into_main_df(main_df, files_dict, selections)
#########################################################################################################################################################
# Categorize Variables and Impute Missing Values
#########################################################################################################################################################
# Create an editable DataFrame in Streamlit
st.markdown("#### Select Variables Category & Impute Missing Values")
# Prepare missing stats DataFrame for editing
missing_stats_df = prepare_missing_stats_df(merged_df)
sorted_missing_stats_df = missing_stats_df.sort_values(
by="Missing Values", ascending=False, na_position="first"
)
edited_stats_df = st.data_editor(
sorted_missing_stats_df,
column_config={
"Impute Method": st.column_config.SelectboxColumn(
options=[
"Drop Column",
"Fill with Mean",
"Fill with Median",
"Fill with 0",
],
required=True,
default="Fill with 0",
),
"Category": st.column_config.SelectboxColumn(
options=[
"Spends",
"Media",
"Exogenous",
"Internal",
"Response Metrics",
],
required=True,
default="Media",
),
},
disabled=["Column", "Missing Values", "Missing Percentage"],
hide_index=True,
use_container_width=True,
key="data-editor-2",
)
# Apply changes based on edited DataFrame
for i, row in edited_stats_df.iterrows():
column = row["Column"]
if row["Impute Method"] == "Drop Column":
merged_df.drop(columns=[column], inplace=True)
elif row["Impute Method"] == "Fill with Mean":
merged_df[column].fillna(merged_df[column].mean(), inplace=True)
elif row["Impute Method"] == "Fill with Median":
merged_df[column].fillna(merged_df[column].median(), inplace=True)
elif row["Impute Method"] == "Fill with 0":
merged_df[column].fillna(0, inplace=True)
#########################################################################################################################################################
# Group columns
#########################################################################################################################################################
# Display Group columns header
st.markdown("#### Feature engineering")
# Prepare the numeric columns and an empty DataFrame for user input
numeric_columns, default_df = prepare_numeric_columns_and_default_df(
merged_df, edited_stats_df
)
# Display editable Dataframe
edited_df = st.data_editor(
default_df,
column_config={
"Column 1": st.column_config.SelectboxColumn(
options=numeric_columns,
required=True,
width=400,
),
"Operator": st.column_config.SelectboxColumn(
options=["+", "-", "*", "/"],
required=True,
default="+",
width=100,
),
"Column 2": st.column_config.SelectboxColumn(
options=numeric_columns,
required=True,
default=numeric_columns[0],
width=400,
),
"Category": st.column_config.SelectboxColumn(
options=[
"Media",
"Exogenous",
"Internal",
"Response Metrics",
],
required=True,
default="Media",
width=200,
),
},
num_rows="dynamic",
key="data-editor-3",
)
# Process the DataFrame based on user inputs and operations specified in edited_df
final_df, edited_stats_df = process_dataframes(
merged_df, edited_df, edited_stats_df
)
#########################################################################################################################################################
# Display the Final DataFrame and variables
#########################################################################################################################################################
# Display the Final DataFrame and variables
st.markdown("#### Final DataFrame")
sort_col = []
for col in final_df.columns:
if col in ["Panel_1", "Panel_2", "date"]:
sort_col.append(col)
sorted_final_df = final_df.sort_values(
by=sort_col, ascending=True, na_position="first"
)
st.dataframe(sorted_final_df, hide_index=True)
# Initialize an empty dictionary to hold categories and their variables
category_dict = {
"Spends": [],
"Media": [],
"Exogenous": [],
"Internal": [],
"Response Metrics": [],
}
# Iterate over each row in the edited DataFrame to populate the dictionary
for i, row in edited_stats_df.iterrows():
column = row["Column"]
category = row[
"Category"
] # The category chosen by the user for this variable
if column not in list(final_df.columns): # Skip columns that are dropped
continue
# Check if the category already exists in the dictionary
if category not in category_dict:
# If not, initialize it with the current column as its first element
category_dict[category] = [column]
else:
# If it exists, append the current column to the list of variables under this category
category_dict[category].append(column)
# Add Date, Panel_1 and Panel_12 in category dictionary
category_dict.update({"Date": ["date"]})
if "Panel_1" in final_df.columns:
category_dict["Panel Level 1"] = ["Panel_1"]
if "Panel_2" in final_df.columns:
category_dict["Panel Level 2"] = ["Panel_2"]
###################################### Group Media Channels ######################################
with st.expander("Group Media Channels"):
media_channels = category_dict["Media"]
spends_channels = category_dict["Spends"]
allowed_channels_bin = media_channels + spends_channels
group_selection_placeholder = st.container()
total_groups = st.number_input("Total Groups", value=0)
try:
total_groups = int(total_groups)
except:
total_groups = 0
group_dict = {}
channels_added = set()
with group_selection_placeholder:
for i in range(total_groups):
group_name_inp_col, group_col = st.columns([1, 4])
group_name = group_name_inp_col.text_input(
"Group name", key=f"group_name_{i}"
)
# Filter the allowed channels by removing those already added, then sort the list
allowed_channels = sorted(
[
channel
for channel in allowed_channels_bin
if channel not in channels_added
],
key=lambda x: x.split("_")[
0
], # Split each string by '_' and sort by the first part
)
selected_channels = group_col.multiselect(
"Select channels to group",
options=allowed_channels,
key=f"selected_channels_key_{i}",
)
if ((group_name is not None) and (group_name != "")) and (
len(selected_channels) > 0
):
group_dict[group_name] = selected_channels
channels_added.update(selected_channels)
###################################### Group Media Channels ######################################
# Display the dictionary
st.markdown("#### Variable Category")
for category, variables in category_dict.items():
# Skip empty category
if len(variables) == 0:
variables_str = ""
# Check if there are multiple variables to handle "and" insertion correctly
if len(variables) > 1:
# Join all but the last variable with ", ", then add " and " before the last variable
variables_str = ", ".join(variables[:-1]) + " and " + variables[-1]
else:
# Skip empty category
if len(variables) == 0:
variables_str = ""
else:
# If there's only one variable, no need for "and"
variables_str = variables[0]
# Display the category and its variables in the desired format
st.markdown(
f"{category}: {variables_str}
",
unsafe_allow_html=True,
)
# Function to check if Response Metrics is selected
st.write("")
# Define the required column categories to check.
required_categories = ["Response Metrics", "Spends", "Media"]
# Iterate over the required categories to check for missing columns.
for category in required_categories:
category_columns = category_dict.get(category, [])
if len(category_columns) == 0:
st.warning(
f"Please select at least one column for the {category} category",
icon="⚠️",
)
st.stop()
# Filter channels that are in category_dict["Media"] / category_dict["Spends"] and in final_df.columns
filtered_channels = [
channel
for channel in category_dict["Media"] + category_dict["Spends"]
if channel in final_df.columns
]
# Combine all channels into a single list using a list comprehension
all_added_channels = []
for channels in group_dict:
all_added_channels += group_dict[channels]
# Check for duplicated channels across groups
if len(all_added_channels) != len(set(all_added_channels)):
st.warning(
"A channel can only be grouped once, and duplicate groupings are not permitted",
icon="⚠️",
)
st.stop()
# Check if all filtered channels are present in group_dict
if not set(filtered_channels) == set(all_added_channels):
st.warning("Please group all media channels", icon="⚠️")
st.stop()
# Store final dataframe and bin dictionary into session state
st.session_state["final_df"], st.session_state["bin_dict"] = (
final_df,
category_dict,
)
# Save the DataFrame and dictionary from the session state to the pickle file
if st.button("Accept and Save", use_container_width=True):
update_db("1_Data_Import.py")
project_dct_path = os.path.join(
st.session_state["project_path"], "project_dct.pkl"
)
with open(project_dct_path, "wb") as f:
pickle.dump(st.session_state["project_dct"], f)
data_path = os.path.join(
st.session_state["project_path"], "data_import.pkl"
)
st.session_state["data_path"] = data_path
save_to_pickle(
data_path,
st.session_state["final_df"],
st.session_state["bin_dict"],
)
## ADD Exog vars to channels & save channels
if len(category_dict["Exogenous"]) > 0:
for exog_var in category_dict["Exogenous"]:
group_dict[exog_var] = [exog_var]
with open(
os.path.join(st.session_state["project_path"], "channel_groups.pkl"),
"wb",
) as f:
pickle.dump(group_dict, f)
st.session_state["project_dct"]["data_import"][
"edited_stats_df"
] = edited_stats_df
st.session_state["project_dct"]["data_import"]["merged_df"] = merged_df
st.session_state["project_dct"]["data_import"][
"missing_stats_df"
] = missing_stats_df
st.session_state["project_dct"]["data_import"]["cat_dct"] = {
col: cat
for col, cat in zip(
edited_stats_df["Column"], edited_stats_df["Category"]
)
}
st.session_state["project_dct"]["data_import"][
"numeric_columns"
] = numeric_columns
st.session_state["project_dct"]["data_import"]["default_df"] = default_df
st.session_state["project_dct"]["data_import"]["final_df"] = final_df
st.session_state["project_dct"]["data_import"]["edited_df"] = edited_df
st.toast("💾 Saved Successfully!")