srivatsavdamaraju commited on
Commit
f9c4f38
·
verified ·
1 Parent(s): 5846b08

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +599 -755
app.py CHANGED
@@ -1,9 +1,9 @@
 
1
  from fastapi import FastAPI, HTTPException, Query as QueryParam
2
  from pydantic import BaseModel, Field
3
  from langchain_openai import ChatOpenAI, OpenAIEmbeddings
4
  from qdrant_client import QdrantClient
5
- from langchain.agents import Tool, AgentExecutor, create_openai_tools_agent, initialize_agent
6
- from langchain.agents.agent_types import AgentType
7
  from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
8
  from langchain.memory import ConversationBufferMemory
9
  from typing import Optional, List, Dict, Any
@@ -27,12 +27,13 @@ import psycopg2
27
  from pandasai import SmartDataframe
28
  from pandasai.llm.openai import OpenAI as PandasOpenAI
29
 
30
- from fastapi import FastAPI, Request, UploadFile, File
 
 
 
31
  from fastapi.responses import JSONResponse
32
- from qdrant_client.models import VectorParams, Distance, PointStruct, Filter, SearchRequest
33
- from langchain.document_loaders import PyPDFLoader, TextLoader, CSVLoader, Docx2txtLoader, BSHTMLLoader
34
- from langchain.text_splitter import RecursiveCharacterTextSplitter
35
- import tempfile
36
 
37
  # Import your existing S3 connection details
38
  from retrive_secrects import * # CONNECTIONS_HOST, etc.
@@ -42,20 +43,10 @@ warnings.filterwarnings("ignore", message="Qdrant client version.*is incompatibl
42
 
43
  load_dotenv()
44
 
45
- app = FastAPI(title="AI Agent with Enhanced Tools and Session Management")
46
-
47
- OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
48
- if not OPENAI_API_KEY:
49
- raise ValueError("❌ OPENAI_API_KEY not set in environment variables")
50
-
51
- # Qdrant Configuration
52
- API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIiwiZXhwIjoxNzY0MTQ5OTc3fQ.l_2R-Eyb_530887EGLUkawZQamhPGVklDMlaVs0bDqo"
53
- URL = "https://09476415-f871-4664-9c92-2f7f17c223ee.eu-central-1-0.aws.cloud.qdrant.io"
54
-
55
- # Initialize Qdrant client
56
- client = QdrantClient(url=URL, api_key=API_KEY)
57
 
58
  # Environment variables
 
59
  QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "vatsav_test_1")
60
  QDRANT_HOST = os.getenv("QDRANT_HOST", "127.0.0.1")
61
  QDRANT_PORT = int(os.getenv("QDRANT_PORT", 6333))
@@ -66,10 +57,24 @@ REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")
66
  REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
67
  REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
68
 
69
- # S3 Constants
70
  S3_Bucket_Name = 'ingenspark-user-files'
71
  S3_Raw_Files_Folder = 'User-Uploaded-Raw-Files'
72
  S3_Modified_Files_Folder = 'Modified-Files/'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  cloud_front_url = "https://files.dev.ingenspark.com/"
74
 
75
  # Initialize Redis client
@@ -77,16 +82,19 @@ def get_redis_client():
77
  """Initialize Redis client with fallback to local Redis"""
78
  try:
79
  if REDIS_URL:
 
80
  redis_client = redis.from_url(
81
  REDIS_URL,
82
  decode_responses=True,
83
  socket_connect_timeout=5,
84
  socket_timeout=5
85
  )
 
86
  redis_client.ping()
87
  print(f"✅ Connected to deployed Redis: {REDIS_URL}")
88
  return redis_client
89
  else:
 
90
  redis_client = redis.StrictRedis(
91
  host=REDIS_HOST,
92
  port=REDIS_PORT,
@@ -95,6 +103,7 @@ def get_redis_client():
95
  socket_connect_timeout=5,
96
  socket_timeout=5
97
  )
 
98
  redis_client.ping()
99
  print(f"✅ Connected to local Redis: {REDIS_HOST}:{REDIS_PORT}")
100
  return redis_client
@@ -102,23 +111,35 @@ def get_redis_client():
102
  print(f"❌ Redis connection failed: {e}")
103
  raise HTTPException(status_code=500, detail=f"Redis connection failed: {str(e)}")
104
 
 
105
  redis_client = get_redis_client()
106
 
107
  # Initialize models
108
  embedding_model = OpenAIEmbeddings(
109
- model="text-embedding-3-large",#3072
110
  openai_api_key=OPENAI_API_KEY,
111
  )
112
 
 
113
  llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=OPENAI_API_KEY)
114
 
115
- # === ENHANCED DATA READING FUNCTIONS ===
116
 
117
  def read_parquet_file_from_s3(ufuid=None, columns_list=None, records_count=None, file_location=''):
118
  """
119
- Enhanced version that reads Parquet files from S3 using Dask and returns as Pandas DataFrame.
 
 
 
 
 
 
 
 
 
120
  """
121
  try:
 
