Soumik555 commited on
Commit
133bf59
·
1 Parent(s): 3ab74be

DB Chat for pro users only

Browse files
Files changed (1) hide show
  1. orchestrator_functions.py +109 -276
orchestrator_functions.py CHANGED
@@ -3,6 +3,7 @@ import asyncio
3
  import logging
4
  import os
5
  import threading
 
6
  import uuid
7
  from fastapi.encoders import jsonable_encoder
8
  import numpy as np
@@ -344,164 +345,19 @@ def langchain_csv_chart(csv_url: str, question: str, chart_required: bool):
344
 
345
 
346
 
347
- ###########################################################################################################################
348
 
349
-
350
-
351
-
352
- # async def csv_chart(csv_url: str, query: str):
353
- # """
354
- # Generate a chart based on the provided CSV URL and query.
355
- # Parameters:
356
- # - csv_url (str): The URL of the CSV file.
357
- # - query (str): The query for generating the chart.
358
- # Returns:
359
- # - dict: A dictionary containing the generated chart image URL.
360
- # Example:
361
- # - csv_url: "https://example.com/data.csv"
362
- # - query: "Generate a bar chart showing sales by region."
363
- # Returns:
364
- # - dict: {"image_url": "https://example.com/chart.png"}.
365
-
366
- # """
367
-
368
- # try:
369
- # # First try Groq-based chart generation
370
- # try:
371
- # groq_result = await asyncio.to_thread(groq_chart, csv_url, query)
372
- # logger.info(f"Generated Chart (Groq): {groq_result}")
373
-
374
- # if groq_result != 'Chart not generated':
375
- # unique_file_name = f'{str(uuid.uuid4())}.png'
376
- # image_public_url = await upload_file_to_supabase(groq_result, unique_file_name)
377
- # logger.info(f"Image uploaded to Supabase: {image_public_url}")
378
- # return {"image_url": image_public_url}
379
-
380
- # except Exception as groq_error:
381
- # logger.info(f"Groq chart generation failed, falling back to Langchain: {str(groq_error)}")
382
-
383
- # # Fallback to Langchain if Groq fails
384
- # try:
385
- # langchain_paths = await asyncio.to_thread(langchain_csv_chart, csv_url, query, True)
386
- # logger.info("Fallback langchain chart result:", langchain_paths)
387
-
388
- # if isinstance(langchain_paths, list) and len(langchain_paths) > 0:
389
- # unique_file_name = f'{str(uuid.uuid4())}.png'
390
- # logger.info("Uploading the chart to supabase...")
391
- # image_public_url = await upload_file_to_supabase(langchain_paths[0], unique_file_name)
392
- # logger.info("Image uploaded to Supabase and Image URL is... ", image_public_url)
393
- # return {"image_url": image_public_url}
394
-
395
- # except Exception as langchain_error:
396
- # logger.info(f"Langchain chart generation also failed: {str(langchain_error)}")
397
- # try:
398
- # # Last resort: Try with the gemini langchain agent
399
- # logger.info("Trying with the gemini langchain agent...")
400
- # lc_gemini_chart_result = await asyncio.to_thread(langchain_gemini_csv_handler, csv_url, query, True)
401
- # if lc_gemini_chart_result is not None:
402
- # clean_path = lc_gemini_chart_result.strip()
403
- # unique_file_name = f'{str(uuid.uuid4())}.png'
404
- # logger.info("Uploading the chart to supabase...")
405
- # image_public_url = await upload_file_to_supabase(clean_path, unique_file_name)
406
- # logger.info("Image uploaded to Supabase and Image URL is... ", image_public_url)
407
- # return {"image_url": image_public_url}
408
- # except Exception as gemini_error:
409
- # logger.info(f"Gemini Langchain chart generation also failed: {str(gemini_error)}")
410
-
411
- # # If both methods fail
412
- # return {"error": "Could not generate the chart, please try again."}
413
-
414
- # except Exception as e:
415
- # logger.info(f"Critical chart error: {str(e)}")
416
- # return {"error": "Internal system error"}
417
-
418
-
419
-
420
-
421
-
422
-
423
-
424
-
425
- # async def csv_chat(csv_url: str, query: str):
426
- # """
427
- # Generate a response based on the provided CSV URL and query.
428
- # Parameters:
429
- # - csv_url (str): The URL of the CSV file.
430
- # - query (str): The query for generating the response.
431
- # Returns:
432
- # - dict: A dictionary containing the generated response.
433
- # Example:
434
- # - csv_url: "https://example.com/data.csv"
435
- # - query: "What is the total sales for the year 2022?"
436
- # Returns:
437
- # - dict: {"answer": "The total sales for 2022 is $100,000."}.
438
- # """
439
- # try:
440
- # updated_query = f"{query} and Do not show any charts or graphs."
441
-
442
- # # Process with Groq first
443
- # try:
444
- # groq_answer = await asyncio.to_thread(groq_chat, csv_url, updated_query)
445
- # logger.info("groq_answer:", groq_answer)
446
-
447
- # if process_answer(groq_answer) == "Empty response received." or groq_answer == None:
448
- # return {"answer": "Sorry, I couldn't find relevant data..."}
449
-
450
- # if process_answer(groq_answer) or groq_answer == None:
451
- # raise Exception("Groq response not usable, falling back to LangChain")
452
-
453
- # return {"answer": jsonable_encoder(groq_answer)}
454
-
455
- # except Exception as groq_error:
456
- # logger.info(f"Groq error, falling back to LangChain: {str(groq_error)}")
457
-
458
- # # Process with LangChain if Groq fails
459
- # try:
460
- # lang_answer = await asyncio.to_thread(
461
- # langchain_csv_chat, csv_url, query, False
462
- # )
463
- # if not process_answer(lang_answer):
464
- # return {"answer": jsonable_encoder(lang_answer)}
465
- # return {"answer": "Sorry, I couldn't find relevant data..."}
466
- # except Exception as langchain_error:
467
- # logger.info(f"LangChain processing error: {str(langchain_error)}")
468
-
469
- # # last resort: Try with the gemini langchain agent
470
- # try:
471
- # gemini_answer = await asyncio.to_thread(
472
- # langchain_gemini_csv_handler, csv_url, query, False
473
- # )
474
- # if not process_answer(gemini_answer):
475
- # return {"answer": jsonable_encoder(gemini_answer)}
476
- # return {"answer": "Sorry, I couldn't find relevant data..."}
477
- # except Exception as gemini_error:
478
- # logger.info(f"Gemini Langchain processing error: {str(gemini_error)}")
479
- # return {"answer": "error"}
480
-
481
- # except Exception as e:
482
- # logger.info(f"Error processing request: {str(e)}")
483
- # return {"answer": "error"}
484
-
485
-
486
-
487
-
488
-
489
-
490
-
491
- ####################################### Start with lc_gemini #######################################
492
-
493
-
494
- async def csv_chat(csv_url: str, query: str):
495
  """
496
  Generate a response based on the provided CSV URL and query.
497
- Prioritizes LangChain-Groq, then raw Groq, and finally LangChain-Gemini as fallback.
498
 
499
  Parameters:
500
  - csv_url (str): The URL of the CSV file.
501
  - query (str): The query for generating the response.
502
 
503
  Returns:
504
- - dict: A dictionary containing the generated response.
505
 
506
  Example:
507
  - csv_url: "https://example.com/data.csv"
@@ -509,159 +365,136 @@ async def csv_chat(csv_url: str, query: str):
509
  Returns:
510
  - dict: {"answer": "The total sales for 2022 is $100,000."}
511
  """
 
 
 
 
