|
import os |
|
import threading |
|
import uuid |
|
from dotenv import load_dotenv |
|
from langchain_groq import ChatGroq |
|
import pandas as pd |
|
from pandasai import SmartDataframe |
|
import numpy as np |
|
import logging |
|
from csv_service import clean_data |
|
from util_service import handle_out_of_range_float |
|
|
|
load_dotenv() |
|
|
|
|
|
current_groq_key_index = 0 |
|
current_groq_key_lock = threading.Lock() |
|
|
|
|
|
groq_api_keys = os.getenv("GROQ_API_KEYS").split(",") |
|
model_name = os.getenv("GROQ_LLM_MODEL") |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def groq_chat(csv_url: str, question: str): |
|
global current_groq_key_index, current_groq_key_lock |
|
|
|
while True: |
|
with current_groq_key_lock: |
|
if current_groq_key_index >= len(groq_api_keys): |
|
return {"error": "All API keys exhausted."} |
|
current_api_key = groq_api_keys[current_groq_key_index] |
|
|
|
try: |
|
|
|
cache_db_path = "/workspace/cache/cache_db_0.11.db" |
|
if os.path.exists(cache_db_path): |
|
try: |
|
os.remove(cache_db_path) |
|
except Exception as e: |
|
logger.info(f"Error deleting cache DB file: {e}") |
|
|
|
data = clean_data(csv_url) |
|
llm = ChatGroq(model=model_name, api_key=current_api_key) |
|
|
|
chart_filename = f"chart_{uuid.uuid4()}.png" |
|
chart_path = os.path.join("generated_charts", chart_filename) |
|
|
|
|
|
df = SmartDataframe( |
|
data, |
|
config={ |
|
'llm': llm, |
|
'save_charts': True, |
|
'open_charts': False, |
|
'save_charts_path': os.path.dirname(chart_path), |
|
'custom_chart_filename': chart_filename |
|
} |
|
) |
|
|
|
answer = df.chat(question) |
|
|
|
|
|
if isinstance(answer, pd.DataFrame): |
|
processed = answer.apply(handle_out_of_range_float).to_dict(orient="records") |
|
elif isinstance(answer, pd.Series): |
|
processed = answer.apply(handle_out_of_range_float).to_dict() |
|
elif isinstance(answer, list): |
|
processed = [handle_out_of_range_float(item) for item in answer] |
|
elif isinstance(answer, dict): |
|
processed = {k: handle_out_of_range_float(v) for k, v in answer.items()} |
|
else: |
|
processed = {"answer": str(handle_out_of_range_float(answer))} |
|
|
|
return processed |
|
|
|
except Exception as e: |
|
error_message = str(e) |
|
if "429" in error_message: |
|
with current_groq_key_lock: |
|
current_groq_key_index += 1 |
|
if current_groq_key_index >= len(groq_api_keys): |
|
logger.info("All API keys exhausted.") |
|
return None |
|
else: |
|
logger.info(f"Error with API key index {current_groq_key_index}: {error_message}") |
|
return None |