122
  conn = psycopg2.connect(
123
  host=CONNECTIONS_HOST,
124
  database=CONNECTIONS_DB,
@@ -135,18 +156,22 @@ def read_parquet_file_from_s3(ufuid=None, columns_list=None, records_count=None,
135
  raise ValueError(f"No file found for ufuid: {ufuid}")
136
  file_name, s3_file_path = file
137
  else:
 
138
  file_location = re.sub(r'\.parquet(?!$)', '', file_location)
139
  s3_file_path = file_location if file_location.endswith('.parquet') else file_location + '.parquet'
140
 
 
141
  s3_file_path = urllib.parse.unquote(s3_file_path.split(f"{S3_Bucket_Name}/")[-1])
142
  if not s3_file_path.endswith('.parquet'):
143
  s3_file_path += '.parquet'
144
 
 
145
  if columns_list and not isinstance(columns_list, list):
146
  columns_list = [col.strip(' "\'') for col in columns_list.split(',')]
147
 
148
  print(f"\n{'!' * 100}\nReading from: s3://{S3_Bucket_Name}/{s3_file_path}\n")
149
 
 
150
  ddf = dd.read_parquet(
151
  f"s3://{S3_Bucket_Name}/{s3_file_path}",
152
  engine="pyarrow",
@@ -154,9 +179,10 @@ def read_parquet_file_from_s3(ufuid=None, columns_list=None, records_count=None,
154
  assume_missing=True
155
  )
156
 
157
- ddf = ddf.repartition(npartitions=8)
158
- print("Reading Parquet file from S3 completed successfully.")
159
 
 
160
  cursor.close()
161
  conn.close()
162
 
@@ -164,142 +190,56 @@ def read_parquet_file_from_s3(ufuid=None, columns_list=None, records_count=None,
164
 
165
  except Exception as e:
166
  print(f"❌ Error reading Parquet file: {e}")
167
- return pd.DataFrame()
168
-
169
- # === ENHANCED TOOL CLASSES ===
170
 
171
- class PandasAITool:
172
- """Enhanced PandasAI tool with better error handling and context"""
 
173
 
174
- def __init__(self, df: pd.DataFrame = None):
175
- if not OPENAI_API_KEY:
176
- raise ValueError("OPENAI_API_KEY is not set in environment variables.")
177
-
178
- self.llm = PandasOpenAI(api_token=OPENAI_API_KEY)
179
- self.df = df
180
- self.sdf = SmartDataframe(df, config={"llm": self.llm}) if df is not None else None
181
-
182
- def run(self, query_with_filepath: str) -> str:
183
- """
184
- Enhanced run method that can handle both direct queries and file loading
185
- Input format: 'filepath|query' or just 'query' if DataFrame already loaded
186
- """
187
- try:
188
- # Parse input to extract filepath and query if both provided
189
- if '|' in query_with_filepath:
190
- parts = query_with_filepath.split('|', 1)
191
- if len(parts) == 2:
192
- filepath, query = parts
193
- filepath = filepath.strip()
194
- query = query.strip()
195
-
196
- # Load data from file
197
- if filepath.isdigit():
198
- data = read_parquet_file_from_s3(ufuid=int(filepath))
199
- else:
200
- data = read_parquet_file_from_s3(file_location=filepath)
201
-
202
- if data.empty:
203
- return "❌ No data found or failed to load the file. Please check the file path or ufuid."
204
-
205
- # Update the dataframe and SmartDataframe
206
- self.df = data
207
- self.sdf = SmartDataframe(data, config={"llm": self.llm})
208
- else:
209
- query = query_with_filepath
210
- else:
211
- query = query_with_filepath
212
-
213
- if self.sdf is None:
214
- return "❌ No dataset loaded. Please provide a file path with your query."
215
-
216
- print(f"\n[PandasAI Query]: {query}")
217
- result = self.sdf.chat(query)
218
- print(f"[PandasAI Result]: {result}")
219
-
220
- # Handle different types of results
221
- if isinstance(result, str):
222
- return f"📊 Analysis Result:\n{result}"
223
- elif isinstance(result, (pd.DataFrame, pd.Series)):
224
- return f"📊 Analysis Result:\n{result.to_string()}"
225
- else:
226
- return f"📊 Analysis Result:\n{str(result)}"
227
-
228
- except Exception as e:
229
- error_msg = f"❌ Error in PandasAI Tool: {str(e)}"
230
- print(error_msg)
231
- return error_msg
232
 
 
 
233
 
234
- class EnhancedDocumentSearchTool:
235
- """Enhanced document search tool that works with both local and remote Qdrant"""
236
-
237
- def __init__(self, collection_name: str = None):
238
- self.collection_name = collection_name or QDRANT_COLLECTION_NAME
239
- self.client = client
240
- self.embeddings = embedding_model
241
- self.llm = llm
242
-
243
- def run(self, query: str) -> str:
244
- """
245
- Enhanced document search with better context handling
246
- Can handle: 'search query' or 'ufuid search query'
247
- """
248
- try:
249
- # Check if query starts with a ufuid
250
- parts = query.strip().split(maxsplit=1)
251
- if len(parts) >= 2 and parts[0].isdigit():
252
- ufuid = int(parts[0])
253
- search_query = parts[1]
254
- # Could potentially filter by ufuid if metadata supports it
255
- else:
256
- search_query = query
257
-
258
- print(f"[DocumentSearchTool] Searching for: {search_query}")
259
-
260
- # Generate embedding for the query
261
- query_vector = self.embeddings.embed_query(search_query)
262
-
263
- # Search in Qdrant
264
- search_result = self.client.search(
265
- collection_name=self.collection_name,
266
- query_vector=query_vector,
267
- limit=5,
268
- )
269
-
270
- if not search_result:
271
- return "❌ No relevant information found in the knowledge base."
272
-
273
- # Extract context and sources
274
- context_texts = []
275
- sources = []
276
-
277
- for hit in search_result:
278
- context_texts.append(hit.payload.get("text", str(hit.payload)))
279
- sources.append(hit.payload.get("source", "unknown"))
280
-
281
- context = "\n\n".join(context_texts)
282
- unique_sources = list(set(sources))
283
-
284
- # Use LLM to provide a comprehensive answer
285
- prompt = f"""Based on the following context from documents, answer this query: {search_query}
286
 
287
- Context from documents:
288
- {context}
289
 
290
- Please provide a comprehensive and helpful answer based on the context above. If the context doesn't contain enough information to fully answer the query, mention this clearly but provide what information is available."""
 
291
 
292
- response = self.llm.invoke(prompt)
293
-
294
- result = f"{response.content}\n\n📚 Sources: {', '.join(unique_sources)}"
295
- print(f"[DocumentSearchTool Result]: {result}")
296
- return result
297
-
298
- except Exception as e:
299
- error_msg = f"❌ Error in Document Search Tool: {str(e)}"
300
- print(error_msg)
301
- return error_msg
 
302
 
 
 
 
 
303
 
304
  # === INPUT SCHEMAS ===
305
 
@@ -318,6 +258,10 @@ class BotQuery(BaseModel):
318
  session_id: Optional[str] = None
319
  message: str
320
 
 
 
 
 
321
  class SessionResponse(BaseModel):
322
  session_id: str
323
  userLoginId: int
@@ -329,7 +273,7 @@ class SessionResponse(BaseModel):
329
  class MessageResponse(BaseModel):
330
  message_id: str
331
  session_id: str
332
- role: str
333
  message: str
334
  timestamp: str
335
 
@@ -344,112 +288,34 @@ class UpdateSessionTitleRequest(BaseModel):
344
  # === SESSION MANAGEMENT FUNCTIONS ===
345
 
346
 
347
- def get_user_sessions(userLoginId: int) -> List[dict]:
348
- """Get all sessions for a user - returns stored titles, not generated ones"""
349
- sessions = []
350
-
351
- # Scan for all session keys
352
- for key in redis_client.scan_iter(match="session:*"):
353
- session_data = redis_client.get(key)
354
- if session_data:
355
- try:
356
- session = json.loads(session_data)
357
- if session["userLoginId"] == userLoginId:
358
- # Use the stored title, don't override it
359
- # Only generate title if it's missing or still "New Chat"
360
- current_title = session.get("title", "New Chat")
361
-
362
- # Only auto-generate if title is missing or default
363
- if not current_title or current_title == "New Chat":
364
- session["title"] = generate_session_title(session["session_id"])
365
- # Update the session in Redis with the new title
366
- redis_client.setex(
367
- key,
368
- 86400, # 24 hours
369
- json.dumps(session)
370
- )
371
-
372
- sessions.append(session)
373
- except json.JSONDecodeError:
374
- # Skip corrupted session data
375
- continue
376
-
377
- # Sort sessions by created_at (most recent first)
378
- sessions.sort(key=lambda x: x.get("created_at", ""), reverse=True)
379
- return sessions
380
 
381
 
382
- def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
383
- """Update the title of an existing session with custom name"""
384
- try:
385
- # Get current session data from Redis
386
- redis_key = f"session:{session_id}"
387
- session_data = redis_client.get(redis_key)
388
-
389
- if not session_data:
390
- raise HTTPException(status_code=404, detail="Session not found or expired")
391
-
392
- # Parse current session data
393
- session = json.loads(session_data)
394
-
395
- # Validate new title
396
- new_title = request.new_title.strip()
397
- if not new_title:
398
- raise HTTPException(status_code=400, detail="New title cannot be empty")
399
-
400
- if len(new_title) > 100:
401
- raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
402
-
403
- # Get the actual old title from Redis (not from generate_session_title)
404
- old_title = session.get("title", "New Chat")
405
-
406
- # Update the title
407
- session["title"] = new_title
408
- session["last_updated"] = datetime.now().isoformat()
409
-
410
- # Save updated session back to Redis with same TTL
411
- redis_client.setex(
412
- redis_key,
413
- 86400, # 24 hours TTL
414
- json.dumps(session)
415
- )
416
-
417
- # Verify the update by reading back from Redis
418
- updated_session_data = redis_client.get(redis_key)
419
- if updated_session_data:
420
- updated_session = json.loads(updated_session_data)
421
- actual_new_title = updated_session.get("title", "Unknown")
422
- else:
423
- raise HTTPException(status_code=500, detail="Failed to verify session update")
424
-
425
- print(f"Title update: {old_title} -> {actual_new_title}") # Debug log
426
-
427
- return {
428
- "message": "Session title updated successfully",
429
- "session_id": session_id,
430
- "old_title": old_title,
431
- "new_title": actual_new_title,
432
- "last_updated": session.get("last_updated"),
433
- "success": True
434
- }
435
-
436
- except HTTPException:
437
- raise
438
- except json.JSONDecodeError:
439
- raise HTTPException(status_code=500, detail="Invalid session data format in Redis")
440
- except Exception as e:
441
- raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
442
 
443
 
444
- # Also update the UpdateSessionTitleRequest model to be more explicit
445
- class UpdateSessionTitleRequest(BaseModel):
446
- new_title: str = Field(
447
- ...,
448
- min_length=1,
449
- max_length=100,
450
- description="New title for the session",
451
- example="My Custom Chat Title"
452
- )
453
 
454
  def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict:
455
  """Create a new chat session"""
@@ -461,12 +327,29 @@ def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict:
461
  "auth_token": auth_token,
462
  "created_at": datetime.now().isoformat(),
463
  "status": "active",
464
- "title": "New Chat"
465
  }
466
 
467
- redis_client.setex(f"session:{session_id}", 86400, json.dumps(session_data))
468
- redis_client.setex(f"chat:{session_id}", 86400, json.dumps([]))
469
- redis_client.setex(f"memory:{session_id}", 86400, json.dumps([]))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470
 
471
  return session_data
472
 
@@ -488,25 +371,70 @@ def add_message_to_session(session_id: str, role: str, message: str) -> str:
488
  "timestamp": datetime.now().isoformat()
489
  }
490
 
 
491
  chat_history = redis_client.get(f"chat:{session_id}")
492
- messages = json.loads(chat_history) if chat_history else []
 
 
 
 
 
493
  messages.append(message_data)
494
 
495
- redis_client.setex(f"chat:{session_id}", 86400, json.dumps(messages))
 
 
 
 
 
 
496
  return message_id
497
 
498
  def get_session_memory(session_id: str) -> List[Dict]:
499
  """Get conversation memory for session"""
500
  memory_data = redis_client.get(f"memory:{session_id}")
501
- return json.loads(memory_data) if memory_data else []
 
 
502
 
503
  def update_session_memory(session_id: str, messages: List[Dict]):
504
  """Update conversation memory for session"""
505
- redis_client.setex(f"memory:{session_id}", 86400, json.dumps(messages))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
506
 
507
  def generate_session_title(session_id: str) -> str:
508
  """Generate a title for the session based on chat history"""
509
  try:
 
510
  chat_data = redis_client.get(f"chat:{session_id}")
511
  if not chat_data:
512
  return "New Chat"
@@ -515,6 +443,7 @@ def generate_session_title(session_id: str) -> str:
515
  if not messages:
516
  return "New Chat"
517
 
 
518
  first_user_message = None
519
  for msg in messages:
520
  if msg["role"] == "user":
@@ -524,41 +453,59 @@ def generate_session_title(session_id: str) -> str:
524
  if not first_user_message:
525
  return "New Chat"
526
 
 
527
  title_prompt = f"""Generate a short, descriptive title (maximum 6 words) for a chat conversation that starts with this message:
528
 
529
  "{first_user_message[:200]}"
530
 
531
- Return only the title, no quotes or additional text."""
532
 
533
- response = llm.invoke(title_prompt)
534
- title = response.content.strip().replace('"', '').replace("'", "")
535
-
536
- if len(title) > 50:
537
- title = title[:47] + "..."
538
-
539
- return title if title else "New Chat"
 
 
 
 
 
 
 
 
 
540
 
541
  except Exception as e:
542
  print(f"Error in generate_session_title: {e}")
543
  return "New Chat"
544
 
545
- def update_session_title(session_id: str):
546
- """Update session title after first message"""
547
- try:
548
- session_data = redis_client.get(f"session:{session_id}")
549
- if not session_data:
550
- return
551
-
552
- session = json.loads(session_data)
553
-
554
- if session.get("title", "New Chat") == "New Chat":
555
- new_title = generate_session_title(session_id)
556
- session["title"] = new_title
557
-
558
- redis_client.setex(f"session:{session_id}", 86400, json.dumps(session))
559
-
560
- except Exception as e:
561
- print(f"Error updating session title: {e}")
 
 
 
 
 
 
 
 
562
 
563
  # === UTILITY FUNCTIONS ===
564
 
@@ -568,7 +515,11 @@ def get_encoded_auth_token(user: int, token: str) -> str:
568
 
569
  def fetch_user_projects(userLoginId: int, orgId: int, auth_token: str):
570
  url = "https://japidemo.dev.ingenspark.com/fetchUserProjects"
571
- payload = {"userLoginId": userLoginId, "orgId": orgId}
 
 
 
 
572
  headers = {
573
  'accept': 'application/json, text/plain, */*',
574
  'authorization': f'Basic {auth_token}',
@@ -612,6 +563,7 @@ def format_project_response(data: dict) -> str:
612
  if not all_projects:
613
  return "❌ No projects found."
614
 
 
615
  result = [f"✅ You have access to {len(all_projects)} project(s):\n"]
616
  for i, project in enumerate(all_projects, 1):
617
  result.append(f"{i}. Project Name: {project['projectNm']} ({project['type']})")
@@ -621,162 +573,254 @@ def format_project_response(data: dict) -> str:
621
  result.append(f" Category: {project['categoryName']}\n")
622
  return "\n".join(result)
623
 
624
- # Global variables for auth context
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
625
  _current_user_id = None
626
  _current_org_id = None
627
  _current_auth_token = None
628
 
629
  def get_user_projects(userLoginId: str) -> str:
630
- """Get list of projects for a user"""
631
  try:
 
632
  if _current_auth_token and _current_user_id:
633
  user_id = _current_user_id
634
  org_id = _current_org_id or 1
635
  auth_token = _current_auth_token
636
  else:
637
- return "❌ Authentication token required."
638
 
 
639
  encoded_token = get_encoded_auth_token(user_id, auth_token)
 
 
640
  data = fetch_user_projects(user_id, org_id, encoded_token)
641
- return format_project_response(data)
642
 
 
 
 
 
 
 
643
  except Exception as e:
644
  return f"❌ Error fetching projects: {str(e)}"
645
 
646
- # === CREATE ENHANCED AGENT ===
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
647
 
648
- def create_enhanced_agent_with_session_memory(session_id: str):
649
- """Create enhanced agent with all tools and session memory"""
 
 
650
 
651
  # Get memory from Redis
652
  memory_messages = get_session_memory(session_id)
653
 
654
- # Initialize tools with enhanced capabilities
655
- pandas_tool = PandasAITool()
656
- doc_search_tool = EnhancedDocumentSearchTool()
657
-
658
- # Create tool wrappers
659
- tools = [
660
- Tool(
661
- name="enhanced_pandas_analysis",
662
- description="""Use this tool for advanced data analysis on CSV/Parquet files using PandasAI.
663
- Input format: 'filepath|query' where:
664
- - filepath: S3 file path or ufuid (e.g., 'User-Uploaded-Raw-Files/data.csv' or '123')
665
- - query: Natural language question about the data
666
-
667
- Examples:
668
- - 'User-Uploaded-Raw-Files/mydata.csv|What are the top 5 values?'
669
- - '123|Show me summary statistics'
670
- - 'Modified-Files/processed_data|What are the trends in sales data?'""",
671
- func=pandas_tool.run
672
- ),
673
- Tool(
674
- name="enhanced_document_search",
675
- description="""Use this tool to search through ingested documents and knowledge base.
676
- Input can be:
677
- - Simple search query: 'search terms'
678
- - With ufuid: 'ufuid search terms' (e.g., '9 list all files')
679
-
680
- Perfect for answering questions about uploaded documents, manuals, or stored content.""",
681
- func=doc_search_tool.run
682
- ),
683
- Tool(
684
- name="get_user_projects",
685
- description="""Use this tool to get the list of projects for a user.
686
- Input should be the userLoginId as a string (e.g., '25').
687
- Perfect for when users ask about their projects or need project information.""",
688
- func=get_user_projects
689
- )
690
- ]
691
-
692
- # Create agent prompt
693
  agent_prompt = ChatPromptTemplate.from_messages([
694
- ("system", """You are an advanced AI assistant with access to powerful data analysis and document search capabilities:
695
-
696
- 🔧 **Available Tools:**
697
- 1. **Enhanced Pandas Analysis**: Analyze CSV/Parquet files with natural language queries
698
- 2. **Enhanced Document Search**: Search through uploaded documents and knowledge base
699
- 3. **Project Management**: Get user project information
700
-
701
- 💡 **Your Capabilities:**
702
- - Perform sophisticated data analysis on datasets using PandasAI
703
- - Search and retrieve information from document knowledge bases
704
- - Help users manage and find their projects
705
- - Remember conversation context and provide personalized assistance
706
- - Handle complex multi-step queries involving multiple tools
707
-
708
- 📋 **Guidelines:**
709
- - Use pandas analysis when users ask about data insights, statistics, trends, or file analysis
710
- - Use document search when users ask about specific content, documentation, or information
711
- - Use project tools when users need project-related information
712
- - Always provide clear, comprehensive, and well-formatted responses
713
  - Reference previous conversation context when relevant
714
- - If you need clarification, ask specific questions to help the user better
 
 
715
 
716
- 🎯 **Response Quality:**
717
- - Be thorough but concise
718
- - Use proper formatting for readability
719
- - Explain your reasoning when using tools
720
- - Provide actionable insights and next steps when appropriate
721
-
722
- Remember: You have access to both the user's conversation history and powerful analysis tools to provide the best possible assistance."""),
723
  MessagesPlaceholder(variable_name="chat_history"),
724
  ("user", "{input}"),
725
  MessagesPlaceholder(variable_name="agent_scratchpad"),
726
  ])
727
 
728
- # Create memory object and load existing messages
729
  memory = ConversationBufferMemory(
730
  memory_key="chat_history",
731
  return_messages=True
732
  )
733
 
 
734
  for msg in memory_messages:
735
  if msg["role"] == "user":
736
  memory.chat_memory.add_user_message(msg["message"])
737
  else:
738
  memory.chat_memory.add_ai_message(msg["message"])
739
 
740
- # Create the enhanced agent
 
 
 
741
  agent = create_openai_tools_agent(llm, tools, agent_prompt)
742
 
743
- # Create agent executor with memory
744
  agent_executor = AgentExecutor(
745
  agent=agent,
746
  tools=tools,
747
  verbose=True,
748
- memory=memory,
749
- max_iterations=5,
750
- early_stopping_method="generate"
751
  )
752
 
753
  return agent_executor, memory
754
 
755
- # === MIDDLEWARE ===
756
 
757
- @app.middleware("http")
758
- async def add_success_flag(request: Request, call_next):
759
- response = await call_next(request)
 
 
 
 
 
760
 
761
- if "application/json" in response.headers.get("content-type", ""):
762
- try:
763
- body = b"".join([chunk async for chunk in response.body_iterator])
764
- data = json.loads(body.decode())
765
- data["success"] = 200 <= response.status_code < 300
766
- response = JSONResponse(
767
- content=data,
768
- status_code=response.status_code,
769
- headers={k: v for k, v in response.headers.items() if k.lower() != "content-length"},
770
- )
771
- except Exception:
772
- pass
773
- return response
774
 
775
- # === MAIN BOT ENDPOINT ===
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
776
 
777
  @app.post("/bot")
778
- def enhanced_chat_with_bot(query: BotQuery):
779
- """Enhanced main bot endpoint with combined tools and session management"""
780
  try:
781
  # Set global auth context for tools
782
  global _current_user_id, _current_org_id, _current_auth_token
@@ -797,13 +841,11 @@ def enhanced_chat_with_bot(query: BotQuery):
797
  # Add user message to session
798
  user_message_id = add_message_to_session(session_id, "user", query.message)
799
 
800
- # Create enhanced agent with session memory
801
- agent_executor, memory = create_enhanced_agent_with_session_memory(session_id)
802
 
803
- # Process the query with enhanced agent
804
- print(f"\n🤖 Processing query: {query.message}")
805
  result = agent_executor.invoke({"input": query.message})
806
- print(f"🎯 Agent response: {result['output']}")
807
 
808
  # Add AI response to session
809
  ai_message_id = add_message_to_session(session_id, "assistant", result["output"])
@@ -836,9 +878,7 @@ def enhanced_chat_with_bot(query: BotQuery):
836
  "message": query.message,
837
  "answer": result["output"],
838
  "userLoginId": query.userLoginId,
839
- "agent_type": "enhanced_multi_tool",
840
- "tools_available": ["enhanced_pandas_analysis", "enhanced_document_search", "get_user_projects"],
841
- "timestamp": datetime.now().isoformat()
842
  }
843
 
844
  except Exception as e:
@@ -847,65 +887,16 @@ def enhanced_chat_with_bot(query: BotQuery):
847
  _current_org_id = None
848
  _current_auth_token = None
849
 
850
- error_msg = f"Error processing enhanced chat: {str(e)}"
851
- print(error_msg)
852
- raise HTTPException(status_code=500, detail=error_msg)
853
-
854
- # === SESSION MANAGEMENT ENDPOINTS ===
855
-
856
- @app.post("/sessions", response_model=SessionResponse)
857
- def create_new_session(userLoginId: int, orgId: int, auth_token: str):
858
- """Create a new chat session"""
859
- try:
860
- session_data = create_session(userLoginId, orgId, auth_token)
861
- return SessionResponse(**session_data)
862
- except Exception as e:
863
- raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}")
864
-
865
- @app.get("/sessions")
866
- def list_user_sessions(userLoginId: int):
867
- """List all sessions for a user"""
868
- try:
869
- sessions = []
870
- for key in redis_client.scan_iter(match="session:*"):
871
- session_data = redis_client.get(key)
872
- if session_data:
873
- session = json.loads(session_data)
874
- if session["userLoginId"] == userLoginId:
875
- session["title"] = generate_session_title(session["session_id"])
876
- sessions.append(session)
877
-
878
- sessions.sort(key=lambda x: x["created_at"], reverse=True)
879
- return {
880
- "userLoginId": userLoginId,
881
- "total_sessions": len(sessions),
882
- "sessions": sessions
883
- }
884
- except Exception as e:
885
- raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}")
886
-
887
- @app.delete("/sessions/{session_id}")
888
- def delete_user_session(session_id: str):
889
- """Delete/close a session"""
890
- try:
891
- get_session(session_id)
892
- redis_client.delete(f"session:{session_id}")
893
- redis_client.delete(f"chat:{session_id}")
894
- redis_client.delete(f"memory:{session_id}")
895
-
896
- return {
897
- "message": f"Session {session_id} deleted successfully",
898
- "session_id": session_id
899
- }
900
- except Exception as e:
901
- raise HTTPException(status_code=500, detail=f"Error deleting session: {str(e)}")
902
 
