autoclientgradio / IGNORE /corrected-gradio-final.py
Sami
m
4744c71
raw
history blame
19.3 kB
import os
import re
import psycopg2
from psycopg2 import pool
import requests
import pandas as pd
from datetime import datetime
from bs4 import BeautifulSoup
from googlesearch import search
import gradio as gr
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import openai
import logging
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Configuration
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "AKIASO2XOMEGIVD422N7")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "Rl+rzgizFDZPnNgDUNk0N0gAkqlyaYqhx7O2ona9")
REGION_NAME = "us-east-1"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key")
OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "http://127.0.0.1:11434/v1")
OPENAI_MODEL = "mistral"
DB_PARAMS = {
"user": "postgres.whwiyccyyfltobvqxiib",
"password": "SamiHalawa1996",
"host": "aws-0-eu-central-1.pooler.supabase.com",
"port": "6543",
"dbname": "postgres",
"sslmode": "require",
"gssencmode": "disable"
}
# Initialize AWS SES client
ses_client = boto3.client('ses',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION_NAME)
# Connection pool for PostgreSQL
db_pool = pool.SimpleConnectionPool(1, 10, **DB_PARAMS)
# HTTP session with retry strategy
session = requests.Session()
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount('https://', adapter)
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Enhanced logging for debugging
def log_debug_info(step, info):
logger.debug(f"Step: {step}, Info: {info}")
# Initialize database connection
def init_db():
try:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
conn.close()
logger.info("Database connection established successfully.")
except psycopg2.Error as e:
logger.error(f"Failed to connect to the database: {e}")
init_db()
# Check if the email is valid
def is_valid_email(email):
invalid_patterns = [
r'\.png', r'\.jpg', r'\.jpeg', r'\.gif', r'\.bmp', r'^no-reply@',
r'^prueba@', r'^\d+[a-z]*@'
]
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"]
MIN_EMAIL_LENGTH = 6
MAX_EMAIL_LENGTH = 254
if len(email) < MIN_EMAIL_LENGTH or len(email) > MAX_EMAIL_LENGTH:
return False
for pattern in invalid_patterns:
if re.search(pattern, email, re.IGNORECASE):
return False
domain = email.split('@')[1]
if domain in typo_domains or not re.match(r"^[A-Za-z0-9.-]+\.[A-Za-z]{2,}$", domain):
return False
return True
# Function to find and validate unique emails in a text
def find_emails(html_text):
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b')
all_emails = set(email_regex.findall(html_text))
valid_emails = {email for email in all_emails if is_valid_email(email)}
return valid_emails
# Function to save search results to PostgreSQL database
def save_to_db(search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date):
try:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO emails (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date))
cursor.execute("""
UPDATE search_terms SET last_processed_at = %s, fetched_emails = fetched_emails + 1
WHERE term = %s AND fetched_emails < 30
""", (scrape_date, search_query))
conn.commit()
db_pool.putconn(conn)
logger.info(f"Successfully saved data to the database for email: {email}")
except Exception as e:
logger.error(f"Failed to save data to the database: {e}")
# Function to scrape emails using Google Search
def scrape_emails(search_query, num_results=10):
results = []
search_params = {'q': search_query, 'num': num_results, 'start': 0}
for _ in range(num_results // 10):
try:
start_time = datetime.now()
response = session.get('https://www.google.com/search', params=search_params)
http_status = response.status_code
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
page_title = soup.title.string if soup.title else 'No Title Found'
meta_description = soup.find('meta', attrs={'name': 'description'})
meta_description = meta_description['content'] if meta_description else 'No Description Found'
scrape_duration = datetime.now() - start_time
log_debug_info("HTTP Response", f"Status Code: {response.status_code}, URL: {response.url}")
emails = find_emails(response.text)
log_debug_info("Email Extraction", f"Extracted Emails: {emails}")
for email in emails:
if is_valid_email(email):
results.append((search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now())))
save_to_db(search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now()))
search_params['start'] += 10
except Exception as e:
logger.error(f"Failed to scrape {response.url}: {e}")
return pd.DataFrame(results, columns=["Search Query", "Email", "Page Title", "URL", "Meta Description", "HTTP Status", "Scrape Duration", "Scrape Date"])
# Function to generate AI-based email content
def generate_ai_content(lead_info):
prompt = f"""
Generate a personalized email for a lead using the following information: {lead_info}.
The email should include an engaging subject line, a warm greeting, a value proposition, key benefits, and a call-to-action.
"""
try:
response = openai.Completion.create(
model=OPENAI_MODEL,
prompt=prompt,
max_tokens=500,
n=1,
stop=None
)
content = response.choices[0].text.strip()
if "\n\n" in content:
subject, email_body = content.split("\n\n", 1)
return subject, email_body
else:
logger.error("AI-generated content is missing subject or body.")
return None, None
except openai.error.APIError as e:
logger.error(f"OpenAI API error: {e}")
return None, None
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None, None
# Function to send an email via AWS SES
def send_email_via_ses(subject, body_html, to_address, from_address, reply_to):
try:
response = ses_client.send_email(
Destination={
'ToAddresses': [to_address]
},
Message={
'Body': {
'Html': {
'Charset': 'UTF-8',
'Data': body_html
}
},
'Subject': {
'Charset': 'UTF-8',
'Data': subject
}
},
Source=from_address,
ReplyToAddresses=[reply_to]
)
logger.info(f"Email sent successfully to {to_address}. Message ID: {response['MessageId']}")
except NoCredentialsError:
logger.error("AWS credentials not available.")
except PartialCredentialsError:
logger.error("Incomplete AWS credentials provided.")
except Exception as e:
logger.error(f"Failed to send email to {to_address}: {e}")
# Function to fetch search terms from the database
def fetch_search_terms():
try:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
with conn.cursor() as cursor:
cursor.execute("SELECT id, term, status, fetched_emails FROM search_terms")
search_terms = cursor.fetchall()
db_pool.putconn(conn)
return pd.DataFrame(search_terms, columns=["ID", "Search Term", "Status", "Fetched Emails"])
except psycopg2.Error as e:
logger.error(f"Failed to fetch search terms: {e}")
return pd.DataFrame()
# Function to fetch email templates from the database
def fetch_templates():
try:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
with conn.cursor() as cursor:
cursor.execute("SELECT id, template_name, subject, body_html FROM email_templates")
templates = cursor.fetchall()
db_pool.putconn(conn)
return pd.DataFrame(templates, columns=["ID", "Template Name", "Subject", "Body HTML"])
except psycopg2.Error as e:
logger.error(f"Failed to fetch templates: {e}")
return pd.DataFrame()
# Function to fetch a specific template by ID
def fetch_template(template_id):
templates = fetch_templates()
if not templates.empty and template_id in templates['ID'].tolist():
selected_template = templates.loc[templates['ID'] == template_id]
return selected_template['Subject'].item(), selected_template['Body HTML'].item()
logger.error(f"Template ID {template_id} is invalid or has empty fields.")
return None, None
# Function to process and send emails in bulk with logging
def process_and_send_with_logging(template_id, name, from_email, reply_to, use_ai_customizer):
logger.info(f"Starting email campaign with template ID: {template_id}")
result_message = bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to)
logger.info(result_message)
return result_message
# Bulk processing and sending emails function
def bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to):
total_processed = 0
try:
for term_id in selected_terms:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
with conn.cursor() as cursor:
cursor.execute('SELECT term FROM search_terms WHERE id=%s', (term_id,))
search_term = cursor.fetchone()[0]
cursor.execute('UPDATE search_terms SET status=%s WHERE id=%s', ('processing', term_id))
conn.commit()
db_pool.putconn(conn)
emails_df = scrape_emails(search_term, num_results=num_emails)
logger.info(f"Scraped {len(emails_df)} emails for search term '{search_term}'")
if emails_df.empty:
logger.warning(f"No emails found for search term: {search_term}")
continue
for _, email_data in emails_df.iterrows():
email = email_data['Email']
save_lead(search_term, email)
if template_id is None:
for _, email_data in emails_df.iterrows():
email = email_data['Email']
lead_info = {"name": name, "from_email": from_email, "reply_to": reply_to, "prompt": ""}
subject, generated_email = generate_ai_content(lead_info)
if generated_email:
save_generated_email(search_term, email, generated_email, email_data.get('URL', ''), subject)
if auto_send:
send_email_via_ses(subject, generated_email, email, from_email, reply_to)
logger.info(f"Email sent to {email}")
else:
subject, body_html = fetch_template(template_id)
for _, email_data in emails_df.iterrows():
email = email_data['Email']
if subject and body_html:
save_generated_email(search_term, email, body_html, email_data.get('URL', ''), subject)
if auto_send:
send_email_via_ses(subject, body_html, email, from_email, reply_to)
logger.info(f"Email sent to {email}")
total_processed += len(emails_df)
logger.info(f"Processed {len(emails_df)} emails for search term '{search_term}'")
return f"Processed and sent {total_processed} emails successfully." if auto_send else f"Processed {total_processed} emails successfully."
except Exception as e:
logger.error(f"Error during bulk process and send: {e}")
return "An error occurred during processing."
# Populate the valid_templates list
valid_templates = fetch_templates()
with gr.Blocks() as gradio_app:
gr.Markdown("# Email Campaign Management System")
# Tab for Searching Emails
with gr.Tab("Search Emails"):
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'")
num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results")
search_button = gr.Button("Search")
results = gr.Dataframe(headers=["Search Query", "Email"])
search_button.click(scrape_emails, inputs=[search_query, num_results], outputs=[results])
# Tab for Creating Email Templates
with gr.Tab("Create Email Template"):
template_name = gr.Textbox(label="Template Name", placeholder="e.g., 'Welcome Email'")
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'")
body_html = gr.Textbox(label="Email Content (HTML)", placeholder="Enter your email content here...", lines=8)
create_template_button = gr.Button("Create Template")
template_status = gr.Textbox(label="Template Creation Status", interactive=False)
def create_email_template(template_name, subject, body_html):
try:
try:
conn = db_pool.getconn()
log_debug_info("Database Connection", "Successfully established a connection to the database")
except psycopg2.Error as e:
log_debug_info("Database Connection Failed", f"Error: {str(e)}")
raise
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO email_templates (template_name, subject, body_html)
VALUES (%s, %s, %s)
""", (template_name, subject, body_html))
conn.commit()
db_pool.putconn(conn)
return "Template created successfully."
except psycopg2.Error as e:
logger.error(f"Failed to create template: {e}")
return f"Error creating template: {e}"
create_template_button.click(create_email_template, inputs=[template_name, subject, body_html], outputs=[template_status])
# Tab for Generating and Sending Emails
with gr.Tab("Generate and Send Emails"):
with gr.Row():
template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label="Select Email Template")
use_ai_customizer = gr.Checkbox(label="AI Customizer", value=False)
with gr.Row():
name = gr.Textbox(label="Your Name", value="Sami Halawa | IA Prof", interactive=False)
from_email = gr.Textbox(label="From Email", value="hello@indosy.com", interactive=False)
reply_to = gr.Textbox(label="Reply To", value="hello@indosy.com", interactive=False)
with gr.Row():
subject = gr.Textbox(label="Email Subject", placeholder="e.g., 'Welcome to Our Service'")
body_html = gr.HTML(label="Email Content (Dynamic Preview)", value="")
preview_button = gr.Button("Preview Emails")
preview_results = gr.Dataframe(headers=["Sample Email 1", "Sample Email 2", "Sample Email 3"])
def generate_preview_emails(template_id, name, from_email, reply_to, use_ai_customizer):
emails = []
for i in range(3): # Generate 3 sample emails
_, email_body = generate_ai_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id)
emails.append(email_body)
return pd.DataFrame([emails], columns=["Sample Email 1", "Sample Email 2", "Sample Email 3"])
preview_button.click(generate_preview_emails,
inputs=[template_id, name, from_email, reply_to, use_ai_customizer],
outputs=[preview_results])
accept_button = gr.Button("Accept and Start")
accept_button.click(process_and_send_with_logging,
inputs=[template_id, name, from_email, reply_to, use_ai_customizer],
outputs=[gr.Textbox(label="Status", interactive=False)])
# Tab for Bulk Process and Send
with gr.Tab("Bulk Process and Send"):
search_term_list = gr.Dataframe(fetch_search_terms(), headers=["ID", "Search Term", "Status", "Fetched Emails"])
selected_terms = gr.CheckboxGroup(label="Select Search Queries to Process", choices=fetch_search_terms()['ID'].tolist())
num_emails = gr.Slider(1, 100, value=10, step=1, label="Number of Emails per Search Term")
auto_send = gr.Checkbox(label="Auto Send Emails After Processing", value=False)
template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label="Select Email Template")
from_email = gr.Textbox(label="From Email", value="hello@indosy.com", interactive=False)
reply_to = gr.Textbox(label="Reply To", value="hello@indosy.com", interactive=False)
process_send_button = gr.Button("Process and Send Selected Queries")
process_status = gr.Textbox(label="Process Status", interactive=False)
process_send_button.click(bulk_process_and_send,
inputs=[selected_terms, template_id, num_emails, auto_send, from_email, reply_to],
outputs=[process_status])
gradio_app.launch(share=True)