autoclientgradio / app00.py
luigi12345's picture
Create app00.py
64b08ce verified
raw
history blame
25.4 kB
import os
import re
import sqlite3
import requests
import pandas as pd
from datetime import datetime
from bs4 import BeautifulSoup
from googlesearch import search
import gradio as gr
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
import openai
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import json
# Configuration
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", "default_aws_access_key_id")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", "default_aws_secret_access_key")
region_name = "us-east-1"
openai.api_key = os.getenv("OPENAI_API_KEY", "default_openai_api_key")
openai.api_base = os.getenv("OPENAI_API_BASE", "http://127.0.0.1:11434/v1")
openai_model = "mistral"
# SQLite configuration
sqlite_db_path = "autoclient.db"
# Ensure the database file exists
try:
if not os.path.exists(sqlite_db_path):
open(sqlite_db_path, 'w').close()
except IOError as e:
logging.error(f"Failed to create database file: {e}")
raise
# Initialize AWS SES client
try:
ses_client = boto3.client('ses',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
except (NoCredentialsError, PartialCredentialsError) as e:
logging.error(f"AWS SES client initialization failed: {e}")
raise
# SQLite connection
def get_db_connection():
try:
return sqlite3.connect(sqlite_db_path)
except sqlite3.Error as e:
logging.error(f"Database connection failed: {e}")
raise
# HTTP session with retry strategy
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
# Setup logging
try:
logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
except IOError as e:
print(f"Error setting up logging: {e}")
raise
# Input validation functions
def validate_name(name):
if not name or not name.strip():
raise ValueError("Name cannot be empty or just whitespace")
if len(name) > 100:
raise ValueError("Name is too long (max 100 characters)")
return name.strip()
def validate_email(email):
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
raise ValueError("Invalid email address")
return email
def validate_campaign_type(campaign_type):
valid_types = ["Email", "SMS"]
if campaign_type not in valid_types:
raise ValueError(f"Invalid campaign type. Must be one of {valid_types}")
return campaign_type
def validate_id(id_value, id_type):
try:
id_int = int(id_value.split(':')[0] if ':' in str(id_value) else id_value)
if id_int <= 0:
raise ValueError
return id_int
except (ValueError, AttributeError):
raise ValueError(f"Invalid {id_type} ID")
def validate_status(status, valid_statuses):
if status not in valid_statuses:
raise ValueError(f"Invalid status. Must be one of {valid_statuses}")
return status
def validate_num_results(num_results):
if not isinstance(num_results, int) or num_results <= 0:
raise ValueError("Invalid number of results")
return num_results
# Initialize database
def init_db():
conn = get_db_connection()
cursor = conn.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS campaigns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_name TEXT NOT NULL,
project_id INTEGER,
campaign_type TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects (id)
);
CREATE TABLE IF NOT EXISTS message_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
template_name TEXT NOT NULL,
subject TEXT,
body_content TEXT NOT NULL,
campaign_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id)
);
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT,
phone TEXT,
first_name TEXT,
last_name TEXT,
company TEXT,
job_title TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS lead_sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
lead_id INTEGER,
search_query TEXT,
url TEXT,
page_title TEXT,
meta_description TEXT,
http_status INTEGER,
scrape_duration TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (lead_id) REFERENCES leads (id)
);
CREATE TABLE IF NOT EXISTS campaign_leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER,
lead_id INTEGER,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id),
FOREIGN KEY (lead_id) REFERENCES leads (id)
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER,
lead_id INTEGER,
template_id INTEGER,
customized_subject TEXT,
customized_content TEXT,
sent_at TIMESTAMP,
status TEXT DEFAULT 'pending',
engagement_data TEXT,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id),
FOREIGN KEY (lead_id) REFERENCES leads (id),
FOREIGN KEY (template_id) REFERENCES message_templates (id)
);
CREATE TABLE IF NOT EXISTS search_terms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
term TEXT NOT NULL,
status TEXT DEFAULT 'pending',
processed_leads INTEGER DEFAULT 0,
last_processed_at TIMESTAMP,
campaign_id INTEGER,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id)
);
''')
conn.commit()
conn.close()
logging.info("Database initialized successfully!")
# Call this at the start of your script
init_db()
# Function to create a new project
def create_project(project_name):
project_name = validate_name(project_name)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO projects (project_name) VALUES (?)", (project_name,))
project_id = cursor.lastrowid
conn.commit()
conn.close()
return project_id
# Function to create a new campaign
def create_campaign(campaign_name, project_id, campaign_type):
campaign_name = validate_name(campaign_name)
project_id = validate_id(project_id, "project")
campaign_type = validate_campaign_type(campaign_type)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO campaigns (campaign_name, project_id, campaign_type) VALUES (?, ?, ?)",
(campaign_name, project_id, campaign_type))
campaign_id = cursor.lastrowid
conn.commit()
conn.close()
return campaign_id
# Function to create a new message template
def create_message_template(template_name, subject, body_content, campaign_id):
template_name = validate_name(template_name)
subject = validate_name(subject)
body_content = sanitize_html(body_content)
campaign_id = validate_id(campaign_id, "campaign")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO message_templates (template_name, subject, body_content, campaign_id)
VALUES (?, ?, ?, ?)
""", (template_name, subject, body_content, campaign_id))
template_id = cursor.lastrowid
conn.commit()
conn.close()
return template_id
# Function to add a new search term
def add_search_term(term, campaign_id):
term = validate_name(term)
campaign_id = validate_id(campaign_id, "campaign")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT INTO search_terms (term, campaign_id) VALUES (?, ?)", (term, campaign_id))
term_id = cursor.lastrowid
conn.commit()
conn.close()
return term_id
# Function to fetch search terms
def fetch_search_terms(campaign_id=None):
conn = get_db_connection()
cursor = conn.cursor()
if campaign_id:
campaign_id = validate_id(campaign_id, "campaign")
cursor.execute('SELECT id, term, processed_leads, status FROM search_terms WHERE campaign_id = ?', (campaign_id,))
else:
cursor.execute('SELECT id, term, processed_leads, status FROM search_terms')
rows = cursor.fetchall()
conn.close()
return pd.DataFrame(rows, columns=["ID", "Search Term", "Leads Fetched", "Status"])
# Function to update search term status
def update_search_term_status(term_id, new_status, processed_leads):
term_id = validate_id(term_id, "search term")
new_status = validate_status(new_status, ["pending", "completed"])
processed_leads = validate_num_results(processed_leads)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE search_terms
SET status = ?, processed_leads = ?, last_processed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, processed_leads, term_id))
conn.commit()
conn.close()
# Function to save a new lead
def save_lead(email, phone, first_name, last_name, company, job_title):
email = validate_email(email)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO leads (email, phone, first_name, last_name, company, job_title)
VALUES (?, ?, ?, ?, ?, ?)
""", (email, phone, first_name, last_name, company, job_title))
lead_id = cursor.lastrowid
conn.commit()
conn.close()
return lead_id
# Function to save lead source
def save_lead_source(lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration):
lead_id = validate_id(lead_id, "lead")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO lead_sources (lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration))
conn.commit()
conn.close()
# Function to add a lead to a campaign
def add_lead_to_campaign(campaign_id, lead_id):
campaign_id = validate_id(campaign_id, "campaign")
lead_id = validate_id(lead_id, "lead")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO campaign_leads (campaign_id, lead_id) VALUES (?, ?)",
(campaign_id, lead_id))
conn.commit()
conn.close()
# Function to create a new message
def create_message(campaign_id, lead_id, template_id, customized_subject, customized_content):
campaign_id = validate_id(campaign_id, "campaign")
lead_id = validate_id(lead_id, "lead")
template_id = validate_id(template_id, "template")
customized_subject = validate_name(customized_subject)
customized_content = sanitize_html(customized_content)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO messages (campaign_id, lead_id, template_id, customized_subject, customized_content)
VALUES (?, ?, ?, ?, ?)
""", (campaign_id, lead_id, template_id, customized_subject, customized_content))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return message_id
# Function to update message status
def update_message_status(message_id, status, sent_at=None):
message_id = validate_id(message_id, "message")
status = validate_status(status, ["pending", "sent", "failed"])
conn = get_db_connection()
cursor = conn.cursor()
if sent_at:
cursor.execute("UPDATE messages SET status = ?, sent_at = ? WHERE id = ?",
(status, sent_at, message_id))
else:
cursor.execute("UPDATE messages SET status = ? WHERE id = ?",
(status, message_id))
conn.commit()
conn.close()
# Function to fetch message templates
def fetch_message_templates(campaign_id=None):
conn = get_db_connection()
cursor = conn.cursor()
if campaign_id:
campaign_id = validate_id(campaign_id, "campaign")
cursor.execute('SELECT id, template_name FROM message_templates WHERE campaign_id = ?', (campaign_id,))
else:
cursor.execute('SELECT id, template_name FROM message_templates')
rows = cursor.fetchall()
conn.close()
return [f"{row[0]}: {row[1]}" for row in rows]
# Function to fetch projects
def fetch_projects():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, project_name FROM projects')
rows = cursor.fetchall()
conn.close()
return [f"{row[0]}: {row[1]}" for row in rows]
# Function to fetch campaigns
def fetch_campaigns():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, campaign_name FROM campaigns')
campaigns = cursor.fetchall()
conn.close()
return [f"{campaign[0]}: {campaign[1]}" for campaign in campaigns]
# Bulk search function
async def bulk_search(selected_terms, num_results, progress=gr.Progress()):
if not selected_terms:
raise ValueError("No search terms selected")
num_results = validate_num_results(num_results)
total_leads = 0
for term_id in selected_terms:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT term, processed_leads FROM search_terms WHERE id = ?', (term_id,))
term, processed_leads = cursor.fetchone()
conn.close()
leads_found = 0
try:
search_urls = list(search(term, num_results=num_results))
except Exception as e:
logging.error(f"Error performing Google search for term '{term}': {e}")
continue
for url in search_urls:
if leads_found + processed_leads >= num_results:
break
try:
response = session.get(url, timeout=10)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
emails = find_emails(response.text)
for email in emails:
lead_id = save_lead(email, None, None, None, None, None)
save_lead_source(lead_id, term, url, soup.title.string, None, response.status_code, str(response.elapsed))
leads_found += 1
total_leads += 1
if leads_found + processed_leads >= num_results:
break
except Exception as e:
logging.error(f"Error processing {url}: {e}")
yield f"Processed {leads_found + processed_leads} leads for term '{term}'"
update_search_term_status(term_id, 'completed', leads_found + processed_leads)
yield f"Completed term '{term}': Found {leads_found} new leads, total {leads_found + processed_leads}"
yield f"Bulk search completed. Total new leads found: {total_leads}"
# Bulk send function
async def bulk_send(template_id, from_email, reply_to, progress=gr.Progress()):
if not isinstance(template_id, int):
raise ValueError("Invalid template ID")
if not re.match(r"[^@]+@[^@]+\.[^@]+", from_email):
raise ValueError("Invalid from email address")
if not re.match(r"[^@]+@[^@]+\.[^@]+", reply_to):
raise ValueError("Invalid reply to email address")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT m.id, l.email, m.customized_subject, m.customized_content
FROM messages m
JOIN leads l ON m.lead_id = l.id
WHERE m.template_id = ? AND m.status = 'pending'
''', (template_id,))
messages = cursor.fetchall()
conn.close()
total_sent = 0
for message_id, email, subject, content in messages:
try:
response = ses_client.send_email(
Source=from_email,
Destination={'ToAddresses': [email]},
Message={
'Subject': {'Data': subject},
'Body': {'Html': {'Data': content}}
},
ReplyToAddresses=[reply_to]
)
update_message_status(message_id, 'sent', datetime.now())
total_sent += 1
yield f"Sent email to {email}"
except Exception as e:
logging.error(f"Failed to send email to {email}: {e}")
update_message_status(message_id, 'failed')
yield f"Failed to send email to {email}"
yield f"Bulk send completed. Total emails sent: {total_sent}"
# Function to get email preview
def get_email_preview(template_id, from_email, reply_to):
template_id = validate_id(template_id, "template")
from_email = validate_email(from_email)
reply_to = validate_email(reply_to)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT subject, body_content FROM message_templates WHERE id = ?', (template_id,))
template = cursor.fetchone()
conn.close()
if template:
subject, body_content = template
preview = f"Subject: {subject}\n\nFrom: {from_email}\nReply-To: {reply_to}\n\nBody:\n{body_content}"
return preview
else:
return "Template not found"
# Function to sanitize HTML content
def sanitize_html(content):
return re.sub('<[^<]+?>', '', content)
# Function to find valid emails in HTML text
def find_emails(html_text):
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b')
all_emails = set(email_regex.findall(html_text))
valid_emails = {email for email in all_emails if is_valid_email(email)}
unique_emails = {}
for email in valid_emails:
domain = email.split('@')[1]
if domain not in unique_emails:
unique_emails[domain] = email
return set(unique_emails.values())
# Function to validate email address
def is_valid_email(email):
invalid_patterns = [
r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@',
r'^prueba@', r'^\d+[a-z]*@'
]
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"]
if len(email) < 6 or len(email) > 254:
return False
for pattern in invalid_patterns:
if re.search(pattern, email, re.IGNORECASE):
return False
domain = email.split('@')[1]
if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain):
return False
return True
# Function to refresh search terms
def refresh_search_terms(campaign_id):
return df_to_list(fetch_search_terms(campaign_id))
# Function to convert DataFrame to list of lists
def df_to_list(df):
return df.values.tolist()
# Add this function before the Gradio interface definition
def manual_search(term, num_results):
results = []
try:
search_urls = list(search(term, num_results=num_results))
for url in search_urls:
response = session.get(url, timeout=10)
emails = find_emails(response.text)
results.extend([(email, url) for email in emails])
except Exception as e:
logging.error(f"Error in manual search: {e}")
return results[:num_results]
# Gradio interface
with gr.Blocks() as gradio_app:
gr.Markdown("# AUTOCLIENT")
with gr.Tab("Projects and Campaigns"):
with gr.Row():
with gr.Column():
project_name = gr.Textbox(label="Project Name")
create_project_btn = gr.Button("Create Project")
project_status = gr.Textbox(label="Project Status", interactive=False)
with gr.Column():
campaign_name = gr.Textbox(label="Campaign Name")
project_id = gr.Dropdown(label="Project", choices=fetch_projects())
campaign_type = gr.Radio(["Email", "SMS"], label="Campaign Type")
create_campaign_btn = gr.Button("Create Campaign")
campaign_status = gr.Textbox(label="Campaign Status", interactive=False)
with gr.Tab("Message Templates"):
with gr.Row():
with gr.Column():
template_name = gr.Textbox(label="Template Name")
subject = gr.Textbox(label="Subject")
body_content = gr.Code(language="html", label="Body Content")
campaign_id_for_template = gr.Dropdown(label="Campaign", choices=fetch_campaigns())
create_template_btn = gr.Button("Create Template")
with gr.Column():
template_status = gr.Textbox(label="Template Status", interactive=False)
template_preview = gr.HTML(label="Template Preview")
with gr.Tab("Search Terms"):
with gr.Row():
with gr.Column():
search_term = gr.Textbox(label="Search Term")
campaign_id_for_search = gr.Dropdown(label="Campaign", choices=fetch_campaigns())
add_term_btn = gr.Button("Add Search Term")
with gr.Column():
search_term_status = gr.Textbox(label="Search Term Status", interactive=False)
search_term_list = gr.Dataframe(df_to_list(fetch_search_terms()), headers=["ID", "Search Term", "Leads Fetched", "Status"])
with gr.Tab("Bulk Operations"):
with gr.Row():
campaign_id_for_bulk = gr.Dropdown(label="Campaign", choices=fetch_campaigns())
refresh_btn = gr.Button("Refresh Data")
search_term_df = gr.Dataframe(headers=["ID", "Search Term", "Leads Fetched", "Status"])
selected_terms = gr.CheckboxGroup(label="Select Search Terms", choices=[])
num_results = gr.Slider(minimum=10, maximum=500, value=120, step=10, label="Results per term")
with gr.Row():
template_id = gr.Dropdown(choices=fetch_message_templates(), label="Select Message Template")
from_email = gr.Textbox(label="From Email", value="Sami Halawa <hello@indosy.com>")
reply_to = gr.Textbox(label="Reply To", value="eugproduction@gmail.com")
preview_button = gr.Button("Preview Email")
email_preview = gr.HTML(label="Email Preview")
with gr.Row():
bulk_search_button = gr.Button("Bulk Search")
bulk_send_button = gr.Button("Bulk Send")
bulk_search_send_button = gr.Button("Bulk Search & Send")
log_output = gr.TextArea(label="Process Logs", interactive=False)
with gr.Tab("Manual Search"):
with gr.Row():
manual_search_term = gr.Textbox(label="Manual Search Term")
manual_num_results = gr.Slider(minimum=1, maximum=50, value=10, step=1)
manual_search_btn = gr.Button("Search")
manual_search_results = gr.Dataframe(headers=["Email", "Source"])
# Move these lines inside the Blocks context
gradio_app.load(lambda: gr.update(value=df_to_list(fetch_search_terms())), outputs=search_term_df)
gradio_app.load(lambda: gr.update(value=fetch_message_templates()), outputs=template_id)
# Define button actions
create_project_btn.click(create_project, inputs=[project_name], outputs=[project_status])
create_campaign_btn.click(create_campaign, inputs=[campaign_name, project_id, campaign_type], outputs=[campaign_status])
create_template_btn.click(create_message_template, inputs=[template_name, subject, body_content, campaign_id_for_template], outputs=[template_status])
add_term_btn.click(add_search_term, inputs=[search_term, campaign_id_for_search], outputs=[search_term_status])
preview_button.click(get_email_preview, inputs=[template_id, from_email, reply_to], outputs=email_preview)
bulk_search_button.click(bulk_search, inputs=[selected_terms, num_results], outputs=[log_output, search_term_df])
bulk_send_button.click(bulk_send, inputs=[template_id, from_email, reply_to], outputs=log_output)
bulk_search_send_button.click(
lambda selected_terms, num_results, template_id, from_email, reply_to:
gr.update(value="Starting bulk search..."),
inputs=[selected_terms, num_results, template_id, from_email, reply_to],
outputs=log_output
).then(
bulk_search,
inputs=[selected_terms, num_results],
outputs=[log_output, search_term_df]
).then(
lambda: gr.update(value="Bulk search completed. Starting bulk send..."),
outputs=log_output
).then(
bulk_send,
inputs=[template_id, from_email, reply_to],
outputs=log_output
)
manual_search_btn.click(manual_search, inputs=[manual_search_term, manual_num_results], outputs=manual_search_results)
refresh_btn.click(refresh_search_terms, inputs=[campaign_id_for_bulk], outputs=[search_term_df])
# Launch the app outside the Blocks context
gradio_app.launch()