903
  @app.get("/sessions/{session_id}/history", response_model=ChatHistoryResponse)
904
- def get_session_history(session_id: str, n: int = QueryParam(50, description="Number of recent messages")):
905
  """Get chat history for a session"""
906
  try:
 
907
  get_session(session_id)
908
 
 
909
  chat_data = redis_client.get(f"chat:{session_id}")
910
  if not chat_data:
911
  return ChatHistoryResponse(
@@ -915,7 +906,11 @@ def get_session_history(session_id: str, n: int = QueryParam(50, description="Nu
915
  )
916
 
917
  messages = json.loads(chat_data)
 
 
918
  recent_messages = messages[-n:] if len(messages) > n else messages
 
 
919
  message_responses = [MessageResponse(**msg) for msg in recent_messages]
920
 
921
  return ChatHistoryResponse(
@@ -926,49 +921,19 @@ def get_session_history(session_id: str, n: int = QueryParam(50, description="Nu
926
 
927
  except Exception as e:
928
  raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}")
 
 
929
 
930
- # @app.put("/sessions/{session_id}/title")
931
- # def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
932
- # """Update the title of an existing session"""
933
- # try:
934
- # session_data = redis_client.get(f"session:{session_id}")
935
- # if not session_data:
936
- # raise HTTPException(status_code=404, detail="Session not found or expired")
937
-
938
- # session = json.loads(session_data)
939
-
940
- # new_title = request.new_title.strip()
941
- # if not new_title:
942
- # raise HTTPException(status_code=400, detail="New title cannot be empty")
943
-
944
- # if len(new_title) > 100:
945
- # raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
946
-
947
- # old_title = session.get("title", "New Chat")
948
- # session["title"] = new_title
949
- # session["last_updated"] = datetime.now().isoformat()
950
-
951
- # redis_client.setex(f"session:{session_id}", 86400, json.dumps(session))
952
-
953
- # return {
954
- # "message": "Session title updated successfully",
955
- # "session_id": session_id,
956
- # "old_title": old_title,
957
- # "new_title": new_title
958
- # }
959
-
960
- # except HTTPException:
961
- # raise
962
- # except Exception as e:
963
- # raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
964
  @app.put("/sessions/{session_id}/title")
965
  def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
966
- """Update the title of an existing session with custom name"""
967
  try:
968
- # Get current session data from Redis
969
- redis_key = f"session:{session_id}"
970
- session_data = redis_client.get(redis_key)
971
 
 
 
972
  if not session_data:
973
  raise HTTPException(status_code=404, detail="Session not found or expired")
974
 
@@ -984,302 +949,215 @@ def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRe
984
  raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
985
 
986
  # Update the title
987
- old_title = session.get("title", "New Chat")
988
  session["title"] = new_title
989
  session["last_updated"] = datetime.now().isoformat()
990
 
991
- # Save updated session back to Redis with same TTL
992
  redis_client.setex(
993
- redis_key,
994
  86400, # 24 hours TTL
995
  json.dumps(session)
996
  )
997
 
998
- # Verify the update by reading back from Redis
999
- updated_session_data = redis_client.get(redis_key)
1000
- if updated_session_data:
1001
- updated_session = json.loads(updated_session_data)
1002
- actual_new_title = updated_session.get("title", "Unknown")
1003
- else:
1004
- raise HTTPException(status_code=500, detail="Failed to verify session update")
1005
-
1006
  return {
1007
  "message": "Session title updated successfully",
1008
  "session_id": session_id,
1009
  "old_title": old_title,
1010
- "new_title": actual_new_title,
1011
- "last_updated": session.get("last_updated"),
1012
- "success": True
1013
  }
1014
 
1015
  except HTTPException:
1016
  raise
1017
- except json.JSONDecodeError:
1018
- raise HTTPException(status_code=500, detail="Invalid session data format in Redis")
1019
  except Exception as e:
1020
  raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
1021
 
1022
-
1023
- # Alternative endpoint for auto-generating titles based on chat history
1024
- @app.put("/sessions/{session_id}/title/auto-generate")
1025
- def auto_generate_session_title(session_id: str):
1026
- """Automatically generate and update session title based on chat history"""
1027
- try:
1028
- # Verify session exists
1029
- redis_key = f"session:{session_id}"
1030
- session_data = redis_client.get(redis_key)
1031
-
1032
- if not session_data:
1033
- raise HTTPException(status_code=404, detail="Session not found or expired")
1034
-
1035
- session = json.loads(session_data)
1036
- old_title = session.get("title", "New Chat")
1037
-
1038
- # Generate new title based on chat history
1039
- new_title = generate_session_title(session_id)
1040
-
1041
- # Update session
1042
- session["title"] = new_title
1043
- session["last_updated"] = datetime.now().isoformat()
1044
-
1045
- # Save updated session back to Redis
1046
- redis_client.setex(
1047
- redis_key,
1048
- 86400, # 24 hours TTL
1049
- json.dumps(session)
1050
- )
1051
-
1052
- return {
1053
- "message": "Session title auto-generated successfully",
1054
- "session_id": session_id,
1055
- "old_title": old_title,
1056
- "new_title": new_title,
1057
- "method": "auto_generated",
1058
- "success": True
1059
- }
1060
-
1061
- except HTTPException:
1062
- raise
1063
- except Exception as e:
1064
- raise HTTPException(status_code=500, detail=f"Error auto-generating session title: {str(e)}")
1065
- # === DIRECT TOOL ENDPOINTS ===
1066
-
1067
  @app.post("/chat-documents")
1068
  def chat_documents_only(query: Query):
1069
- """Direct enhanced document search without agent"""
1070
  try:
1071
- doc_search_tool = EnhancedDocumentSearchTool()
1072
- result = doc_search_tool.run(query.message)
1073
  return {
1074
  "message": query.message,
1075
  "answer": result,
1076
- "tool_used": "enhanced_document_search",
1077
- "timestamp": datetime.now().isoformat()
1078
  }
1079
  except Exception as e:
1080
  return {
1081
  "message": query.message,
1082
- "answer": f"Error occurred: {str(e)}",
1083
- "tool_used": "enhanced_document_search",
1084
- "error": True,
1085
- "timestamp": datetime.now().isoformat()
1086
  }
1087
 
1088
  @app.post("/list-projects")
1089
  def list_projects(request: ProjectRequest):
1090
  """Direct project listing without agent"""
1091
  try:
 
1092
  encoded_token = get_encoded_auth_token(request.userLoginId, request.auth_token)
 
 
1093
  data = fetch_user_projects(request.userLoginId, request.orgId, encoded_token)
 
 
1094
  formatted = format_project_response(data)
1095
  return {
1096
  "projects": formatted,
1097
- "tool_used": "project_list",
1098
- "timestamp": datetime.now().isoformat()
1099
  }
1100
  except Exception as e:
1101
  return {
1102
- "error": f"Error occurred: {str(e)}",
1103
- "tool_used": "project_list",
1104
- "timestamp": datetime.now().isoformat()
1105
  }
1106
 
1107
  @app.post("/chat-with-pandas-agent")
1108
- def chat_with_enhanced_pandas_agent(request: Query):
1109
- """Direct enhanced pandas AI agent endpoint for data analysis"""
1110
  try:
1111
- pandas_tool = PandasAITool()
1112
- result = pandas_tool.run(request.message)
1113
 
1114
  return {
1115
- "query": request.message,
 
1116
  "answer": result,
1117
- "tool_used": "enhanced_pandas_analysis",
1118
  "timestamp": datetime.now().isoformat()
1119
  }
1120
 
1121
  except Exception as e:
1122
- error_msg = f"Error occurred: {str(e)}"
1123
  return {
1124
- "query": request.message,
 
1125
  "answer": error_msg,
1126
- "tool_used": "enhanced_pandas_analysis",
1127
  "error": True,
1128
  "timestamp": datetime.now().isoformat()
1129
  }
1130
 
1131
- # === COLLECTION MANAGEMENT ENDPOINTS ===
1132
-
1133
- class CollectionRequest(BaseModel):
1134
- name: str
1135
- vector_size: int
1136
- distance: str = "Cosine"
1137
-
1138
- class UpdateCollectionRequest(BaseModel):
1139
- vector_size: int = None
1140
- distance: str = None
1141
-
1142
- @app.post("/collections/")
1143
- def create_collection(req: CollectionRequest):
1144
- """Create a new Qdrant collection"""
1145
- distance_map = {
1146
- "Cosine": Distance.COSINE,
1147
- "Euclid": Distance.EUCLID,
1148
- "Dot": Distance.DOT,
1149
- }
1150
- if req.distance not in distance_map:
1151
- raise HTTPException(status_code=400, detail="Invalid distance metric")
1152
-
1153
  try:
1154
- client.recreate_collection(
1155
- collection_name=req.name,
1156
- vectors_config=VectorParams(size=req.vector_size, distance=distance_map[req.distance]),
 
 
 
 
 
 
 
 
 
1157
  )
1158
- return {"message": f"Collection '{req.name}' created successfully"}
1159
- except Exception as e:
1160
- raise HTTPException(status_code=500, detail=str(e))
1161
-
1162
- @app.get("/collections/")
1163
- def list_collections():
1164
- """List all Qdrant collections"""
1165
- try:
1166
- collections = client.get_collections()
1167
- return collections.dict()
1168
- except Exception as e:
1169
- raise HTTPException(status_code=500, detail=str(e))
1170
-
1171
- @app.get("/collections/{name}")
1172
- def get_collection(name: str):
1173
- """Get collection information"""
1174
- try:
1175
- collection_info = client.get_collection(collection_name=name)
1176
- return collection_info.dict()
1177
  except Exception as e:
1178
- raise HTTPException(status_code=404, detail=f"Collection '{name}' not found: {str(e)}")
1179
-
1180
- @app.delete("/collections/{name}")
1181
- def delete_collection(name: str):
1182
- """Delete a collection"""
1183
  try:
1184
- client.delete_collection(collection_name=name)
1185
- return {"message": f"Collection '{name}' deleted successfully"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1186
  except Exception as e:
1187
- raise HTTPException(status_code=404, detail=f"Collection '{name}' not found: {str(e)}")
1188
-
1189
- # === DOCUMENT INGESTION ENDPOINTS ===
1190
-
1191
- @app.post("/ingest/{collection_name}")
1192
- async def ingest_file(collection_name: str, file: UploadFile = File(...)):
1193
- """Ingest documents into Qdrant collection"""
1194
- suffix = os.path.splitext(file.filename)[-1].lower()
1195
- with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
1196
- tmp.write(await file.read())
1197
- tmp_path = tmp.name
1198
-
1199
- try:
1200
- # Select loader based on file suffix
1201
- if suffix == ".pdf":
1202
- loader = PyPDFLoader(tmp_path)
1203
- elif suffix in [".txt", ".md"]:
1204
- loader = TextLoader(tmp_path)
1205
- elif suffix == ".csv":
1206
- loader = CSVLoader(file_path=tmp_path)
1207
- elif suffix == ".docx":
1208
- loader = Docx2txtLoader(tmp_path)
1209
- elif suffix == ".html":
1210
- loader = BSHTMLLoader(file_path=tmp_path)
1211
- else:
1212
- raise HTTPException(status_code=400, detail=f"Unsupported file type: {suffix}")
1213
-
1214
- docs = loader.load()
1215
- splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
1216
- chunks = splitter.split_documents(docs)
1217
-
1218
- texts = [chunk.page_content for chunk in chunks]
1219
- embeddings = embedding_model.embed_documents(texts)
1220
-
1221
- # Verify embedding dimension
1222
- collection_info = client.get_collection(collection_name=collection_name)
1223
- expected_dim = collection_info.config.params.vectors.size
1224
- if len(embeddings[0]) != expected_dim:
1225
- raise HTTPException(
1226
- status_code=400,
1227
- detail=f"Embedding dimension mismatch: expected {expected_dim}, got {len(embeddings[0])}",
1228
- )
1229
-
1230
- points = [
1231
- PointStruct(
1232
- id=str(uuid.uuid4()),
1233
- vector=embeddings[i],
1234
- payload={"text": texts[i], "source": file.filename},
1235
- )
1236
- for i in range(len(texts))
1237
- ]
1238
-
1239
- client.upsert(collection_name=collection_name, points=points)
1240
 
1241
- except HTTPException as he:
1242
- raise he
1243
- except Exception as e:
1244
- raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}")
1245
- finally:
1246
- os.remove(tmp_path)
1247
-
1248
- return {"message": f"'{file.filename}' ingested into '{collection_name}' successfully"}
1249
-
1250
- @app.get("/search/{collection_name}")
1251
- def search_collection(
1252
- collection_name: str,
1253
- query: str = QueryParam(..., description="Search query"),
1254
- top_k: int = 5
1255
- ):
1256
- """Search documents in collection"""
1257
  try:
1258
- query_vector = embedding_model.embed_query(query)
1259
- search_result = client.search(
1260
- collection_name=collection_name,
1261
- query_vector=query_vector,
1262
- limit=top_k,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1263
  )
1264
-
1265
- results = [
1266
- {
1267
- "score": hit.score,
1268
- "payload": hit.payload,
1269
- }
1270
- for hit in search_result
1271
- ]
1272
-
1273
  return {
1274
- "query": query,
1275
- "results": results,
1276
- "timestamp": datetime.now().isoformat()
 
1277
  }
1278
-
 
 
1279
  except Exception as e:
1280
- raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
1281
-
1282
- # === HEALTH AND INFO ENDPOINTS ===
1283
 
1284
  @app.get("/redis-info")
1285
  def redis_info():
@@ -1301,63 +1179,29 @@ def redis_info():
1301
 
1302
  @app.get("/health")
1303
  def health():
1304
- """Health check endpoint"""
1305
  try:
1306
  redis_client.ping()
1307
  redis_status = "connected"
1308
  except:
1309
  redis_status = "disconnected"
1310
 
1311
- try:
1312
- client.get_collections()
1313
- qdrant_status = "connected"
1314
- except:
1315
- qdrant_status = "disconnected"
1316
-
1317
  return {
1318
  "status": "ok",
1319
- "tools": ["enhanced_pandas_analysis", "enhanced_document_search", "get_user_projects"],
1320
- "agent": "enhanced_multi_tool",
1321
  "session_management": "enabled",
1322
  "redis_status": redis_status,
1323
- "qdrant_status": qdrant_status,
1324
- "pandas_ai": "enhanced",
1325
- "total_sessions": len(list(redis_client.scan_iter(match="session:*"))),
1326
- "timestamp": datetime.now().isoformat()
1327
- }
1328
-
1329
- @app.get("/")
1330
- def root():
1331
- """Root endpoint with API information"""
1332
- return {
1333
- "message": "Enhanced AI Agent API with Multi-Tool Support",
1334
- "version": "2.0",
1335
- "features": [
1336
- "Enhanced PandasAI data analysis",
1337
- "Advanced document search with Qdrant",
1338
- "Project management integration",
1339
- "Session-based conversation memory",
1340
- "Redis-backed session storage",
1341
- "Multi-tool agent with context awareness"
1342
- ],
1343
- "endpoints": {
1344
- "chat": "/bot",
1345
- "sessions": "/sessions",
1346
- "direct_tools": ["/chat-documents", "/list-projects", "/chat-with-pandas-agent"],
1347
- "collections": "/collections/",
1348
- "health": "/health"
1349
- },
1350
- "timestamp": datetime.now().isoformat()
1351
  }
1352
 
1353
  if __name__ == "__main__":
1354
  import uvicorn
1355
  try:
1356
- print("Starting Enhanced AI Agent API...")
1357
- print("Features: Enhanced PandasAI, Document Search, Project Management")
1358
- print("Session Management: Redis-backed with conversation memory")
1359
  uvicorn.run(app, host="0.0.0.0", port=8000)
1360
  except KeyboardInterrupt:
1361
- print("\nServer stopped gracefully")
1362
  except Exception as e:
1363
- print(f"Server error: {e}")
 
 
 
1
+
2
  from fastapi import FastAPI, HTTPException, Query as QueryParam
3
  from pydantic import BaseModel, Field
4
  from langchain_openai import ChatOpenAI, OpenAIEmbeddings
5
  from qdrant_client import QdrantClient
6
+ from langchain.agents import Tool, AgentExecutor, create_openai_tools_agent
 
7
  from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
8
  from langchain.memory import ConversationBufferMemory
9
  from typing import Optional, List, Dict, Any
 
27
  from pandasai import SmartDataframe
28
  from pandasai.llm.openai import OpenAI as PandasOpenAI
29
 
30
+
31
+
32
+
33
+ from fastapi import FastAPI, Request
34
  from fastapi.responses import JSONResponse
35
+ import json
36
+
 
 
37
 
38
  # Import your existing S3 connection details
39
  from retrive_secrects import * # CONNECTIONS_HOST, etc.
 
43
 
44
  load_dotenv()
45
 
46
+ app = FastAPI(title="AI Agent with Redis Session Management and Pandas AI")
 
 
 
 
 
 
 
 
 
 
 
47
 
48
  # Environment variables
49
+ OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
50
  QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "vatsav_test_1")
51
  QDRANT_HOST = os.getenv("QDRANT_HOST", "127.0.0.1")
52
  QDRANT_PORT = int(os.getenv("QDRANT_PORT", 6333))
 
57
  REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
58
  REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
59
 
60
+ # S3 Constants (from your original code)
61
  S3_Bucket_Name = 'ingenspark-user-files'
62
  S3_Raw_Files_Folder = 'User-Uploaded-Raw-Files'
63
  S3_Modified_Files_Folder = 'Modified-Files/'
64
+ S3_Output_Files_Folder = 'Output-Files/'
65
+ S3_Published_Results_Folder = 'Published-Results/'
66
+ S3_Ingen_Customer_Output = 'Ingen-Customer/'
67
+ Dominant_Segmentation_Output = 'Dominant-Segmentation/'
68
+ Trend_Segmentation_Output = 'Trend-Segmentation/'
69
+ Decile_Quartile_segmentation_Output = 'Decile-Quartile-Segmentation/'
70
+ Combined_Segmentation_Output = 'Combine-Segmentation/'
71
+ Custom_Segmentation_Output = 'Custom-Segmentation/'
72
+ Customer_360_Output = 'Customer-360/'
73
+ Merge_file_folder = S3_Modified_Files_Folder + 'IngenData-Merged-Tables/'
74
+ S3_Dev_Doc_Images_Folder = 'Developers-Documentation-Images/'
75
+ S3_Temporary_Files_Folder = S3_Raw_Files_Folder
76
+ S3_App_Specific_Data = 'Application-Specific-Data/'
77
+ S3_Transformation_Tables_Folder = 'Modified-Files/Modified-Tables/Transformation-Tables/'
78
  cloud_front_url = "https://files.dev.ingenspark.com/"
79
 
80
  # Initialize Redis client
 
82
  """Initialize Redis client with fallback to local Redis"""
83
  try:
84
  if REDIS_URL:
85
+ # Use deployed Redis URL
86
  redis_client = redis.from_url(
87
  REDIS_URL,
88
  decode_responses=True,
89
  socket_connect_timeout=5,
90
  socket_timeout=5
91
  )
92
+ # Test connection
93
  redis_client.ping()
94
  print(f"✅ Connected to deployed Redis: {REDIS_URL}")
95
  return redis_client
96
  else:
97
+ # Use local Redis
98
  redis_client = redis.StrictRedis(
99
  host=REDIS_HOST,
100
  port=REDIS_PORT,
 
103
  socket_connect_timeout=5,
104
  socket_timeout=5
105
  )
106
+ # Test connection
107
  redis_client.ping()
108
  print(f"✅ Connected to local Redis: {REDIS_HOST}:{REDIS_PORT}")
109
  return redis_client
 
111
  print(f"❌ Redis connection failed: {e}")
112
  raise HTTPException(status_code=500, detail=f"Redis connection failed: {str(e)}")
113
 
114
+ # Initialize Redis client
115
  redis_client = get_redis_client()
116
 
117
  # Initialize models
118
  embedding_model = OpenAIEmbeddings(
119
+ model="text-embedding-3-large",
120
  openai_api_key=OPENAI_API_KEY,
121
  )
122
 
123
+ qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
124
  llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=OPENAI_API_KEY)
125
 
126
+ # === PANDAS AI FUNCTIONS ===
127
 
128
  def read_parquet_file_from_s3(ufuid=None, columns_list=None, records_count=None, file_location=''):
129
  """
130
+ Reads a Parquet file from S3 using Dask and returns it as a Pandas DataFrame.
131
+
132
+ Parameters:
133
+ ufuid (int): Optional user_file_upload_id to fetch S3 path from DB.
134
+ columns_list (list/str): Columns to read.
135
+ records_count (int): Not used currently.
136
+ file_location (str): Direct file path in S3.
137
+
138
+ Returns:
139
+ pandas.DataFrame
140
  """
141
  try:
142
+ # Connect to PostgreSQL
143
  conn = psycopg2.connect(
144
  host=CONNECTIONS_HOST,
145
  database=CONNECTIONS_DB,
 
156
  raise ValueError(f"No file found for ufuid: {ufuid}")
157
  file_name, s3_file_path = file
158
  else:
159
+ # Normalize input path
160
  file_location = re.sub(r'\.parquet(?!$)', '', file_location)
161
  s3_file_path = file_location if file_location.endswith('.parquet') else file_location + '.parquet'
162
 
163
+ # Extract relative S3 path
164
  s3_file_path = urllib.parse.unquote(s3_file_path.split(f"{S3_Bucket_Name}/")[-1])
165
  if not s3_file_path.endswith('.parquet'):
166
  s3_file_path += '.parquet'
167
 
168
+ # Parse columns if given as comma-separated string
169
  if columns_list and not isinstance(columns_list, list):
170
  columns_list = [col.strip(' "\'') for col in columns_list.split(',')]
171
 
172
  print(f"\n{'!' * 100}\nReading from: s3://{S3_Bucket_Name}/{s3_file_path}\n")
173
 
174
+ # Read using Dask
175
  ddf = dd.read_parquet(
176
  f"s3://{S3_Bucket_Name}/{s3_file_path}",
177
  engine="pyarrow",
 
179
  assume_missing=True
180
  )
181
 
182
+ ddf = ddf.repartition(npartitions=8) # Optimize for processing
183
+ print("Reading Parquet file from S3 completed successfully.")
184
 
185
+ # Close database connection
186
  cursor.close()
187
  conn.close()
188
 
 
190
 
191
  except Exception as e:
192
  print(f"❌ Error reading Parquet file: {e}")
193
+ return pd.DataFrame() # Return empty DataFrame on error
 
 
194
 
195
+ def pandas_agent(filepath: str, query: str) -> str:
196
+ """
197
+ PandasAI agent that reads data from S3 and answers queries about the data.
198
 
199
+ Parameters:
200
+ filepath (str): S3 file path or ufuid
201
+ query (str): Natural language query about the data
202
+
203
+ Returns:
204
+ str: Answer from PandasAI
205
+ """
206
+ try:
207
+ # Check if filepath is a number (ufuid) or a file path
208
+ if filepath.isdigit():
209
+ # It's a ufuid
210
+ data = read_parquet_file_from_s3(ufuid=int(filepath))
211
+ else:
212
+ # It's a file path
213
+ data = read_parquet_file_from_s3(file_location=filepath)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
 
215
+ if data.empty:
216
+ return "❌ No data found or failed to load the file. Please check the file path or ufuid."
217
 
218
+ # Initialize PandasAI LLM
219
+ if not OPENAI_API_KEY:
220
+ return "❌ OPENAI_API_KEY is not set in environment variables."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
+ pandas_llm = PandasOpenAI(api_token=OPENAI_API_KEY)
 
223
 
224
+ # Create SmartDataframe
225
+ sdf = SmartDataframe(data, config={"llm": pandas_llm})
226
 
227
+ # Ask the question
228
+ print(f"🔍 Processing query: {query}")
229
+ result = sdf.chat(query)
230
+
231
+ # Handle different types of results
232
+ if isinstance(result, str):
233
+ return f"📊 Analysis Result:\n{result}"
234
+ elif isinstance(result, (pd.DataFrame, pd.Series)):
235
+ return f"📊 Analysis Result:\n{result.to_string()}"
236
+ else:
237
+ return f"📊 Analysis Result:\n{str(result)}"
238
 
239
+ except Exception as e:
240
+ error_msg = f"❌ Error in pandas_agent: {str(e)}"
241
+ print(error_msg)
242
+ return error_msg
243
 
244
  # === INPUT SCHEMAS ===
245
 
 
258
  session_id: Optional[str] = None
259
  message: str
260
 
261
+ class PandasAgentQuery(BaseModel):
262
+ filepath: str = Field(..., description="S3 file path or ufuid")
263
+ query: str = Field(..., description="Natural language query about the data")
264
+
265
  class SessionResponse(BaseModel):
266
  session_id: str
267
  userLoginId: int
 
273
  class MessageResponse(BaseModel):
274
  message_id: str
275
  session_id: str
276
+ role: str # "user" or "assistant"
277
  message: str
278
  timestamp: str
279
 
 
288
  # === SESSION MANAGEMENT FUNCTIONS ===
289
 
290
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291
 
292
 
293
+ @app.middleware("http")
294
+ async def add_success_flag(request: Request, call_next):
295
+ response = await call_next(request)
296
+
297
+ # Only modify JSON responses
298
+ if "application/json" in response.headers.get("content-type", ""):
299
+ try:
300
+ body = b"".join([chunk async for chunk in response.body_iterator])
301
+ data = json.loads(body.decode())
302
+
303
+ # Add success flag
304
+ data["success"] = 200 <= response.status_code < 300
305
+
306
+ # Build new JSONResponse (auto handles Content-Length)
307
+ response = JSONResponse(
308
+ content=data,
309
+ status_code=response.status_code,
310
+ headers={k: v for k, v in response.headers.items() if k.lower() != "content-length"},
311
+ )
312
+ except Exception:
313
+ # fallback if response is not JSON parseable
314
+ pass
315
+ return response
316
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
317
 
318
 
 
 
 
 
 
 
 
 
 
319
 
320
  def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict:
321
  """Create a new chat session"""
 
327
  "auth_token": auth_token,
328
  "created_at": datetime.now().isoformat(),
329
  "status": "active",
330
+ "title": "New Chat" # Default title, will be updated after first message
331
  }
332
 
333
+ # Store session in Redis with 24 hour TTL
334
+ redis_client.setex(
335
+ f"session:{session_id}",
336
+ 86400, # 24 hours
337
+ json.dumps(session_data)
338
+ )
339
+
340
+ # Initialize empty chat history
341
+ redis_client.setex(
342
+ f"chat:{session_id}",
343
+ 86400, # 24 hours
344
+ json.dumps([])
345
+ )
346
+
347
+ # Initialize conversation memory
348
+ redis_client.setex(
349
+ f"memory:{session_id}",
350
+ 86400, # 24 hours
351
+ json.dumps([])
352
+ )
353
 
354
  return session_data
355
 
 
371
  "timestamp": datetime.now().isoformat()
372
  }
373
 
374
+ # Get current chat history
375
  chat_history = redis_client.get(f"chat:{session_id}")
376
+ if chat_history:
377
+ messages = json.loads(chat_history)
378
+ else:
379
+ messages = []
380
+
381
+ # Add new message
382
  messages.append(message_data)
383
 
384
+ # Update chat history in Redis with extended TTL
385
+ redis_client.setex(
386
+ f"chat:{session_id}",
387
+ 86400, # 24 hours
388
+ json.dumps(messages)
389
+ )
390
+
391
  return message_id
392
 
393
  def get_session_memory(session_id: str) -> List[Dict]:
394
  """Get conversation memory for session"""
395
  memory_data = redis_client.get(f"memory:{session_id}")
396
+ if memory_data:
397
+ return json.loads(memory_data)
398
+ return []
399
 
400
  def update_session_memory(session_id: str, messages: List[Dict]):
401
  """Update conversation memory for session"""
402
+ redis_client.setex(
403
+ f"memory:{session_id}",
404
+ 86400, # 24 hours
405
+ json.dumps(messages)
406
+ )
407
+
408
+ def update_session_title(session_id: str):
409
+ """Update session title after first message"""
410
+ try:
411
+ # Get session data
412
+ session_data = redis_client.get(f"session:{session_id}")
413
+ if not session_data:
414
+ return
415
+
416
+ session = json.loads(session_data)
417
+
418
+ # Only update if current title is "New Chat"
419
+ if session.get("title", "New Chat") == "New Chat":
420
+ new_title = generate_session_title(session_id)
421
+ session["title"] = new_title
422
+
423
+ # Update session in Redis
424
+ redis_client.setex(
425
+ f"session:{session_id}",
426
+ 86400, # 24 hours
427
+ json.dumps(session)
428
+ )
429
+
430
+ except Exception as e:
431
+ print(f"Error updating session title: {e}")
432
+ pass # Don't fail the request if title update fails
433
 
434
  def generate_session_title(session_id: str) -> str:
435
  """Generate a title for the session based on chat history"""
436
  try:
437
+ # Get chat history
438
  chat_data = redis_client.get(f"chat:{session_id}")
439
  if not chat_data:
440
  return "New Chat"
 
443
  if not messages:
444
  return "New Chat"
445
 
446
+ # Get first user message for title generation
447
  first_user_message = None
448
  for msg in messages:
449
  if msg["role"] == "user":
 
453
  if not first_user_message:
454
  return "New Chat"
455
 
456
+ # Generate title using LLM
457
  title_prompt = f"""Generate a short, descriptive title (maximum 6 words) for a chat conversation that starts with this message:
458
 
459
  "{first_user_message[:200]}"
460
 
461
+ Return only the title, no quotes or additional text. The title should capture the main topic or intent of the conversation."""
462
 
463
+ try:
464
+ response = llm.invoke(title_prompt)
465
+ title = response.content.strip()
466
+
467
+ # Clean and limit title
468
+ title = title.replace('"', '').replace("'", "")
469
+ if len(title) > 50:
470
+ title = title[:47] + "..."
471
+
472
+ return title if title else "New Chat"
473
+
474
+ except Exception as e:
475
+ print(f"Error generating title: {e}")
476
+ # Fallback: use first few words of the message
477
+ words = first_user_message.split()[:4]
478
+ return " ".join(words) + ("..." if len(words) >= 4 else "")
479
 
480
  except Exception as e:
481
  print(f"Error in generate_session_title: {e}")
482
  return "New Chat"
483
 
484
+ def get_user_sessions(userLoginId: int) -> List[dict]:
485
+ """Get all sessions for a user with generated titles"""
486
+ sessions = []
487
+ # Scan for all session keys
488
+ for key in redis_client.scan_iter(match="session:*"):
489
+ session_data = redis_client.get(key)
490
+ if session_data:
491
+ session = json.loads(session_data)
492
+ if session["userLoginId"] == userLoginId:
493
+ # Generate title based on chat history
494
+ session["title"] = generate_session_title(session["session_id"])
495
+ sessions.append(session)
496
+
497
+ # Sort sessions by created_at (most recent first)
498
+ sessions.sort(key=lambda x: x["created_at"], reverse=True)
499
+ return sessions
500
+
501
+ def delete_session(session_id: str):
502
+ """Delete session and associated data"""
503
+ # Delete session data
504
+ redis_client.delete(f"session:{session_id}")
505
+ # Delete chat history
506
+ redis_client.delete(f"chat:{session_id}")
507
+ # Delete memory
508
+ redis_client.delete(f"memory:{session_id}")
509
 
510
  # === UTILITY FUNCTIONS ===
511
 
 
515
 
516
  def fetch_user_projects(userLoginId: int, orgId: int, auth_token: str):
517
  url = "https://japidemo.dev.ingenspark.com/fetchUserProjects"
518
+ payload = {
519
+ "userLoginId": userLoginId,
520
+ "orgId": orgId
521
+ }
522
+
523
  headers = {
524
  'accept': 'application/json, text/plain, */*',
525
  'authorization': f'Basic {auth_token}',
 
563
  if not all_projects:
564
  return "❌ No projects found."
565
 
566
+ # Build the formatted string
567
  result = [f"✅ You have access to {len(all_projects)} project(s):\n"]
568
  for i, project in enumerate(all_projects, 1):
569
  result.append(f"{i}. Project Name: {project['projectNm']} ({project['type']})")
 
573
  result.append(f" Category: {project['categoryName']}\n")
574
  return "\n".join(result)
575
 
576
+ # === TOOL FUNCTIONS ===
577
+
578
+ def search_documents(query: str) -> str:
579
+ """Search through ingested documents and get relevant information."""
580
+ try:
581
+ # Generate embedding for the query
582
+ query_vector = embedding_model.embed_query(query)
583
+
584
+ # Search in Qdrant
585
+ search_result = qdrant_client.search(
586
+ collection_name=QDRANT_COLLECTION_NAME,
587
+ query_vector=query_vector,
588
+ limit=5,
589
+ )
590
+
591
+ if not search_result:
592
+ return "No relevant information found in the knowledge base."
593
+
594
+ # Convert results to text content
595
+ context_texts = []
596
+ sources = []
597
+
598
+ for hit in search_result:
599
+ context_texts.append(hit.payload["text"])
600
+ sources.append(hit.payload.get("source", "unknown"))
601
+
602
+ # Create a simple prompt for answering based on context
603
+ context = "\n\n".join(context_texts)
604
+ unique_sources = list(set(sources))
605
+
606
+ # Use the LLM directly to answer the message based on context
607
+ prompt = f"""Based on the following context, answer the message: {query}
608
+
609
+ Context:
610
+ {context}
611
+
612
+ Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to answer the message, say so clearly."""
613
+
614
+ response = llm.invoke(prompt)
615
+
616
+ return f"{response.content}\n\nSources: {', '.join(unique_sources)}"
617
+
618
+ except Exception as e:
619
+ return f"Error searching documents: {str(e)}"
620
+
621
+ # Global variables to store auth context (for tool functions)
622
  _current_user_id = None
623
  _current_org_id = None
624
  _current_auth_token = None
625
 
626
  def get_user_projects(userLoginId: str) -> str:
627
+ """Get list of projects for a user."""
628
  try:
629
+ # Use global auth context if available
630
  if _current_auth_token and _current_user_id:
631
  user_id = _current_user_id
632
  org_id = _current_org_id or 1
633
  auth_token = _current_auth_token
634
  else:
635
+ return "❌ Authentication token required. Please provide auth_token in your request."
636
 
637
+ # Encode auth token using the actual user ID and provided token
638
  encoded_token = get_encoded_auth_token(user_id, auth_token)
639
+
640
+ # Fetch projects
641
  data = fetch_user_projects(user_id, org_id, encoded_token)
 
642
 
643
+ # Format and return the project list
644
+ formatted = format_project_response(data)
645
+ return formatted
646
+
647
+ except ValueError:
648
+ return "❌ Invalid userLoginId format. Please provide a valid number."
649
  except Exception as e:
650
  return f"❌ Error fetching projects: {str(e)}"
651
 
652
+ def pandas_data_analysis(query_with_filepath: str) -> str:
653
+ """
654
+ Tool for data analysis using PandasAI.
655
+ Input format: 'filepath|query' where filepath is S3 path or ufuid, and query is the analysis question.
656
+ """
657
+ try:
658
+ # Parse the input to extract filepath and query
659
+ parts = query_with_filepath.split('|', 1)
660
+ if len(parts) != 2:
661
+ return "❌ Invalid input format. Please use: 'filepath|query' format."
662
+
663
+ filepath, query = parts
664
+ filepath = filepath.strip()
665
+ query = query.strip()
666
+
667
+ if not filepath or not query:
668
+ return "❌ Both filepath and query are required."
669
+
670
+ # Use the pandas_agent function
671
+ result = pandas_agent(filepath, query)
672
+ return result
673
+
674
+ except Exception as e:
675
+ return f"❌ Error in pandas data analysis: {str(e)}"
676
+
677
+ # === CREATE TOOLS ===
678
+
679
+ document_search_tool = Tool(
680
+ name="document_search",
681
+ description="""Use this tool to search through ingested documents and get relevant information from the knowledge base.
682
+ Perfect for answering messages about uploaded documents, manuals, or any content that was previously stored.
683
+ Input should be a search query or message about the documents.""",
684
+ func=search_documents
685
+ )
686
+
687
+ project_list_tool = Tool(
688
+ name="get_user_projects",
689
+ description="""Use this tool to get the list of projects for a user.
690
+ Perfect for when users ask about their projects, want to see available projects, or need project information.
691
+ Input should be the userLoginId (e.g., '25').
692
+ Note: This tool requires authentication context to be set.""",
693
+ func=get_user_projects
694
+ )
695
+
696
+ pandas_analysis_tool = Tool(
697
+ name="pandas_data_analysis",
698
+ description="""Use this tool for data analysis on CSV/Parquet files using PandasAI.
699
+ Perfect for when users ask questions about data analysis, statistics, insights, or want to query their datasets.
700
+ Input format: 'filepath|query' where:
701
+ - filepath: S3 file path (e.g., 'User-Uploaded-Raw-Files/Data2004csv1754926601269756') or ufuid (e.g., '123')
702
+ - query: Natural language question about the data (e.g., 'What are the top 5 values?', 'Show me summary statistics')
703
+
704
+ Examples:
705
+ - 'User-Uploaded-Raw-Files/mydata.csv|What is this file about?'
706
+ - '123|Show me the first 5 rows'
707
+ - 'Modified-Files/processed_data|What are the most common values in column X?'
708
+ """,
709
+ func=pandas_data_analysis
710
+ )
711
 
712
+ # === AGENT SETUP ===
713
+
714
+ def create_agent_with_session_memory(session_id: str):
715
+ """Create agent with session memory from Redis"""
716
 
717
  # Get memory from Redis
718
  memory_messages = get_session_memory(session_id)
719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
720
  agent_prompt = ChatPromptTemplate.from_messages([
721
+ ("system", """You are a helpful AI assistant with access to multiple tools and conversation memory:
722
+
723
+ 1. **Document Search**: Search through uploaded documents and knowledge base
724
+ 2. **Project Management**: Get list of user projects and project information
725
+ 3. **Data Analysis**: Analyze CSV/Parquet files using PandasAI for insights, statistics, and queries
726
+
727
+ Your capabilities:
728
+ - Answer messages about documents using the document search tool
729
+ - Help users find their projects and project information
730
+ - Perform data analysis on uploaded datasets using natural language queries
731
+ - Remember previous conversations in this session
732
+ - Provide general assistance and information
733
+ - Use appropriate tools based on user queries
734
+
735
+ Guidelines:
736
+ - Use the document search tool when users ask about specific content, documentation, or information that might be in uploaded files
737
+ - Use the project tool when users ask about projects, want to see their projects, or need project-related information
738
+ - Use the pandas analysis tool when users ask about data analysis, statistics, insights, or want to query datasets
739
+ - For pandas analysis, you need both a filepath (S3 path or ufuid) and a query - ask for missing information if needed
740
  - Reference previous conversation context when relevant
741
+ - Be clear about which tool you're using and what information you're providing
742
+ - If you're unsure which tool to use, you can ask for clarification
743
+ - Provide helpful, accurate, and well-formatted responses
744
 
745
+ Remember: Always use the most appropriate tool based on the user's message and conversation context to provide the best possible answer."""),
 
 
 
 
 
 
746
  MessagesPlaceholder(variable_name="chat_history"),
747
  ("user", "{input}"),
748
  MessagesPlaceholder(variable_name="agent_scratchpad"),
749
  ])
750
 
751
+ # Create memory object
752
  memory = ConversationBufferMemory(
753
  memory_key="chat_history",
754
  return_messages=True
755
  )
756
 
757
+ # Load existing messages into memory
758
  for msg in memory_messages:
759
  if msg["role"] == "user":
760
  memory.chat_memory.add_user_message(msg["message"])
761
  else:
762
  memory.chat_memory.add_ai_message(msg["message"])
763
 
764
+ # Create tools list
765
+ tools = [document_search_tool, project_list_tool, pandas_analysis_tool]
766
+
767
+ # Create the agent
768
  agent = create_openai_tools_agent(llm, tools, agent_prompt)
769
 
770
+ # Create the agent executor with memory
771
  agent_executor = AgentExecutor(
772
  agent=agent,
773
  tools=tools,
774
  verbose=True,
775
+ memory=memory
 
 
776
  )
777
 
778
  return agent_executor, memory
779
 
780
+ # === API ENDPOINTS ===
781
 
782
+ @app.post("/sessions", response_model=SessionResponse)
783
+ def create_new_session(userLoginId: int, orgId: int, auth_token: str):
784
+ """Create a new chat session"""
785
+ try:
786
+ session_data = create_session(userLoginId, orgId, auth_token)
787
+ return SessionResponse(**session_data)
788
+ except Exception as e:
789
+ raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}")
790
 
791
+ @app.get("/sessions")
792
+ def list_user_sessions(userLoginId: int):
793
+ """List all sessions for a user"""
794
+ try:
795
+ sessions = get_user_sessions(userLoginId)
796
+ return {
797
+ "userLoginId": userLoginId,
798
+ "total_sessions": len(sessions),
799
+ "sessions": sessions
800
+ }
801
+ except Exception as e:
802
+ raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}")
 
803
 
804
+ @app.delete("/sessions/{session_id}")
805
+ def delete_user_session(session_id: str):
806
+ """Delete/close a session"""
807
+ try:
808
+ # Verify session exists
809
+ get_session(session_id)
810
+
811
+ # Delete session
812
+ delete_session(session_id)
813
+
814
+ return {
815
+ "message": f"Session {session_id} deleted successfully",
816
+ "session_id": session_id
817
+ }
818
+ except Exception as e:
819
+ raise HTTPException(status_code=500, detail=f"Error deleting session: {str(e)}")
820
 
821
  @app.post("/bot")
822
+ def chat_with_bot(query: BotQuery):
823
+ """Main bot endpoint with session management"""
824
  try:
825
  # Set global auth context for tools
826
  global _current_user_id, _current_org_id, _current_auth_token
 
841
  # Add user message to session
842
  user_message_id = add_message_to_session(session_id, "user", query.message)
843
 
844
+ # Create agent with session memory
845
+ agent_executor, memory = create_agent_with_session_memory(session_id)
846
 
847
+ # Use the agent to process the query
 
848
  result = agent_executor.invoke({"input": query.message})
 
849
 
850
  # Add AI response to session
851
  ai_message_id = add_message_to_session(session_id, "assistant", result["output"])
 