512
  try:
513
- updated_query = f"{query} and Do not show any charts or graphs."
 
 
 
514
 
515
- # --- 1. First Attempt: LangChain Groq ---
516
- try:
517
- lang_groq_answer = await asyncio.to_thread(
518
- langchain_csv_chat, csv_url, updated_query, False
519
- )
520
- logger.info("LangChain-Groq answer:", lang_groq_answer)
521
-
522
- if lang_groq_answer is not None:
523
- return {"answer": jsonable_encoder(lang_groq_answer)}
524
-
525
- raise Exception("LangChain-Groq response not usable, falling back to raw Groq")
526
-
527
- except Exception as lang_groq_error:
528
- logger.info(f"LangChain-Groq error: {str(lang_groq_error)}")
529
-
530
- # --- 2. Second Attempt: Raw Groq Chat ---
531
- try:
532
- raw_groq_answer = await asyncio.to_thread(groq_chat, csv_url, updated_query)
533
- logger.info("Raw Groq answer:", raw_groq_answer)
534
-
535
- if process_answer(raw_groq_answer) == "Empty response received." or raw_groq_answer is None:
536
- raise Exception("Raw Groq response not usable, falling back to LangChain-Gemini")
537
-
538
- if process_answer(raw_groq_answer):
539
- raise Exception("Raw Groq response not usable, falling back to LangChain-Gemini")
540
-
541
- return {"answer": jsonable_encoder(raw_groq_answer)}
542
-
543
- except Exception as raw_groq_error:
544
- logger.info(f"Raw Groq error: {str(raw_groq_error)}")
545
-
546
- # --- 3. Final Attempt: LangChain Gemini ---
547
- try:
548
- gemini_answer = await asyncio.to_thread(
549
- langchain_gemini_csv_handler, csv_url, updated_query, False
550
- )
551
- logger.info("LangChain-Gemini answer:", gemini_answer)
552
-
553
- if gemini_answer is not None:
554
- return {"answer": jsonable_encoder(gemini_answer)}
555
-
556
- raise Exception("All fallbacks exhausted")
557
-
558
- except Exception as gemini_error:
559
- logger.info(f"LangChain-Gemini error: {str(gemini_error)}")
560
- return {"answer": "Sorry, I couldn't find relevant data..."}
561
-
562
- except Exception as e:
563
- logger.info(f"Unexpected error: {str(e)}")
564
- return {"answer": "error"}
565
 
 
 
 
 
 
 
 
 
 
566
 
