Spaces:
Runtime error
Runtime error
import os | |
import re | |
import psycopg2 | |
from psycopg2 import pool | |
import requests | |
import pandas as pd | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
from googlesearch import search | |
import gradio as gr | |
import boto3 | |
from botocore.exceptions import NoCredentialsError, PartialCredentialsError | |
import openai | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
import logging | |
# Configuration | |
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", "your-aws-access-key") | |
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", "your-aws-secret-key") | |
region_name = "us-east-1" | |
openai.api_key = os.getenv("OPENAI_API_KEY", "your-openai-api-key") | |
openai.api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1/") | |
openai_model = "text-davinci-003" | |
db_params = { | |
"user": "your-postgres-user", | |
"password": "your-postgres-password", | |
"host": "your-postgres-host", | |
"port": "your-postgres-port", | |
"dbname": "your-postgres-dbname", | |
"sslmode": "require" | |
} | |
# Initialize AWS SES client | |
ses_client = boto3.client('ses', | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name=region_name) | |
# Connection pool for PostgreSQL | |
db_pool = pool.SimpleConnectionPool(1, 10, **db_params) | |
# HTTP session with retry strategy | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) | |
session.mount('https://', HTTPAdapter(max_retries=retries)) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a', | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# Initialize database | |
def init_db(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
conn.close() | |
logging.info("Successfully connected to the database!") | |
except Exception as e: | |
logging.error(f"Failed to connect to the database: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
init_db() | |
# Fetch the most recent or frequently used template ID | |
def fetch_recent_template_id(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT id FROM email_templates ORDER BY last_used DESC LIMIT 1') | |
recent_template_id = cursor.fetchone()[0] | |
return recent_template_id | |
except Exception as e: | |
logging.error(f"Failed to fetch the most recent template: {e}") | |
return None | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Auto-save drafts every few seconds | |
def auto_save_drafts(draft_content): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('INSERT INTO drafts (content, saved_at) VALUES (%s, %s) RETURNING id', | |
(draft_content, datetime.now())) | |
conn.commit() | |
logging.info("Draft saved successfully.") | |
except Exception as e: | |
logging.error(f"Failed to save draft: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Auto-restore drafts when user returns | |
def auto_restore_drafts(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT content FROM drafts ORDER BY saved_at DESC LIMIT 1') | |
draft_content = cursor.fetchone()[0] | |
return draft_content | |
except Exception as e: | |
logging.error(f"Failed to restore draft: {e}") | |
return "" | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Save each search query automatically | |
def save_search_query(query): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('INSERT INTO search_terms (status, fetched_emails, last_processed_at) VALUES (%s, %s, %s) RETURNING id', | |
('pending', 0, None)) | |
search_term_id = cursor.fetchone()[0] | |
conn.commit() | |
return search_term_id | |
except Exception as e: | |
logging.error(f"Failed to save search query: {e}") | |
return None | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Fetch unsent emails and auto-sort them by priority or date | |
def fetch_unsent_emails(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT * FROM generated_emails WHERE email_sent=0 ORDER BY email_id') | |
unsent_emails = cursor.fetchall() | |
return unsent_emails | |
except Exception as e: | |
logging.error(f"Failed to fetch unsent emails: {e}") | |
return [] | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Enhanced function for tracking progress and sending emails | |
def track_progress_and_send(from_address, reply_to): | |
progress = 0 | |
total_emails = len(fetch_unsent_emails()) | |
for email in fetch_unsent_emails(): | |
send_email_via_aws(email[2], email[3], email[4], from_address, reply_to) | |
progress += 1 | |
update_progress_bar(progress / total_emails) | |
notify_user(f"Sent {progress}/{total_emails} emails.") | |
return "All emails sent successfully." | |
# Function to send emails via AWS SES | |
def send_email_via_aws(to_address, subject, body_html, from_address, reply_to): | |
try: | |
response = ses_client.send_email( | |
Source=from_address, | |
Destination={'ToAddresses': [to_address]}, | |
Message={ | |
'Subject': {'Data': subject}, | |
'Body': { | |
'Html': {'Data': body_html} | |
} | |
}, | |
ReplyToAddresses=[reply_to] | |
) | |
return response['MessageId'] | |
except (NoCredentialsError, PartialCredentialsError) as e: | |
logging.error(f"AWS credentials error: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"Failed to send email via AWS SES: {e}") | |
return None | |
# Function to update the progress bar | |
def update_progress_bar(progress): | |
# Implement your code to update the progress bar here | |
pass | |
# Function to notify the user with a message | |
def notify_user(message): | |
# Implement your code to notify the user here | |
pass | |
# Function to scrape emails from Google search results | |
def scrape_emails(search_query, num_results): | |
results = [] | |
search_urls = list(search(search_query, num_results=num_results)) | |
for url in search_urls: | |
try: | |
response = session.get(url, timeout=10) | |
response.encoding = 'utf-8' | |
soup = BeautifulSoup(response.text, 'html.parser') | |
emails = find_emails(response.text) | |
for email in emails: | |
results.append((search_query, email, url)) | |
save_lead(search_query, email, url) | |
except Exception as e: | |
logging.error(f"Failed to scrape {url}: {e}") | |
return pd.DataFrame(results, columns=["Search Query", "Email", "URL"]) | |
# Function to find emails in HTML text | |
def find_emails(html_text): | |
email_pattern = re.compile(r'[\w\.-]+@[\w\.-]+') | |
emails = email_pattern.findall(html_text) | |
return emails | |
# Function to save a lead (search query, email, URL) | |
def save_lead(search_query, email, url): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('INSERT INTO leads (search_query, email, url) VALUES (%s, %s, %s)', | |
(search_query, email, url)) | |
conn.commit() | |
logging.info(f"Lead saved successfully: {search_query}, {email}, {url}") | |
except Exception as e: | |
logging.error(f"Failed to save lead: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to generate email content using OpenAI | |
def generate_ai_content(search_query, email): | |
prompt = f"Write an email to {email} about {search_query}:\n\n" | |
response = openai.Completion.create(engine=openai_model, prompt=prompt) | |
return response.choices[0].text.strip() | |
# Function to fetch an email template by ID | |
def fetch_template(template_id): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT content FROM email_templates WHERE id=%s', (template_id,)) | |
template_content = cursor.fetchone()[0] | |
return template_content | |
except Exception as e: | |
logging.error(f"Failed to fetch template {template_id}: {e}") | |
return "" | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to update the last_used timestamp of a template | |
def update_template_last_used(template_id): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
current_time = datetime.now() | |
cursor.execute('UPDATE email_templates SET last_used=%s WHERE id=%s', (current_time, template_id)) | |
conn.commit() | |
logging.info(f"Template {template_id} last_used timestamp updated.") | |
except Exception as e: | |
logging.error(f"Failed to update template {template_id} last_used timestamp: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to generate and save emails | |
def generate_and_save_emails(search_query, template_id, num_emails): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
for _ in range(num_emails): | |
email = fetch_recent_lead(search_query) | |
ai_content = generate_ai_content(search_query, email) | |
full_content = fetch_template(template_id) | |
full_content = full_content.replace("{{content}}", ai_content) | |
cursor.execute('INSERT INTO generated_emails (search_query, email, content, status, email_sent) VALUES (%s, %s, %s, %s, %s)', | |
(search_query, email, full_content, 'draft', 0)) | |
conn.commit() | |
logging.info(f"Generated and saved {num_emails} emails successfully.") | |
except Exception as e: | |
logging.error(f"Failed to generate and save emails: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to fetch the most recent lead for a search query | |
def fetch_recent_lead(search_query): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT email FROM leads WHERE search_query=%s ORDER BY saved_at DESC LIMIT 1', (search_query,)) | |
email = cursor.fetchone()[0] | |
return email | |
except Exception as e: | |
logging.error(f"Failed to fetch recent lead for {search_query}: {e}") | |
return "" | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to update the status of generated emails | |
def update_email_status(email_id, status): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('UPDATE generated_emails SET status=%s WHERE email_id=%s', (status, email_id)) | |
conn.commit() | |
logging.info(f"Email {email_id} status updated to {status}.") | |
except Exception as e: | |
logging.error(f"Failed to update email {email_id} status: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Function to bulk send emails via AWS SES | |
def bulk_send_emails_aws(from_address, reply_to): | |
unsent_emails = fetch_unsent_emails() | |
for email in unsent_emails: | |
send_email_via_aws(email[2], email[3], email[4], from_address, reply_to) | |
update_email_status(email[0], 'sent') | |
return "All emails sent successfully." | |
# Function to bulk send emails via custom API | |
def bulk_send_emails_custom_api(from_address, reply_to): | |
unsent_emails = fetch_unsent_emails() | |
for email in unsent_emails: | |
send_email_via_custom_api(email[2], email[3], email[4], from_address, reply_to) | |
update_email_status_custom_db(email[0], 'sent') | |
return "All emails sent successfully." | |
# Function to send emails via custom API | |
def send_email_via_custom_api(to_address, subject, body_html, from_address, reply_to): | |
# Implement your custom email sending logic here | |
# ... | |
return "Email sent successfully via custom API." | |
# Function to update email status in a custom database | |
def update_email_status_custom_db(email_id, status): | |
# Implement your custom database update logic here | |
# ... | |
return "Email status updated successfully in custom database." | |
# Gradio interface | |
with gr.Blocks() as gradio_app: | |
gr.Markdown("# Email Campaign Management System") | |
with gr.Tab("Search Emails"): | |
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") | |
num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results") | |
search_button = gr.Button("Search") | |
results = gr.Dataframe(headers=["Search Query", "Email", "URL"]) | |
search_button.click(lambda query, num_results: scrape_emails(query, num_results), | |
inputs=[search_query, num_results], outputs=[results]) | |
with gr.Tab("Create Email Template"): | |
template_content = gr.Textarea(placeholder="Email template content", default="Hi {{name}},\n\nThis is an email template with a placeholder for the name.\n\nRegards,\nYour Company", lines=10) | |
save_template_button = gr.Button("Save Template") | |
template_id = gr.Label("") | |
save_template_button.click(lambda content: save_template(content), inputs=[template_content], outputs=[template_id]) | |
with gr.Tab("Generate and Send Emails"): | |
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") | |
template_id = gr.Dropdown(label="Select Template", choices=["Select a template"]) | |
num_emails = gr.Slider(1, 1000, value=10, step=1, label="Number of Emails") | |
generate_button = gr.Button("Generate and Save") | |
send_button = gr.Button("Send Emails via AWS SES") | |
custom_send_button = gr.Button("Send Emails via Custom API") | |
generate_button.click(lambda query, template, num: generate_and_save_emails(query, template, num), | |
inputs=[search_query, template_id, num_emails]) | |
send_button.click(lambda from_address, reply_to: track_progress_and_send(from_address, reply_to), | |
inputs=[gr.InputBox(label="From Address", type="email"), gr.InputBox(label="Reply-To Address", type="email")]) | |
custom_send_button.click(lambda from_address, reply_to: bulk_send_emails_custom_api(from_address, reply_to), | |
inputs=[gr.InputBox(label="From Address", type="email"), gr.InputBox(label="Reply-To Address", type="email")]) | |
with gr.Tab("Manage Search Queries"): | |
query_status = gr.Dropdown(label="Select Query Status", choices=["Select a status"]) | |
process_button = gr.Button("Process Queries") | |
processed_queries = gr.Label("") | |
process_button.click(lambda status: process_queries(status=status), inputs=[query_status], outputs=[processed_queries]) | |
gradio_app.launch() | |