import os import re import psycopg2 from psycopg2 import pool import requests import pandas as pd from datetime import datetime from bs4 import BeautifulSoup import gradio as gr from googlesearch-python import search import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError import openai import logging from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry # Configuration AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "AKIASO2XOMEGIVD422N7") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "Rl+rzgizFDZPnNgDUNk0N0gAkqlyaYqhx7O2ona9") REGION_NAME = "us-east-1" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key") OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://127.0.0.1:11434/v1") OPENAI_MODEL = "gpt-3.5-turbo" DB_PARAMS = { "user": "postgres.whwiyccyyfltobvqxiib", "password": "SamiHalawa1996", "host": "aws-0-eu-central-1.pooler.supabase.com", "port": "6543", "dbname": "postgres", "sslmode": "require", "gssencmode": "disable" } # Initialize AWS SES client ses_client = boto3.client('ses', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=REGION_NAME) # Connection pool for PostgreSQL db_pool = pool.SimpleConnectionPool(1, 10, **DB_PARAMS) # HTTP session with retry strategy session = requests.Session() retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retries) session.mount('https://', adapter) # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize database connection def init_db(): try: conn = db_pool.getconn() conn.close() logger.info("Database connection established successfully.") except psycopg2.Error as e: logger.error(f"Failed to connect to the database: {e}") init_db() def is_valid_email(email): invalid_patterns = [ r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@', r'^prueba@', r'^\d+[a-z]*@' ] typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] if not email or len(email) < 6 or len(email) > 254: return False for pattern in invalid_patterns: if re.search(pattern, email, re.IGNORECASE): return False domain = email.split('@')[-1] if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain): return False return True def find_emails(html_text): email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b') all_emails = set(email_regex.findall(html_text)) valid_emails = {email.lower() for email in all_emails if is_valid_email(email)} return valid_emails def scrape_emails(search_query, num_results=10): results = [] search_params = {'q': search_query, 'num': num_results, 'start': 0} try: for _ in range(num_results // 10): # Adjust the loop to fetch num_results in batches of 10 response = session.get('https://www.google.com/search', params=search_params) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') emails = find_emails(soup.get_text()) for email in emails: results.append((search_query, email)) save_lead(search_query, email) search_params['start'] += 10 except requests.exceptions.RequestException as e: logger.error(f"Failed to scrape search results: {e}") except Exception as e: logger.error(f"Unexpected error: {e}") return pd.DataFrame(results, columns=["Search Query", "Email"]) def save_lead(search_query, email): try: conn = db_pool.getconn() with conn.cursor() as cursor: cursor.execute(""" INSERT INTO leads (search_query, email) VALUES (%s, %s) ON CONFLICT (email, search_query) DO NOTHING """, (search_query, email)) conn.commit() db_pool.putconn(conn) except psycopg2.Error as e: logger.error(f"Failed to save lead data to the database: {e}") def save_generated_email(search_term, email, generated_email, url, subject): try: conn = db_pool.getconn() with conn.cursor() as cursor: cursor.execute(""" INSERT INTO generated_emails (search_term, email, generated_email, url, subject) VALUES (%s, %s, %s, %s, %s) """, (search_term, email, generated_email, url, subject)) conn.commit() db_pool.putconn(conn) except psycopg2.Error as e: logger.error(f"Failed to save generated email to the database: {e}") def generate_ai_content(lead_info): prompt = f""" Generate a personalized email for a lead using the following information: {lead_info}. The email should include an engaging subject line, a warm greeting, a value proposition, key benefits, and a call-to-action. """ try: response = openai.Completion.create( model=mistral, prompt=prompt, max_tokens=500, n=1, stop=None ) content = response.choices[0].text.strip() if "\n\n" in content: subject, email_body = content.split("\n\n", 1) return subject, email_body else: logger.error("AI-generated content is missing subject or body.") return None, None except openai.error.APIError as e: logger.error(f"OpenAI API error: {e}") return None, None except Exception as e: logger.error(f"Unexpected error: {e}") return None, None def send_email_via_ses(subject, body_html, to_address, from_address, reply_to): try: response = ses_client.send_email( Destination={ 'ToAddresses': [to_address] }, Message={ 'Body': { 'Html': { 'Charset': 'UTF-8', 'Data': body_html } }, 'Subject': { 'Charset': 'UTF-8', 'Data': subject } }, Source=from_address, ReplyToAddresses=[reply_to] ) logger.info(f"Email sent successfully. Message ID: {response['MessageId']}") except NoCredentialsError: logger.error("AWS credentials not available.") except PartialCredentialsError: logger.error("Incomplete AWS credentials provided.") except Exception as e: logger.error(f"Failed to send email: {e}") def process_and_send_bulk(selected_terms, template_id, num_emails, from_email, reply_to, auto_send=False): total_processed = 0 try: for term_id in selected_terms: conn = db_pool.getconn() with conn.cursor() as cursor: cursor.execute('SELECT term FROM search_terms WHERE id=%s', (term_id,)) search_term = cursor.fetchone()[0] cursor.execute('UPDATE search_terms SET status=%s WHERE id=%s', ('processing', term_id)) conn.commit() db_pool.putconn(conn) emails_df = scrape_emails(search_term, num_results=num_emails) logger.info(f"Scraped {len(emails_df)} emails for search term '{search_term}'") if emails_df.empty: logger.warning(f"No emails found for search term: {search_term}") continue for _, email_data in emails_df.iterrows(): email = email_data['Email'] save_lead(search_term, email) if template_id is None: for _, email_data in emails_df.iterrows(): email = email_data['Email'] lead_info = {"name": "", "from_email": from_email, "reply_to": reply_to, "prompt": ""} subject, generated_email = generate_ai_content(lead_info) if generated_email: save_generated_email(search_term, email, generated_email, email_data.get('URL', ''), subject) if auto_send: send_email_via_ses(subject, generated_email, email, from_email, reply_to) logger.info(f"Email sent to {email}") else: subject, body_html = fetch_template(template_id) for _, email_data in emails_df.iterrows(): email = email_data['Email'] if subject and body_html: save_generated_email(search_term, email, body_html, email_data.get('URL', ''), subject) if auto_send: send_email_via_ses(subject, body_html, email, from_email, reply_to) logger.info(f"Email sent to {email}") total_processed += len(emails_df) logger.info(f"Processed {len(emails_df)} emails for search term '{search_term}'") return f"Processed and sent {total_processed} emails successfully." if auto_send else f"Processed {total_processed} emails successfully." except Exception as e: logger.error(f"Error during bulk process and send: {e}") return "An error occurred during processing." with gr.Blocks() as gradio_app: gr.Markdown("# Email Campaign Management System") with gr.Tab("Search Emails"): search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results") search_button = gr.Button("Search") results = gr.Dataframe(headers=["Search Query", "Email"]) search_button.click(scrape_emails, inputs=[search_query, num_results], outputs=[results]) with gr.Tab("Create Email Template"): template_name = gr.Textbox(label="Template Name", placeholder="e.g., 'Welcome Email'") subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") body_html = gr.Textbox(label="Email Content (HTML)", placeholder="Enter your email content here...", lines=8) create_template_button = gr.Button("Create Template") template_status = gr.Textbox(label="Template Creation Status", interactive=False) def create_email_template(template_name, subject, body_html): try: conn = db_pool.getconn() with conn.cursor() as cursor: cursor.execute(""" INSERT INTO email_templates (template_name, subject, body_html) VALUES (%s, %s, %s) """, (template_name, subject, body_html)) conn.commit() db_pool.putconn(conn) template_status.update(value="Template created successfully.") except psycopg2.Error as e: template_status.update(value=f"Error creating template: {e}") logger.error(f"Failed to create template: {e}") create_template_button.click(create_email_template, inputs=[template_name, subject, body_html], outputs=[template_status]) with gr.Tab("Generate and Send Emails"): with gr.Row(): template_id = gr.Dropdown(choices=[], label="Select Email Template") use_ai_customizer = gr.Checkbox(label="AI Customizer", value=False) with gr.Row(): name = gr.Textbox(label="Your Name", placeholder="e.g., 'Daniel C.'") from_email = gr.Textbox(label="From Email", placeholder="e.g., 'your.email@example.com'") subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") body_html = gr.HTML(label="Email Content (Dynamic Preview)", value="") reply_to = gr.Textbox(label="Reply To", placeholder="e.g., 'replyto@example.com'") def fetch_templates(): try: conn = db_pool.getconn() with conn.cursor() as cursor: cursor.execute("SELECT * FROM email_templates") templates = cursor.fetchall() db_pool.putconn(conn) return pd.DataFrame(templates, columns=["ID", "Template Name", "Subject", "Body HTML"]) except psycopg2.Error as e: logger.error(f"Failed to fetch templates: {e}") return pd.DataFrame() def fetch_template(template_id): templates = fetch_templates() if not templates.empty and template_id in templates['ID'].tolist(): selected_template = templates.loc[templates['ID'] == template_id] return selected_template['Subject'].item(), selected_template['Body HTML'].item() return None, None def generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id): if use_ai_customizer: lead_info = { "name": name, "from_email": from_email, "reply_to": reply_to, "prompt": "" } subject, email_body = generate_ai_content(lead_info) return subject, email_body else: subject, body_html = fetch_template(template_id) return subject, body_html def update_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id): new_subject, new_body = generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id) return new_subject, new_body for input_component in [name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id]: input_component.change(update_email_content, inputs=[name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id], outputs=[subject, body_html]) def generate_all_emails(template_id, name, from_email, reply_to, use_ai_customizer): data = fetch_search_terms() generated_data = [] for _, row in data.iterrows(): email_info = { 'email': row['email'], 'url': row['url'], 'search_query': row['search_query'] } subject, body_html = fetch_template(template_id) if template_id else (None, None) gen_subject, generated_email = generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id) if gen_subject and generated_email: save_generated_email(row['id'], gen_subject, generated_email, email_info['url'], subject) generated_data.append({ "ID": row['id'], "Search Query": row['search_query'], "Email": row['email'], "Generated Email": generated_email, "Email Sent": False }) else: logger.error(f"Failed to generate email for {row['email']}") return pd.DataFrame(generated_data) generate_button = gr.Button("Generate Emails") results = gr.Dataframe(headers=["ID", "Search Query", "Email", "Generated Email", "Email Sent"]) generate_button.click(generate_all_emails, inputs=[template_id, name, from_email, reply_to, use_ai_customizer], outputs=[results]) send_button = gr.Button("Bulk Send Emails") send_status = gr.Textbox(label="Send Status", interactive=False) def send_emails(from_email, reply_to): fixed_subject = "Your Subject Line Here" fixed_body_html = """
We are thrilled to have you on board!
""" process_and_send_bulk(from_email, reply_to, fixed_subject, fixed_body_html, auto_send=True) send_status.update(value="Emails sent successfully.") send_button.click(send_emails, inputs=[from_email, reply_to], outputs=[send_status]) with gr.Tab("Bulk Process and Send"): search_term_list = gr.Dataframe(fetch_search_terms(), headers=["ID", "Search Term", "Status", "Fetched Emails"]) selected_terms = gr.CheckboxGroup(label="Select Search Queries to Process", choices=fetch_search_terms()['ID'].tolist()) num_emails = gr.Slider(1, 100, value=10, step=1, label="Number of Emails per Search Term") auto_send = gr.Checkbox(label="Auto Send Emails After Processing", value=False) template_id = gr.Dropdown(choices=[], label="Select Email Template for Bulk Send") from_email = gr.Textbox(label="From Email", placeholder="Enter your email address") reply_to = gr.Textbox(label="Reply To", placeholder="Enter reply-to email address") process_send_button = gr.Button("Process and Send Selected Queries") process_status = gr.Textbox(label="Process Status", interactive=False) def bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to): return process_and_send_bulk(selected_terms, template_id, num_emails, from_email, reply_to, auto_send=auto_send) process_send_button.click(bulk_process_and_send, inputs=[selected_terms, template_id, num_emails, auto_send, from_email, reply_to], outputs=[process_status]) gradio_app.launch(share=True)