567
 
568
 
569
 
 
570
 
571
 
572
- async def csv_chart(csv_url: str, query: str, chat_id: str):
573
  """
574
  Generate a chart based on the provided CSV URL and query.
575
- Prioritizes PandasAI Groq, then LangChain Gemini, and finally LangChain Groq as fallback.
576
 
577
  Parameters:
578
- - csv_url (str): The URL of the CSV file.
579
- - query (str): The query for generating the chart.
 
580
 
581
  Returns:
582
- - dict: A dictionary containing either:
583
- - {"image_url": "https://example.com/chart.png"} on success, or
584
- - {"error": "error message"} on failure
585
 
586
  Example:
587
  - csv_url: "https://example.com/data.csv"
588
- - query: "Show sales trends as a line chart"
589
  Returns:
590
  - dict: {"image_url": "https://storage.example.com/chart_uuid.png"}
591
  """
 
592
 
593
- async def upload_and_return(image_path: str, chat_id: str) -> dict:
594
- """Helper function to handle image uploads"""
595
  unique_name = f'{uuid.uuid4()}.png'
596
  public_url = await upload_file_to_supabase(image_path, unique_name, chat_id)
597
- logger.info(f"Uploaded chart: {public_url}")
598
- os.remove(image_path) # Remove the local image file after upload
 
 
 
599
  return {"image_url": public_url}
600
 
 
601
  try:
602
- # Commented out for now because aiml api is not working
603
- # try:
604
- # # --- 1. First Attempt: OpenAI ---
605
- # openai_result = await asyncio.to_thread(openai_chart, csv_url, query)
606
- # logger.info(f"OpenAI chart result:", openai_result)
607
-
608
- # if openai_result and openai_result != 'Chart not generated':
609
- # return await upload_and_return(openai_result, chat_id)
610
-
611
- # raise Exception("OpenAI failed to generate chart")
612
-
613
- # except Exception as openai_error:
614
- # logger.info(f"OpenAI failed ({str(openai_error)}), trying raw Groq...")
615
- # --- 2. Second Attempt: Raw Groq ---
616
- try:
617
- groq_result = await asyncio.to_thread(groq_chart, csv_url, query)
618
- logger.info(f"Raw Groq chart result:", groq_result)
619
-
620
- if groq_result and groq_result != 'Chart not generated':
621
- return await upload_and_return(groq_result, chat_id)
622
-
623
- raise Exception("Raw Groq failed to generate chart")
624
 
625
- except Exception as groq_error:
626
- logger.info(f"Raw Groq failed ({str(groq_error)}), trying LangChain Gemini...")
 
 
 
 
 
 
 
627
 
