import os import re import requests import pandas as pd from datetime import datetime from bs4 import BeautifulSoup from googlesearch import search import gradio as gr import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError import openai from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import logging import json from sqlalchemy import create_engine, func from sqlalchemy.orm import sessionmaker from schema import Base, Project, Campaign, Lead, LeadSource, CampaignLead, EmailCampaign, SearchTerm, KnowledgeBase, EmailTemplate, OptimizedSearchTerm, SearchTermEffectiveness, SearchTermGroup, AIRequestLog, AutomationLog # Validation functions def validate_name(name): if not name or not name.strip(): raise ValueError("Name cannot be empty or just whitespace") if len(name) > 100: raise ValueError("Name is too long (max 100 characters)") return name.strip() def validate_email(email): if not re.match(r"[^@]+@[^@]+\.[^@]+", email): raise ValueError("Invalid email address") return email def validate_campaign_type(campaign_type): valid_types = ["Email", "SMS"] if campaign_type not in valid_types: raise ValueError(f"Invalid campaign type. Must be one of {valid_types}") return campaign_type def validate_id(id_value, id_type): try: id_int = int(id_value.split(':')[0] if ':' in str(id_value) else id_value) if id_int <= 0: raise ValueError return id_int except (ValueError, AttributeError): raise ValueError(f"Invalid {id_type} ID") def validate_status(status, valid_statuses): if status not in valid_statuses: raise ValueError(f"Invalid status. Must be one of {valid_statuses}") return status def validate_num_results(num_results): if not isinstance(num_results, int) or num_results <= 0: raise ValueError("Invalid number of results") return num_results def sanitize_html(content): return re.sub('<[^<]+?>', '', content) def find_emails(html_text): email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[^@]+\.[A-Z|a-z]{2,7}\b') all_emails = set(email_regex.findall(html_text)) valid_emails = {email for email in all_emails if is_valid_email(email)} unique_emails = {} for email in valid_emails: domain = email.split('@')[1] if domain not in unique_emails: unique_emails[domain] = email return set(unique_emails.values()) def is_valid_email(email): invalid_patterns = [ r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@', r'^prueba@', r'^\d+[a-z]*@' ] typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] if len(email) < 6 or len(email) > 254: return False for pattern in invalid_patterns: if re.search(pattern, email, re.IGNORECASE): return False domain = email.split('@')[1] if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain): return False return True # Configuration aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", "default_aws_access_key_id") aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", "default_aws_secret_access_key") region_name = "us-east-1" openai.api_key = os.getenv("OPENAI_API_KEY", "default_openai_api_key") openai.api_base = os.getenv("OPENAI_API_BASE", "http://127.0.0.1:11434/v1") openai_model = "mistral" # Database configuration DATABASE_URL = f"postgresql://{os.getenv('SUPABASE_DB_USER')}:{os.getenv('SUPABASE_DB_PASSWORD')}@{os.getenv('SUPABASE_DB_HOST')}:{os.getenv('SUPABASE_DB_PORT')}/{os.getenv('SUPABASE_DB_NAME')}" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Initialize AWS SES client try: ses_client = boto3.client('ses', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) except (NoCredentialsError, PartialCredentialsError) as e: logging.error(f"AWS SES client initialization failed: {e}") raise # HTTP session with retry strategy session = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) session.mount('https://', HTTPAdapter(max_retries=retries)) # Setup logging try: logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s') except IOError as e: print(f"Error setting up logging: {e}") raise # Function to create a new project def create_project(project_name): project_name = validate_name(project_name) db = SessionLocal() new_project = Project(project_name=project_name) db.add(new_project) db.commit() project_id = new_project.id db.close() return project_id # Function to create a new campaign def create_campaign(campaign_name, project_id, campaign_type): campaign_name = validate_name(campaign_name) project_id = validate_id(project_id, "project") campaign_type = validate_campaign_type(campaign_type) db = SessionLocal() new_campaign = Campaign(campaign_name=campaign_name, project_id=project_id, campaign_type=campaign_type) db.add(new_campaign) db.commit() campaign_id = new_campaign.id db.close() return campaign_id # Function to create a new message template def create_message_template(template_name, subject, body_content, campaign_id): template_name = validate_name(template_name) subject = validate_name(subject) body_content = sanitize_html(body_content) campaign_id = validate_id(campaign_id, "campaign") db = SessionLocal() new_template = EmailTemplate(template_name=template_name, subject=subject, body_content=body_content, campaign_id=campaign_id) db.add(new_template) db.commit() template_id = new_template.id db.close() return template_id # Function to add a new search term def add_search_term(term, campaign_id): term = validate_name(term) campaign_id = validate_id(campaign_id, "campaign") db = SessionLocal() new_term = SearchTerm(term=term, campaign_id=campaign_id) db.add(new_term) db.commit() term_id = new_term.id db.close() return term_id # Function to fetch search terms def fetch_search_terms(campaign_id=None): db = SessionLocal() query = db.query(SearchTerm.id, SearchTerm.term, func.count(LeadSource.id).label('processed_leads'), SearchTerm.category.label('status')) query = query.outerjoin(LeadSource) if campaign_id: campaign_id = validate_id(campaign_id, "campaign") query = query.filter(SearchTerm.campaign_id == campaign_id) query = query.group_by(SearchTerm.id) rows = query.all() db.close() return pd.DataFrame([(row.id, row.term, row.processed_leads, row.status) for row in rows], columns=["ID", "Search Term", "Leads Fetched", "Status"]) # Function to update search term status def update_search_term_status(term_id, new_status, processed_leads): term_id = validate_id(term_id, "search term") new_status = validate_status(new_status, ["pending", "completed"]) processed_leads = validate_num_results(processed_leads) db = SessionLocal() term = db.query(SearchTerm).filter(SearchTerm.id == term_id).first() if term: term.category = new_status db.commit() db.close() # Function to save a new lead def save_lead(email, phone, first_name, last_name, company, job_title): email = validate_email(email) db = SessionLocal() new_lead = Lead(email=email, phone=phone, first_name=first_name, last_name=last_name, company=company, job_title=job_title) db.add(new_lead) db.commit() lead_id = new_lead.id db.close() return lead_id # Function to save lead source def save_lead_source(lead_id, search_query, url, page_title, meta_description, http_status, scrape_duration): lead_id = validate_id(lead_id, "lead") db = SessionLocal() new_lead_source = LeadSource(lead_id=lead_id, url=url, page_title=page_title, meta_description=meta_description, http_status=http_status, scrape_duration=scrape_duration) db.add(new_lead_source) db.commit() db.close() # Function to add a lead to a campaign def add_lead_to_campaign(campaign_id, lead_id): campaign_id = validate_id(campaign_id, "campaign") lead_id = validate_id(lead_id, "lead") db = SessionLocal() new_campaign_lead = CampaignLead(campaign_id=campaign_id, lead_id=lead_id, status='active') db.add(new_campaign_lead) db.commit() db.close() # Function to create a new message def create_message(campaign_id, lead_id, template_id, customized_subject, customized_content): campaign_id = validate_id(campaign_id, "campaign") lead_id = validate_id(lead_id, "lead") template_id = validate_id(template_id, "template") customized_subject = validate_name(customized_subject) customized_content = sanitize_html(customized_content) db = SessionLocal() new_message = EmailCampaign(campaign_id=campaign_id, lead_id=lead_id, template_id=template_id, customized_subject=customized_subject, customized_content=customized_content) db.add(new_message) db.commit() message_id = new_message.id db.close() return message_id # Function to update message status def update_message_status(message_id, status, sent_at=None): message_id = validate_id(message_id, "message") status = validate_status(status, ["pending", "sent", "failed"]) db = SessionLocal() message = db.query(EmailCampaign).filter(EmailCampaign.id == message_id).first() if message: message.status = status if sent_at: message.sent_at = sent_at db.commit() db.close() # Function to fetch message templates def fetch_message_templates(campaign_id=None): db = SessionLocal() query = db.query(EmailTemplate.id, EmailTemplate.template_name) if campaign_id: campaign_id = validate_id(campaign_id, "campaign") query = query.filter(EmailTemplate.campaign_id == campaign_id) templates = query.all() db.close() return [f"{template.id}: {template.template_name}" for template in templates] # Function to fetch projects def fetch_projects(): db = SessionLocal() projects = db.query(Project.id, Project.project_name).all() db.close() return [f"{project.id}: {project.project_name}" for project in projects] # Function to fetch campaigns def fetch_campaigns(): db = SessionLocal() campaigns = db.query(Campaign.id, Campaign.campaign_name).all() db.close() return [f"{campaign.id}: {campaign.campaign_name}" for campaign in campaigns] # Bulk search function async def bulk_search(selected_terms, num_results, progress=gr.Progress()): if not selected_terms: raise ValueError("No search terms selected") num_results = validate_num_results(num_results) total_leads = 0 for term_id in selected_terms: db = SessionLocal() term = db.query(SearchTerm.term, func.count(LeadSource.id).label('processed_leads')).outerjoin(LeadSource).filter(SearchTerm.id == term_id).first() db.close() if term: term, processed_leads = term.term, term.processed_leads else: continue leads_found = 0 try: search_urls = list(search(term, num_results=num_results)) except Exception as e: logging.error(f"Error performing Google search for term '{term}': {e}") continue for url in search_urls: if leads_found + processed_leads >= num_results: break try: response = session.get(url, timeout=10) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'html.parser') emails = find_emails(response.text) for email in emails: lead_id = save_lead(email, None, None, None, None, None) save_lead_source(lead_id, term, url, soup.title.string, None, response.status_code, str(response.elapsed)) leads_found += 1 total_leads += 1 if leads_found + processed_leads >= num_results: break except Exception as e: logging.error(f"Error processing {url}: {e}") yield f"Processed {leads_found + processed_leads} leads for term '{term}'" update_search_term_status(term_id, 'completed', leads_found + processed_leads) yield f"Completed term '{term}': Found {leads_found} new leads, total {leads_found + processed_leads}" yield f"Bulk search completed. Total new leads found: {total_leads}" # Bulk send function async def bulk_send(template_id, from_email, reply_to, progress=gr.Progress()): if not isinstance(template_id, int): raise ValueError("Invalid template ID") if not re.match(r"[^@]+@[^@]+\.[^@]+", from_email): raise ValueError("Invalid from email address") if not re.match(r"[^@]+@[^@]+\.[^@]+", reply_to): raise ValueError("Invalid reply to email address") db = SessionLocal() messages = db.query(EmailCampaign.id, Lead.email, EmailCampaign.customized_subject, EmailCampaign.customized_content).join(Lead).filter(EmailCampaign.template_id == template_id, EmailCampaign.status == 'pending').all() db.close() total_sent = 0 for message_id, email, subject, content in messages: try: response = ses_client.send_email( Source=from_email, Destination={'ToAddresses': [email]}, Message={ 'Subject': {'Data': subject}, 'Body': {'Html': {'Data': content}} }, ReplyToAddresses=[reply_to] ) update_message_status(message_id, 'sent', datetime.now()) total_sent += 1 yield f"Sent email to {email}" except Exception as e: logging.error(f"Failed to send email to {email}: {e}") update_message_status(message_id, 'failed') yield f"Failed to send email to {email}" yield f"Bulk send completed. Total emails sent: {total_sent}" # Function to get email preview def get_email_preview(template_id, from_email, reply_to): template_id = validate_id(template_id, "template") from_email = validate_email(from_email) reply_to = validate_email(reply_to) db = SessionLocal() template = db.query(EmailTemplate.subject, EmailTemplate.body_content).filter(EmailTemplate.id == template_id).first() db.close() if template: subject, body_content = template preview = f"Subject: {subject}\n\nFrom: {from_email}\nReply-To: {reply_to}\n\nBody:\n{body_content}" return preview else: return "Template not found" # Function to refresh search terms def refresh_search_terms(campaign_id): return df_to_list(fetch_search_terms(campaign_id)) # Function to convert DataFrame to list of lists def df_to_list(df): return df.values.tolist() # Add this function before the Gradio interface definition def manual_search(term, num_results): results = [] try: search_urls = list(search(term, num_results=num_results)) for url in search_urls: response = session.get(url, timeout=10) emails = find_emails(response.text) results.extend([(email, url) for email in emails]) except Exception as e: logging.error(f"Error in manual search: {e}") return results[:num_results] # Add a function to fetch leads def fetch_leads(): db = SessionLocal() leads = db.query(Lead.id, Lead.email, Lead.first_name, Lead.last_name, Lead.company, Lead.job_title).all() db.close() return pd.DataFrame([(lead.id, lead.email, lead.first_name, lead.last_name, lead.company, lead.job_title) for lead in leads], columns=["ID", "Email", "First Name", "Last Name", "Company", "Job Title"]) # Add a function to fetch campaigns with leads count def fetch_campaigns_with_leads(): db = SessionLocal() campaigns = db.query(Campaign.id, Campaign.campaign_name, func.count(CampaignLead.id).label('leads_count')).outerjoin(CampaignLead).group_by(Campaign.id).all() db.close() return pd.DataFrame([(campaign.id, campaign.campaign_name, campaign.leads_count) for campaign in campaigns], columns=["ID", "Campaign Name", "Leads Count"]) # Add a function to fetch leads for a specific campaign def fetch_leads_for_campaign(campaign_id): campaign_id = validate_id(campaign_id, "campaign") db = SessionLocal() leads = db.query(Lead.id, Lead.email, Lead.first_name, Lead.last_name, Lead.company, Lead.job_title).join(CampaignLead).filter(CampaignLead.campaign_id == campaign_id).all() db.close() return pd.DataFrame([(lead.id, lead.email, lead.first_name, lead.last_name, lead.company, lead.job_title) for lead in leads], columns=["ID", "Email", "First Name", "Last Name", "Company", "Job Title"]) # Gradio interface with gr.Blocks() as gradio_app: gr.Markdown("# AUTOCLIENT") with gr.Tab("Projects and Campaigns"): with gr.Row(): with gr.Column(): project_name = gr.Textbox(label="Project Name") create_project_btn = gr.Button("Create Project") project_status = gr.Textbox(label="Project Status", interactive=False) with gr.Column(): campaign_name = gr.Textbox(label="Campaign Name") project_id = gr.Dropdown(label="Project", choices=fetch_projects()) campaign_type = gr.Radio(["Email", "SMS"], label="Campaign Type") create_campaign_btn = gr.Button("Create Campaign") campaign_status = gr.Textbox(label="Campaign Status", interactive=False) campaign_list = gr.Dataframe(fetch_campaigns_with_leads(), headers=["ID", "Campaign Name", "Leads Count"]) with gr.Tab("Email Templates"): with gr.Row(): template_name = gr.Textbox(label="Template Name") subject = gr.Textbox(label="Subject") body_content = gr.Code(language="html", label="Body Content", lines=10) campaign_id_for_template = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) create_template_btn = gr.Button("Create Template") template_status = gr.Textbox(label="Template Status", interactive=False) template_preview = gr.HTML(label="Template Preview") with gr.Tab("Search Terms"): with gr.Row(): search_term = gr.Textbox(label="Search Term") campaign_id_for_search = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) add_term_btn = gr.Button("Add Search Term") search_term_status = gr.Textbox(label="Search Term Status", interactive=False) search_term_list = gr.Dataframe(df_to_list(fetch_search_terms()), headers=["ID", "Search Term", "Leads Fetched", "Status"]) with gr.Row(): edit_term_id = gr.Number(label="Term ID to Edit") new_term = gr.Textbox(label="New Term") edit_term_btn = gr.Button("Edit Term") delete_term_btn = gr.Button("Delete Term") with gr.Tab("Bulk Operations"): with gr.Row(): campaign_id_for_bulk = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) refresh_btn = gr.Button("Refresh Data") search_term_df = gr.Dataframe(headers=["ID", "Search Term", "Leads Fetched", "Status"]) selected_terms = gr.CheckboxGroup(label="Select Search Terms", choices=[]) num_results = gr.Slider(minimum=10, maximum=500, value=120, step=10, label="Results per term") with gr.Row(): template_id = gr.Dropdown(choices=fetch_message_templates(), label="Select Message Template") from_email = gr.Textbox(label="From Email", value="Sami Halawa ") reply_to = gr.Textbox(label="Reply To", value="eugproduction@gmail.com") preview_button = gr.Button("Preview Email") email_preview = gr.HTML(label="Email Preview") with gr.Row(): bulk_search_button = gr.Button("Bulk Search") bulk_send_button = gr.Button("Bulk Send") bulk_search_send_button = gr.Button("Bulk Search & Send") send_progress = gr.Progress() log_output = gr.TextArea(label="Process Logs", interactive=False) with gr.Tab("Manual Search"): with gr.Row(): manual_search_term = gr.Textbox(label="Manual Search Term") manual_num_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Number of Results") manual_search_btn = gr.Button("Search") manual_search_results = gr.Dataframe(headers=["Email", "Source", "First Name", "Last Name", "Company", "Job Title"]) add_to_campaign_btn = gr.Button("Add Selected to Campaign") add_status = gr.Textbox(label="Add Status", interactive=False) with gr.Tab("Leads"): leads_df = gr.Dataframe(fetch_leads(), headers=["ID", "Email", "First Name", "Last Name", "Company", "Job Title"]) campaign_id_for_leads = gr.Dropdown(label="Campaign", choices=fetch_campaigns()) fetch_leads_btn = gr.Button("Fetch Leads for Campaign") leads_for_campaign_df = gr.Dataframe(headers=["ID", "Email", "First Name", "Last Name", "Company", "Job Title"]) with gr.Row(): lead_email = gr.Textbox(label="Email") lead_first_name = gr.Textbox(label="First Name") lead_last_name = gr.Textbox(label="Last Name") lead_company = gr.Textbox(label="Company") lead_job_title = gr.Textbox(label="Job Title") add_lead_btn = gr.Button("Add Lead") lead_status = gr.Textbox(label="Lead Status", interactive=False) with gr.Tab("Campaign Analytics"): campaign_select = gr.Dropdown(label="Select Campaign", choices=fetch_campaigns()) refresh_analytics_btn = gr.Button("Refresh Analytics") with gr.Row(): total_leads = gr.Number(label="Total Leads") emails_sent = gr.Number(label="Emails Sent") open_rate = gr.Number(label="Open Rate") click_rate = gr.Number(label="Click Rate") performance_chart = gr.Plot(label="Campaign Performance") with gr.Tab("Lead Import/Export"): with gr.Row(): import_file = gr.File(label="Import CSV") import_btn = gr.Button("Import Leads") import_status = gr.Textbox(label="Import Status", interactive=False) export_campaign = gr.Dropdown(label="Select Campaign for Export", choices=fetch_campaigns()) export_btn = gr.Button("Export Leads") export_status = gr.Textbox(label="Export Status", interactive=False) with gr.Tab("Email Scheduling"): schedule_campaign = gr.Dropdown(label="Select Campaign", choices=fetch_campaigns()) schedule_template = gr.Dropdown(label="Select Template", choices=fetch_message_templates()) schedule_date = gr.Datetime(label="Schedule Date and Time") schedule_btn = gr.Button("Schedule Campaign") schedule_status = gr.Textbox(label="Schedule Status", interactive=False) scheduled_campaigns = gr.Dataframe(headers=["ID", "Campaign", "Template", "Scheduled Time", "Status"]) with gr.Tab("AI Content Assistant"): content_type = gr.Radio(["Email Subject", "Email Body", "Search Term"], label="Content Type") campaign_context = gr.Dropdown(label="Select Campaign for Context", choices=fetch_campaigns()) prompt = gr.Textbox(label="Prompt for AI", lines=3) generate_btn = gr.Button("Generate Content") generated_content = gr.Textbox(label="Generated Content", lines=10) use_content_btn = gr.Button("Use Generated Content") with gr.Tab("User Management"): with gr.Row(): username = gr.Textbox(label="Username") email = gr.Textbox(label="Email") password = gr.Textbox(label="Password", type="password") role = gr.Dropdown(label="Role", choices=["Admin", "User", "Viewer"]) add_user_btn = gr.Button("Add User") user_status = gr.Textbox(label="User Status", interactive=False) user_list = gr.Dataframe(headers=["ID", "Username", "Email", "Role", "Last Login"]) with gr.Row(): edit_user_id = gr.Number(label="User ID to Edit") new_role = gr.Dropdown(label="New Role", choices=["Admin", "User", "Viewer"]) edit_user_btn = gr.Button("Edit User Role") delete_user_btn = gr.Button("Delete User") # Move these lines inside the Blocks context gradio_app.load(lambda: gr.update(value=df_to_list(fetch_search_terms())), outputs=search_term_df) gradio_app.load(lambda: gr.update(value=fetch_message_templates()), outputs=template_id) # Define button actions create_project_btn.click(create_project, inputs=[project_name], outputs=[project_status]) create_campaign_btn.click(create_campaign, inputs=[campaign_name, project_id, campaign_type], outputs=[campaign_status]) create_template_btn.click(create_message_template, inputs=[template_name, subject, body_content, campaign_id_for_template], outputs=[template_status]) add_term_btn.click(add_search_term, inputs=[search_term, campaign_id_for_search], outputs=[search_term_status]) preview_button.click(get_email_preview, inputs=[template_id, from_email, reply_to], outputs=email_preview) bulk_search_button.click(bulk_search, inputs=[selected_terms, num_results], outputs=[log_output, search_term_df]) bulk_send_button.click(bulk_send, inputs=[template_id, from_email, reply_to], outputs=log_output) bulk_search_send_button.click( lambda selected_terms, num_results, template_id, from_email, reply_to: gr.update(value="Starting bulk search..."), inputs=[selected_terms, num_results, template_id, from_email, reply_to], outputs=log_output ).then( bulk_search, inputs=[selected_terms, num_results], outputs=[log_output, search_term_df] ).then( lambda: gr.update(value="Bulk search completed. Starting bulk send..."), outputs=log_output ).then( bulk_send, inputs=[template_id, from_email, reply_to], outputs=log_output ) manual_search_btn.click(manual_search, inputs=[manual_search_term, manual_num_results], outputs=manual_search_results) refresh_btn.click(refresh_search_terms, inputs=[campaign_id_for_bulk], outputs=[search_term_df]) fetch_leads_btn.click(fetch_leads_for_campaign, inputs=[campaign_id_for_leads], outputs=[leads_for_campaign_df]) # Launch the app outside the Blocks context gradio_app.launch()