Spaces:
Runtime error
Runtime error
import os | |
import re | |
import psycopg2 | |
from psycopg2 import pool | |
import requests | |
import pandas as pd | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
from googlesearch import search | |
import gradio as gr | |
import boto3 | |
from botocore.exceptions import NoCredentialsError, PartialCredentialsError | |
import openai | |
import logging | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
# Configuration | |
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "AKIASO2XOMEGIVD422N7") | |
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "Rl+rzgizFDZPnNgDUNk0N0gAkqlyaYqhx7O2ona9") | |
REGION_NAME = "us-east-1" | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key") | |
OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "http://127.0.0.1:11434/v1") | |
OPENAI_MODEL = "mistral" | |
DB_PARAMS = { | |
"user": "postgres.whwiyccyyfltobvqxiib", | |
"password": "SamiHalawa1996", | |
"host": "aws-0-eu-central-1.pooler.supabase.com", | |
"port": "6543", | |
"dbname": "postgres", | |
"sslmode": "require", | |
"gssencmode": "disable" | |
} | |
# Initialize AWS SES client | |
ses_client = boto3.client('ses', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
region_name=REGION_NAME) | |
# Connection pool for PostgreSQL | |
db_pool = pool.SimpleConnectionPool(1, 10, **DB_PARAMS) | |
# HTTP session with retry strategy | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) | |
adapter = HTTPAdapter(max_retries=retries) | |
session.mount('https://', adapter) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Enhanced logging for debugging | |
def log_debug_info(step, info): | |
logger.debug(f"Step: {step}, Info: {info}") | |
# Initialize database connection | |
def init_db(): | |
try: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
conn.close() | |
logger.info("Database connection established successfully.") | |
except psycopg2.Error as e: | |
logger.error(f"Failed to connect to the database: {e}") | |
init_db() | |
# Check if the email is valid | |
def is_valid_email(email): | |
invalid_patterns = [ | |
r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@', | |
r'^prueba@', r'^\d+[a-z]*@' | |
] | |
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] | |
MIN_EMAIL_LENGTH = 6 | |
MAX_EMAIL_LENGTH = 254 | |
if len(email) < MIN_EMAIL_LENGTH or len(email) > MAX_EMAIL_LENGTH: | |
return False | |
for pattern in invalid_patterns: | |
if re.search(pattern, email, re.IGNORECASE): | |
return False | |
domain = email.split('@')[1] | |
if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain): | |
return False | |
return True | |
# Function to find and validate unique emails in a text | |
def find_emails(html_text): | |
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b') | |
all_emails = set(email_regex.findall(html_text)) | |
valid_emails = {email for email in all_emails if is_valid_email(email)} | |
return valid_emails | |
# Function to save search results to PostgreSQL database | |
def save_to_db(search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date): | |
try: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
with conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO emails (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
""", (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date)) | |
cursor.execute(""" | |
UPDATE search_terms SET last_processed_at = %s, fetched_emails = fetched_emails + 1 | |
WHERE term = %s AND fetched_emails < 30 | |
""", (scrape_date, search_query)) | |
conn.commit() | |
db_pool.putconn(conn) | |
logger.info(f"Successfully saved data to the database for email: {email}") | |
except Exception as e: | |
logger.error(f"Failed to save data to the database: {e}") | |
# Function to scrape emails using Google Search | |
def scrape_emails(search_query, num_results=10): | |
results = [] | |
search_params = {'q': search_query, 'num': num_results, 'start': 0} | |
for _ in range(num_results // 10): | |
try: | |
start_time = datetime.now() | |
response = session.get('https://www.google.com/search', params=search_params) | |
http_status = response.status_code | |
response.encoding = 'utf-8' | |
soup = BeautifulSoup(response.text, 'html.parser') | |
page_title = soup.title.string if soup.title else 'No Title Found' | |
meta_description = soup.find('meta', attrs={'name': 'description'}) | |
meta_description = meta_description['content'] if meta_description else 'No Description Found' | |
scrape_duration = datetime.now() - start_time | |
log_debug_info("HTTP Response", f"Status Code: {response.status_code}, URL: {response.url}") | |
emails = find_emails(response.text) | |
log_debug_info("Email Extraction", f"Extracted Emails: {emails}") | |
for email in emails: | |
if is_valid_email(email): | |
results.append((search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now()))) | |
save_to_db(search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now())) | |
search_params['start'] += 10 | |
except Exception as e: | |
logger.error(f"Failed to scrape {response.url}: {e}") | |
return pd.DataFrame(results, columns=["Search Query", "Email", "Page Title", "URL", "Meta Description", "HTTP Status", "Scrape Duration", "Scrape Date"]) | |
# Function to generate AI-based email content | |
def generate_ai_content(lead_info): | |
prompt = f""" | |
Generate a personalized email for a lead using the following information: {lead_info}. | |
The email should include an engaging subject line, a warm greeting, a value proposition, key benefits, and a call-to-action. | |
""" | |
try: | |
response = openai.Completion.create( | |
model=OPENAI_MODEL, | |
prompt=prompt, | |
max_tokens=500, | |
n=1, | |
stop=None | |
) | |
content = response.choices[0].text.strip() | |
if "\n\n" in content: | |
subject, email_body = content.split("\n\n", 1) | |
return subject, email_body | |
else: | |
logger.error("AI-generated content is missing subject or body.") | |
return None, None | |
except openai.error.APIError as e: | |
logger.error(f"OpenAI API error: {e}") | |
return None, None | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return None, None | |
# Function to send an email via AWS SES | |
def send_email_via_ses(subject, body_html, to_address, from_address, reply_to): | |
try: | |
response = ses_client.send_email( | |
Destination={ | |
'ToAddresses': [to_address] | |
}, | |
Message={ | |
'Body': { | |
'Html': { | |
'Charset': 'UTF-8', | |
'Data': body_html | |
} | |
}, | |
'Subject': { | |
'Charset': 'UTF-8', | |
'Data': subject | |
} | |
}, | |
Source=from_address, | |
ReplyToAddresses=[reply_to] | |
) | |
logger.info(f"Email sent successfully to {to_address}. Message ID: {response['MessageId']}") | |
except NoCredentialsError: | |
logger.error("AWS credentials not available.") | |
except PartialCredentialsError: | |
logger.error("Incomplete AWS credentials provided.") | |
except Exception as e: | |
logger.error(f"Failed to send email to {to_address}: {e}") | |
# Function to fetch search terms from the database | |
def fetch_search_terms(): | |
try: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
with conn.cursor() as cursor: | |
cursor.execute("SELECT id, term, status, fetched_emails FROM search_terms") | |
search_terms = cursor.fetchall() | |
db_pool.putconn(conn) | |
return pd.DataFrame(search_terms, columns=["ID", "Search Term", "Status", "Fetched Emails"]) | |
except psycopg2.Error as e: | |
logger.error(f"Failed to fetch search terms: {e}") | |
return pd.DataFrame() | |
# Function to fetch email templates from the database | |
def fetch_templates(): | |
try: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
with conn.cursor() as cursor: | |
cursor.execute("SELECT id, template_name, subject, body_html FROM email_templates") | |
templates = cursor.fetchall() | |
db_pool.putconn(conn) | |
return pd.DataFrame(templates, columns=["ID", "Template Name", "Subject", "Body HTML"]) | |
except psycopg2.Error as e: | |
logger.error(f"Failed to fetch templates: {e}") | |
return pd.DataFrame() | |
# Function to fetch a specific template by ID | |
def fetch_template(template_id): | |
templates = fetch_templates() | |
if not templates.empty and template_id in templates['ID'].tolist(): | |
selected_template = templates.loc[templates['ID'] == template_id] | |
return selected_template['Subject'].item(), selected_template['Body HTML'].item() | |
logger.error(f"Template ID {template_id} is invalid or has empty fields.") | |
return None, None | |
# Function to process and send emails in bulk with logging | |
def process_and_send_with_logging(template_id, name, from_email, reply_to, use_ai_customizer): | |
logger.info(f"Starting email campaign with template ID: {template_id}") | |
result_message = bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to) | |
logger.info(result_message) | |
return result_message | |
# Bulk processing and sending emails function | |
def bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to): | |
total_processed = 0 | |
try: | |
for term_id in selected_terms: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT term FROM search_terms WHERE id=%s', (term_id,)) | |
search_term = cursor.fetchone()[0] | |
cursor.execute('UPDATE search_terms SET status=%s WHERE id=%s', ('processing', term_id)) | |
conn.commit() | |
db_pool.putconn(conn) | |
emails_df = scrape_emails(search_term, num_results=num_emails) | |
logger.info(f"Scraped {len(emails_df)} emails for search term '{search_term}'") | |
if emails_df.empty: | |
logger.warning(f"No emails found for search term: {search_term}") | |
continue | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
save_lead(search_term, email) | |
if template_id is None: | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
lead_info = {"name": name, "from_email": from_email, "reply_to": reply_to, "prompt": ""} | |
subject, generated_email = generate_ai_content(lead_info) | |
if generated_email: | |
save_generated_email(search_term, email, generated_email, email_data.get('URL', ''), subject) | |
if auto_send: | |
send_email_via_ses(subject, generated_email, email, from_email, reply_to) | |
logger.info(f"Email sent to {email}") | |
else: | |
subject, body_html = fetch_template(template_id) | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
if subject and body_html: | |
save_generated_email(search_term, email, body_html, email_data.get('URL', ''), subject) | |
if auto_send: | |
send_email_via_ses(subject, body_html, email, from_email, reply_to) | |
logger.info(f"Email sent to {email}") | |
total_processed += len(emails_df) | |
logger.info(f"Processed {len(emails_df)} emails for search term '{search_term}'") | |
return f"Processed and sent {total_processed} emails successfully." if auto_send else f"Processed {total_processed} emails successfully." | |
except Exception as e: | |
logger.error(f"Error during bulk process and send: {e}") | |
return "An error occurred during processing." | |
# Populate the valid_templates list | |
valid_templates = fetch_templates() | |
with gr.Blocks() as gradio_app: | |
gr.Markdown("# Email Campaign Management System") | |
# Tab for Searching Emails | |
with gr.Tab("Search Emails"): | |
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") | |
num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results") | |
search_button = gr.Button("Search") | |
results = gr.Dataframe(headers=["Search Query", "Email"]) | |
search_button.click(scrape_emails, inputs=[search_query, num_results], outputs=[results]) | |
# Tab for Creating Email Templates | |
with gr.Tab("Create Email Template"): | |
template_name = gr.Textbox(label="Template Name", placeholder="e.g., 'Welcome Email'") | |
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") | |
body_html = gr.Textbox(label="Email Content (HTML)", placeholder="Enter your email content here...", lines=8) | |
create_template_button = gr.Button("Create Template") | |
template_status = gr.Textbox(label="Template Creation Status", interactive=False) | |
def create_email_template(template_name, subject, body_html): | |
try: | |
try: | |
conn = db_pool.getconn() | |
log_debug_info("Database Connection", "Successfully established a connection to the database") | |
except psycopg2.Error as e: | |
log_debug_info("Database Connection Failed", f"Error: {str(e)}") | |
raise | |
with conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO email_templates (template_name, subject, body_html) | |
VALUES (%s, %s, %s) | |
""", (template_name, subject, body_html)) | |
conn.commit() | |
db_pool.putconn(conn) | |
return "Template created successfully." | |
except psycopg2.Error as e: | |
logger.error(f"Failed to create template: {e}") | |
return f"Error creating template: {e}" | |
create_template_button.click(create_email_template, inputs=[template_name, subject, body_html], outputs=[template_status]) | |
# Tab for Generating and Sending Emails | |
with gr.Tab("Generate and Send Emails"): | |
with gr.Row(): | |
template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label="Select Email Template") | |
use_ai_customizer = gr.Checkbox(label="AI Customizer", value=False) | |
with gr.Row(): | |
name = gr.Textbox(label="Your Name", value="Sami Halawa | IA Prof", interactive=False) | |
from_email = gr.Textbox(label="From Email", value="hello@indosy.com", interactive=False) | |
reply_to = gr.Textbox(label="Reply To", value="hello@indosy.com", interactive=False) | |
with gr.Row(): | |
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") | |
body_html = gr.HTML(label="Email Content (Dynamic Preview)", value="") | |
preview_button = gr.Button("Preview Emails") | |
preview_results = gr.Dataframe(headers=["Sample Email 1", "Sample Email 2", "Sample Email 3"]) | |
def generate_preview_emails(template_id, name, from_email, reply_to, use_ai_customizer): | |
emails = [] | |
for i in range(3): # Generate 3 sample emails | |
_, email_body = generate_ai_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id) | |
emails.append(email_body) | |
return pd.DataFrame([emails], columns=["Sample Email 1", "Sample Email 2", "Sample Email 3"]) | |
preview_button.click(generate_preview_emails, | |
inputs=[template_id, name, from_email, reply_to, use_ai_customizer], | |
outputs=[preview_results]) | |
accept_button = gr.Button("Accept and Start") | |
accept_button.click(process_and_send_with_logging, | |
inputs=[template_id, name, from_email, reply_to, use_ai_customizer], | |
outputs=[gr.Textbox(label="Status", interactive=False)]) | |
# Tab for Bulk Process and Send | |
with gr.Tab("Bulk Process and Send"): | |
search_term_list = gr.Dataframe(fetch_search_terms(), headers=["ID", "Search Term", "Status", "Fetched Emails"]) | |
selected_terms = gr.CheckboxGroup(label="Select Search Queries to Process", choices=fetch_search_terms()['ID'].tolist()) | |
num_emails = gr.Slider(1, 100, value=10, step=1, label="Number of Emails per Search Term") | |
auto_send = gr.Checkbox(label="Auto Send Emails After Processing", value=False) | |
template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label="Select Email Template") | |
from_email = gr.Textbox(label="From Email", value="hello@indosy.com", interactive=False) | |
reply_to = gr.Textbox(label="Reply To", value="hello@indosy.com", interactive=False) | |
process_send_button = gr.Button("Process and Send Selected Queries") | |
process_status = gr.Textbox(label="Process Status", interactive=False) | |
process_send_button.click(bulk_process_and_send, | |
inputs=[selected_terms, template_id, num_emails, auto_send, from_email, reply_to], | |
outputs=[process_status]) | |
gradio_app.launch(share=True) | |