Aldi / Scenario.py
BlendMMM's picture
Upload 74 files
201ff33 verified
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import datetime
from utilities import set_header,initialize_data,load_local_css
from scipy.optimize import curve_fit
import statsmodels.api as sm
from plotly.subplots import make_subplots
st.set_page_config(
page_title="Data Validation",
page_icon=":shark:",
layout="wide",
initial_sidebar_state='collapsed'
)
load_local_css('styles.css')
set_header()
def format_numbers(x):
if abs(x) >= 1e6:
# Format as millions with one decimal place and commas
return f'{x/1e6:,.1f}M'
elif abs(x) >= 1e3:
# Format as thousands with one decimal place and commas
return f'{x/1e3:,.1f}K'
else:
# Format with one decimal place and commas for values less than 1000
return f'{x:,.1f}'
def format_axis(x):
if isinstance(x, tuple):
x = x[0] # Extract the numeric value from the tuple
if abs(x) >= 1e6:
return f'{x / 1e6:.0f}M'
elif abs(x) >= 1e3:
return f'{x / 1e3:.0f}k'
else:
return f'{x:.0f}'
attributred_app_installs=pd.read_csv("attributed_app_installs.csv")
attributred_app_installs_tactic=pd.read_excel('attributed_app_installs_tactic.xlsx')
data=pd.read_excel('Channel_wise_imp_click_spends.xlsx')
data['Date']=pd.to_datetime(data['Date'])
st.header('Saturation Curves')
# st.dataframe(data.head(2))
st.markdown('Data QC')
st.markdown('Channel wise summary')
summary_df=data.groupby(data['Date'].dt.strftime('%B %Y')).sum()
summary_df=summary_df.sort_index(key=lambda x: pd.to_datetime(x, format='%B %Y'))
st.dataframe(summary_df.applymap(format_numbers))
def line_plot_target(df,target,title):
df=df
df['Date_unix'] = df['Date'].apply(lambda x: x.timestamp())
# Perform polynomial fitting
coefficients = np.polyfit(df['Date_unix'], df[target], 1)
# st.dataframe(df)
coefficients = np.polyfit(df['Date'].view('int64'), df[target], 1)
trendline = np.poly1d(coefficients)
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['Date'], y=df[target], mode='lines', name=target,line=dict(color='#11B6BD')))
trendline_x = df['Date']
trendline_y = trendline(df['Date'].view('int64'))
fig.add_trace(go.Scatter(x=trendline_x, y=trendline_y, mode='lines', name='Trendline', line=dict(color='#739FAE')))
fig.update_layout(
title=title,
xaxis=dict(type='date')
)
for year in df['Date'].dt.year.unique()[1:]:
january_1 = pd.Timestamp(year=year, month=1, day=1)
fig.add_shape(
go.layout.Shape(
type="line",
x0=january_1,
x1=january_1,
y0=0,
y1=1,
xref="x",
yref="paper",
line=dict(color="grey", width=1.5, dash="dash"),
)
)
return fig
channels_d= data.columns[:28]
channels=list(set([col.replace('_impressions','').replace('_clicks','').replace('_spend','') for col in channels_d if col.lower()!='date']))
channel= st.selectbox('Select Channel_name',channels)
target_column = st.selectbox('Select Channel)',[col for col in data.columns if col.startswith(channel)])
fig=line_plot_target(data, target=str(target_column), title=f'{str(target_column)} Over Time')
st.plotly_chart(fig, use_container_width=True)
# st.markdown('## Saturation Curve')
st.header('Build saturation curve')
# Your data
# st.write(len(attributred_app_installs))
# st.write(len(data))
# col=st.columns(3)
# with col[0]:
col=st.columns(2)
with col[0]:
if st.checkbox('Cap Outliers'):
x = data[target_column]
x.index=data['Date']
# st.write(x)
result = sm.tsa.seasonal_decompose(x, model='additive')
x_resid=result.resid
# fig = make_subplots(rows=1, cols=1, shared_xaxes=True, vertical_spacing=0.02)
# trace_x = go.Scatter(x=data['Date'], y=x, mode='lines', name='x')
# fig.add_trace(trace_x)
# trace_x_resid = go.Scatter(x=data['Date'], y=x_resid, mode='lines', name='x_resid', yaxis='y2',line=dict(color='orange'))
# fig.add_trace(trace_x_resid)
# fig.update_layout(title='',
# xaxis=dict(title='Date'),
# yaxis=dict(title='x', side='left'),
# yaxis2=dict(title='x_resid', side='right'))
# st.title('')
# st.plotly_chart(fig)
# x=result.resid
# x=x.fillna(0)
x_mean = np.mean(x)
x_std = np.std(x)
x_scaled = (x - x_mean) / x_std
lower_threshold = -2.0
upper_threshold = 2.0
x_scaled = np.clip(x_scaled, lower_threshold, upper_threshold)
else:
x = data[target_column]
x_mean = np.mean(x)
x_std = np.std(x)
x_scaled = (x - x_mean) / x_std
with col[1]:
if st.checkbox('Attributed'):
column=[col for col in attributred_app_installs.columns if col in target_column]
data['app_installs_appsflyer']=attributred_app_installs[column]
y=data['app_installs_appsflyer']
title='Attributed-App_installs_appsflyer'
# st.dataframe(y)
# st.dataframe(x)
# st.dataframe(x_scaled)
else:
y=data["app_installs_appsflyer"]
title='App_installs_appsflyer'
# st.write(len(y))
# Curve fitting function
def sigmoid(x, K, a, x0):
return K / (1 + np.exp(-a * (x - x0)))
initial_K = np.max(y)
initial_a = 1
initial_x0 = 0
columns=st.columns(3)
with columns[0]:
K = st.number_input('K (Amplitude)', min_value=0.01, max_value=2.0 * np.max(y), value=float(initial_K), step=5.0)
with columns[1]:
a = st.number_input('a (Slope)', min_value=0.01, max_value=5.0, value=float(initial_a), step=0.5)
with columns[2]:
x0 = st.number_input('x0 (Center)', min_value=float(min(x_scaled)), max_value=float(max(x_scaled)), value=float(initial_x0), step=2.0)
params, _ = curve_fit(sigmoid, x_scaled, y, p0=[K, a, x0], maxfev=20000)
x_slider = st.slider('X Value', min_value=float(min(x)), max_value=float(max(x))+1, value=float(x_mean), step=1.)
# Calculate the corresponding value on the fitted curve
x_slider_scaled = (x_slider - x_mean) / x_std
y_slider_fit = sigmoid(x_slider_scaled, *params)
# Display the corresponding value
st.write(f'{target_column}: {format_numbers(x_slider)}')
st.write(f'Corresponding App_installs: {format_numbers(y_slider_fit)}')
# Scatter plot of your data
fig = px.scatter(data_frame=data, x=x_scaled, y=y, labels={'x': f'{target_column}', 'y': 'App Installs'}, title=title)
# Add the fitted sigmoid curve to the plot
x_fit = np.linspace(min(x_scaled), max(x_scaled), 100) # Generate x values for the curve
y_fit = sigmoid(x_fit, *params)
fig.add_trace(px.line(x=x_fit, y=y_fit).data[0])
fig.data[1].update(line=dict(color='orange'))
fig.add_vline(x=x_slider_scaled, line_dash='dash', line_color='red', annotation_text=f'{format_numbers(x_slider)}')
x_tick_labels = {format_axis(x_scaled[i]): format_axis(x[i]) for i in range(len(x_scaled))}
num_points = 30 # Number of points you want to select
keys = list(x_tick_labels.keys())
values = list(x_tick_labels.values())
spacing = len(keys) // num_points # Calculate the spacing
if spacing==0:
spacing=15
selected_keys = keys[::spacing]
selected_values = values[::spacing]
else:
selected_keys = keys[::spacing]
selected_values = values[::spacing]
# Update the x-axis ticks with the selected keys and values
fig.update_xaxes(tickvals=selected_keys, ticktext=selected_values)
fig.update_xaxes(tickvals=list(x_tick_labels.keys()), ticktext=list(x_tick_labels.values()))
# Show the plot using st.plotly_chart
fig.update_xaxes(showgrid=False)
fig.update_yaxes(showgrid=False)
fig.update_layout(
width=600, # Adjust the width as needed
height=600 # Adjust the height as needed
)
st.plotly_chart(fig)
st.markdown('Tactic level')
if channel=='paid_social':
tactic_data=pd.read_excel("Tatcic_paid.xlsx",sheet_name='paid_social_impressions')
else:
tactic_data=pd.read_excel("Tatcic_paid.xlsx",sheet_name='digital_app_display_impressions')
target_column = st.selectbox('Select Channel)',[col for col in tactic_data.columns if col!='Date' and col!='app_installs_appsflyer'])
fig=line_plot_target(tactic_data, target=str(target_column), title=f'{str(target_column)} Over Time')
st.plotly_chart(fig, use_container_width=True)
if st.checkbox('Cap Outliers',key='tactic1'):
x = tactic_data[target_column]
x_mean = np.mean(x)
x_std = np.std(x)
x_scaled = (x - x_mean) / x_std
lower_threshold = -2.0
upper_threshold = 2.0
x_scaled = np.clip(x_scaled, lower_threshold, upper_threshold)
else:
x = tactic_data[target_column]
x_mean = np.mean(x)
x_std = np.std(x)
x_scaled = (x - x_mean) / x_std
if st.checkbox('Attributed',key='tactic2'):
column=[col for col in attributred_app_installs_tactic.columns if col in target_column]
tactic_data['app_installs_appsflyer']=attributred_app_installs_tactic[column]
y=tactic_data['app_installs_appsflyer']
title='Attributed-App_installs_appsflyer'
# st.dataframe(y)
# st.dataframe(x)
# st.dataframe(x_scaled)
else:
y=data["app_installs_appsflyer"]
title='App_installs_appsflyer'
# st.write(len(y))
# Curve fitting function
def sigmoid(x, K, a, x0):
return K / (1 + np.exp(-a * (x - x0)))
# Curve fitting
# st.dataframe(x_scaled.head(3))
# # y=y.astype(float)
# st.dataframe(y.head(3))
initial_K = np.max(y)
initial_a = 1
initial_x0 = 0
K = st.number_input('K (Amplitude)', min_value=0.01, max_value=2.0 * np.max(y), value=float(initial_K), step=5.0,key='tactic3')
a = st.number_input('a (Slope)', min_value=0.01, max_value=5.0, value=float(initial_a), step=2.0,key='tactic41')
x0 = st.number_input('x0 (Center)', min_value=float(min(x_scaled)), max_value=float(max(x_scaled)), value=float(initial_x0), step=2.0,key='tactic4')
params, _ = curve_fit(sigmoid, x_scaled, y, p0=[K, a, x0], maxfev=20000)
# Slider to vary x
x_slider = st.slider('X Value', min_value=float(min(x)), max_value=float(max(x)), value=float(x_mean), step=1.,key='tactic7')
# Calculate the corresponding value on the fitted curve
x_slider_scaled = (x_slider - x_mean) / x_std
y_slider_fit = sigmoid(x_slider_scaled, *params)
# Display the corresponding value
st.write(f'{target_column}: {format_axis(x_slider)}')
st.write(f'Corresponding App_installs: {format_axis(y_slider_fit)}')
# Scatter plot of your data
fig = px.scatter(data_frame=data, x=x_scaled, y=y, labels={'x': f'{target_column}', 'y': 'App Installs'}, title=title)
# Add the fitted sigmoid curve to the plot
x_fit = np.linspace(min(x_scaled), max(x_scaled), 100) # Generate x values for the curve
y_fit = sigmoid(x_fit, *params)
fig.add_trace(px.line(x=x_fit, y=y_fit).data[0])
fig.data[1].update(line=dict(color='orange'))
fig.add_vline(x=x_slider_scaled, line_dash='dash', line_color='red', annotation_text=f'{format_numbers(x_slider)}')
x_tick_labels = {format_axis((x_scaled[i],0)): format_axis(x[i]) for i in range(len(x_scaled))}
num_points = 50 # Number of points you want to select
keys = list(x_tick_labels.keys())
values = list(x_tick_labels.values())
spacing = len(keys) // num_points # Calculate the spacing
if spacing==0:
spacing=2
selected_keys = keys[::spacing]
selected_values = values[::spacing]
else:
selected_keys = keys[::spacing]
selected_values = values[::spacing]
# Update the x-axis ticks with the selected keys and values
fig.update_xaxes(tickvals=selected_keys, ticktext=selected_values)
# Round the x-axis and y-axis tick values to zero decimal places
fig.update_xaxes(tickformat=".f") # Format x-axis ticks to zero decimal places
fig.update_yaxes(tickformat=".f") # Format y-axis ticks to zero decimal places
# Show the plot using st.plotly_chart
fig.update_xaxes(showgrid=False)
fig.update_yaxes(showgrid=False)
fig.update_layout(
width=600, # Adjust the width as needed
height=600 # Adjust the height as needed
)
st.plotly_chart(fig)