import os import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from dotenv import load_dotenv from scrapegraphai.graphs import SmartScraperGraph, SearchGraph from scrapegraphai.utils import prettify_exec_info from langchain_huggingface import HuggingFaceEndpoint from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings import gradio as gr import subprocess import json import re import time # Ensure Playwright installs required browsers and dependencies subprocess.run(["playwright", "install"]) subprocess.run(["playwright", "install-deps"]) # Load environment variables load_dotenv() HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN') # Initialize the model instances repo_id = "Qwen/Qwen2.5-72B-Instruct" llm_model_instance = HuggingFaceEndpoint( repo_id=repo_id, temperature=0.5, max_new_tokens=128, huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN ) embedder_model_instance = HuggingFaceInferenceAPIEmbeddings( api_key=HUGGINGFACEHUB_API_TOKEN, model_name="sentence-transformers/all-MiniLM-l6-v2" ) graph_config = { "llm": { "model_instance": llm_model_instance, "model_tokens": 100000, }, "embeddings": {"model_instance": embedder_model_instance} } ####### def clean_json_string(json_str): """ Removes any comments or prefixes before the actual JSON content. Returns the cleaned JSON string. """ # Find the first occurrence of '{' json_start = json_str.find('{') if json_start == -1: # If no '{' is found, try with '[' for arrays json_start = json_str.find('[') if json_start == -1: return json_str # Return original if no JSON markers found # Extract everything from the first JSON marker cleaned_str = json_str[json_start:] # Verify it's valid JSON try: json.loads(cleaned_str) return cleaned_str except json.JSONDecodeError: return json_str # Return original if cleaning results in invalid JSON def search_for_leads(search_query, number_of_leads=20): """ Search for leads using SearchGraph based on search terms """ search_prompt = f""" Search for businesses or professionals related to "{search_query}". For each result, gather the following information: 1. Full name of person (if available) 2. Job title/position (if available) 3. Company name 4. Company website URL 5. Email address (if available) 6. Phone number (if available) 7. LinkedIn profile URL (if available) 8. Company industry or sector 9. Company size or employee count (if available) Return results as a structured JSON array with at least {number_of_leads} leads if possible. Each lead should be a JSON object with the fields above. """ try: # Initialize SearchGraph with our search query search_graph = SearchGraph( prompt=search_prompt, config=graph_config ) # Execute the search result = search_graph.run() exec_info = search_graph.get_execution_info() # Process the search results if isinstance(result, str): # Clean and parse the JSON result cleaned_result = clean_json_string(result) try: leads = json.loads(cleaned_result) except json.JSONDecodeError: # If result is not valid JSON, it might be text that needs extraction # Use the LLM to extract structured data extract_prompt = f""" Extract structured lead information from this text: {result} Return ONLY a JSON array with each lead having these fields (if available): name, job_title, company, email, phone, linkedin, industry, company_size, website """ structured_result = llm_model_instance.invoke(extract_prompt) # Try to parse the structured result try: leads = json.loads(clean_json_string(structured_result)) except: # Last resort: create a minimal structure leads = [{"company": "Result parsing failed", "info": result[:200]}] else: leads = result # Ensure leads is a list if not isinstance(leads, list): leads = [leads] # Process leads to enhance data for personalization enhanced_leads = [] for lead in leads: # Ensure all expected fields exist if 'name' not in lead: lead['name'] = '' if 'job_title' not in lead: lead['job_title'] = lead.get('position', '') # Sometimes position is used instead if 'company' not in lead: lead['company'] = '' if 'industry' not in lead: lead['industry'] = '' enhanced_leads.append(lead) return enhanced_leads, prettify_exec_info(exec_info) except Exception as e: return [], f"Error searching for leads: {str(e)}" def send_email(smtp_server, smtp_port, use_ssl, username, password, from_email, to_email, subject, body): """ Send an email using the provided SMTP settings """ try: msg = MIMEMultipart() msg['From'] = from_email msg['To'] = to_email msg['Subject'] = subject msg.attach(MIMEText(body, 'html')) if use_ssl: server = smtplib.SMTP_SSL(smtp_server, smtp_port) else: server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.login(username, password) server.send_message(msg) server.quit() return True, "Email sent successfully" except Exception as e: return False, f"Error sending email: {str(e)}" def generate_personalized_content(lead, template_markers): """ Generate personalized content for each lead based on their information Args: lead (dict): The lead information template_markers (dict): Dictionary with markers for customizable sections Returns: dict: Dictionary with personalized content for each marker """ personalized_content = {} # Generate personalized introduction based on lead info company = lead.get('company', '') job_title = lead.get('job_title', '') # Build a prompt for the LLM to generate personalized content intro_prompt = f""" Write a personalized introduction paragraph for a marketing email based on these details: - Person's name: {lead.get('name', '')} - Company: {company} - Job title: {job_title} The email is about AI services from Pime.AI. Keep it concise (2-3 sentences), professional, and mention their company if available. Don't include any generic placeholders. """ try: # Use the existing LLM instance to generate personalized content personalized_intro = llm_model_instance.invoke(intro_prompt).strip() personalized_content['intro'] = personalized_intro except Exception as e: # Fallback if LLM generation fails if lead.get('name') and company: personalized_content['intro'] = f"Hi {lead.get('name')}, I noticed {company} and wanted to share how our AI services could benefit your operations." elif lead.get('name'): personalized_content['intro'] = f"Hi {lead.get('name')}, I wanted to introduce how our AI services could enhance your marketing operations." else: personalized_content['intro'] = "Hi there, I wanted to introduce how our AI services could enhance your marketing operations." # Generate custom value proposition based on lead's company/industry if available if company: value_prompt = f""" Write a short, personalized paragraph (2-3 sentences) explaining how AI solutions from Pime.AI could specifically benefit a company named {company} {"in the role of " + job_title if job_title else ""}. Focus on specific value propositions relevant to this type of company. Keep it concise and professional. """ try: personalized_value = llm_model_instance.invoke(value_prompt).strip() personalized_content['value_prop'] = personalized_value except Exception: # Fallback personalized_content['value_prop'] = template_markers.get('value_prop', '') else: personalized_content['value_prop'] = template_markers.get('value_prop', '') return personalized_content def send_bulk_emails(leads, smtp_settings, email_template, delay=5): """ Send emails to multiple leads with a delay between each email """ results = [] # Extract template markers - sections that will be customized template_markers = { 'intro': "{intro}", 'value_prop': "{value_prop}", 'name': "{name}" } for i, lead in enumerate(leads): # Skip leads without email if not lead.get('email'): results.append({"name": lead.get('name', f"Lead {i+1}"), "status": "Skipped - No email address"}) continue # Get personalized content for this lead personalized_content = generate_personalized_content(lead, template_markers) # Start with the template body personalized_body = email_template['body'] # Replace all markers with personalized content for marker, content in personalized_content.items(): personalized_body = personalized_body.replace(f"{{{marker}}}", content) # Ensure name replacement still works for backward compatibility if lead.get('name'): personalized_body = personalized_body.replace("{name}", lead['name']) else: personalized_body = personalized_body.replace("{name}", "there") # Send the email success, message = send_email( smtp_settings['server'], smtp_settings['port'], smtp_settings['use_ssl'], smtp_settings['username'], smtp_settings['password'], smtp_settings['from_email'], lead['email'], email_template['subject'], personalized_body ) results.append({ "name": lead.get('name', f"Lead {i+1}"), "email": lead['email'], "status": "Sent" if success else "Failed", "message": message }) # Add delay between emails if i < len(leads) - 1: time.sleep(delay) return results # Gradio interface with gr.Blocks() as demo: gr.Markdown("# AI-Powered Lead Scraper & Email Tool") with gr.Tab("Lead Search"): gr.Markdown(""" Search for businesses and professionals based on keywords, then send personalized emails. This tool uses SearchGraph AI to intelligently find and extract contact details. """) with gr.Row(): with gr.Column(): search_input = gr.Textbox( label="Search Keywords", placeholder="marketing automation tools needed" ) num_leads = gr.Slider(minimum=1, maximum=100, value=20, step=1, label="Number of Leads") search_button = gr.Button("Search for Leads") with gr.Column(): leads_output = gr.JSON(label="Found Leads") exec_info_output = gr.Textbox(label="Execution Info") with gr.Tab("Email Settings"): with gr.Row(): with gr.Column(): gr.Markdown("### SMTP Configuration") smtp_server = gr.Textbox(label="SMTP Server", value="smtp.gmail.com") smtp_port = gr.Number(label="SMTP Port", value=587) use_ssl = gr.Checkbox(label="Use SSL", value=False) smtp_username = gr.Textbox(label="SMTP Username", value="eugproductions@gmail.com") smtp_password = gr.Textbox(label="SMTP Password", value="rovt fswq crlv bhzk", type="password") from_email = gr.Textbox(label="From Email", value="eugproductions@gmail.com") with gr.Column(): gr.Markdown("### Email Template") email_subject = gr.Textbox( label="Email Subject", value="Custom AI Solutions to Transform Your Marketing Operations" ) email_body = gr.TextArea( label="Email Body (HTML)", value="""

