Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
import streamlit as st | |
from crewai import Agent, Task, Crew | |
from crewai_tools import SerperDevTool, WebsiteSearchTool, FileReadTool | |
from openai import OpenAI | |
import tempfile | |
import time | |
# Load environment variables | |
load_dotenv() | |
# Get API keys from environment variables | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# Initialize OpenAI API | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# Create assistant | |
assistant = client.beta.assistants.create( | |
name="Climate Strategy Assistant", | |
instructions="You are an expert in climate strategy. Analyze climate strategy documents and provide relevant insights.", | |
model="gpt-4o", | |
tools=[{"type": "file_search"}], | |
) | |
# Instantiate tools | |
file_tool = FileReadTool(file_path='analysis.txt') # Adjusted to read analysis.txt | |
search_tool = SerperDevTool() | |
web_rag_tool = WebsiteSearchTool() | |
# Create agents | |
analyst = Agent( | |
role='Climate Strategy Analyst', | |
goal='Analyze the climate strategy plan to identify key requirements and objectives.', | |
backstory='An expert in climate strategy with experience in sustainable solutions.', | |
tools=[file_tool], | |
verbose=True | |
) | |
researcher = Agent( | |
role='Climate Tech Researcher', | |
goal='Find and list specific climate tech companies that provide solutions aligning with the strategy plan.', | |
backstory='A researcher specialized in identifying and evaluating climate technology solutions, with a focus on practical implementations.', | |
tools=[search_tool, web_rag_tool], | |
verbose=True | |
) | |
# Define tasks | |
analyze_strategy = Task( | |
description='Analyze the climate strategy plan from analysis.txt and extract key requirements.', | |
expected_output='A detailed list of key requirements and objectives from the climate strategy plan.', | |
agent=analyst | |
) | |
search_companies = Task( | |
description='Search for and list specific climate tech companies that offer solutions meeting the extracted strategy requirements. Provide company names, brief descriptions of their solutions, and how they align with the strategy needs.', | |
expected_output='A detailed list of climate tech companies, including company names, solution descriptions, and how they align with the strategy requirements.', | |
agent=researcher, | |
output_file='research_results/company_recommendations.md' | |
) | |
# Assemble a crew with planning enabled | |
crew = Crew( | |
agents=[analyst, researcher], | |
tasks=[analyze_strategy, search_companies], | |
verbose=True, | |
planning=True, | |
) | |
def process_file_with_assistant(file, file_type, category, reports_needed, user_feedback): | |
print(f"Starting {file_type} processing with Assistant") | |
try: | |
# Upload the file to OpenAI | |
uploaded_file = client.files.create( | |
file=file, | |
purpose='assistants' | |
) | |
print(f"File uploaded successfully. File ID: {uploaded_file.id}") | |
# Create an assistant | |
assistant = client.beta.assistants.create( | |
name=f"{file_type} Analyzer", | |
instructions=f"You are an expert in analyzing {file_type} files, focusing on {category}. Provide insights and summaries of the content based on the following reports: {', '.join(reports_needed)}. Consider the user's previous feedback: {user_feedback}. Provide your response in plain text format without any formatting, bullet points, numbering, or source citations. Do not use asterisks, hashes, or any other special characters for emphasis or structure.", | |
model="gpt-4o", | |
tools=[{"type": "file_search"}] | |
) | |
print(f"Assistant created. Assistant ID: {assistant.id}") | |
# Create a thread | |
thread = client.beta.threads.create() | |
print(f"Thread created. Thread ID: {thread.id}") | |
# Add a message to the thread | |
message = client.beta.threads.messages.create( | |
thread_id=thread.id, | |
role="user", | |
content=f"Please analyze this file and provide insights for the {category} category, focusing on the following reports: {', '.join(reports_needed)}.", | |
attachments=[ | |
{"file_id": uploaded_file.id, "tools": [{"type": "file_search"}]} | |
] | |
) | |
print(f"Message added to thread. Message ID: {message.id}") | |
# Run the assistant | |
run = client.beta.threads.runs.create( | |
thread_id=thread.id, | |
assistant_id=assistant.id, | |
additional_instructions="Use the uploaded file to answer the question. Provide your response in plain text format without any formatting, bullet points, numbering, or source citations. Do not use asterisks, hashes, or any other special characters for emphasis or structure." | |
) | |
print(f"Run created. Run ID: {run.id}") | |
# Wait for the run to complete | |
while run.status != 'completed': | |
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id) | |
print(f"Run status: {run.status}") | |
time.sleep(1) | |
# Retrieve the messages | |
messages = client.beta.threads.messages.list(thread_id=thread.id) | |
# Extract the assistant's response | |
analysis_result = next((msg.content[0].text.value for msg in messages if msg.role == 'assistant'), None) | |
print(f"{file_type} analysis completed successfully") | |
return analysis_result | |
except Exception as e: | |
print(f"Error in processing {file_type}: {str(e)}") | |
return None | |
# Update the existing process_pdf_with_assistant function | |
def process_pdf_with_assistant(uploaded_file): | |
"""Process the uploaded PDF using the Assistants API.""" | |
return process_file_with_assistant( | |
file=uploaded_file, | |
file_type="PDF", | |
category="climate strategy", | |
reports_needed=["key points summary"], | |
user_feedback="" | |
) | |
def save_processed_content(content, filename='analysis.txt'): | |
"""Save the processed content to a text file.""" | |
if content is not None: | |
with open(filename, 'w') as f: | |
f.write(content) | |
else: | |
print("No content to save.") | |
def run_crew(): | |
# Create crew and execute tasks | |
result = crew.kickoff() | |
# Read and return the content of the output file | |
with open('research_results/company_recommendations.md', 'r') as f: | |
return f.read() | |
# Initialize session state variables | |
if 'pdf_processed' not in st.session_state: | |
st.session_state.pdf_processed = False | |
if 'processed_content' not in st.session_state: | |
st.session_state.processed_content = None | |
# Streamlit interface | |
st.title("Climate Tech Company Finder") | |
st.write("Upload a PDF to analyze climate strategy and find relevant companies.") | |
# PDF Upload | |
uploaded_pdf = st.file_uploader("Upload your climate strategy PDF", type=["pdf"]) | |
if uploaded_pdf is not None: | |
# Button to trigger PDF analysis | |
if st.button("Run PDF Analysis") or st.session_state.pdf_processed: | |
if not st.session_state.pdf_processed: | |
with st.spinner("Processing PDF..."): | |
# Process PDF with Assistants API | |
st.session_state.processed_content = process_pdf_with_assistant(uploaded_pdf) | |
if st.session_state.processed_content: | |
# Save processed content to analysis.txt | |
save_processed_content(st.session_state.processed_content, 'analysis.txt') | |
st.session_state.pdf_processed = True | |
st.success("PDF processed successfully.") | |
else: | |
st.error("Failed to process the PDF. Please try again.") | |
if st.session_state.pdf_processed: | |
# Display the PDF analysis to the user | |
st.subheader("PDF Analysis") | |
st.text_area("Key Points Summary", st.session_state.processed_content, height=200) | |
# Only show the crew analysis button after PDF analysis is complete | |
if st.button("Run Crew Analysis"): | |
with st.spinner("Running crew analysis..."): | |
result = run_crew() | |
st.subheader("Company Recommendations") | |
st.markdown(result) # Use markdown to preserve formatting | |
else: | |
st.info("Please upload a climate strategy PDF to begin.") |