628
- # --- 3. Third Attempt: LangChain Gemini ---
629
- try:
630
- gemini_result = await asyncio.to_thread(
631
- langchain_gemini_csv_handler, csv_url, query, True
632
- )
633
- logger.info("LangChain Gemini chart result:", gemini_result)
634
-
635
- # --- i) If Gemini result is a string, return it ---
636
- if gemini_result and isinstance(gemini_result, str):
637
- clean_path = gemini_result.strip()
638
- return await upload_and_return(clean_path, chat_id)
639
-
640
- # --- ii) If Gemini result is a list, return the first element ---
641
- if gemini_result and isinstance(gemini_result, list) and len(gemini_result) > 0:
642
- return await upload_and_return(gemini_result[0], chat_id)
643
-
644
- raise Exception("LangChain Gemini returned empty result")
645
-
646
- except Exception as gemini_error:
647
- logger.info(f"LangChain Gemini failed ({str(gemini_error)}), trying LangChain Groq...")
648
-
649
- # --- 4. Final Attempt: LangChain Groq ---
650
- try:
651
- lc_groq_paths = await asyncio.to_thread(
652
- langchain_csv_chart, csv_url, query, True
653
- )
654
- logger.info("LangChain Groq chart result:", lc_groq_paths)
655
-
656
- if isinstance(lc_groq_paths, list) and lc_groq_paths:
657
- return await upload_and_return(lc_groq_paths[0], chat_id)
658
-
659
- return {"error": "All chart generation methods failed"}
660
-
661
- except Exception as lc_groq_error:
662
- logger.info(f"LangChain Groq failed: {str(lc_groq_error)}")
663
- return {"error": "Could not generate chart"}
664
-
665
- except Exception as e:
666
- logger.info(f"Critical error: {str(e)}")
667
- return {"error": "Internal system error"}
 
3
  import logging
4
  import os
5
  import threading
6
+ from typing import Dict
7
  import uuid
8
  from fastapi.encoders import jsonable_encoder
9
  import numpy as np
 
345
 
346
 
347
 
348
+ ####################################### Orchestrator Function for Chat #######################################
349
 
350
+ async def csv_chat(csv_url: str, query: str) -> dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
351
  """
352
  Generate a response based on the provided CSV URL and query.
353
+ Prioritizes LangChain-Gemini first, then falls back to LangChain-Groq.
354
 
355
  Parameters:
356
  - csv_url (str): The URL of the CSV file.
357
  - query (str): The query for generating the response.
358
 
359
  Returns:
360
+ - dict: A dictionary containing the generated response or error message.
361
 
362
  Example:
363
  - csv_url: "https://example.com/data.csv"
 
365
  Returns:
366
  - dict: {"answer": "The total sales for 2022 is $100,000."}
367
  """
368
+ updated_query = f"{query} and Do not show any charts or graphs."
369
+ error_messages = []
370
+
371
+ # --- 1. First Attempt: LangChain Gemini ---
372
  try:
373
+ gemini_answer = await asyncio.to_thread(
374
+ langchain_gemini_csv_handler, csv_url, updated_query, False
375
+ )
376
+ logger.info(f"LangChain-Gemini answer: {gemini_answer}")
377
 