{intro}

{value_prop}

At Pime.AI, we provide end-to-end AI solutions tailored to your specific needs:

We handle everything from initial analysis to implementation and ongoing optimization, all customized to your existing software environment.

What marketing process would you most like to automate or enhance?

Book a 15-min discovery call: https://calendly.com/sami-halawa

Visit our website: https://pime.ai or contact me directly via WhatsApp: https://wa.me/34679794037

Regards,
Sami Halawa

""" ) gr.Markdown("### Testing") test_personalization_btn = gr.Button("Test Personalization") personalization_result = gr.HTML(label="Personalization Preview") gr.Markdown("### Send Test Email") test_email = gr.Textbox(label="Test Email Address") test_email_btn = gr.Button("Send Test Email") test_result = gr.Textbox(label="Test Result") with gr.Tab("Send Campaign"): with gr.Row(): with gr.Column(): email_delay = gr.Slider(minimum=1, maximum=60, value=5, step=1, label="Delay Between Emails (seconds)") send_campaign_btn = gr.Button("Send Emails to All Leads") email_status = gr.Dataframe( headers=["Name", "Email", "Status", "Message"], label="Email Status" ) # Test generation of personalized email content def test_personalization(): # Sample lead data for testing sample_leads = [ { "name": "John Smith", "job_title": "Marketing Director", "company": "TechCorp Inc.", "email": "john@example.com", "industry": "Technology" }, { "name": "Sarah Johnson", "job_title": "CMO", "company": "Healthcare Solutions", "email": "sarah@example.com", "industry": "Healthcare" }, { "name": "Michael Lee", "email": "michael@example.com" # Minimal data to test fallbacks } ] # Get dummy template markers for testing template_markers = { 'intro': "{intro}", 'value_prop': "{value_prop}", 'name': "{name}" } # Test each lead results = [] for lead in sample_leads: # Generate personalized content content = generate_personalized_content(lead, template_markers) # Create a preview of the personalized email preview = f"""

