{ "cells": [ { "cell_type": "code", "execution_count": 5, "id": "c53645c0-56ea-424b-872c-38355f1a74d1", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2024-09-04 20:27:54,997 - INFO - Database connection established successfully.\n", "2024-09-04 20:27:56,007 - INFO - HTTP Request: GET http://127.0.0.1:7863/startup-events \"HTTP/1.1 200 OK\"\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Running on local URL: http://127.0.0.1:7863\n", "\n", "Thanks for being a Gradio user! If you have questions or feedback, please join our Discord server and chat with us: https://discord.gg/feTf9x3ZSB\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2024-09-04 20:27:56,514 - INFO - HTTP Request: HEAD http://127.0.0.1:7863/ \"HTTP/1.1 200 OK\"\n", "2024-09-04 20:27:56,681 - INFO - HTTP Request: GET https://api.gradio.app/pkg-version \"HTTP/1.1 200 OK\"\n", "2024-09-04 20:27:57,537 - INFO - HTTP Request: GET https://api.gradio.app/v2/tunnel-request \"HTTP/1.1 200 OK\"\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Running on public URL: https://d31e53152c73d1a0fe.gradio.live\n", "\n", "This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2024-09-04 20:27:59,633 - INFO - HTTP Request: HEAD https://d31e53152c73d1a0fe.gradio.live \"HTTP/1.1 200 OK\"\n" ] }, { "data": { "text/html": [ "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "import re\n", "import psycopg2\n", "from psycopg2 import pool\n", "import requests\n", "import pandas as pd\n", "from datetime import datetime\n", "from bs4 import BeautifulSoup\n", "from googlesearch import search\n", "import gradio as gr\n", "import boto3\n", "from botocore.exceptions import NoCredentialsError, PartialCredentialsError\n", "import openai\n", "import logging\n", "from requests.adapters import HTTPAdapter\n", "from requests.packages.urllib3.util.retry import Retry\n", "\n", "# Configuration\n", "AWS_ACCESS_KEY_ID = os.getenv(\"AWS_ACCESS_KEY_ID\", \"AKIASO2XOMEGIVD422N7\")\n", "AWS_SECRET_ACCESS_KEY = os.getenv(\"AWS_SECRET_ACCESS_KEY\", \"Rl+rzgizFDZPnNgDUNk0N0gAkqlyaYqhx7O2ona9\")\n", "REGION_NAME = \"us-east-1\"\n", "\n", "OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\", \"sk-your-key\")\n", "OPENAI_API_BASE = os.getenv(\"OPENAI_API_BASE\", \"http://127.0.0.1:11434/v1\")\n", "OPENAI_MODEL = \"mistral\"\n", "\n", "DB_PARAMS = {\n", " \"user\": \"postgres.whwiyccyyfltobvqxiib\",\n", " \"password\": \"SamiHalawa1996\",\n", " \"host\": \"aws-0-eu-central-1.pooler.supabase.com\",\n", " \"port\": \"6543\",\n", " \"dbname\": \"postgres\",\n", " \"sslmode\": \"require\",\n", " \"gssencmode\": \"disable\"\n", "}\n", "\n", "# Initialize AWS SES client\n", "ses_client = boto3.client('ses',\n", " aws_access_key_id=AWS_ACCESS_KEY_ID,\n", " aws_secret_access_key=AWS_SECRET_ACCESS_KEY,\n", " region_name=REGION_NAME)\n", "\n", "# Connection pool for PostgreSQL\n", "db_pool = pool.SimpleConnectionPool(1, 10, **DB_PARAMS)\n", "\n", "# HTTP session with retry strategy\n", "session = requests.Session()\n", "retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])\n", "adapter = HTTPAdapter(max_retries=retries)\n", "session.mount('https://', adapter)\n", "\n", "# Setup logging\n", "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", "logger = logging.getLogger(__name__)\n", "\n", "# Initialize database connection\n", "def init_db():\n", " try:\n", " conn = db_pool.getconn()\n", " conn.close()\n", " logger.info(\"Database connection established successfully.\")\n", " except psycopg2.Error as e:\n", " logger.error(f\"Failed to connect to the database: {e}\")\n", "\n", "\n", "init_db()\n", "\n", "# Check if the email is valid\n", "def is_valid_email(email):\n", " invalid_patterns = [\n", " r'\\.png', r'\\.jpg', r'\\.jpeg', r'\\.gif', r'\\.bmp', r'^no-reply@', \n", " r'^prueba@', r'^\\d+[a-z]*@'\n", " ]\n", " typo_domains = [\"gmil.com\", \"gmal.com\", \"gmaill.com\", \"gnail.com\"]\n", " MIN_EMAIL_LENGTH = 6\n", " MAX_EMAIL_LENGTH = 254\n", "\n", " if len(email) < MIN_EMAIL_LENGTH or len(email) > MAX_EMAIL_LENGTH:\n", " return False\n", " for pattern in invalid_patterns:\n", " if re.search(pattern, email, re.IGNORECASE):\n", " return False\n", " domain = email.split('@')[1]\n", " if domain in typo_domains or not re.match(r\"^[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$\", domain):\n", " return False\n", " return True\n", "\n", "# Function to find and validate unique emails in a text\n", "def find_emails(html_text):\n", " email_regex = re.compile(r'\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,7}\\b')\n", " all_emails = set(email_regex.findall(html_text))\n", " valid_emails = {email for email in all_emails if is_valid_email(email)}\n", "\n", " return valid_emails\n", "\n", "# Function to save search results to PostgreSQL database\n", "def save_to_db(search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date):\n", " try:\n", " conn = db_pool.getconn()\n", " with conn.cursor() as cursor:\n", " cursor.execute(\"\"\"\n", " INSERT INTO emails (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date)\n", " VALUES (%s, %s, %s, %s, %s, %s, %s, %s)\n", " \"\"\", (search_query, email, page_title, url, meta_description, http_status, scrape_duration, scrape_date))\n", " cursor.execute(\"\"\"\n", " UPDATE search_terms SET last_processed_at = %s, fetched_emails = fetched_emails + 1\n", " WHERE term = %s AND fetched_emails < 30\n", " \"\"\", (scrape_date, search_query))\n", " conn.commit()\n", " db_pool.putconn(conn)\n", " logger.info(f\"Successfully saved data to the database for email: {email}\")\n", " except Exception as e:\n", " logger.error(f\"Failed to save data to the database: {e}\")\n", "\n", "# Function to scrape emails using Google Search\n", "def scrape_emails(search_query, num_results=10):\n", " results = []\n", " search_params = {'q': search_query, 'num': num_results, 'start': 0}\n", "\n", " for _ in range(num_results // 10):\n", " try:\n", " start_time = datetime.now()\n", " response = session.get('https://www.google.com/search', params=search_params)\n", " http_status = response.status_code\n", " response.encoding = 'utf-8'\n", " soup = BeautifulSoup(response.text, 'html.parser')\n", " page_title = soup.title.string if soup.title else 'No Title Found'\n", " meta_description = soup.find('meta', attrs={'name': 'description'})\n", " meta_description = meta_description['content'] if meta_description else 'No Description Found'\n", " scrape_duration = datetime.now() - start_time\n", "\n", " emails = find_emails(response.text)\n", " for email in emails:\n", " if is_valid_email(email):\n", " results.append((search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now())))\n", " save_to_db(search_query, email, page_title, response.url, meta_description, http_status, str(scrape_duration), str(datetime.now()))\n", "\n", " search_params['start'] += 10\n", "\n", " except Exception as e:\n", " logger.error(f\"Failed to scrape {response.url}: {e}\")\n", "\n", " return pd.DataFrame(results, columns=[\"Search Query\", \"Email\", \"Page Title\", \"URL\", \"Meta Description\", \"HTTP Status\", \"Scrape Duration\", \"Scrape Date\"])\n", "\n", "# Function to generate AI-based email content\n", "def generate_ai_content(lead_info):\n", " prompt = f\"\"\"\n", " Generate a personalized email for a lead using the following information: {lead_info}.\n", " The email should include an engaging subject line, a warm greeting, a value proposition, key benefits, and a call-to-action.\n", " \"\"\"\n", "\n", " try:\n", " response = openai.Completion.create(\n", " model=OPENAI_MODEL,\n", " prompt=prompt,\n", " max_tokens=500,\n", " n=1,\n", " stop=None\n", " )\n", " content = response.choices[0].text.strip()\n", "\n", " if \"\\n\\n\" in content:\n", " subject, email_body = content.split(\"\\n\\n\", 1)\n", " return subject, email_body\n", " else:\n", " logger.error(\"AI-generated content is missing subject or body.\")\n", " return None, None\n", " except openai.error.APIError as e:\n", " logger.error(f\"OpenAI API error: {e}\")\n", " return None, None\n", " except Exception as e:\n", " logger.error(f\"Unexpected error: {e}\")\n", " return None, None\n", "\n", "# Function to send an email via AWS SES\n", "def send_email_via_ses(subject, body_html, to_address, from_address, reply_to):\n", " try:\n", " response = ses_client.send_email(\n", " Destination={\n", " 'ToAddresses': [to_address]\n", " },\n", " Message={\n", " 'Body': {\n", " 'Html': {\n", " 'Charset': 'UTF-8',\n", " 'Data': body_html\n", " }\n", " },\n", " 'Subject': {\n", " 'Charset': 'UTF-8',\n", " 'Data': subject\n", " }\n", " },\n", " Source=from_address,\n", " ReplyToAddresses=[reply_to]\n", " )\n", " logger.info(f\"Email sent successfully to {to_address}. Message ID: {response['MessageId']}\")\n", " except NoCredentialsError:\n", " logger.error(\"AWS credentials not available.\")\n", " except PartialCredentialsError:\n", " logger.error(\"Incomplete AWS credentials provided.\")\n", " except Exception as e:\n", " logger.error(f\"Failed to send email to {to_address}: {e}\")\n", "\n", "# Function to fetch search terms from the database\n", "def fetch_search_terms():\n", " try:\n", " conn = db_pool.getconn()\n", " with conn.cursor() as cursor:\n", " cursor.execute(\"SELECT id, term, status, fetched_emails FROM search_terms\")\n", " search_terms = cursor.fetchall()\n", " db_pool.putconn(conn)\n", " return pd.DataFrame(search_terms, columns=[\"ID\", \"Search Term\", \"Status\", \"Fetched Emails\"])\n", " except psycopg2.Error as e:\n", " logger.error(f\"Failed to fetch search terms: {e}\")\n", " return pd.DataFrame()\n", "\n", "# Function to fetch email templates from the database\n", "def fetch_templates():\n", " try:\n", " conn = db_pool.getconn()\n", " with conn.cursor() as cursor:\n", " cursor.execute(\"SELECT id, template_name, subject, body_html FROM email_templates\")\n", " templates = cursor.fetchall()\n", " db_pool.putconn(conn)\n", " return pd.DataFrame(templates, columns=[\"ID\", \"Template Name\", \"Subject\", \"Body HTML\"])\n", " except psycopg2.Error as e:\n", " logger.error(f\"Failed to fetch templates: {e}\")\n", " return pd.DataFrame()\n", "\n", "# Function to fetch a specific template by ID\n", "def fetch_template(template_id):\n", " templates = fetch_templates()\n", " if not templates.empty and template_id in templates['ID'].tolist():\n", " selected_template = templates.loc[templates['ID'] == template_id]\n", " return selected_template['Subject'].item(), selected_template['Body HTML'].item()\n", " logger.error(f\"Template ID {template_id} is invalid or has empty fields.\")\n", " return None, None\n", "\n", "# Function to process and send emails in bulk with logging\n", "def process_and_send_with_logging(template_id, name, from_email, reply_to, use_ai_customizer):\n", " logger.info(f\"Starting email campaign with template ID: {template_id}\")\n", " result_message = bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to)\n", " logger.info(result_message)\n", " return result_message\n", "\n", "# Bulk processing and sending emails function\n", "def bulk_process_and_send(selected_terms, template_id, num_emails, auto_send, from_email, reply_to):\n", " total_processed = 0\n", " try:\n", " for term_id in selected_terms:\n", " conn = db_pool.getconn()\n", " with conn.cursor() as cursor:\n", " cursor.execute('SELECT term FROM search_terms WHERE id=%s', (term_id,))\n", " search_term = cursor.fetchone()[0]\n", " cursor.execute('UPDATE search_terms SET status=%s WHERE id=%s', ('processing', term_id))\n", " conn.commit()\n", " db_pool.putconn(conn)\n", "\n", " emails_df = scrape_emails(search_term, num_results=num_emails)\n", " logger.info(f\"Scraped {len(emails_df)} emails for search term '{search_term}'\")\n", "\n", " if emails_df.empty:\n", " logger.warning(f\"No emails found for search term: {search_term}\")\n", " continue\n", "\n", " for _, email_data in emails_df.iterrows():\n", " email = email_data['Email']\n", " save_lead(search_term, email)\n", "\n", " if template_id is None:\n", " for _, email_data in emails_df.iterrows():\n", " email = email_data['Email']\n", " lead_info = {\"name\": name, \"from_email\": from_email, \"reply_to\": reply_to, \"prompt\": \"\"}\n", " subject, generated_email = generate_ai_content(lead_info)\n", " if generated_email:\n", " save_generated_email(search_term, email, generated_email, email_data.get('URL', ''), subject)\n", " if auto_send:\n", " send_email_via_ses(subject, generated_email, email, from_email, reply_to)\n", " logger.info(f\"Email sent to {email}\")\n", " else:\n", " subject, body_html = fetch_template(template_id)\n", " for _, email_data in emails_df.iterrows():\n", " email = email_data['Email']\n", " if subject and body_html:\n", " save_generated_email(search_term, email, body_html, email_data.get('URL', ''), subject)\n", " if auto_send:\n", " send_email_via_ses(subject, body_html, email, from_email, reply_to)\n", " logger.info(f\"Email sent to {email}\")\n", "\n", " total_processed += len(emails_df)\n", " logger.info(f\"Processed {len(emails_df)} emails for search term '{search_term}'\")\n", "\n", " return f\"Processed and sent {total_processed} emails successfully.\" if auto_send else f\"Processed {total_processed} emails successfully.\"\n", "\n", " except Exception as e:\n", " logger.error(f\"Error during bulk process and send: {e}\")\n", " return \"An error occurred during processing.\"\n", "\n", "# Populate the valid_templates list\n", "valid_templates = fetch_templates()\n", "\n", "with gr.Blocks() as gradio_app:\n", " gr.Markdown(\"# Email Campaign Management System\")\n", "\n", " # Tab for Searching Emails\n", " with gr.Tab(\"Search Emails\"):\n", " search_query = gr.Textbox(label=\"Search Query\", placeholder=\"e.g., 'Potential Customers in Madrid'\")\n", " num_results = gr.Slider(1, 100, value=10, step=1, label=\"Number of Results\")\n", " search_button = gr.Button(\"Search\")\n", " results = gr.Dataframe(headers=[\"Search Query\", \"Email\"])\n", "\n", " search_button.click(scrape_emails, inputs=[search_query, num_results], outputs=[results])\n", "\n", " # Tab for Creating Email Templates\n", " with gr.Tab(\"Create Email Template\"):\n", " template_name = gr.Textbox(label=\"Template Name\", placeholder=\"e.g., 'Welcome Email'\")\n", " subject = gr.Textbox(label=\"Email Subject\", placeholder=\"e.g., 'Welcome to Our Service'\")\n", " body_html = gr.Textbox(label=\"Email Content (HTML)\", placeholder=\"Enter your email content here...\", lines=8)\n", " create_template_button = gr.Button(\"Create Template\")\n", " template_status = gr.Textbox(label=\"Template Creation Status\", interactive=False)\n", "\n", " def create_email_template(template_name, subject, body_html):\n", " try:\n", " conn = db_pool.getconn()\n", " with conn.cursor() as cursor:\n", " cursor.execute(\"\"\"\n", " INSERT INTO email_templates (template_name, subject, body_html)\n", " VALUES (%s, %s, %s)\n", " \"\"\", (template_name, subject, body_html))\n", " conn.commit()\n", " db_pool.putconn(conn)\n", " return \"Template created successfully.\"\n", " except psycopg2.Error as e:\n", " logger.error(f\"Failed to create template: {e}\")\n", " return f\"Error creating template: {e}\"\n", "\n", " create_template_button.click(create_email_template, inputs=[template_name, subject, body_html], outputs=[template_status])\n", "\n", " # Tab for Generating and Sending Emails\n", " with gr.Tab(\"Generate and Send Emails\"):\n", " with gr.Row():\n", " template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label=\"Select Email Template\")\n", " use_ai_customizer = gr.Checkbox(label=\"AI Customizer\", value=False)\n", "\n", " with gr.Row():\n", " name = gr.Textbox(label=\"Your Name\", value=\"Sami Halawa | IA Prof\", interactive=False)\n", " from_email = gr.Textbox(label=\"From Email\", value=\"hello@indosy.com\", interactive=False)\n", " reply_to = gr.Textbox(label=\"Reply To\", value=\"hello@indosy.com\", interactive=False)\n", "\n", " with gr.Row():\n", " subject = gr.Textbox(label=\"Email Subject\", placeholder=\"e.g., 'Welcome to Our Service'\")\n", " body_html = gr.HTML(label=\"Email Content (Dynamic Preview)\", value=\"\")\n", "\n", " preview_button = gr.Button(\"Preview Emails\")\n", " preview_results = gr.Dataframe(headers=[\"Sample Email 1\", \"Sample Email 2\", \"Sample Email 3\"])\n", "\n", " def generate_preview_emails(template_id, name, from_email, reply_to, use_ai_customizer):\n", " emails = []\n", " for i in range(3): # Generate 3 sample emails\n", " _, email_body = generate_ai_content(name, from_email, subject, body_html, reply_to, use_ai_customizer, template_id)\n", " emails.append(email_body)\n", " return pd.DataFrame([emails], columns=[\"Sample Email 1\", \"Sample Email 2\", \"Sample Email 3\"])\n", "\n", " preview_button.click(generate_preview_emails,\n", " inputs=[template_id, name, from_email, reply_to, use_ai_customizer],\n", " outputs=[preview_results])\n", "\n", " accept_button = gr.Button(\"Accept and Start\")\n", "\n", " accept_button.click(process_and_send_with_logging,\n", " inputs=[template_id, name, from_email, reply_to, use_ai_customizer],\n", " outputs=[gr.Textbox(label=\"Status\", interactive=False)])\n", "\n", " # Tab for Bulk Process and Send\n", " with gr.Tab(\"Bulk Process and Send\"):\n", " search_term_list = gr.Dataframe(fetch_search_terms(), headers=[\"ID\", \"Search Term\", \"Status\", \"Fetched Emails\"])\n", " selected_terms = gr.CheckboxGroup(label=\"Select Search Queries to Process\", choices=fetch_search_terms()['ID'].tolist())\n", " num_emails = gr.Slider(1, 100, value=10, step=1, label=\"Number of Emails per Search Term\")\n", " auto_send = gr.Checkbox(label=\"Auto Send Emails After Processing\", value=False)\n", " template_id = gr.Dropdown(choices=[template[0] for template in valid_templates], label=\"Select Email Template\")\n", " from_email = gr.Textbox(label=\"From Email\", value=\"hello@indosy.com\", interactive=False)\n", " reply_to = gr.Textbox(label=\"Reply To\", value=\"hello@indosy.com\", interactive=False)\n", " process_send_button = gr.Button(\"Process and Send Selected Queries\")\n", " process_status = gr.Textbox(label=\"Process Status\", interactive=False)\n", "\n", " process_send_button.click(bulk_process_and_send,\n", " inputs=[selected_terms, template_id, num_emails, auto_send, from_email, reply_to],\n", " outputs=[process_status])\n", "\n", "gradio_app.launch(share=True)" ] }, { "cell_type": "code", "execution_count": 4, "id": "ef432ebd-0814-486e-9cbb-0f5a8cc3c962", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: googlesearch-python in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (1.2.5)\n", "Requirement already satisfied: beautifulsoup4>=4.9 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from googlesearch-python) (4.12.3)\n", "Requirement already satisfied: requests>=2.20 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from googlesearch-python) (2.32.2)\n", "Requirement already satisfied: soupsieve>1.2 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from beautifulsoup4>=4.9->googlesearch-python) (2.5)\n", "Requirement already satisfied: charset-normalizer<4,>=2 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from requests>=2.20->googlesearch-python) (3.3.2)\n", "Requirement already satisfied: idna<4,>=2.5 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from requests>=2.20->googlesearch-python) (3.7)\n", "Requirement already satisfied: urllib3<3,>=1.21.1 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from requests>=2.20->googlesearch-python) (2.2.1)\n", "Requirement already satisfied: certifi>=2017.4.17 in /Users/samihalawa/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from requests>=2.20->googlesearch-python) (2024.2.2)\n" ] } ], "source": [ "!pip install googlesearch-python" ] }, { "cell_type": "code", "execution_count": null, "id": "bcc27a7e-3958-44f5-b287-ea84eb4a5749", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }