import os import asyncio from llama_index.core import ( Document, VectorStoreIndex, Settings, StorageContext, load_index_from_storage ) from llama_index.core.retrievers import VectorIndexRetriever from llama_index.embeddings.huggingface import HuggingFaceEmbedding from pymongo import MongoClient from anthropic import AsyncAnthropic import traceback try: Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") Settings.chunk_size = 512 except Exception as e: print(f"Error initializing LlamaIndex settings in analyzer tool: {e}") async_anthropic_client = AsyncAnthropic() MONGO_URI = os.getenv("MONGODB_URI") INDEX_PERSIST_DIR = "./travel_data_index" mongo_client_analyzer = None db_analyzer = None travel_records_collection_analyzer = None llama_index_instance_analyzer = None async def initialize_analyzer_tool(): global mongo_client_analyzer, db_analyzer, travel_records_collection_analyzer, llama_index_instance_analyzer if llama_index_instance_analyzer is not None and mongo_client_analyzer is not None: # Already initialized print("Analyzer tool already initialized.") return True if not MONGO_URI: print("FATAL: MONGODB_URI environment variable not set for Analyzer tool.") return False if mongo_client_analyzer is None: try: mongo_client_analyzer = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # Add timeout mongo_client_analyzer.admin.command('ping') db_analyzer = mongo_client_analyzer.get_database() travel_records_collection_analyzer = db_analyzer.get_collection("travelrecords") print("MongoDB connection successful for Analyzer Tool.") if travel_records_collection_analyzer.count_documents({}) == 0: # Check if collection is empty print("Warning: 'travelrecords' collection is empty in MongoDB for Analyzer Tool.") except Exception as e: print(f"FATAL: Could not connect to MongoDB or get collection for Analyzer Tool. Error: {e}") mongo_client_analyzer = None; db_analyzer = None; travel_records_collection_analyzer = None return False try: if os.path.exists(INDEX_PERSIST_DIR) and os.listdir(INDEX_PERSIST_DIR): print(f"Loading LlamaIndex from {INDEX_PERSIST_DIR} for Analyzer Tool...") storage_context = StorageContext.from_defaults(persist_dir=INDEX_PERSIST_DIR) llama_index_instance_analyzer = load_index_from_storage(storage_context) print("LlamaIndex loaded successfully for Analyzer Tool.") else: print(f"LlamaIndex not found or directory empty at {INDEX_PERSIST_DIR}. Attempting to build for Analyzer Tool...") if travel_records_collection_analyzer is None: print("Cannot build LlamaIndex: MongoDB collection 'travelrecords' is not available.") return False all_records_cursor = travel_records_collection_analyzer.find({}) all_records = await asyncio.to_thread(list, all_records_cursor) if not all_records: print("No records found in 'travelrecords' collection to build the index. Creating empty index for Analyzer Tool.") llama_index_instance_analyzer = VectorStoreIndex.from_documents([], show_progress=False) return True documents = [] for record in all_records: content_parts = [ f"Traveler Name: {record.get('name', 'N/A')}", f"Destination: {record.get('destinationName', 'N/A')}", f"Country: {record.get('destinationCountry', 'N/A')}", f"Start Date: {record.get('startDate', 'N/A')}", f"End Date: {record.get('endDate', 'N/A')}", f"Duration (days): {record.get('tripDurationDays', 'N/A')}", f"Highlights: {record.get('highlights', 'N/A')}", f"Latitude: {record.get('latitude', 'N/A')}", f"Longitude: {record.get('longitude', 'N/A')}" ] document_text = "\n".join(filter(None, content_parts)) metadata = { "traveler_name": record.get('name'), "destination_city": record.get('destinationName'), "destination_country": record.get('destinationCountry'), "start_date": str(record.get('startDate')), "latitude": record.get('latitude'), "longitude": record.get('longitude') } filtered_metadata = {k: v for k, v in metadata.items() if v is not None} documents.append(Document(text=document_text, metadata=filtered_metadata)) if not documents: print("No documents were created from MongoDB records for Analyzer Tool. Creating empty index.") llama_index_instance_analyzer = VectorStoreIndex.from_documents([], show_progress=False) return True print(f"Building LlamaIndex from {len(documents)} travel records for Analyzer Tool...") llama_index_instance_analyzer = VectorStoreIndex.from_documents(documents, show_progress=True) llama_index_instance_analyzer.storage_context.persist(persist_dir=INDEX_PERSIST_DIR) print(f"LlamaIndex built and persisted to {INDEX_PERSIST_DIR} for Analyzer Tool.") return True except Exception as e: print(f"Error loading or building LlamaIndex for Analyzer Tool: {e}\n{traceback.format_exc()}") llama_index_instance_analyzer = None return False async def analyze_travel_data_with_llm(user_question: str) -> str: """ Analyzes travel records using LlamaIndex and an LLM to answer statistical or comparative questions. The LLM will infer necessary context (like departure points for 'farthest' queries) from the user_question itself. """ global llama_index_instance_analyzer if llama_index_instance_analyzer is None: print("LlamaIndex instance not available for analyzer tool. Attempting re-initialization...") init_success = await initialize_analyzer_tool() if not init_success or llama_index_instance_analyzer is None: return "Sorry, the travel data analysis tool is currently unavailable or could not be initialized." print(f"--- Analyzer Tool: Processing question: '{user_question}' ---") try: retriever = VectorIndexRetriever( index=llama_index_instance_analyzer, similarity_top_k=30, ) retrieved_nodes = await asyncio.to_thread(retriever.retrieve, user_question) if not retrieved_nodes: print(f"Initial retrieval for '{user_question}' yielded no results. Trying a broader retrieval...") retrieved_nodes = await asyncio.to_thread(retriever.retrieve, "summary of all travel records and traveler activities") if not retrieved_nodes: return "I couldn't find relevant travel records in the database to answer your question, even with a broader search." context_str = "\n\n---\n".join([node.get_text() for node in retrieved_nodes]) # Use node.get_text() analysis_prompt = f""" You are a data analyst AI. Your task is to answer the user's question based on the provided collection of travel records. User's Question: "{user_question}" Available Travel Records Context (a subset of all records, retrieved based on similarity to the question): --- {context_str} --- Your Task: Carefully analyze ALL the provided travel records context to answer the user's question. Consider the following when formulating your answer: - "Most active traveler": Interpret this based on the context. It could mean the traveler with the highest number of distinct trip records mentioned, or who has visited the most unique destinations mentioned. If possible, count distinct trips per traveler name. State the basis of your conclusion if ambiguous. - "Farthest place": If the user's question specifies a departure point (e.g., "From Helsinki, who went farthest?"), use that as your origin. If the question does not specify a departure point, you must state that "farthest" is relative and an exact answer cannot be given without an origin, but you can list some very distant-sounding locations mentioned in the records. Use your general geographical knowledge to estimate relative distances. The context includes latitude and longitude for destinations; you can mention these help in determining distance but you are not expected to perform calculations. - "Most times to [country/city]": Count how many separate trip records exist for each traveler to the specified location *within the provided context*. Provide a clear, concise, and direct answer. If the provided context is insufficient to definitively answer (e.g., not enough records retrieved), state that clearly (e.g., "Based on the currently available information, I cannot definitively determine... but I can tell you..."). If listing multiple travelers for a ranking, please rank them. Base your answer ONLY on the provided travel records context and the user's question. Do not invent information or assume data beyond what's provided in the context. """ print(f"--- Sending analytical prompt to LLM (context length: {len(context_str)} chars) ---") response = await async_anthropic_client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=2000, temperature=0.1, system="You are an AI data analyst. Your responses must be strictly based on the provided context from travel records and the user's question. Be precise and indicate if the context is insufficient for a definitive answer.", messages=[{"role": "user", "content": analysis_prompt}] ) answer = response.content[0].text.strip() if response.content and response.content[0].text else "I analyzed the data but could not formulate a textual answer from the LLM." print(f"--- LLM Analysis Result: {answer[:300]}... ---") return answer except Exception as e: error_message = f"Error during travel data analysis with LLM: {type(e).__name__}: {str(e)}" print(f"{error_message}\n{traceback.format_exc()}") return f"Sorry, I encountered an error trying to analyze the travel data: {str(e)}"