autoscraper / app.py
samihalawa's picture
m
a70de9b
raw
history blame
20.3 kB
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from dotenv import load_dotenv
from scrapegraphai.graphs import SmartScraperGraph, SearchGraph
from scrapegraphai.utils import prettify_exec_info
from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
import gradio as gr
import subprocess
import json
import re
import time
# Ensure Playwright installs required browsers and dependencies
subprocess.run(["playwright", "install"])
#subprocess.run(["playwright", "install-deps"])
# Load environment variables
load_dotenv()
HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN')
# Initialize the model instances
repo_id = "Qwen/Qwen2.5-72B-Instruct"
llm_model_instance = HuggingFaceEndpoint(
repo_id=repo_id,
temperature=0.5,
max_tokens=128,
huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN
)
embedder_model_instance = HuggingFaceInferenceAPIEmbeddings(
api_key=HUGGINGFACEHUB_API_TOKEN,
model_name="sentence-transformers/all-MiniLM-l6-v2"
)
graph_config = {
"llm": {
"model_instance": llm_model_instance,
"model_tokens": 100000,
},
"embeddings": {"model_instance": embedder_model_instance}
}
#######
def clean_json_string(json_str):
"""
Removes any comments or prefixes before the actual JSON content.
Returns the cleaned JSON string.
"""
# Find the first occurrence of '{'
json_start = json_str.find('{')
if json_start == -1:
# If no '{' is found, try with '[' for arrays
json_start = json_str.find('[')
if json_start == -1:
return json_str # Return original if no JSON markers found
# Extract everything from the first JSON marker
cleaned_str = json_str[json_start:]
# Verify it's valid JSON
try:
json.loads(cleaned_str)
return cleaned_str
except json.JSONDecodeError:
return json_str # Return original if cleaning results in invalid JSON
def search_for_leads(search_query, number_of_leads=20):
"""
Search for leads using SearchGraph based on search terms
"""
search_prompt = f"""
Search for businesses or professionals related to "{search_query}".
For each result, gather the following information:
1. Full name of person (if available)
2. Job title/position (if available)
3. Company name
4. Company website URL
5. Email address (if available)
6. Phone number (if available)
7. LinkedIn profile URL (if available)
8. Company industry or sector
9. Company size or employee count (if available)
Return results as a structured JSON array with at least {number_of_leads} leads if possible.
Each lead should be a JSON object with the fields above.
"""
try:
# Initialize SearchGraph with our search query
search_graph = SearchGraph(
prompt=search_prompt,
config=graph_config
)
# Execute the search
result = search_graph.run()
exec_info = search_graph.get_execution_info()
# Process the search results
if isinstance(result, str):
# Clean and parse the JSON result
cleaned_result = clean_json_string(result)
try:
leads = json.loads(cleaned_result)
except json.JSONDecodeError:
# If result is not valid JSON, it might be text that needs extraction
# Use the LLM to extract structured data
extract_prompt = f"""
Extract structured lead information from this text:
{result}
Return ONLY a JSON array with each lead having these fields (if available):
name, job_title, company, email, phone, linkedin, industry, company_size, website
"""
structured_result = llm_model_instance.invoke(extract_prompt)
# Try to parse the structured result
try:
leads = json.loads(clean_json_string(structured_result))
except:
# Last resort: create a minimal structure
leads = [{"company": "Result parsing failed", "info": result[:200]}]
else:
leads = result
# Ensure leads is a list
if not isinstance(leads, list):
leads = [leads]
# Process leads to enhance data for personalization
enhanced_leads = []
for lead in leads:
# Ensure all expected fields exist
if 'name' not in lead:
lead['name'] = ''
if 'job_title' not in lead:
lead['job_title'] = lead.get('position', '') # Sometimes position is used instead
if 'company' not in lead:
lead['company'] = ''
if 'industry' not in lead:
lead['industry'] = ''
enhanced_leads.append(lead)
return enhanced_leads, prettify_exec_info(exec_info)
except Exception as e:
return [], f"Error searching for leads: {str(e)}"
def send_email(smtp_server, smtp_port, use_ssl, username, password,
from_email, to_email, subject, body):
"""
Send an email using the provided SMTP settings
"""
try:
msg = MIMEMultipart()
msg['From'] = from_email
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
if use_ssl:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(username, password)
server.send_message(msg)
server.quit()
return True, "Email sent successfully"
except Exception as e:
return False, f"Error sending email: {str(e)}"
def generate_personalized_content(lead, template_markers):
"""
Generate personalized content for each lead based on their information
Args:
lead (dict): The lead information
template_markers (dict): Dictionary with markers for customizable sections
Returns:
dict: Dictionary with personalized content for each marker
"""
personalized_content = {}
# Generate personalized introduction based on lead info
company = lead.get('company', '')
job_title = lead.get('job_title', '')
# Build a prompt for the LLM to generate personalized content
intro_prompt = f"""
Write a personalized introduction paragraph for a marketing email based on these details:
- Person's name: {lead.get('name', '')}
- Company: {company}
- Job title: {job_title}
The email is about AI services from Pime.AI. Keep it concise (2-3 sentences), professional, and mention their company if available.
Don't include any generic placeholders.
"""
try:
# Use the existing LLM instance to generate personalized content
personalized_intro = llm_model_instance.invoke(intro_prompt).strip()
personalized_content['intro'] = personalized_intro
except Exception as e:
# Fallback if LLM generation fails
if lead.get('name') and company:
personalized_content['intro'] = f"Hi {lead.get('name')}, I noticed {company} and wanted to share how our AI services could benefit your operations."
elif lead.get('name'):
personalized_content['intro'] = f"Hi {lead.get('name')}, I wanted to introduce how our AI services could enhance your marketing operations."
else:
personalized_content['intro'] = "Hi there, I wanted to introduce how our AI services could enhance your marketing operations."
# Generate custom value proposition based on lead's company/industry if available
if company:
value_prompt = f"""
Write a short, personalized paragraph (2-3 sentences) explaining how AI solutions from Pime.AI
could specifically benefit a company named {company}
{"in the role of " + job_title if job_title else ""}.
Focus on specific value propositions relevant to this type of company.
Keep it concise and professional.
"""
try:
personalized_value = llm_model_instance.invoke(value_prompt).strip()
personalized_content['value_prop'] = personalized_value
except Exception:
# Fallback
personalized_content['value_prop'] = template_markers.get('value_prop', '')
else:
personalized_content['value_prop'] = template_markers.get('value_prop', '')
return personalized_content
def send_bulk_emails(leads, smtp_settings, email_template, delay=5):
"""
Send emails to multiple leads with a delay between each email
"""
results = []
# Extract template markers - sections that will be customized
template_markers = {
'intro': "{intro}",
'value_prop': "{value_prop}",
'name': "{name}"
}
for i, lead in enumerate(leads):
# Skip leads without email
if not lead.get('email'):
results.append({"name": lead.get('name', f"Lead {i+1}"), "status": "Skipped - No email address"})
continue
# Get personalized content for this lead
personalized_content = generate_personalized_content(lead, template_markers)
# Start with the template body
personalized_body = email_template['body']
# Replace all markers with personalized content
for marker, content in personalized_content.items():
personalized_body = personalized_body.replace(f"{{{marker}}}", content)
# Ensure name replacement still works for backward compatibility
if lead.get('name'):
personalized_body = personalized_body.replace("{name}", lead['name'])
else:
personalized_body = personalized_body.replace("{name}", "there")
# Send the email
success, message = send_email(
smtp_settings['server'],
smtp_settings['port'],
smtp_settings['use_ssl'],
smtp_settings['username'],
smtp_settings['password'],
smtp_settings['from_email'],
lead['email'],
email_template['subject'],
personalized_body
)
results.append({
"name": lead.get('name', f"Lead {i+1}"),
"email": lead['email'],
"status": "Sent" if success else "Failed",
"message": message
})
# Add delay between emails
if i < len(leads) - 1:
time.sleep(delay)
return results
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# AI-Powered Lead Scraper & Email Tool")
with gr.Tab("Lead Search"):
gr.Markdown("""
Search for businesses and professionals based on keywords, then send personalized emails.
This tool uses SearchGraph AI to intelligently find and extract contact details.
""")
with gr.Row():
with gr.Column():
search_input = gr.Textbox(
label="Search Keywords",
placeholder="marketing automation tools needed"
)
num_leads = gr.Slider(minimum=1, maximum=100, value=20, step=1, label="Number of Leads")
search_button = gr.Button("Search for Leads")
with gr.Column():
leads_output = gr.JSON(label="Found Leads")
exec_info_output = gr.Textbox(label="Execution Info")
with gr.Tab("Email Settings"):
with gr.Row():
with gr.Column():
gr.Markdown("### SMTP Configuration")
smtp_server = gr.Textbox(label="SMTP Server", value="smtp.gmail.com")
smtp_port = gr.Number(label="SMTP Port", value=587)
use_ssl = gr.Checkbox(label="Use SSL", value=False)
smtp_username = gr.Textbox(label="SMTP Username", value="[email protected]")
smtp_password = gr.Textbox(label="SMTP Password", value="rovt fswq crlv bhzk", type="password")
from_email = gr.Textbox(label="From Email", value="[email protected]")
with gr.Column():
gr.Markdown("### Email Template")
email_subject = gr.Textbox(
label="Email Subject",
value="Custom AI Solutions to Transform Your Marketing Operations"
)
email_body = gr.TextArea(
label="Email Body (HTML)",
value="""
<!-- Personalized Intro - AI Generated -->
<p>{intro}</p>
<!-- Personalized Value Proposition - AI Generated -->
<p>{value_prop}</p>
<!-- Fixed Content - Standard Offering -->
<p>At Pime.AI, we provide end-to-end AI solutions tailored to your specific needs:</p>
<ul>
<li><strong>Custom AI Applications</strong> - Develop your own proprietary AI tools for market analysis and customer insights</li>
<li><strong>Process Automation</strong> - Eliminate repetitive tasks in your marketing workflow</li>
<li><strong>Intelligent Chatbots</strong> - Customer engagement tools that integrate with your existing CRM and websites</li>
<li><strong>Content Generation Systems</strong> - Create marketing materials, product descriptions, and social posts at scale</li>
</ul>
<p>We handle everything from initial analysis to implementation and ongoing optimization, all customized to your existing software environment.</p>
<p><em>What marketing process would you most like to automate or enhance?</em></p>
<!-- Fixed Content - Contact Information -->
<p>Book a 15-min discovery call: <a href="https://calendly.com/sami-halawa">https://calendly.com/sami-halawa</a></p>
<p>Visit our website: <a href="https://pime.ai">https://pime.ai</a> or contact me directly via WhatsApp: <a href="https://wa.me/34679794037">https://wa.me/34679794037</a></p>
<p>Regards,<br>
Sami Halawa</p>
"""
)
gr.Markdown("### Testing")
test_personalization_btn = gr.Button("Test Personalization")
personalization_result = gr.HTML(label="Personalization Preview")
gr.Markdown("### Send Test Email")
test_email = gr.Textbox(label="Test Email Address")
test_email_btn = gr.Button("Send Test Email")
test_result = gr.Textbox(label="Test Result")
with gr.Tab("Send Campaign"):
with gr.Row():
with gr.Column():
email_delay = gr.Slider(minimum=1, maximum=60, value=5, step=1, label="Delay Between Emails (seconds)")
send_campaign_btn = gr.Button("Send Emails to All Leads")
email_status = gr.Dataframe(
headers=["Name", "Email", "Status", "Message"],
label="Email Status"
)
# Test generation of personalized email content
def test_personalization():
# Sample lead data for testing
sample_leads = [
{
"name": "John Smith",
"job_title": "Marketing Director",
"company": "TechCorp Inc.",
"email": "[email protected]",
"industry": "Technology"
},
{
"name": "Sarah Johnson",
"job_title": "CMO",
"company": "Healthcare Solutions",
"email": "[email protected]",
"industry": "Healthcare"
},
{
"name": "Michael Lee",
"email": "[email protected]"
# Minimal data to test fallbacks
}
]
# Get dummy template markers for testing
template_markers = {
'intro': "{intro}",
'value_prop': "{value_prop}",
'name': "{name}"
}
# Test each lead
results = []
for lead in sample_leads:
# Generate personalized content
content = generate_personalized_content(lead, template_markers)
# Create a preview of the personalized email
preview = f"""
<div style="border: 1px solid #ccc; padding: 10px; margin-bottom: 20px;">
<h3>Preview for: {lead.get('name', 'Unknown')} ({lead.get('email', 'No email')})</h3>
<hr/>
<p><strong>Intro:</strong> {content.get('intro', 'No intro generated')}</p>
<p><strong>Value Proposition:</strong> {content.get('value_prop', 'No value prop generated')}</p>
</div>
"""
results.append(preview)
# Join all previews
return "<h2>Personalization Test Results</h2>" + "".join(results)
# Connect functions to the UI
search_button.click(
search_for_leads,
inputs=[search_input, num_leads],
outputs=[leads_output, exec_info_output]
)
# Handle personalization testing
test_personalization_btn.click(
test_personalization,
inputs=[],
outputs=[personalization_result]
)
# Handle test email
def send_test_email(server, port, ssl, username, password, from_addr, to_addr, subject, body):
success, message = send_email(
server, int(port), ssl, username, password, from_addr, to_addr, subject, body
)
return message
test_email_btn.click(
send_test_email,
inputs=[
smtp_server, smtp_port, use_ssl, smtp_username, smtp_password,
from_email, test_email, email_subject, email_body
],
outputs=[test_result]
)
# Handle campaign sending
def start_email_campaign(leads_json, server, port, ssl, username, password,
from_addr, subject, body, delay):
if not leads_json:
return [[lead.get('name', 'N/A'), lead.get('email', 'N/A'),
"Error", "No leads available"] for lead in [{"name": "Error"}]]
# Parse leads if they're in JSON string format
if isinstance(leads_json, str):
try:
leads = json.loads(leads_json)
except:
return [["Error", "N/A", "Failed", "Invalid leads data"]]
else:
leads = leads_json
smtp_settings = {
'server': server,
'port': int(port),
'use_ssl': ssl,
'username': username,
'password': password,
'from_email': from_addr
}
email_template = {
'subject': subject,
'body': body
}
results = send_bulk_emails(leads, smtp_settings, email_template, int(delay))
return [[r['name'], r.get('email', 'N/A'), r['status'], r.get('message', '')] for r in results]
send_campaign_btn.click(
start_email_campaign,
inputs=[
leads_output, smtp_server, smtp_port, use_ssl, smtp_username, smtp_password,
from_email, email_subject, email_body, email_delay
],
outputs=[email_status]
)
# Launch the Gradio app
if __name__ == "__main__":
demo.launch()