Preview for: {lead.get('name', 'Unknown')} ({lead.get('email', 'No email')})


Intro: {content.get('intro', 'No intro generated')}

Value Proposition: {content.get('value_prop', 'No value prop generated')}

""" results.append(preview) # Join all previews return "

Personalization Test Results

" + "".join(results) # Connect functions to the UI search_button.click( search_for_leads, inputs=[search_input, num_leads], outputs=[leads_output, exec_info_output] ) # Handle personalization testing test_personalization_btn.click( test_personalization, inputs=[], outputs=[personalization_result] ) # Handle test email def send_test_email(server, port, ssl, username, password, from_addr, to_addr, subject, body): success, message = send_email( server, int(port), ssl, username, password, from_addr, to_addr, subject, body ) return message test_email_btn.click( send_test_email, inputs=[ smtp_server, smtp_port, use_ssl, smtp_username, smtp_password, from_email, test_email, email_subject, email_body ], outputs=[test_result] ) # Handle campaign sending def start_email_campaign(leads_json, server, port, ssl, username, password, from_addr, subject, body, delay): if not leads_json: return [[lead.get('name', 'N/A'), lead.get('email', 'N/A'), "Error", "No leads available"] for lead in [{"name": "Error"}]] # Parse leads if they're in JSON string format if isinstance(leads_json, str): try: leads = json.loads(leads_json) except: return [["Error", "N/A", "Failed", "Invalid leads data"]] else: leads = leads_json smtp_settings = { 'server': server, 'port': int(port), 'use_ssl': ssl, 'username': username, 'password': password, 'from_email': from_addr } email_template = { 'subject': subject, 'body': body } results = send_bulk_emails(leads, smtp_settings, email_template, int(delay)) return [[r['name'], r.get('email', 'N/A'), r['status'], r.get('message', '')] for r in results] send_campaign_btn.click( start_email_campaign, inputs=[ leads_output, smtp_server, smtp_port, use_ssl, smtp_username, smtp_password, from_email, email_subject, email_body, email_delay ], outputs=[email_status] ) # Launch the Gradio app if __name__ == "__main__": demo.launch()