Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException, Query as QueryParam | |
| from pydantic import BaseModel, Field | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from qdrant_client import QdrantClient | |
| from langchain.agents import Tool, AgentExecutor, create_openai_tools_agent | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.tools import BaseTool | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from typing import Type, Optional, List, Dict, Any | |
| import os | |
| import warnings | |
| import base64 | |
| import requests | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| import json | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore", message="Qdrant client version.*is incompatible.*") | |
| load_dotenv() | |
| app = FastAPI(title="AI Agent with Document Search, Project Management and Session Memory") | |
| # Environment variables | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "vatsav_test_1") | |
| QDRANT_HOST = os.getenv("QDRANT_HOST", "127.0.0.1") | |
| QDRANT_PORT = int(os.getenv("QDRANT_PORT", 6333)) | |
| # Initialize models | |
| embedding_model = OpenAIEmbeddings( | |
| model="text-embedding-3-large", | |
| openai_api_key=OPENAI_API_KEY, | |
| ) | |
| qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=OPENAI_API_KEY) | |
| # === USER SESSION MANAGEMENT === | |
| # In-memory storage for user sessions (in production, use Redis or database) | |
| user_memories: Dict[int, ConversationBufferMemory] = {} | |
| user_chat_history: Dict[int, List[Dict]] = {} | |
| def get_or_create_user_memory(user_login_id: int) -> ConversationBufferMemory: | |
| """Get or create a conversation memory for a user""" | |
| if user_login_id not in user_memories: | |
| user_memories[user_login_id] = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| user_chat_history[user_login_id] = [] | |
| return user_memories[user_login_id] | |
| def add_to_chat_history(user_login_id: int, user_message: str, ai_response: str): | |
| """Add messages to user chat history""" | |
| if user_login_id not in user_chat_history: | |
| user_chat_history[user_login_id] = [] | |
| timestamp = datetime.now().isoformat() | |
| # Add user message | |
| user_chat_history[user_login_id].append({ | |
| "id": len(user_chat_history[user_login_id]) + 1, | |
| "type": "user", | |
| "message": user_message, | |
| "timestamp": timestamp | |
| }) | |
| # Add AI response | |
| user_chat_history[user_login_id].append({ | |
| "id": len(user_chat_history[user_login_id]) + 1, | |
| "type": "assistant", | |
| "message": ai_response, | |
| "timestamp": timestamp | |
| }) | |
| # === INPUT SCHEMAS === | |
| class Query(BaseModel): | |
| message: str | |
| class ProjectRequest(BaseModel): | |
| userLoginId: int | |
| orgId: int | |
| auth_token: str | |
| class AgentQuery(BaseModel): | |
| message: str | |
| userLoginId: int # Now required for user-based memory | |
| orgId: Optional[int] = None | |
| auth_token: Optional[str] = None | |
| class ChatHistoryResponse(BaseModel): | |
| user_login_id: int | |
| total_messages: int | |
| messages: List[Dict[str, Any]] | |
| # === UTILITY FUNCTIONS === | |
| def get_encoded_auth_token(user: int, token: str) -> str: | |
| auth_string = f"{user}:{token}" | |
| return base64.b64encode(auth_string.encode("utf-8")).decode("utf-8") | |
| def fetch_user_projects(userLoginId: int, orgId: int, auth_token: str): | |
| url = "https://japidemo.dev.ingenspark.com/fetchUserProjects" | |
| payload = { | |
| "userLoginId": userLoginId, | |
| "orgId": orgId | |
| } | |
| headers = { | |
| 'accept': 'application/json, text/plain, */*', | |
| 'authorization': f'Basic {auth_token}', | |
| 'content-type': 'application/json; charset=UTF-8' | |
| } | |
| print("auth_token", auth_token) | |
| try: | |
| response = requests.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=response.status_code if 'response' in locals() else 500, | |
| detail=str(e)) | |
| def format_project_response(data: dict) -> str: | |
| my_projects = data.get("data", {}).get("Myprojects", []) | |
| other_projects = data.get("data", {}).get("Otherprojects", []) | |
| all_projects = [] | |
| for project in my_projects: | |
| all_projects.append({ | |
| "type": "Your Project", | |
| "projectNm": project["projectNm"], | |
| "projectId": project["projectId"], | |
| "created_dttm": project["created_dttm"].split('.')[0], | |
| "description": project["description"], | |
| "categoryName": project["categoryName"] | |
| }) | |
| for project in other_projects: | |
| all_projects.append({ | |
| "type": "Other Project", | |
| "projectNm": project["projectNm"], | |
| "projectId": project["projectId"], | |
| "created_dttm": project["created_dttm"].split('.')[0], | |
| "description": project["description"], | |
| "categoryName": project["categoryName"] | |
| }) | |
| if not all_projects: | |
| return "β No projects found." | |
| # Build the formatted string | |
| result = [f"β You have access to {len(all_projects)} project(s):\n"] | |
| for i, project in enumerate(all_projects, 1): | |
| result.append(f"{i}. Project Name: {project['projectNm']} ({project['type']})") | |
| result.append(f" Project ID: {project['projectId']}") | |
| result.append(f" Created On: {project['created_dttm']}") | |
| result.append(f" Description: {project['description']}") | |
| result.append(f" Category: {project['categoryName']}\n") | |
| return "\n".join(result) | |
| # === TOOL FUNCTIONS === | |
| def search_documents(query: str) -> str: | |
| """Search through ingested documents and get relevant information. | |
| Args: | |
| query: The search query or message about the documents | |
| Returns: | |
| Relevant information from the documents with sources | |
| """ | |
| try: | |
| # Generate embedding for the query | |
| query_vector = embedding_model.embed_query(query) | |
| # Search in Qdrant | |
| search_result = qdrant_client.search( | |
| collection_name=QDRANT_COLLECTION_NAME, | |
| query_vector=query_vector, | |
| limit=5, | |
| ) | |
| if not search_result: | |
| return "No relevant information found in the knowledge base." | |
| # Convert results to text content | |
| context_texts = [] | |
| sources = [] | |
| for hit in search_result: | |
| context_texts.append(hit.payload["text"]) | |
| sources.append(hit.payload.get("source", "unknown")) | |
| # Create a simple prompt for answering based on context | |
| context = "\n\n".join(context_texts) | |
| unique_sources = list(set(sources)) | |
| # Use the LLM directly to answer the message based on context | |
| prompt = f"""Based on the following context, answer the message: {query} | |
| Context: | |
| {context} | |
| Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to answer the message, say so clearly.""" | |
| response = llm.invoke(prompt) | |
| return f"{response.content}\n\nSources: {', '.join(unique_sources)}" | |
| except Exception as e: | |
| return f"Error searching documents: {str(e)}" | |
| # Global variables to store auth context (for tool functions) | |
| _current_user_id = None | |
| _current_org_id = None | |
| _current_auth_token = None | |
| def get_user_projects(userLoginId: str) -> str: | |
| """Get list of projects for a user. Requires userLoginId as input. | |
| Args: | |
| userLoginId: The user login ID to fetch projects for (format: 'userLoginId:orgId' or just 'userLoginId') | |
| Returns: | |
| Formatted list of user projects | |
| """ | |
| try: | |
| # Use global auth context if available | |
| if _current_auth_token and _current_user_id: | |
| user_id = _current_user_id | |
| org_id = _current_org_id or 1 | |
| auth_token = _current_auth_token | |
| else: | |
| # Parse userLoginId (can be "25" or "25:1" format) - fallback | |
| if ":" in userLoginId: | |
| user_id, org_id = userLoginId.split(":", 1) | |
| user_id = int(user_id) | |
| org_id = int(org_id) | |
| else: | |
| user_id = int(userLoginId) | |
| org_id = 1 # Default org ID | |
| return "β Authentication token required. Please provide auth_token in your request." | |
| # Encode auth token using the actual user ID and provided token | |
| encoded_token = get_encoded_auth_token(user_id, auth_token) | |
| # Fetch projects | |
| data = fetch_user_projects(user_id, org_id, encoded_token) | |
| print("Fetched data:", data) # Debugging line | |
| # Format and return the project list | |
| formatted = format_project_response(data) | |
| return formatted | |
| except ValueError: | |
| return "β Invalid userLoginId format. Please provide a valid number or 'userLoginId:orgId' format." | |
| except Exception as e: | |
| return f"β Error fetching projects: {str(e)}" | |
| # === CREATE TOOLS === | |
| document_search_tool = Tool( | |
| name="document_search", | |
| description="""Use this tool to search through ingested documents and get relevant information from the knowledge base. | |
| Perfect for answering messages about uploaded documents, manuals, or any content that was previously stored. | |
| Input should be a search query or message about the documents.""", | |
| func=search_documents | |
| ) | |
| project_list_tool = Tool( | |
| name="get_user_projects", | |
| description="""Use this tool to get the list of projects for a user. | |
| Perfect for when users ask about their projects, want to see available projects, or need project information. | |
| Input should be the userLoginId (e.g., '25') or in format 'userLoginId:orgId' (e.g., '25:1'). | |
| Note: This tool requires authentication context to be set.""", | |
| func=get_user_projects | |
| ) | |
| # === AGENT SETUP === | |
| def create_agent_with_memory(memory: ConversationBufferMemory): | |
| """Create agent with session memory""" | |
| agent_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """You are a helpful AI assistant with access to multiple tools and conversation memory: | |
| 1. **Document Search**: Search through uploaded documents and knowledge base | |
| 2. **Project Management**: Get list of user projects and project information | |
| Your capabilities: | |
| - Answer messages about documents using the document search tool | |
| - Help users find their projects and project information | |
| - Remember previous conversations in this session | |
| - Provide general assistance and information | |
| - Use appropriate tools based on user queries | |
| Guidelines: | |
| - Use the document search tool when users ask about specific content, documentation, or information that might be in uploaded files | |
| - Use the project tool when users ask about projects, want to see their projects, or need project-related information | |
| - Reference previous conversation context when relevant | |
| - If users mention a userLoginId or ask about projects, use the project tool | |
| - Be clear about which tool you're using and what information you're providing | |
| - If you're unsure which tool to use, you can ask for clarification | |
| - Provide helpful, accurate, and well-formatted responses | |
| Remember: Always use the most appropriate tool based on the user's message and conversation context to provide the best possible answer."""), | |
| MessagesPlaceholder(variable_name="chat_history"), | |
| ("user", "{input}"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad"), | |
| ]) | |
| # Create tools list | |
| tools = [document_search_tool, project_list_tool] | |
| # Create the agent | |
| agent = create_openai_tools_agent(llm, tools, agent_prompt) | |
| # Create the agent executor with memory | |
| agent_executor = AgentExecutor( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, | |
| memory=memory | |
| ) | |
| return agent_executor | |
| # === API ENDPOINTS === | |
| @app.post("/bot") | |
| def chat_with_agent(query: AgentQuery): | |
| """Main agent endpoint with user-based memory - handles both document search and project queries intelligently""" | |
| try: | |
| # Set global auth context for tools | |
| global _current_user_id, _current_org_id, _current_auth_token | |
| _current_user_id = query.userLoginId | |
| _current_org_id = query.orgId | |
| _current_auth_token = query.auth_token | |
| # Get or create user memory | |
| memory = get_or_create_user_memory(query.userLoginId) | |
| # Create agent with memory | |
| agent_executor = create_agent_with_memory(memory) | |
| # Prepare the input for the agent | |
| agent_input = query.message | |
| # If user provided credentials, add them to the context | |
| if query.userLoginId is not None: | |
| agent_input += f" [UserLoginId: {query.userLoginId}" | |
| if query.orgId is not None: | |
| agent_input += f", OrgId: {query.orgId}" | |
| agent_input += "]" | |
| # Use the agent to process the query | |
| result = agent_executor.invoke({"input": agent_input}) | |
| # Add to chat history | |
| add_to_chat_history(query.userLoginId, query.message, result["output"]) | |
| # Clear auth context after use | |
| _current_user_id = None | |
| _current_org_id = None | |
| _current_auth_token = None | |
| return { | |
| "message": query.message, | |
| "answer": result["output"], | |
| "user_login_id": query.userLoginId, | |
| "agent_used": True | |
| } | |
| except Exception as e: | |
| # Clear auth context on error | |
| _current_user_id = None | |
| _current_org_id = None | |
| _current_auth_token = None | |
| return { | |
| "message": query.message, | |
| "answer": f"An error occurred: {str(e)}", | |
| "user_login_id": query.userLoginId, | |
| "agent_used": True | |
| } | |
| @app.get("/get-chat-history/{user_login_id}") | |
| def get_chat_history( | |
| user_login_id: int, | |
| n: int = QueryParam(10, description="Number of recent messages to return") | |
| ) -> ChatHistoryResponse: | |
| """Get chat history for a user""" | |
| try: | |
| if user_login_id not in user_chat_history: | |
| return ChatHistoryResponse( | |
| user_login_id=user_login_id, | |
| total_messages=0, | |
| messages=[] | |
| ) | |
| history = user_chat_history[user_login_id] | |
| # Get the last n messages (or all if less than n) | |
| recent_messages = history[-n:] if len(history) > n else history | |
| return ChatHistoryResponse( | |
| user_login_id=user_login_id, | |
| total_messages=len(history), | |
| messages=recent_messages | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}") | |
| @app.delete("/clear-user-history/{user_login_id}") | |
| def clear_user_history(user_login_id: int): | |
| """Clear user memory and chat history""" | |
| try: | |
| if user_login_id in user_memories: | |
| del user_memories[user_login_id] | |
| if user_login_id in user_chat_history: | |
| del user_chat_history[user_login_id] | |
| return { | |
| "message": f"User {user_login_id} chat history cleared successfully", | |
| "user_login_id": user_login_id | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error clearing user history: {str(e)}") | |
| @app.get("/active-users") | |
| def get_active_users(): | |
| """Get list of active users with chat history""" | |
| return { | |
| "active_users": list(user_memories.keys()), | |
| "total_active_users": len(user_memories) | |
| } | |
| @app.post("/chat-documents") | |
| def chat_documents_only(query: Query): | |
| """Direct document search without agent""" | |
| try: | |
| result = search_documents(query.message) | |
| return { | |
| "message": query.message, | |
| "answer": result, | |
| "tool_used": "document_search" | |
| } | |
| except Exception as e: | |
| return { | |
| "message": query.message, | |
| "answer": f"An error occurred: {str(e)}", | |
| "tool_used": "document_search" | |
| } | |
| @app.post("/list-projects") | |
| def list_projects(request: ProjectRequest): | |
| """Direct project listing without agent""" | |
| try: | |
| # Use the provided auth token and userLoginId | |
| encoded_token = get_encoded_auth_token(request.userLoginId, request.auth_token) | |
| print("Encoded token:", encoded_token) | |
| # Fetch projects | |
| data = fetch_user_projects(request.userLoginId, request.orgId, encoded_token) | |
| # Format and return the project list | |
| formatted = format_project_response(data) | |
| return { | |
| "projects": formatted, | |
| "tool_used": "project_list" | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"An error occurred: {str(e)}", | |
| "tool_used": "project_list" | |
| } | |
| @app.get("/health") | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "tools": ["document_search", "project_list"], | |
| "agent": "active", | |
| "user_memory_management": "enabled", | |
| "active_users": len(user_memories) | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| try: | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| except KeyboardInterrupt: | |
| print("\nπ Server stopped gracefully") | |
| except Exception as e: | |
| print(f"β Server error: {e}") |