378
+ if gemini_answer and is_valid_response(gemini_answer):
379
+ return {"answer": jsonable_encoder(gemini_answer)}
380
+
381
+ error_messages.append("LangChain-Gemini response not usable")
382
+ except Exception as gemini_error:
383
+ error_messages.append(f"LangChain-Gemini error: {str(gemini_error)}")
384
+ logger.error(f"LangChain-Gemini failed: {str(gemini_error)}")
385
+
386
+ # --- 2. Fallback Attempt: LangChain Groq ---
387
+ try:
388
+ lang_groq_answer = await asyncio.to_thread(
389
+ langchain_csv_chat, csv_url, updated_query, False
390
+ )
391
+ logger.info(f"LangChain-Groq answer: {lang_groq_answer}")
392
+
393
+ if lang_groq_answer and is_valid_response(lang_groq_answer):
394
+ return {"answer": jsonable_encoder(lang_groq_answer)}
395
+
396
+ error_messages.append("LangChain-Groq response not usable")
397
+ except Exception as lang_groq_error:
398
+ error_messages.append(f"LangChain-Groq error: {str(lang_groq_error)}")
399
+ logger.error(f"LangChain-Groq failed: {str(lang_groq_error)}")
400
+
401
+ # --- Final Fallback when all attempts fail ---
402
+ logger.error(f"All attempts failed. Errors: {'; '.join(error_messages)}")
403
+ return {"answer": "Sorry, I couldn't process your request with the available data."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
404
 
405
+ def is_valid_response(response) -> bool:
406
+ """Check if the response is valid and not empty."""
407
+ if not response:
408
+ return False
409
+ if isinstance(response, str) and response.strip() == "":
410
+ return False
411
+ if isinstance(response, dict) and not response.get("answer"):
412
+ return False
413
+ return True
414
 
415
 
416
 
417
 
418
+ ####################################### Orchestrator Function for Chart #######################################
419
 
420
 
421
+ async def csv_chart(csv_url: str, query: str, chat_id: str) -> Dict[str, str]:
422
  """
423
  Generate a chart based on the provided CSV URL and query.
424
+ Prioritizes LangChain-Gemini first, then falls back to LangChain-Groq.
425
 
426
  Parameters:
427
+ - csv_url (str): URL of the CSV file
428
+ - query (str): Query for generating the chart
429
+ - chat_id (str): Chat session ID for file storage
430
 
431
  Returns:
432
+ - dict: Either {"image_url": "url"} on success or {"error": "message"} on failure
 
 
433
 
434
  Example:
435
  - csv_url: "https://example.com/data.csv"
436
+ - query: "Show sales trends as line chart"
437
  Returns:
438
  - dict: {"image_url": "https://storage.example.com/chart_uuid.png"}
439
  """
440
+ error_messages = []
441
 
442
+ async def upload_and_return(image_path: str) -> Dict[str, str]:
443
+ """Handle image upload and return public URL"""
444
  unique_name = f'{uuid.uuid4()}.png'
445
  public_url = await upload_file_to_supabase(image_path, unique_name, chat_id)
446
+ logger.info(f"Uploaded chart to: {public_url}")
447
+ try:
448
+ os.remove(image_path)
449
+ except OSError as e:
450
+ logger.warning(f"Could not delete temp file {image_path}: {str(e)}")
451
  return {"image_url": public_url}
452
 
453
+ # --- 1. First Attempt: LangChain Gemini ---
454
  try:
455
+ gemini_result = await asyncio.to_thread(
456
+ langchain_gemini_csv_handler, csv_url, query, True
457
+ )
458
+ logger.info(f"LangChain-Gemini chart result: {gemini_result}")
459
+
460
+ if gemini_result:
461
+ # Handle string or list response
462
+ if isinstance(gemini_result, str):
463
+ clean_path = gemini_result.strip()
464
+ if os.path.exists(clean_path):
465
+ return await upload_and_return(clean_path)
 
 
 
 
 
 
 
 
 
 
 
466
 
467
+ if isinstance(gemini_result, list) and gemini_result:
468
+ first_path = gemini_result[0]
469
+ if os.path.exists(first_path):
470
+ return await upload_and_return(first_path)
471
+
472
+ error_messages.append("LangChain-Gemini returned invalid result")
473
+ except Exception as gemini_error:
474
+ error_messages.append(f"LangChain-Gemini error: {str(gemini_error)}")
475
+ logger.error(f"Gemini chart failed: {str(gemini_error)}")
476
 
477
+ # --- 2. Fallback Attempt: LangChain Groq ---
478
+ try:
479
+ lc_groq_paths = await asyncio.to_thread(
480
+ langchain_csv_chart, csv_url, query, True
481
+ )
482
+ logger.info(f"LangChain-Groq chart result: {lc_groq_paths}")
483
+
484
+ if lc_groq_paths:
485
+ if isinstance(lc_groq_paths, list) and lc_groq_paths:
486
+ first_path = lc_groq_paths[0]
487
+ if os.path.exists(first_path):
488
+ return await upload_and_return(first_path)
489
+
490
+ if isinstance(lc_groq_paths, str) and os.path.exists(lc_groq_paths):
491
+ return await upload_and_return(lc_groq_paths)
492
+
493
+ error_messages.append("LangChain-Groq returned invalid result")
494
+ except Exception as lc_groq_error:
495
+ error_messages.append(f"LangChain-Groq error: {str(lc_groq_error)}")
496
+ logger.error(f"Groq chart failed: {str(lc_groq_error)}")
497
+
498
+ # --- Final Error Handling ---
499
+ logger.error(f"All chart generation failed. Errors: {'; '.join(error_messages)}")
500
+ return {"error": "Could not generate chart from the provided data"}