PersonaTrip-Agent / tools /personalized_planner.py
tracyshen301
Configure Git LFS for image assets and prepare for push
ca1a2dd
raw
history blame
7.43 kB
import os
import asyncio
from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from pymongo import MongoClient
from anthropic import AsyncAnthropic
import requests
import base64
from PIL import Image
from io import BytesIO
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
Settings.chunk_size = 512
async_anthropic_client = AsyncAnthropic()
client = MongoClient(os.getenv("MONGODB_URI"))
db = client.get_database()
collection = db.get_collection("travelrecords")
async def get_image_b64_data(image_url: str):
def blocking_io_and_compress():
try:
response = requests.get(image_url, timeout=10)
response.raise_for_status()
content = response.content
if len(content) <= 5 * 1024 * 1024:
return response.headers.get('Content-Type','image/jpeg'), base64.b64encode(content).decode('utf-8')
print(f"⚠️ Image >5MB, compressing: {image_url}")
img = Image.open(BytesIO(content))
buffer = BytesIO()
img.save(buffer, format="JPEG", quality=70, optimize=True)
compressed_data = buffer.getvalue()
if len(compressed_data) > 5 * 1024 * 1024:
print(f"❌ Compression failed, still too large.")
return None, None
return "image/jpeg", base64.b64encode(compressed_data).decode('utf-8')
except Exception as e:
print(f"❌ Image processing error: {e}")
return None, None
return await asyncio.to_thread(blocking_io_and_compress)
async def describe_image(image_data: str, media_type: str) -> str:
if not image_data: return ""
try:
response = await async_anthropic_client.messages.create(
model="claude-3-haiku-20240307", max_tokens=75,
messages=[{"role":"user", "content":[
{"type":"image", "source":{"type":"base64", "media_type":media_type, "data":image_data}},
{"type":"text", "text":"Briefly describe this travel photo's key elements and atmosphere in one sentence."}
]}]
)
return response.content[0].text
except Exception as e:
print(f"ERROR calling vision model: {e}")
return ""
async def describe_all_images(image_urls: list) -> str:
if not image_urls: return "No images provided."
tasks = [get_image_b64_data(url) for url in image_urls]
results = await asyncio.gather(*tasks)
desc_tasks = [describe_image(img_data, media_type) for media_type, img_data in results if img_data]
descriptions = await asyncio.gather(*desc_tasks)
return "\n".join(descriptions)
async def create_personalized_plan(user_name: str, new_destination: str, trip_duration_days: int, user_request: str) -> str:
print(f"--- [Corrected Async] Starting Personalized Planner for {user_name} to {new_destination} ---")
try:
user_records = await asyncio.to_thread(list, collection.find({"name": {"$regex": user_name, "$options": "i"}}))
if not user_records: return f"I couldn't find any past travel records for {user_name}."
print(f"Found {len(user_records)} past trips for {user_name}.")
async def create_doc(record):
image_descriptions = await describe_all_images(record.get('uploadedImages', []))
text_content = (f"Trip to {record.get('destinationName', 'N/A')}: Highlights: {record.get('highlights', 'N/A')}\nImage Summary: {image_descriptions}")
return Document(text=text_content)
documents = await asyncio.gather(*[create_doc(r) for r in user_records])
print(f"Successfully created {len(documents)} documents for RAG.")
def build_and_retrieve(docs):
print("Building RAG index... You should see a progress bar now.")
index = VectorStoreIndex.from_documents(docs, show_progress=True)
return index.as_retriever(similarity_top_k=3).retrieve(f"Preferences for {new_destination}: {user_request}")
retrieved_nodes = await asyncio.to_thread(build_and_retrieve, documents)
retrieved_context = "\n\n---\n\n".join([node.get_content() for node in retrieved_nodes])
print(f"\n--- Retrieved Context for Persona ---\n{retrieved_context}\n-----------------------------------\n")
system_prompt = "You are an expert travel agent and persona analyst. Your core function is to synthesize a user's past travel preferences with their current request to generate a truly personalized and actionable travel itinerary."
final_prompt = f"""
**Mission:** Generate a hyper-personalized travel plan.
**1. Input Data:**
* **User Name:** {user_name}
* **Destination:** {new_destination}
* **Trip Duration:** {trip_duration_days} days
* **Specific Request:** "{user_request}"
* **User's Historical Travel Context (for Persona Analysis):**
---
{retrieved_context}
---
**2. Your Task (A mandatory two-step process):**
* **Step A: Define the User's Travel Persona.**
Based *only* on their historical preferences provided above, build a detailed understanding of this user's core travel style, values, and preferences.
* **Step B: Craft the Custom Itinerary.**
Using your deep understanding of the user's persona from Step A, create a day-by-day travel plan for their trip to {new_destination}. Every recommendation must align with their inferred preferences.
**3. Required Output Format (Crucial for user connection):**
1. **Greeting and Persona Summary:**
Start with a detailed summary of the user's travel persona, beginning with the phrase "Based on your past travel experiences, I've discovered you are a traveler who...". This summary should be rich with insights. For example: "Based on your past travel experiences, I've discovered you are a traveler who seeks out spectacular, awe-inspiring moments and deep cultural immersion. You appreciate both iconic, grand-scale views (like the fireworks in Tokyo and the Valley of the Kings in Luxor) and have a keen sense for authentic cuisine, while actively avoiding overrated experiences (like the cocktails in Helsinki). You balance thrilling adventures (hot air ballooning) with quiet cultural exploration and maintain a savvy, cautious approach to new environments."
2. **Introduction to the Plan:**
After the persona summary, add a transitional sentence like: "With this understanding of your unique style, I've crafted this tailored itinerary for your Paris adventure:"
3. **Personalized Itinerary:**
Finally, present the day-by-day itinerary in a clear, easy-to-read format.
"""
print("--- Calling Final LLM with direct RAG context... ---")
response_message = await async_anthropic_client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=4096, system=system_prompt,
messages=[{"role":"user", "content":final_prompt}]
)
return response_message.content[0].text
except Exception as e:
error_message = f"FATAL TOOL ERROR: {type(e).__name__}: {str(e)}"
print("\n\n---" + error_message + "---\n\n")
return error_message