import os import re import sqlite3 import requests import pandas as pd from datetime import datetime from bs4 import BeautifulSoup from googlesearch import search import gradio as gr import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError import openai from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import logging import json # Configuration AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "AKIASO2XOMEGIVD422N7") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "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") REGION_NAME = "us-east-1" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-1234") OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://openai-proxy-kl3l.onrender.com") OPENAI_MODEL = "gpt-3.5-turbo" # SQLite configuration sqlite_db_path = "autoclient.db" # Ensure the database file exists try: if not os.path.exists(sqlite_db_path): open(sqlite_db_path, 'w').close() except IOError as e: logging.error(f"Failed to create database file: {e}") raise # Initialize AWS SES client try: ses_client = boto3.client('ses', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) except (NoCredentialsError, PartialCredentialsError) as e: logging.error(f"AWS SES client initialization failed: {e}") raise # SQLite connection def get_db_connection(): try: return sqlite3.connect(sqlite_db_path) except sqlite3.Error as e: logging.error(f"Database connection failed: {e}") raise # HTTP session with retry strategy session = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) session.mount('https://', HTTPAdapter(max_retries=retries)) # Setup logging try: logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s') except IOError as e: print(f"Error setting up logging: {e}") raise # Input validation functions def validate_name(name): if not name or not name.strip(): raise ValueError("Name cannot be empty or just whitespace") if len(name) > 100: raise ValueError("Name is too long (max 100 characters)") return name.strip() def validate_email(email): if not re.match(r"[^@]+@[^@]+\.[^@]+", email): raise ValueError("Invalid email address") return email def validate_campaign_type(campaign_type): valid_types = ["Email", "SMS"] if campaign_type not in valid_types: raise ValueError(f"Invalid campaign type. Must be one of {valid_types}") return campaign_type def validate_id(id_value, id_type): try: id_int = int(id_value.split(':')[0] if ':' in str(id_value) else id_value) if id_int <= 0: raise ValueError return id_int except (ValueError, AttributeError): raise ValueError(f"Invalid {id_type} ID") def validate_status(status, valid_statuses): if status not in valid_statuses: raise ValueError(f"Invalid status. Must be one of {valid_statuses}") return status def validate_num_results(num_results): if not isinstance(num_results, int) or num_results <= 0: raise ValueError("Invalid number of results") return num_results # Initialize database def init_db(): conn = get_db_connection() cursor = conn.cursor() cursor.executescript(''' CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_name TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS campaigns ( id INTEGER PRIMARY KEY AUTOINCREMENT, campaign_name TEXT NOT NULL, project_id INTEGER, campaign_type TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects (id) ); CREATE TABLE IF NOT EXISTS message_templates ( id INTEGER PRIMARY KEY AUTOINCREMENT, template_name TEXT NOT NULL, subject TEXT, body_content TEXT NOT NULL, campaign_id INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (campaign_id) REFERENCES campaigns (id) ); CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, phone TEXT, first_name TEXT, last_name TEXT, company TEXT, job_title TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS lead_sources ( id INTEGER PRIMARY KEY AUTOINCREMENT, lead_id INTEGER, search_query TEXT, url TEXT, page_title TEXT, meta_description TEXT, http_status INTEGER, scrape_duration TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (lead_id) REFERENCES leads (id) ); CREATE TABLE IF NOT EXISTS campaign_leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, campaign_id INTEGER, lead_id INTEGER, status TEXT DEFAULT 'active', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (campaign_id) REFERENCES campaigns (id), FOREIGN KEY (lead_id) REFERENCES leads (id) ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, campaign_id INTEGER, lead_id INTEGER, template_id INTEGER, customized_subject TEXT, customized_content TEXT, sent_at TIMESTAMP, status TEXT DEFAULT 'pending', engagement_data TEXT, FOREIGN KEY (campaign_id) REFERENCES campaigns (id), FOREIGN KEY (lead_id) REFERENCES leads (id), FOREIGN KEY (template_id) REFERENCES message_templates (id) ); CREATE TABLE IF NOT EXISTS search_terms ( id INTEGER PRIMARY KEY AUTOINCREMENT, term TEXT NOT NULL, status TEXT DEFAULT 'pending', processed_leads INTEGER DEFAULT 0, last_processed_at TIMESTAMP, campaign_id INTEGER, FOREIGN KEY (campaign_id) REFERENCES campaigns (id) ); ''') conn.commit() conn.close() logging.info("Database initialized successfully!") # Call this at the start of your script init_db() # Function to create a new project def create_project(project_name): project_name = validate_name(project_name) conn = get_db_connection() cursor = conn.cursor() cursor.execute("INSERT INTO projects (project_name) VALUES (?)", (project_name,)) project_id = cursor.lastrowid conn.commit() conn.close() return project_id # Function to create a new campaign def create_campaign(campaign_name, project_id, campaign_type): campaign_name = validate_name(campaign_name) project_id = validate_id(project_id, "project") campaign_type = validate_campaign_type(campaign_type) conn = get_db_connection() cursor = conn.cursor() cursor.execute("INSERT INTO campaigns (campaign_name, project_id, campaign_type) VALUES (?, ?, ?)", (campaign_name, project_id, campaign_type)) campaign_id = cursor.lastrowid conn.commit() conn.close() return campaign_id # Function to create a new message template def create_message_template(template_name, subject, body_content, campaign_id): template_name = validate_name(template_name) subject = validate_name(subject) body_content = sanitize_html(body_content) campaign_id = validate_id(campaign_id, "campaign") conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO message_templates (template_name, subject, body_content, campaign_id) VALUES (?, ?, ?, ?) """, (template_name, subject, body_content, campaign_id)) template_id = cursor.lastrowid conn.commit() conn.close() return template_id # Function to add a new search term def add_search_term(term, campaign_id): term = validate_name(term) campaign_id = validate_id(campaign_id, "campaign") conn = get_db_connection() cursor = conn.cursor() cursor.execute("INSERT INTO search_terms (term, campaign_id) VALUES (?, ?)", (term, campaign_id)) term_id = cursor.lastrowid conn.commit() conn.close() return term_id # Function to fetch search terms def fetch_search_terms(campaign_id=None): conn = get_db_connection() cursor = conn.cursor() if campaign_id: campaign_id = validate_id(campaign_id, "campaign") cursor.execute('SELECT id, term, processed_leads, status FROM search_terms WHERE campaign_id = ?', (campaign_id,)) else: cursor.execute('SELECT id, term, processed_leads, status FROM search_terms') rows = cursor.fetchall() conn.close() return pd.DataFrame(rows, columns=["ID", "Search Term", "Leads Fetched", "Status"]) # Function to update search term status def update_search_term_status(term_id, new_status, processed_leads): term_id = validate_id(term_id, "search term") new_status = validate_status(new_status, ["pending", "completed"]) processed_leads = validate_num_results(processed_leads) conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE search_terms SET status = ?, processed_leads = ?, last_processed_at = CURRENT_TIMESTAMP WHERE id = ? """, (new_status, processed_leads, term_id)) conn.commit() conn.close() # Function to save a new lead def save_lead(email, phone, first_name, last_name, company, job_title): email = validate_email(email) conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO leads (email, phone, first_name, last_name, company, job_title) VALUES (?, ?, ?, ?, ?, ?) """, (email, phone, first_name, last_name, company, job_title)) lead_id = cursor.lastrowid conn.commit() conn.close() return lead_id # Function to save lead source def save_lead_source(lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration): lead_id = validate_id(lead_id, "lead") conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO lead_sources (lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration) VALUES (?, ?, ?, ?, ?, ?, ?) """, (lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration)) conn.commit() conn.close() # Function to add a lead to a campaign def add_lead_to_campaign(campaign_id, lead_id): campaign_id = validate_id(campaign_id, "campaign") lead_id = validate_id(lead_id, "lead") conn = get_db_connection() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO campaign_leads (campaign_id, lead_id) VALUES (?, ?)", (campaign_id, lead_id)) conn.commit() conn.close() # Function to create a new message def create_message(campaign_id, lead_id, template_id, customized_subject, customized_content): campaign_id = validate_id(campaign_id, "campaign") lead_id = validate_id(lead_id, "lead") template_id = validate_id(template_id, "template") customized_subject = validate_name(customized_subject) customized_content = sanitize_html(customized_content) conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO messages (campaign_id, lead_id, template_id, customized_subject, customized_content) VALUES (?, ?, ?, ?, ?) """, (campaign_id, lead_id, template_id, customized_subject, customized_content)) message_id = cursor.lastrowid conn.commit() conn.close() return message_id # Function to update message status def update_message_status(message_id, status, sent_at=None): message_id = validate_id(message_id, "message") status = validate_status(status, ["pending", "sent", "failed"]) conn = get_db_connection() cursor = conn.cursor() if sent_at: cursor.execute("UPDATE messages SET status = ?, sent_at = ? WHERE id = ?", (status, sent_at, message_id)) else: cursor.execute("UPDATE messages SET status = ? WHERE id = ?", (status, message_id)) conn.commit() conn.close() # Function to fetch message templates def fetch_message_templates(campaign_id=None): conn = get_db_connection() cursor = conn.cursor() if campaign_id: campaign_id = validate_id(campaign_id, "campaign") cursor.execute('SELECT id, template_name FROM message_templates WHERE campaign_id = ?', (campaign_id,)) else: cursor.execute('SELECT id, template_name FROM message_templates') rows = cursor.fetchall() conn.close() return [f"{row[0]}: {row[1]}" for row in rows] # Function to fetch projects def fetch_projects(): conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT id, project_name FROM projects') rows = cursor.fetchall() conn.close() return [f"{row[0]}: {row[1]}" for row in rows] # Function to fetch campaigns def fetch_campaigns(): conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT id, campaign_name FROM campaigns') campaigns = cursor.fetchall() conn.close() return [f"{campaign[0]}: {campaign[1]}" for campaign in campaigns] # Bulk search function async def bulk_search(selected_terms, num_results, progress=gr.Progress()): if not selected_terms: raise ValueError("No search terms selected") num_results = validate_num_results(num_results) total_leads = 0 for term_id in selected_terms: conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT term, processed_leads FROM search_terms WHERE id = ?', (term_id,)) term, processed_leads = cursor.fetchone() conn.close() leads_found = 0 try: search_urls = list(search(term, num_results=num_results)) except Exception as e: logging.error(f"Error performing Google search for term '{term}': {e}") continue for url in search_urls: if leads_found + processed_leads >= num_results: break try: response = session.get(url, timeout=10) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'html.parser') emails = find_emails(response.text) for email in emails: lead_id = save_lead(email, None, None, None, None, None) save_lead_source(lead_id, term, url, soup.title.string, None, response.status_code, str(response.elapsed)) leads_found += 1 total_leads += 1 if leads_found + processed_leads >= num_results: break except Exception as e: logging.error(f"Error processing {url}: {e}") yield f"Processed {leads_found + processed_leads} leads for term '{term}'" update_search_term_status(term_id, 'completed', leads_found + processed_leads) yield f"Completed term '{term}': Found {leads_found} new leads, total {leads_found + processed_leads}" yield f"Bulk search completed. Total new leads found: {total_leads}" # Bulk send function async def bulk_send(template_id, from_email, reply_to, progress=gr.Progress()): if not isinstance(template_id, int): raise ValueError("Invalid template ID") if not re.match(r"[^@]+@[^@]+\.[^@]+", from_email): raise ValueError("Invalid from email address") if not re.match(r"[^@]+@[^@]+\.[^@]+", reply_to): raise ValueError("Invalid reply to email address") conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' SELECT m.id, l.email, m.customized_subject, m.customized_content FROM messages m JOIN leads l ON m.lead_id = l.id WHERE m.template_id = ? AND m.status = 'pending' ''', (template_id,)) messages = cursor.fetchall() conn.close() total_sent = 0 for message_id, email, subject, content in messages: try: response = ses_client.send_email( Source=from_email, Destination={'ToAddresses': [email]}, Message={ 'Subject': {'Data': subject}, 'Body': {'Html': {'Data': content}} }, ReplyToAddresses=[reply_to] ) update_message_status(message_id, 'sent', datetime.now()) total_sent += 1 yield f"Sent email to {email}" except Exception as e: logging.error(f"Failed to send email to {email}: {e}") update_message_status(message_id, 'failed') yield f"Failed to send email to {email}" yield f"Bulk send completed. Total emails sent: {total_sent}" # Function to get email preview def get_email_preview(template_id, from_email, reply_to): template_id = validate_id(template_id, "template") from_email = validate_email(from_email) reply_to = validate_email(reply_to) conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT subject, body_content FROM message_templates WHERE id = ?', (template_id,)) template = cursor.fetchone() conn.close() if template: subject, body_content = template preview = f"Subject: {subject}\n\nFrom: {from_email}\nReply-To: {reply_to}\n\nBody:\n{body_content}" return preview else: return "Template not found" # Function to sanitize HTML content def sanitize_html(content): return re.sub('<[^<]+?>', '', content) # Function to find valid emails in HTML text def find_emails(html_text): email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b') all_emails = set(email_regex.findall(html_text)) valid_emails = {email for email in all_emails if is_valid_email(email)} unique_emails = {} for email in valid_emails: domain = email.split('@')[1] if domain not in unique_emails: unique_emails[domain] = email return set(unique_emails.values()) # Function to validate email address def is_valid_email(email): invalid_patterns = [ r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@', r'^prueba@', r'^\d+[a-z]*@' ] typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] if len(email) < 6 or len(email) > 254: return False for pattern in invalid_patterns: if re.search(pattern, email, re.IGNORECASE): return False domain = email.split('@')[1] if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain): return False return True # Function to refresh search terms def refresh_search_terms(campaign_id): return df_to_list(fetch_search_terms(campaign_id)) # Function to convert DataFrame to list of lists def df_to_list(df): return df.values.tolist() # Add this function before the Gradio interface definition def manual_search(term, num_results): results = [] try: search_urls = list(search(term, num_results=num_results)) for url in search_urls: response = session.get(url, timeout=10) emails = find_emails(response.text) results.extend([(email, url) for email in emails]) except Exception as e: logging.error(f"Error in manual search: {e}") return results[:num_results] # Gradio interface with gr.Blocks() as gradio_app: gr.Markdown("# Email Campaign Management System") with gr.Tab("Projects and Campaigns"): with gr.Row(): with gr.Column(): project_name = gr.Textbox(label="Project Name") create_project_btn = gr.Button("Create Project") project_status = gr.Textbox(label="Project Status", interactive=False) with gr.Column(): campaign_name = gr.Textbox(label="Campaign Name") project_id = gr.Dropdown(label="Project", choices=fetch_projects()) campaign_type = gr.Radio(["Email", "SMS"], label="Campaign Type") create_campaign_btn = gr.Button("Create Campaign") campaign_status = gr.Textbox(label="Campaign Status", interactive=False) with gr.Tab("Message Templates"): with gr.Row(): with gr.Column(): template_name = gr.Textbox(label="Template Name") subject = gr.Textbox(label="Subject") body_content = gr.Code(language="html", label="Body Content") campaign_id_for_template = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) create_template_btn = gr.Button("Create Template") with gr.Column(): template_status = gr.Textbox(label="Template Status", interactive=False) template_preview = gr.HTML(label="Template Preview") with gr.Tab("Search Terms"): with gr.Row(): with gr.Column(): search_term = gr.Textbox(label="Search Term") campaign_id_for_search = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) add_term_btn = gr.Button("Add Search Term") with gr.Column(): search_term_status = gr.Textbox(label="Search Term Status", interactive=False) search_term_list = gr.Dataframe(df_to_list(fetch_search_terms()), headers=["ID", "Search Term", "Leads Fetched", "Status"]) with gr.Tab("Bulk Operations"): with gr.Row(): campaign_id_for_bulk = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) refresh_btn = gr.Button("Refresh Data") search_term_df = gr.Dataframe(headers=["ID", "Search Term", "Leads Fetched", "Status"]) selected_terms = gr.CheckboxGroup(label="Select Search Terms", choices=[]) num_results = gr.Slider(minimum=10, maximum=500, value=120, step=10, label="Results per term") with gr.Row(): template_id = gr.Dropdown(choices=fetch_message_templates(), label="Select Message Template") from_email = gr.Textbox(label="From Email", value="Sami Halawa ") reply_to = gr.Textbox(label="Reply To", value="eugproduction@gmail.com") preview_button = gr.Button("Preview Email") email_preview = gr.HTML(label="Email Preview") with gr.Row(): bulk_search_button = gr.Button("Bulk Search") bulk_send_button = gr.Button("Bulk Send") bulk_search_send_button = gr.Button("Bulk Search & Send") log_output = gr.TextArea(label="Process Logs", interactive=False) with gr.Tab("Manual Search"): with gr.Row(): manual_search_term = gr.Textbox(label="Manual Search Term") manual_num_results = gr.Slider(minimum=1, maximum=50, value=10, step=1) manual_search_btn = gr.Button("Search") manual_search_results = gr.Dataframe(headers=["Email", "Source"]) # Move these lines inside the Blocks context gradio_app.load(lambda: gr.update(value=df_to_list(fetch_search_terms())), outputs=search_term_df) gradio_app.load(lambda: gr.update(value=fetch_message_templates()), outputs=template_id) # Define button actions create_project_btn.click(create_project, inputs=[project_name], outputs=[project_status]) create_campaign_btn.click(create_campaign, inputs=[campaign_name, project_id, campaign_type], outputs=[campaign_status]) create_template_btn.click(create_message_template, inputs=[template_name, subject, body_content, campaign_id_for_template], outputs=[template_status]) add_term_btn.click(add_search_term, inputs=[search_term, campaign_id_for_search], outputs=[search_term_status]) preview_button.click(get_email_preview, inputs=[template_id, from_email, reply_to], outputs=email_preview) bulk_search_button.click(bulk_search, inputs=[selected_terms, num_results], outputs=[log_output, search_term_df]) bulk_send_button.click(bulk_send, inputs=[template_id, from_email, reply_to], outputs=log_output) bulk_search_send_button.click( lambda selected_terms, num_results, template_id, from_email, reply_to: gr.update(value="Starting bulk search..."), inputs=[selected_terms, num_results, template_id, from_email, reply_to], outputs=log_output ).then( bulk_search, inputs=[selected_terms, num_results], outputs=[log_output, search_term_df] ).then( lambda: gr.update(value="Bulk search completed. Starting bulk send..."), outputs=log_output ).then( bulk_send, inputs=[template_id, from_email, reply_to], outputs=log_output ) manual_search_btn.click(manual_search, inputs=[manual_search_term, manual_num_results], outputs=manual_search_results) refresh_btn.click(refresh_search_terms, inputs=[campaign_id_for_bulk], outputs=[search_term_df]) # Launch the app outside the Blocks context gradio_app.launch()