878
  "message": query.message,
879
  "answer": result["output"],
880
  "userLoginId": query.userLoginId,
881
+ "agent_used": True
 
 
882
  }
883
 
884
  except Exception as e:
 
887
  _current_org_id = None
888
  _current_auth_token = None
889
 
890
+ raise HTTPException(status_code=500, detail=f"Error processing chat: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
891
 
892
  @app.get("/sessions/{session_id}/history", response_model=ChatHistoryResponse)
893
+ def get_session_history(session_id: str, n: int = QueryParam(50, description="Number of recent messages to return")):
894
  """Get chat history for a session"""
895
  try:
896
+ # Verify session exists
897
  get_session(session_id)
898
 
899
+ # Get chat history
900
  chat_data = redis_client.get(f"chat:{session_id}")
901
  if not chat_data:
902
  return ChatHistoryResponse(
 
906
  )
907
 
908
  messages = json.loads(chat_data)
909
+
910
+ # Get the last n messages (or all if less than n)
911
  recent_messages = messages[-n:] if len(messages) > n else messages
912
+
913
+ # Convert to MessageResponse objects
914
  message_responses = [MessageResponse(**msg) for msg in recent_messages]
915
 
916
  return ChatHistoryResponse(
 
921
 
922
  except Exception as e:
923
  raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}")
924
+
925
+
926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
927
  @app.put("/sessions/{session_id}/title")
928
  def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
929
+ """Update the title of an existing session"""
930
  try:
931
+ # Verify that the session_id in URL matches the one in request body
932
+ if session_id != request.session_id:
933
+ raise HTTPException(status_code=400, detail="Session ID in URL and request body must match")
934
 
935
+ # Verify session exists and get current session data
936
+ session_data = redis_client.get(f"session:{session_id}")
937
  if not session_data:
938
  raise HTTPException(status_code=404, detail="Session not found or expired")
939
 
 
949
  raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
950
 
951
  # Update the title
952
+ old_title = session.get("title", "Unknown")
953
  session["title"] = new_title
954
  session["last_updated"] = datetime.now().isoformat()
955
 
956
+ # Save updated session back to Redis
957
  redis_client.setex(
958
+ f"session:{session_id}",
959
  86400, # 24 hours TTL
960
  json.dumps(session)
961
  )
962
 
 
 
 
 
 
 
 
 
963
  return {
964
  "message": "Session title updated successfully",
965
  "session_id": session_id,
966
  "old_title": old_title,
967
+ "new_title": new_title
 
 
968
  }
969
 
970
  except HTTPException:
971
  raise
 
 
972
  except Exception as e:
973
  raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
974
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
975
  @app.post("/chat-documents")
976
  def chat_documents_only(query: Query):
977
+ """Direct document search without agent"""
978
  try:
979
+ result = search_documents(query.message)
 
980
  return {
981
  "message": query.message,
982
  "answer": result,
983
+ "tool_used": "document_search"
 
984
  }
985
  except Exception as e:
986
  return {
987
  "message": query.message,
988
+ "answer": f"An error occurred: {str(e)}",
989
+ "tool_used": "document_search"
 
 
990
  }
991
 
992
  @app.post("/list-projects")
993
  def list_projects(request: ProjectRequest):
994
  """Direct project listing without agent"""
995
  try:
996
+ # Use the provided auth token and userLoginId
997
  encoded_token = get_encoded_auth_token(request.userLoginId, request.auth_token)
998
+
999
+ # Fetch projects
1000
  data = fetch_user_projects(request.userLoginId, request.orgId, encoded_token)
1001
+
1002
+ # Format and return the project list
1003
  formatted = format_project_response(data)
1004
  return {
1005
  "projects": formatted,
1006
+ "tool_used": "project_list"
 
1007
  }
1008
  except Exception as e:
1009
  return {
1010
+ "error": f"An error occurred: {str(e)}",
1011
+ "tool_used": "project_list"
 
1012
  }
1013
 
1014
  @app.post("/chat-with-pandas-agent")
1015
+ def chat_with_pandas_agent(request: PandasAgentQuery):
1016
+ """Direct pandas AI agent endpoint for data analysis"""
1017
  try:
1018
+ result = pandas_agent(request.filepath, request.query)
 
1019
 
1020
  return {
1021
+ "filepath": request.filepath,
1022
+ "query": request.query,
1023
  "answer": result,
1024
+ "tool_used": "pandas_agent",
1025
  "timestamp": datetime.now().isoformat()
1026
  }
1027
 
1028
  except Exception as e:
1029
+ error_msg = f"An error occurred: {str(e)}"
1030
  return {
1031
+ "filepath": request.filepath,
1032
+ "query": request.query,
1033
  "answer": error_msg,
1034
+ "tool_used": "pandas_agent",
1035
  "error": True,
1036
  "timestamp": datetime.now().isoformat()
1037
  }
1038
 
1039
+ @app.put("/sessions/{session_id}/title")
1040
+ def refresh_session_title(session_id: str):
1041
+ """Manually refresh/regenerate session title"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1042
  try:
1043
+ # Verify session exists
1044
+ session_data = get_session(session_id)
1045
+
1046
+ # Generate new title
1047
+ new_title = generate_session_title(session_id)
1048
+
1049
+ # Update session
1050
+ session_data["title"] = new_title
1051
+ redis_client.setex(
1052
+ f"session:{session_id}",
1053
+ 86400, # 24 hours
1054
+ json.dumps(session_data)
1055
  )
1056
+
1057
+ return {
1058
+ "session_id": session_id,
1059
+ "new_title": new_title,
1060
+ "message": "Session title updated successfully"
1061
+ }
1062
+
 
 
 
 
 
 
 
 
 
 
 
 
1063
  except Exception as e:
1064
+ raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
1065
+
1066
+ @app.put("/sessions/{session_id}/title")
1067
+ def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
1068
+ """Update the title of an existing session with custom name"""
1069
  try:
1070
+ # Verify session exists and get current session data
1071
+ session_data = redis_client.get(f"session:{session_id}")
1072
+ if not session_data:
1073
+ raise HTTPException(status_code=404, detail="Session not found or expired")
1074
+
1075
+ # Parse current session data
1076
+ session = json.loads(session_data)
1077
+
1078
+ # Validate new title
1079
+ new_title = request.new_title.strip()
1080
+ if not new_title:
1081
+ raise HTTPException(status_code=400, detail="New title cannot be empty")
1082
+
1083
+ if len(new_title) > 100:
1084
+ raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
1085
+
1086
+ # Update the title
1087
+ old_title = session.get("title", "New Chat")
1088
+ session["title"] = new_title
1089
+ session["last_updated"] = datetime.now().isoformat()
1090
+
1091
+ # Save updated session back to Redis
1092
+ redis_client.setex(
1093
+ f"session:{session_id}",
1094
+ 86400, # 24 hours TTL
1095
+ json.dumps(session)
1096
+ )
1097
+
1098
+ return {
1099
+ "message": "Session title updated successfully",
1100
+ "session_id": session_id,
1101
+ "old_title": old_title,
1102
+ "new_title": new_title
1103
+ }
1104
+
1105
+ except HTTPException:
1106
+ raise
1107
  except Exception as e:
1108
+ raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")# Add this to your existing Pydantic models section
1109
+ class UpdateSessionTitleRequest(BaseModel):
1110
+ session_id: str
1111
+ new_title: str
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1112
 
1113
+ # Add this endpoint to your FastAPI app
1114
+ @app.put("/sessions/{session_id}/title")
1115
+ def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
1116
+ """Update the title of an existing session"""
 
 
 
 
 
 
 
 
 
 
 
 
1117
  try:
1118
+ # Verify that the session_id in URL matches the one in request body
1119
+ if session_id != request.session_id:
1120
+ raise HTTPException(status_code=400, detail="Session ID in URL and request body must match")
1121
+
1122
+ # Verify session exists and get current session data
1123
+ session_data = redis_client.get(f"session:{session_id}")
1124
+ if not session_data:
1125
+ raise HTTPException(status_code=404, detail="Session not found or expired")
1126
+
1127
+ # Parse current session data
1128
+ session = json.loads(session_data)
1129
+
1130
+ # Validate new title
1131
+ new_title = request.new_title.strip()
1132
+ if not new_title:
1133
+ raise HTTPException(status_code=400, detail="New title cannot be empty")
1134
+
1135
+ if len(new_title) > 100:
1136
+ raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
1137
+
1138
+ # Update the title
1139
+ old_title = session.get("title", "Unknown")
1140
+ session["title"] = new_title
1141
+ session["last_updated"] = datetime.now().isoformat()
1142
+
1143
+ # Save updated session back to Redis
1144
+ redis_client.setex(
1145
+ f"session:{session_id}",
1146
+ 86400, # 24 hours TTL
1147
+ json.dumps(session)
1148
  )
1149
+
 
 
 
 
 
 
 
 
1150
  return {
1151
+ "message": "Session title updated successfully",
1152
+ "session_id": session_id,
1153
+ "old_title": old_title,
1154
+ "new_title": new_title
1155
  }
1156
+
1157
+ except HTTPException:
1158
+ raise
1159
  except Exception as e:
1160
+ raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
 
 
1161
 
1162
  @app.get("/redis-info")
1163
  def redis_info():
 
1179
 
1180
  @app.get("/health")
1181
  def health():
 
1182
  try:
1183
  redis_client.ping()
1184
  redis_status = "connected"
1185
  except:
1186
  redis_status = "disconnected"
1187
 
 
 
 
 
 
 
1188
  return {
1189
  "status": "ok",
1190
+ "tools": ["document_search", "project_list", "pandas_data_analysis"],
1191
+ "agent": "active",
1192
  "session_management": "enabled",
1193
  "redis_status": redis_status,
1194
+ "pandas_ai": "enabled",
1195
+ "total_sessions": len(list(redis_client.scan_iter(match="session:*")))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1196
  }
1197
 
1198
  if __name__ == "__main__":
1199
  import uvicorn
1200
  try:
 
 
 
1201
  uvicorn.run(app, host="0.0.0.0", port=8000)
1202
  except KeyboardInterrupt:
1203
+ print("\n🛑 Server stopped gracefully")
1204
  except Exception as e:
1205
+ print(f"Server error: {e}")
1206
+
1207
+ #bot4