Spaces:
Runtime error
Runtime error
import os | |
import re | |
import psycopg2 | |
from psycopg2 import pool | |
import requests | |
import pandas as pd | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
import gradio as gr | |
from googlesearch-python import search | |
import boto3 | |
from botocore.exceptions import NoCredentialsError, PartialCredentialsError | |
import openai | |
import logging | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
# Configuration | |
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "AKIASO2XOMEGIVD422N7") | |
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "Rl+rzgizFDZPnNgDUNk0N0gAkqlyaYqhx7O2ona9") | |
REGION_NAME = "us-east-1" | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key") | |
OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://127.0.0.1:11434/v1") | |
OPENAI_MODEL = "gpt-3.5-turbo" | |
DB_PARAMS = { | |
"user": "postgres.whwiyccyyfltobvqxiib", | |
"password": "SamiHalawa1996", | |
"host": "aws-0-eu-central-1.pooler.supabase.com", | |
"port": "6543", | |
"dbname": "postgres", | |
"sslmode": "require", | |
"gssencmode": "disable" | |
} | |
# Initialize AWS SES client | |
ses_client = boto3.client('ses', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
region_name=REGION_NAME) | |
# Connection pool for PostgreSQL | |
db_pool = pool.SimpleConnectionPool(1, 10, **DB_PARAMS) | |
# HTTP session with retry strategy | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) | |
adapter = HTTPAdapter(max_retries=retries) | |
session.mount('https://', adapter) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Initialize database connection | |
def init_db(): | |
try: | |
conn = db_pool.getconn() | |
conn.close() | |
logger.info("Database connection established successfully.") | |
except psycopg2.Error as e: | |
logger.error(f"Failed to connect to the database: {e}") | |
init_db() | |
def is_valid_email(email): | |
invalid_patterns = [ | |
r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@', | |
r'^prueba@', r'^\d+[a-z]*@' | |
] | |
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] | |
if not email or len(email) < 6 or len(email) > 254: | |
return False | |
for pattern in invalid_patterns: | |
if re.search(pattern, email, re.IGNORECASE): | |
return False | |
domain = email.split('@')[-1] | |
if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain): | |
return False | |
return True | |
def find_emails(html_text): | |
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b') | |
all_emails = set(email_regex.findall(html_text)) | |
valid_emails = {email.lower() for email in all_emails if is_valid_email(email)} | |
return valid_emails | |
def scrape_emails(search_query, num_results=10): | |
results = [] | |
search_params = {'q': search_query, 'num': num_results, 'start': 0} | |
try: | |
for _ in range(num_results // 10): # Adjust the loop to fetch num_results in batches of 10 | |
response = session.get('https://www.google.com/search', params=search_params) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
emails = find_emails(soup.get_text()) | |
for email in emails: | |
results.append((search_query, email)) | |
save_lead(search_query, email) | |
search_params['start'] += 10 | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to scrape search results: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return pd.DataFrame(results, columns=["Search Query", "Email"]) | |
def save_lead(search_query, email): | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO leads (search_query, email) | |
VALUES (%s, %s) | |
ON CONFLICT (email, search_query) DO NOTHING | |
""", (search_query, email)) | |
conn.commit() | |
db_pool.putconn(conn) | |
except psycopg2.Error as e: | |
logger.error(f"Failed to save lead data to the database: {e}") | |
def save_generated_email(search_term, email, generated_email, url, subject): | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO generated_emails (search_term, email, generated_email, url, subject) | |
VALUES (%s, %s, %s, %s, %s) | |
""", (search_term, email, generated_email, url, subject)) | |
conn.commit() | |
db_pool.putconn(conn) | |
except psycopg2.Error as e: | |
logger.error(f"Failed to save generated email to the database: {e}") | |
def generate_ai_content(lead_info): | |
prompt = f""" | |
Generate a personalized email for a lead using the following information: {lead_info}. | |
The email should include an engaging subject line, a warm greeting, a value proposition, key benefits, and a call-to-action. | |
""" | |
try: | |
response = openai.Completion.create( | |
model=mistral, | |
prompt=prompt, | |
max_tokens=500, | |
n=1, | |
stop=None | |
) | |
content = response.choices[0].text.strip() | |
if "\n\n" in content: | |
subject, email_body = content.split("\n\n", 1) | |
return subject, email_body | |
else: | |
logger.error("AI-generated content is missing subject or body.") | |
return None, None | |
except openai.error.APIError as e: | |
logger.error(f"OpenAI API error: {e}") | |
return None, None | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return None, None | |
def send_email_via_ses(subject, body_html, to_address, from_address, reply_to): | |
try: | |
response = ses_client.send_email( | |
Destination={ | |
'ToAddresses': [to_address] | |
}, | |
Message={ | |
'Body': { | |
'Html': { | |
'Charset': 'UTF-8', | |
'Data': body_html | |
} | |
}, | |
'Subject': { | |
'Charset': 'UTF-8', | |
'Data': subject | |
} | |
}, | |
Source=from_address, | |
ReplyToAddresses=[reply_to] | |
) | |
logger.info(f"Email sent successfully. Message ID: {response['MessageId']}") | |
except NoCredentialsError: | |
logger.error("AWS credentials not available.") | |
except PartialCredentialsError: | |
logger.error("Incomplete AWS credentials provided.") | |
except Exception as e: | |
logger.error(f"Failed to send email: {e}") | |
def process_and_send_bulk(selected_terms, template_id, num_emails, from_email, reply_to, auto_send=False): | |
total_processed = 0 | |
try: | |
for term_id in selected_terms: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT term FROM search_terms WHERE id=%s', (term_id,)) | |
search_term = cursor.fetchone()[0] | |
cursor.execute('UPDATE search_terms SET status=%s WHERE id=%s', ('processing', term_id)) | |
conn.commit() | |
db_pool.putconn(conn) | |
emails_df = scrape_emails(search_term, num_results=num_emails) | |
logger.info(f"Scraped {len(emails_df)} emails for search term '{search_term}'") | |
if emails_df.empty: | |
logger.warning(f"No emails found for search term: {search_term}") | |
continue | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
save_lead(search_term, email) | |
if template_id is None: | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
lead_info = {"name": "", "from_email": from_email, "reply_to": reply_to, "prompt": ""} | |
subject, generated_email = generate_ai_content(lead_info) | |
if generated_email: | |
save_generated_email(search_term, email, generated_email, email_data.get('URL', ''), subject) | |
if auto_send: | |
send_email_via_ses(subject, generated_email, email, from_email, reply_to) | |
logger.info(f"Email sent to {email}") | |
else: | |
subject, body_html = fetch_template(template_id) | |
for _, email_data in emails_df.iterrows(): | |
email = email_data['Email'] | |
if subject and body_html: | |
save_generated_email(search_term, email, body_html, email_data.get('URL', ''), subject) | |
if auto_send: | |
send_email_via_ses(subject, body_html, email, from_email, reply_to) | |
logger.info(f"Email sent to {email}") | |
total_processed += len(emails_df) | |
logger.info(f"Processed {len(emails_df)} emails for search term '{search_term}'") | |
return f"Processed and sent {total_processed} emails successfully." if auto_send else f"Processed {total_processed} emails successfully." | |
except Exception as e: | |
logger.error(f"Error during bulk process and send: {e}") | |
return "An error occurred during processing." | |
with gr.Blocks() as gradio_app: | |
gr.Markdown("# Email Campaign Management System") | |
with gr.Tab("Search Emails"): | |
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") | |
num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results") | |
search_button = gr.Button("Search") | |
results = gr.Dataframe(headers=["Search Query", "Email"]) | |
search_button.click(scrape_emails, inputs=[search_query, num_results], outputs=[results]) | |
with gr.Tab("Create Email Template"): | |
template_name = gr.Textbox(label="Template Name", placeholder="e.g., 'Welcome Email'") | |
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") | |
body_html = gr.Textbox(label="Email Content (HTML)", placeholder="Enter your email content here...", lines=8) | |
create_template_button = gr.Button("Create Template") | |
template_status = gr.Textbox(label="Template Creation Status", interactive=False) | |
def create_email_template(template_name, subject, body_html): | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO email_templates (template_name, subject, body_html) | |
VALUES (%s, %s, %s) | |
""", (template_name, subject, body_html)) | |
conn.commit() | |
db_pool.putconn(conn) | |
template_status.update(value="Template created successfully.") | |
except psycopg2.Error as e: | |
template_status.update(value=f"Error creating template: {e}") | |
logger.error(f"Failed to create template: {e}") | |
create_template_button.click(create_email_template, inputs=[template_name, subject, body_html], outputs=[template_status]) | |
with gr.Tab("Generate and Send Emails"): | |
with gr.Row(): | |
template_id = gr.Dropdown(choices=[], label="Select Email Template") | |
use_ai_customizer = gr.Checkbox(label="AI Customizer", value=False) | |
with gr.Row(): | |
name = gr.Textbox(label="Your Name", placeholder="e.g., 'Daniel C.'") | |
from_email = gr.Textbox(label="From Email", placeholder="e.g., 'your.email@example.com'") | |
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'") | |
body_html = gr.HTML(label="Email Content (Dynamic Preview)", value="") | |
reply_to = gr.Textbox(label="Reply To", placeholder="e.g., 'replyto@example.com'") | |
def fetch_templates(): | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute("SELECT * FROM email_templates") | |
templates = cursor.fetchall() | |
db_pool.putconn(conn) | |
return pd.DataFrame(templates, columns=["ID", "Template Name", "Subject", "Body HTML"]) | |
except psycopg2.Error as e: | |
logger.error(f"Failed to fetch templates: {e}") | |
return pd.DataFrame() | |
def fetch_template(template_id): | |
templates = fetch_templates() | |
if not templates.empty and template_id in templates['ID'].tolist(): | |
selected_template = templates.loc[templates['ID'] == template_id] | |
return selected_template['Subject'].item(), selected_template['Body HTML'].item() | |
return None, None | |
def generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id): | |
if use_ai_customizer: | |
lead_info = { | |
"name": name, | |
"from_email": from_email, | |
"reply_to": reply_to, | |
"prompt": "" | |
} | |
subject, email_body = generate_ai_content(lead_info) | |
return subject, email_body | |
else: | |
subject, body_html = fetch_template(template_id) | |
return subject, body_html | |
def update_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id): | |
new_subject, new_body = generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id) | |
return new_subject, new_body | |
for input_component in [name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id]: | |
input_component.change(update_email_content, | |
inputs=[name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id], | |
outputs=[subject, body_html]) | |
def generate_all_emails(template_id, name, from_email, reply_to, use_ai_customizer): | |
data = fetch_search_terms() | |
generated_data = [] | |
for _, row in data.iterrows(): | |
email_info = { | |
'email': row['email'], | |
'url': row['url'], | |
'search_query': row['search_query'] | |
} | |
subject, body_html = fetch_template(template_id) if template_id else (None, None) | |
gen_subject, generated_email = generate_email_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id) | |
if gen_subject and generated_email: | |
save_generated_email(row['id'], gen_subject, generated_email, email_info['url'], subject) | |
generated_data.append({ | |
"ID": row['id'], | |
"Search Query": row['search_query'], | |
"Email": row['email'], | |
"Generated Email": generated_email, | |
"Email Sent": False | |
}) | |
else: | |
logger.error(f"Failed to generate email for {row['email']}") | |
return pd.DataFrame(generated_data) | |
generate_button = gr.Button("Generate Emails") | |
results = gr.Dataframe(headers=["ID", "Search Query", "Email", "Generated Email", "Email Sent"]) | |
generate_button.click(generate_all_emails, | |
inputs=[template_id, name, from_email, reply_to, use_ai_customizer], | |
outputs=[results]) | |
send_button = gr.Button("Bulk Send Emails") | |
send_status = gr.Textbox(label="Send Status", interactive=False) | |
def send_emails(from_email, reply_to): | |
fixed_subject = "Your Subject Line Here" | |
fixed_body_html = """ | |
<html> | |
<body> <h1>Welcome to Our Service</h1> <p>We are thrilled to have you on board!</p> | |
</body> | |
</html> | |
""" | |
process_and_send_bulk(from_email, reply_to, fixed_subject, fixed_body_html, auto_send=True) | |
send_status.update(value="Emails sent successfully.") | |
send_button.click(send_emails, inputs=[from_email, reply_to], outputs=[send_status]) | |
with gr.Tab("Bulk Process and Send"): | |
search_term_list = gr.Dataframe(fetch_search_terms(), headers=["ID", "Search Term", "Status", "Fetched Emails"]) | |
selected_terms = gr.CheckboxGroup(label="Select Search Queries to Process", choices=fetch_search_terms()['ID'].tolist()) | |
num_emails = gr.Slider(1, 100, value=10, step=1, label="Number of Emails per Search Term") | |
auto_send = gr.Checkbox(label="Auto Send Emails After Processing", value=False) | |
template_id = gr.Dropdown(choices=[], label="Select Email Template for Bulk Send") | |
from_email = gr.Textbox(label="From Email", placeholder="Enter your email address") | |
reply_to = gr.Textbox(label="Reply To", placeholder="Enter reply-to email address") | |
process_send_button = gr.Button("Process and Send Selected Queries") | |
process_status = gr.Textbox(label="Process Status", interactive=False) | |
def bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to): | |
return process_and_send_bulk(selected_terms, template_id, num_emails, from_email, reply_to, auto_send=auto_send) | |
process_send_button.click(bulk_process_and_send, | |
inputs=[selected_terms, template_id, num_emails, auto_send, from_email, reply_to], | |
outputs=[process_status]) | |
gradio_app.launch(share=True) | |