Spaces:
Running
Running
| from flask import Flask, request, send_file, abort, jsonify, url_for, render_template, Response | |
| from flask_cors import CORS | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer, util | |
| import torch | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Tuple, Optional, Any | |
| from collections import deque | |
| import os | |
| import logging | |
| import atexit | |
| from threading import Thread, Lock | |
| import time | |
| from datetime import datetime | |
| from uuid import uuid4 as generate_uuid | |
| import csv as csv_lib | |
| import functools | |
| import json | |
| import re | |
| import subprocess | |
| import sys | |
| import sqlite3 | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file AT THE VERY TOP | |
| load_dotenv() | |
| # MODIFIED: Import from the new refactored modules | |
| from llm_fallback import get_groq_fallback_response | |
| from rag_system import initialize_and_get_rag_system | |
| from rag_components import KnowledgeRAG | |
| from utils import download_and_unzip_gdrive_file, download_gdrive_file # MODIFIED: Import the new utility | |
| from config import ( | |
| RAG_SOURCES_DIR, | |
| RAG_STORAGE_PARENT_DIR, | |
| RAG_CHUNKED_SOURCES_FILENAME, | |
| GDRIVE_INDEX_ENABLED, | |
| GDRIVE_INDEX_ID_OR_URL, | |
| GDRIVE_USERS_CSV_ENABLED, # NEW | |
| GDRIVE_USERS_CSV_ID_OR_URL # NEW | |
| ) | |
| # Setup logging (remains global for the app) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app_hybrid_rag.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) # Main app logger | |
| # --- Application Constants and Configuration --- | |
| # MODIFIED: These are now fallbacks if users.csv is not found | |
| ADMIN_USERNAME = os.getenv('FLASK_ADMIN_USERNAME', 'admin') | |
| ADMIN_PASSWORD = os.getenv('FLASK_ADMIN_PASSWORD', 'fleetblox') | |
| REPORT_PASSWORD = os.getenv('FLASK_REPORT_PASSWORD', 'e$$!@2213r423er31') | |
| FLASK_APP_HOST = os.getenv("FLASK_HOST", "0.0.0.0") | |
| FLASK_APP_PORT = int(os.getenv("FLASK_PORT", "5002")) | |
| FLASK_DEBUG_MODE = os.getenv("FLASK_DEBUG", "False").lower() == "true" | |
| _APP_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| TEXT_EXTRACTIONS_DIR = os.path.join(_APP_BASE_DIR, 'text_extractions') | |
| RELATED_QUESTIONS_TO_SHOW = 10 | |
| QUESTIONS_TO_SEND_TO_GROQ_QA = 3 | |
| DB_QA_CONFIDENCE = 85 | |
| GENERAL_QA_CONFIDENCE = 85 | |
| HIGH_CONFIDENCE_THRESHOLD = 90 | |
| CHAT_HISTORY_TO_SEND = 5 | |
| CHAT_LOG_FILE = os.path.join(_APP_BASE_DIR, 'chat_history.csv') | |
| # MODIFIED: Global variable for user data | |
| user_df = None | |
| logger.info(f"APP LAUNCH: Admin username loaded as '{ADMIN_USERNAME}' (fallback)") | |
| # --- NEW: User loading from users.csv --- | |
| def load_users_from_csv(): | |
| global user_df | |
| # CHANGED: users.csv should be in assets folder | |
| assets_folder = os.path.join(_APP_BASE_DIR, 'assets') | |
| os.makedirs(assets_folder, exist_ok=True) # Ensure assets folder exists | |
| users_csv_path = os.path.join(assets_folder, 'users.csv') | |
| try: | |
| if os.path.exists(users_csv_path): | |
| user_df = pd.read_csv(users_csv_path) | |
| # Ensure required columns are present | |
| required_cols = ['sl', 'name', 'email', 'password', 'role'] | |
| if not all(col in user_df.columns for col in required_cols): | |
| logger.error(f"users.csv is missing one of the required columns: {required_cols}") | |
| user_df = None | |
| return | |
| user_df['email'] = user_df['email'].str.lower().str.strip() | |
| logger.info(f"Successfully loaded {len(user_df)} users from {users_csv_path}") | |
| else: | |
| logger.warning(f"users.csv not found at '{users_csv_path}'. Admin auth will use fallback .env credentials.") | |
| user_df = None | |
| except Exception as e: | |
| logger.error(f"Failed to load or process users.csv: {e}", exc_info=True) | |
| user_df = None | |
| # --- inside the ChatHistoryManager class --- | |
| def clear_history(self, session_id: str): | |
| """ | |
| Deletes the entire chat history for a given session_id. | |
| """ | |
| with self.lock: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM chat_histories WHERE session_id = ?", (session_id,)) | |
| conn.commit() | |
| logger.info(f"Successfully cleared history for session: {session_id}") | |
| except Exception as e: | |
| logger.error(f"Error clearing history for session {session_id}: {e}", exc_info=True) | |
| # --- NEW: Persistent Chat History Management using SQLite --- | |
| class ChatHistoryManager: | |
| def __init__(self, db_path): | |
| self.db_path = db_path | |
| self.lock = Lock() | |
| self._create_table() | |
| logger.info(f"SQLite chat history manager initialized at: {self.db_path}") | |
| def _get_connection(self): | |
| # The timeout parameter is crucial to prevent "database is locked" errors under load. | |
| conn = sqlite3.connect(self.db_path, timeout=10) | |
| return conn | |
| def _create_table(self): | |
| with self.lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Use TEXT to store the history as a JSON string | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS chat_histories ( | |
| session_id TEXT PRIMARY KEY, | |
| history TEXT NOT NULL | |
| ) | |
| """) | |
| conn.commit() | |
| def get_history(self, session_id: str, limit: int = 10) -> list: | |
| """ | |
| Retrieves history from the DB and returns it as a list of dictionaries. | |
| """ | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT history FROM chat_histories WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| # Deserialize the JSON string back into a Python list | |
| history_list = json.loads(row[0]) | |
| # Return the last 'limit' * 2 items (user + assistant messages) | |
| return history_list[-(limit * 2):] | |
| else: | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error fetching history for session {session_id}: {e}", exc_info=True) | |
| return [] | |
| def update_history(self, session_id: str, query: str, answer: str): | |
| with self.lock: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # First, get the current history | |
| cursor.execute("SELECT history FROM chat_histories WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| history = json.loads(row[0]) if row else [] | |
| # Append the new conversation turn | |
| history.append({'role': 'user', 'content': query}) | |
| history.append({'role': 'assistant', 'content': answer}) | |
| # Serialize the updated list back to a JSON string | |
| updated_history_json = json.dumps(history) | |
| # Use INSERT OR REPLACE to either create a new row or update the existing one | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO chat_histories (session_id, history) | |
| VALUES (?, ?) | |
| """, (session_id, updated_history_json)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Error updating history for session {session_id}: {e}", exc_info=True) | |
| # --- EmbeddingManager for CSV QA (remains in app.py) --- | |
| class QAEmbeddings: | |
| questions: List[str] | |
| question_map: List[int] | |
| embeddings: torch.Tensor | |
| df_qa: pd.DataFrame | |
| original_questions: List[str] | |
| class EmbeddingManager: | |
| def __init__(self, model_name='all-MiniLM-L6-v2'): | |
| self.model = SentenceTransformer(model_name) | |
| self.embeddings = { | |
| 'general': None, | |
| 'personal': None, | |
| 'greetings': None | |
| } | |
| logger.info(f"EmbeddingManager initialized with model: {model_name}") | |
| def _process_questions(self, df: pd.DataFrame) -> Tuple[List[str], List[int], List[str]]: | |
| questions = [] | |
| question_map = [] | |
| original_questions = [] | |
| if 'Question' not in df.columns: | |
| logger.warning(f"DataFrame for EmbeddingManager is missing 'Question' column. Cannot process questions from it.") | |
| return questions, question_map, original_questions | |
| for idx, question_text_raw in enumerate(df['Question']): | |
| if pd.isna(question_text_raw): | |
| continue | |
| question_text_cleaned = str(question_text_raw).strip() | |
| if not question_text_cleaned or question_text_cleaned.lower() == "nan": | |
| continue | |
| questions.append(question_text_cleaned) | |
| question_map.append(idx) | |
| original_questions.append(question_text_cleaned) | |
| return questions, question_map, original_questions | |
| def update_embeddings(self, general_qa: pd.DataFrame, personal_qa: pd.DataFrame, greetings_qa: pd.DataFrame): | |
| gen_questions, gen_question_map, gen_original_questions = self._process_questions(general_qa) | |
| gen_embeddings = self.model.encode(gen_questions, convert_to_tensor=True, show_progress_bar=False) if gen_questions else None | |
| pers_questions, pers_question_map, pers_original_questions = self._process_questions(personal_qa) | |
| pers_embeddings = self.model.encode(pers_questions, convert_to_tensor=True, show_progress_bar=False) if pers_questions else None | |
| greet_questions, greet_question_map, greet_original_questions = self._process_questions(greetings_qa) | |
| greet_embeddings = self.model.encode(greet_questions, convert_to_tensor=True, show_progress_bar=False) if greet_questions else None | |
| self.embeddings['general'] = QAEmbeddings( | |
| questions=gen_questions, question_map=gen_question_map, embeddings=gen_embeddings, | |
| df_qa=general_qa, original_questions=gen_original_questions | |
| ) | |
| self.embeddings['personal'] = QAEmbeddings( | |
| questions=pers_questions, question_map=pers_question_map, embeddings=pers_embeddings, | |
| df_qa=personal_qa, original_questions=pers_original_questions | |
| ) | |
| self.embeddings['greetings'] = QAEmbeddings( | |
| questions=greet_questions, question_map=greet_question_map, embeddings=greet_embeddings, | |
| df_qa=greetings_qa, original_questions=greet_original_questions | |
| ) | |
| logger.info("CSV QA embeddings updated in EmbeddingManager.") | |
| def find_best_answers(self, user_query: str, qa_type: str, top_n: int = 5) -> Tuple[List[float], List[str], List[str], List[str], List[int]]: | |
| qa_data = self.embeddings[qa_type] | |
| if qa_data is None or qa_data.embeddings is None or len(qa_data.embeddings) == 0: | |
| return [], [], [], [], [] | |
| query_embedding_tensor = self.model.encode([user_query], convert_to_tensor=True, show_progress_bar=False) | |
| if not isinstance(qa_data.embeddings, torch.Tensor): | |
| qa_data.embeddings = torch.tensor(qa_data.embeddings) # Safeguard | |
| cos_scores = util.cos_sim(query_embedding_tensor, qa_data.embeddings)[0] | |
| top_k = min(top_n, len(cos_scores)) | |
| if top_k == 0: | |
| return [], [], [], [], [] | |
| top_scores_tensor, indices_tensor = torch.topk(cos_scores, k=top_k) | |
| top_confidences = [score.item() * 100 for score in top_scores_tensor] | |
| top_indices_mapped = [] | |
| top_questions = [] | |
| for idx_tensor in indices_tensor: | |
| item_idx = idx_tensor.item() | |
| if item_idx < len(qa_data.question_map) and item_idx < len(qa_data.original_questions): | |
| original_df_idx = qa_data.question_map[item_idx] | |
| if original_df_idx < len(qa_data.df_qa): | |
| top_indices_mapped.append(original_df_idx) | |
| top_questions.append(qa_data.original_questions[item_idx]) | |
| else: | |
| logger.warning(f"Index out of bounds: original_df_idx {original_df_idx} for df_qa length {len(qa_data.df_qa)}") | |
| else: | |
| logger.warning(f"Index out of bounds: item_idx {item_idx} for question_map/original_questions") | |
| valid_count = len(top_indices_mapped) | |
| top_confidences = top_confidences[:valid_count] | |
| top_questions = top_questions[:valid_count] | |
| top_answers = [str(qa_data.df_qa['Answer'].iloc[i]) for i in top_indices_mapped] | |
| top_images = [str(qa_data.df_qa['Image'].iloc[i]) if 'Image' in qa_data.df_qa.columns and pd.notna(qa_data.df_qa['Image'].iloc[i]) else None for i in top_indices_mapped] | |
| return top_confidences, top_questions, top_answers, top_images, top_indices_mapped | |
| # --- DatabaseMonitor for personal_qa.csv placeholders (remains in app.py) --- | |
| class DatabaseMonitor: | |
| def __init__(self, database_path): | |
| self.logger = logging.getLogger(__name__ + ".DatabaseMonitor") | |
| self.database_path = database_path | |
| self.last_modified = None | |
| self.last_size = None | |
| self.df = None | |
| self.lock = Lock() | |
| self.running = True | |
| self._load_database() | |
| self.monitor_thread = Thread(target=self._monitor_database, daemon=True) | |
| self.monitor_thread.start() | |
| self.logger.info(f"DatabaseMonitor initialized for: {database_path}") | |
| def _load_database(self): | |
| try: | |
| if not os.path.exists(self.database_path): | |
| self.logger.warning(f"Personal data file not found: {self.database_path}.") | |
| self.df = None | |
| return | |
| with self.lock: | |
| self.df = pd.read_csv(self.database_path, encoding='cp1252') | |
| self.last_modified = os.path.getmtime(self.database_path) | |
| self.last_size = os.path.getsize(self.database_path) | |
| self.logger.info(f"Personal data file reloaded: {self.database_path}") | |
| except Exception as e: | |
| self.logger.error(f"Error loading personal data file '{self.database_path}': {e}", exc_info=True) | |
| self.df = None | |
| def _monitor_database(self): | |
| while self.running: | |
| try: | |
| if not os.path.exists(self.database_path): | |
| if self.df is not None: | |
| self.logger.warning(f"Personal data file disappeared: {self.database_path}") | |
| self.df = None; self.last_modified = None; self.last_size = None | |
| time.sleep(5) | |
| continue | |
| current_modified = os.path.getmtime(self.database_path); current_size = os.path.getsize(self.database_path) | |
| if (self.last_modified is None or current_modified != self.last_modified or | |
| self.last_size is None or current_size != self.last_size): | |
| self.logger.info("Personal data file change detected.") | |
| self._load_database() | |
| time.sleep(1) | |
| except Exception as e: | |
| self.logger.error(f"Error monitoring personal data file: {e}", exc_info=True) | |
| time.sleep(5) | |
| def get_data(self, user_id): | |
| with self.lock: | |
| if self.df is not None and user_id: | |
| try: | |
| # MODIFIED: The user_id from the frontend is the 'sl' column | |
| target_id_col = 'sl' | |
| if target_id_col not in self.df.columns: | |
| self.logger.warning(f"'{target_id_col}' column not found in personal_data.csv (database.csv)") | |
| return None | |
| # Ensure the user_id is of the same type as the column | |
| id_col_type = self.df[target_id_col].dtype | |
| try: | |
| typed_user_id = pd.Series(user_id).astype(id_col_type).iloc[0] | |
| except (ValueError, TypeError): | |
| self.logger.warning(f"Could not convert user_id '{user_id}' to the required type {id_col_type}") | |
| return None | |
| user_data = self.df[self.df[target_id_col] == typed_user_id] | |
| if not user_data.empty: return user_data.iloc[0].to_dict() | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving data for user_id {user_id}: {e}", exc_info=True) | |
| return None | |
| def stop(self): | |
| self.running = False | |
| if hasattr(self, 'monitor_thread') and self.monitor_thread.is_alive(): | |
| self.monitor_thread.join(timeout=5) | |
| self.logger.info("DatabaseMonitor stopped.") | |
| # --- Flask App Initialization --- | |
| app = Flask(__name__, | |
| static_folder='static', | |
| static_url_path='/static', | |
| template_folder='templates') | |
| CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True) | |
| # Add this logging to debug requests | |
| def log_request_info(): | |
| logger.info(f'Request: {request.method} {request.path}') | |
| if request.method == 'POST': | |
| logger.info(f'Request from: {request.remote_addr}') | |
| # --- Initialize Managers --- | |
| embedding_manager = EmbeddingManager() | |
| history_manager = ChatHistoryManager('chat_history.db') | |
| database_csv_path = os.path.join(RAG_SOURCES_DIR, 'database.csv') | |
| personal_data_monitor = DatabaseMonitor(database_csv_path) | |
| # --- Helper Functions (App specific) --- | |
| def normalize_text(text): | |
| if isinstance(text, str): | |
| replacements = { | |
| '\x91': "'", '\x92': "'", '\x93': '"', '\x94': '"', | |
| '\x96': '-', '\x97': '-', '\x85': '...', '\x95': '-', | |
| '"': '"', '"': '"', '‘': "'", '’': "'", | |
| '–': '-', '—': '-', '…': '...', '•': '-', | |
| } | |
| for old, new in replacements.items(): text = text.replace(old, new) | |
| return text | |
| def require_admin_auth(f): | |
| def decorated(*args, **kwargs): | |
| auth = request.authorization | |
| if not auth: | |
| return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) | |
| # MODIFIED: Authenticate against users.csv | |
| if user_df is not None: | |
| user_email = auth.username.lower().strip() | |
| user_record = user_df[user_df['email'] == user_email] | |
| if not user_record.empty: | |
| user_data = user_record.iloc[0] | |
| # Important: Compare password as string | |
| if str(user_data['password']) == auth.password and user_data['role'] == 'admin': | |
| return f(*args, **kwargs) # Success | |
| # Fallback to .env credentials if users.csv failed or user not found | |
| elif auth.username == ADMIN_USERNAME and auth.password == ADMIN_PASSWORD: | |
| logger.warning("Admin authenticated using fallback .env credentials.") | |
| return f(*args, **kwargs) | |
| return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) | |
| return decorated | |
| def require_report_auth(f): | |
| def decorated(*args, **kwargs): | |
| auth = request.authorization | |
| if not auth or auth.username != ADMIN_USERNAME or auth.password != REPORT_PASSWORD: | |
| return Response('Report auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Report Login Required"'}) | |
| return f(*args, **kwargs) | |
| return decorated | |
| def initialize_chat_log(): | |
| if not os.path.exists(CHAT_LOG_FILE): | |
| with open(CHAT_LOG_FILE, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv_lib.writer(f) | |
| writer.writerow(['sl', 'date_time', 'session_id', 'user_id', 'query', 'answer']) | |
| def store_chat_history(sid: str, uid: Optional[str], query: str, resp: Dict[str, Any]): | |
| """ | |
| Stores chat history in both the persistent SQLite DB and the CSV log file. | |
| """ | |
| try: | |
| answer = str(resp.get('answer', '')) | |
| history_manager.update_history(sid, query, answer) | |
| initialize_chat_log() | |
| next_sl = 1 | |
| try: | |
| if os.path.exists(CHAT_LOG_FILE) and os.path.getsize(CHAT_LOG_FILE) > 0: | |
| df_log = pd.read_csv(CHAT_LOG_FILE, on_bad_lines='skip') | |
| if not df_log.empty and 'sl' in df_log.columns and pd.api.types.is_numeric_dtype(df_log['sl'].dropna()): | |
| if not df_log['sl'].dropna().empty: | |
| next_sl = int(df_log['sl'].dropna().max()) + 1 | |
| except Exception as e: | |
| logger.error(f"Error reading SL from {CHAT_LOG_FILE}: {e}", exc_info=True) | |
| with open(CHAT_LOG_FILE, 'a', newline='', encoding='utf-8') as f: | |
| csv_lib.writer(f).writerow([next_sl, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), sid, uid or "N/A", query, answer]) | |
| except Exception as e: | |
| logger.error(f"Error in store_chat_history for session {sid}: {e}", exc_info=True) | |
| def get_formatted_chat_history(session_id: str) -> List[Dict[str, str]]: | |
| """ | |
| Retrieves the chat history for a session from the persistent SQLite database. | |
| """ | |
| return history_manager.get_history(session_id, limit=CHAT_HISTORY_TO_SEND) | |
| def get_qa_context_for_groq(all_questions: List[Dict]) -> str: | |
| valid_qa_pairs = [] | |
| non_greeting_questions = [q for q in all_questions if q.get('source_type') != 'greetings'] | |
| sorted_questions = sorted(non_greeting_questions, key=lambda x: x.get('confidence', 0), reverse=True) | |
| for qa in sorted_questions[:QUESTIONS_TO_SEND_TO_GROQ_QA]: | |
| answer = qa.get('answer') | |
| if (not pd.isna(answer) and isinstance(answer, str) and answer.strip() and | |
| "not available" not in answer.lower()): | |
| valid_qa_pairs.append(f"Q: {qa.get('question')}\nA: {answer}") | |
| return '\n'.join(valid_qa_pairs) | |
| def replace_placeholders_in_answer(answer, db_data): | |
| if pd.isna(answer) or str(answer).strip() == '': | |
| return "Sorry, this information is not available yet" | |
| answer_str = str(answer) | |
| placeholders = re.findall(r'\{(\w+)\}', answer_str) | |
| if not placeholders: return answer_str | |
| if db_data is None: | |
| return "To get this specific information, please ensure you are logged in or have provided your user ID." | |
| missing_count = 0; replacements_made = 0 | |
| for placeholder in set(placeholders): | |
| key = placeholder.strip() | |
| value = db_data.get(key) | |
| if value is None or (isinstance(value, float) and pd.isna(value)) or str(value).strip() == '': | |
| answer_str = answer_str.replace(f'{{{key}}}', "not available") | |
| missing_count += 1 | |
| else: | |
| answer_str = answer_str.replace(f'{{{key}}}', str(value)) | |
| replacements_made +=1 | |
| if missing_count == len(placeholders) and len(placeholders) > 0 : | |
| return "Sorry, some specific details for you are not available at the moment." | |
| if "not available" in answer_str.lower() and replacements_made < len(placeholders): | |
| if answer_str == "not available" and len(placeholders) == 1: | |
| return "Sorry, this information is not available yet." | |
| if re.search(r'\{(\w+)\}', answer_str): | |
| logger.warning(f"Unresolved placeholders remain after replacement attempt: {answer_str}") | |
| answer_str = re.sub(r'\{(\w+)\}', "a specific detail", answer_str) | |
| if "a specific detail" in answer_str and not "Sorry" in answer_str: | |
| return "Sorry, I couldn't retrieve all the specific details for this answer. " + answer_str | |
| return "Sorry, I couldn't retrieve all the specific details for this answer. Some information has been generalized." | |
| return answer_str | |
| # --- NEW User Login Endpoint --- | |
| def user_login(): | |
| if user_df is None: | |
| return jsonify({"error": "User authentication is not available."}), 503 | |
| data = request.json | |
| email = data.get('email', '').lower().strip() | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({"error": "Email and password are required."}), 400 | |
| user_record = user_df[user_df['email'] == email] | |
| if not user_record.empty: | |
| user_data = user_record.iloc[0] | |
| # Compare password as string to avoid type issues | |
| if str(user_data['password']) == str(password): | |
| # Return user data but exclude password | |
| response_data = user_data.to_dict() | |
| del response_data['password'] | |
| return jsonify(response_data), 200 | |
| return jsonify({"error": "Invalid credentials"}), 401 | |
| # --- Main Chat Endpoint --- | |
| def get_answer_hybrid(): | |
| global rag_system | |
| data = request.json | |
| user_query = data.get('query', '') | |
| user_id = data.get('user_id') | |
| session_id = data.get('session_id') | |
| if not user_query: return jsonify({'error': 'No query provided'}), 400 | |
| if not session_id: return jsonify({'error': 'session_id is required'}), 400 | |
| personal_db_data = personal_data_monitor.get_data(user_id) if user_id else None | |
| conf_greet, q_greet, a_greet, img_greet, _ = embedding_manager.find_best_answers(user_query, 'greetings', top_n=1) | |
| conf_pers, q_pers, a_pers, img_pers, _ = embedding_manager.find_best_answers(user_query, 'personal', top_n=RELATED_QUESTIONS_TO_SHOW) | |
| conf_gen, q_gen, a_gen, img_gen, _ = embedding_manager.find_best_answers(user_query, 'general', top_n=RELATED_QUESTIONS_TO_SHOW) | |
| all_csv_candidate_answers = [] | |
| if conf_greet and conf_greet[0] >= HIGH_CONFIDENCE_THRESHOLD: | |
| all_csv_candidate_answers.append({'question': q_greet[0], 'answer': a_greet[0], 'image': img_greet[0] if img_greet else None, 'confidence': conf_greet[0], 'source_type': 'greetings'}) | |
| if conf_pers: | |
| for c, q, a, img in zip(conf_pers, q_pers, a_pers, img_pers): | |
| processed_a = replace_placeholders_in_answer(a, personal_db_data) | |
| if not ("Sorry, this information is not available yet" in processed_a or "To get this specific information" in processed_a): | |
| all_csv_candidate_answers.append({'question': q, 'answer': processed_a, 'image': img, 'confidence': c, 'source_type': 'personal'}) | |
| if conf_gen: | |
| for c, q, a, img in zip(conf_gen, q_gen, a_gen, img_gen): | |
| if not (pd.isna(a) or str(a).strip() == '' or str(a).lower() == 'nan'): | |
| all_csv_candidate_answers.append({'question': q, 'answer': str(a), 'image': img, 'confidence': c, 'source_type': 'general'}) | |
| all_csv_candidate_answers.sort(key=lambda x: x['confidence'], reverse=True) | |
| related_questions_list = [] | |
| if all_csv_candidate_answers: | |
| best_csv_match = all_csv_candidate_answers[0] | |
| is_direct_csv_answer = False | |
| source_name = "" | |
| if best_csv_match['source_type'] == 'greetings' and best_csv_match['confidence'] >= HIGH_CONFIDENCE_THRESHOLD: | |
| source_name = 'greetings_qa'; is_direct_csv_answer = True | |
| elif best_csv_match['source_type'] == 'personal' and best_csv_match['confidence'] >= DB_QA_CONFIDENCE: | |
| source_name = 'personal_qa'; is_direct_csv_answer = True | |
| elif best_csv_match['source_type'] == 'general' and best_csv_match['confidence'] >= GENERAL_QA_CONFIDENCE: | |
| source_name = 'general_qa'; is_direct_csv_answer = True | |
| if is_direct_csv_answer: | |
| response_data = {'query': user_query, 'answer': best_csv_match['answer'], 'confidence': best_csv_match['confidence'], 'original_question': best_csv_match['question'], 'source': source_name} | |
| if best_csv_match['image']: response_data['image_url'] = url_for('static', filename=best_csv_match['image'], _external=True) | |
| for i, cand_q in enumerate(all_csv_candidate_answers): | |
| if i == 0: continue | |
| if cand_q['source_type'] != 'greetings': | |
| related_questions_list.append({'question': cand_q['question'], 'answer': cand_q['answer'], 'match': cand_q['confidence']}) | |
| if len(related_questions_list) >= RELATED_QUESTIONS_TO_SHOW: break | |
| response_data['related_questions'] = related_questions_list | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return jsonify(response_data) | |
| if rag_system and rag_system.retriever: | |
| try: | |
| logger.info(f"Attempting FAISS RAG query for: {user_query[:50]}...") | |
| rag_result = rag_system.query(user_query) | |
| rag_answer = rag_result.get("answer") | |
| rag_sources_details = rag_result.get("cited_source_details") | |
| if rag_answer and \ | |
| "based on the provided excerpts, i cannot answer" not in rag_answer.lower() and \ | |
| "based on the available documents, i could not find relevant information" not in rag_answer.lower() and \ | |
| "error:" not in rag_answer.lower() and \ | |
| "i could not find relevant information" not in rag_answer.lower() and \ | |
| "please provide a valid question" not in rag_answer.lower(): | |
| logger.info(f"FAISS RAG system provided an answer: {rag_answer[:100]}...") | |
| if not related_questions_list: | |
| for cand_q in all_csv_candidate_answers: | |
| if cand_q['source_type'] != 'greetings': | |
| related_questions_list.append({'question': cand_q['question'], 'answer': cand_q['answer'], 'match': cand_q['confidence']}) | |
| if len(related_questions_list) >= RELATED_QUESTIONS_TO_SHOW: break | |
| response_data = { | |
| 'query': user_query, | |
| 'answer': rag_answer, | |
| 'confidence': 85, | |
| 'source': 'document_rag_faiss', | |
| 'related_questions': related_questions_list, | |
| 'document_sources_details': rag_sources_details | |
| } | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return jsonify(response_data) | |
| else: | |
| logger.info(f"FAISS RAG system could not answer or returned an error/no info/invalid query. RAG Answer: '{rag_answer}'. Proceeding to general Groq.") | |
| except Exception as e: | |
| logger.error(f"Error during FAISS RAG system query: {e}", exc_info=True) | |
| logger.info(f"No high-confidence CSV or FAISS RAG answer for '{user_query[:50]}...'. Proceeding to general Groq fallback.") | |
| qa_context_for_groq_str = get_qa_context_for_groq(all_csv_candidate_answers) | |
| chat_history_messages_for_groq = get_formatted_chat_history(session_id) | |
| groq_context = { | |
| 'current_query': user_query, | |
| 'chat_history': chat_history_messages_for_groq, | |
| 'qa_related_info': qa_context_for_groq_str, | |
| 'document_related_info': "" | |
| } | |
| try: | |
| groq_answer = get_groq_fallback_response(groq_context) | |
| if groq_answer and \ | |
| "Sorry, this information is not available yet" not in groq_answer and \ | |
| "I'm currently experiencing a technical difficulty" not in groq_answer and \ | |
| "I specialize in topics related to AMO Green Energy." not in groq_answer: | |
| if not related_questions_list: | |
| for cand_q in all_csv_candidate_answers: | |
| if cand_q['source_type'] != 'greetings': | |
| related_questions_list.append({'question': cand_q['question'], 'answer': cand_q['answer'], 'match': cand_q['confidence']}) | |
| if len(related_questions_list) >= RELATED_QUESTIONS_TO_SHOW: break | |
| response_data = { | |
| 'query': user_query, 'answer': groq_answer, | |
| 'confidence': 75, | |
| 'source': 'groq_general_fallback', | |
| 'related_questions': related_questions_list, | |
| 'document_sources_details': [] | |
| } | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return jsonify(response_data) | |
| except Exception as e: | |
| logger.error(f"General Groq fallback pipeline error: {e}", exc_info=True) | |
| if not related_questions_list: | |
| for cand_q in all_csv_candidate_answers: | |
| if cand_q['source_type'] != 'greetings': | |
| related_questions_list.append({'question': cand_q['question'], 'answer': cand_q['answer'], 'match': cand_q['confidence']}) | |
| if len(related_questions_list) >= RELATED_QUESTIONS_TO_SHOW: break | |
| fallback_message = ( | |
| "For the most current and specific details on your query, particularly regarding product specifications or pricing, " | |
| "please contact AMO Green Energy Limited directly. Our team is ready to assist you.\n\n" | |
| "Contact Information:\n" | |
| "Email: [email protected]\n" | |
| "Phone: +880 1781-469951\n" | |
| "Website: ge-bd.com" | |
| ) | |
| response_data = { | |
| 'query': user_query, 'answer': fallback_message, 'confidence': 0, | |
| 'source': 'fallback', 'related_questions': related_questions_list | |
| } | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return jsonify(response_data) | |
| # --- Admin and Utility Routes --- | |
| def index_route(): | |
| template_to_render = 'chat-bot.html' | |
| # CHANGED: Check in templates folder | |
| template_path = os.path.join(app.root_path, 'templates', template_to_render) | |
| if not os.path.exists(template_path): | |
| logger.error(f"Template '{template_to_render}' not found at {template_path}") | |
| return f"Chatbot interface not found at {template_path}. Please ensure 'templates/chat-bot.html' exists.", 404 | |
| logger.info(f"Serving template: {template_to_render}") | |
| return render_template(template_to_render) | |
| def verify_admin_session(): | |
| """ | |
| Verifies if the current user (from frontend session) is an admin. | |
| No HTTP Basic Auth needed - uses the user data from frontend. | |
| """ | |
| data = request.json | |
| user_email = data.get('email', '').lower().strip() | |
| if not user_email: | |
| return jsonify({"is_admin": False, "error": "Email required"}), 400 | |
| if user_df is None: | |
| return jsonify({"is_admin": False, "error": "User data not available"}), 503 | |
| user_record = user_df[user_df['email'] == user_email] | |
| if not user_record.empty: | |
| user_data = user_record.iloc[0] | |
| is_admin = user_data['role'] == 'admin' | |
| return jsonify({"is_admin": is_admin}), 200 | |
| return jsonify({"is_admin": False}), 200 | |
| def admin_login(): | |
| """ | |
| This endpoint is solely for verifying admin credentials via the decorator. | |
| If credentials are valid, it returns 200 OK. | |
| If not, the decorator returns 401 Unauthorized. | |
| """ | |
| return jsonify({"status": "success", "message": "Authentication successful"}), 200 | |
| def get_faiss_rag_status(): | |
| global rag_system | |
| if not rag_system: | |
| return jsonify({"error": "FAISS RAG system not initialized."}), 500 | |
| try: | |
| status = { | |
| "status": "Initialized" if rag_system.retriever else "Initialized (Retriever not ready)", | |
| "index_storage_dir": rag_system.index_storage_dir, | |
| "embedding_model": rag_system.embedding_model_name, | |
| "groq_model": rag_system.groq_model_name, | |
| "retriever_k": rag_system.retriever.final_k if rag_system.retriever else "N/A", | |
| "processed_source_files": rag_system.processed_source_files, | |
| "index_type": "FAISS", | |
| "index_loaded_or_built": rag_system.vector_store is not None | |
| } | |
| if rag_system.vector_store and hasattr(rag_system.vector_store, 'index') and rag_system.vector_store.index: | |
| try: | |
| status["num_vectors_in_index"] = rag_system.vector_store.index.ntotal | |
| except Exception: | |
| status["num_vectors_in_index"] = "N/A (Could not get count)" | |
| else: | |
| status["num_vectors_in_index"] = "N/A (Vector store or index not available)" | |
| return jsonify(status) | |
| except Exception as e: | |
| logger.error(f"Error getting FAISS RAG status: {e}", exc_info=True) | |
| return jsonify({"error": str(e)}), 500 | |
| def rebuild_faiss_index_route(): | |
| global rag_system | |
| logger.info("Admin request to rebuild FAISS RAG index received. Starting two-step process.") | |
| data = request.json or {} | |
| source_dir_override = data.get('source_directory') | |
| source_dir_to_use = source_dir_override if source_dir_override else RAG_SOURCES_DIR | |
| if source_dir_override and not os.path.isdir(source_dir_override): | |
| return jsonify({"error": f"Custom source directory '{source_dir_override}' not found on the server."}), 400 | |
| logger.info(f"Using source directory: {source_dir_to_use}") | |
| logger.info("Step 1: Running chunker.py to pre-process source documents.") | |
| chunker_script_path = os.path.join(_APP_BASE_DIR, 'chunker.py') | |
| chunked_json_output_path = os.path.join(RAG_STORAGE_PARENT_DIR, RAG_CHUNKED_SOURCES_FILENAME) | |
| os.makedirs(TEXT_EXTRACTIONS_DIR, exist_ok=True) | |
| if not os.path.exists(chunker_script_path): | |
| logger.error(f"Chunker script not found at '{chunker_script_path}'. Aborting rebuild.") | |
| return jsonify({"error": f"chunker.py not found. Cannot proceed with rebuild."}), 500 | |
| chunk_size = os.getenv("RAG_CHUNK_SIZE", "1000") | |
| chunk_overlap = os.getenv("RAG_CHUNK_OVERLAP", "150") | |
| command = [ | |
| sys.executable, | |
| chunker_script_path, | |
| '--sources-dir', source_dir_to_use, | |
| '--output-file', chunked_json_output_path, | |
| '--text-output-dir', TEXT_EXTRACTIONS_DIR, | |
| '--chunk-size', chunk_size, | |
| '--chunk-overlap', chunk_overlap | |
| ] | |
| try: | |
| process = subprocess.run(command, capture_output=True, text=True, check=True) | |
| logger.info("Chunker script executed successfully.") | |
| logger.info(f"Chunker stdout:\n{process.stdout}") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Chunker script failed with exit code {e.returncode}.") | |
| logger.error(f"Chunker stderr:\n{e.stderr}") | |
| return jsonify({"error": "Step 1 (Chunking) failed.", "details": e.stderr}), 500 | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred while running the chunker script: {e}", exc_info=True) | |
| return jsonify({"error": f"An unexpected error occurred during the chunking step: {str(e)}"}), 500 | |
| logger.info("Step 2: Rebuilding FAISS index from the newly generated chunks.") | |
| try: | |
| new_rag_system_instance = initialize_and_get_rag_system(force_rebuild=True, source_dir_override=source_dir_override) | |
| if new_rag_system_instance and new_rag_system_instance.vector_store: | |
| rag_system = new_rag_system_instance | |
| logger.info("FAISS RAG index rebuild completed and new RAG system instance is active.") | |
| updated_status_response = get_faiss_rag_status() | |
| return jsonify({"message": "FAISS RAG index rebuild completed.", "status": updated_status_response.get_json()}), 200 | |
| else: | |
| logger.error("FAISS RAG index rebuild failed during the indexing phase.") | |
| return jsonify({"error": "Step 2 (Indexing) failed. Check logs."}), 500 | |
| except Exception as e: | |
| logger.error(f"Error during admin FAISS index rebuild (indexing phase): {e}", exc_info=True) | |
| return jsonify({"error": f"Failed to rebuild index during indexing phase: {str(e)}"}), 500 | |
| def update_faiss_index_route(): | |
| global rag_system | |
| logger.info("Admin request to update FAISS RAG index with new files received.") | |
| if not rag_system or not rag_system.vector_store: | |
| return jsonify({"error": "RAG system not initialized or index not loaded. Cannot perform update."}), 503 | |
| data = request.json or {} | |
| source_dir_override = data.get('source_directory') | |
| source_dir_to_use = source_dir_override if source_dir_override else RAG_SOURCES_DIR | |
| max_files_to_process = data.get('max_new_files') | |
| if source_dir_override and not os.path.isdir(source_dir_override): | |
| return jsonify({"error": f"Custom source directory '{source_dir_override}' not found on the server."}), 400 | |
| logger.info(f"Checking for new files in: {source_dir_to_use}") | |
| if max_files_to_process: | |
| logger.info(f"Will process a maximum of {max_files_to_process} new files this session.") | |
| try: | |
| update_result = rag_system.update_index_with_new_files( | |
| source_folder_path=source_dir_to_use, | |
| max_files_to_process=max_files_to_process | |
| ) | |
| logger.info(f"Index update process finished with status: {update_result.get('status')}") | |
| return jsonify(update_result), 200 | |
| except Exception as e: | |
| logger.error(f"Error during admin FAISS index update: {e}", exc_info=True) | |
| return jsonify({"error": f"Failed to update index: {str(e)}"}), 500 | |
| def get_personal_db_status(): | |
| try: | |
| status_info = { | |
| 'personal_data_csv_monitor_status': 'running', | |
| 'file_exists': os.path.exists(personal_data_monitor.database_path), | |
| 'data_loaded': personal_data_monitor.df is not None, 'last_update': None | |
| } | |
| if status_info['file_exists'] and os.path.getmtime(personal_data_monitor.database_path) is not None: | |
| status_info['last_update'] = datetime.fromtimestamp(os.path.getmtime(personal_data_monitor.database_path)).isoformat() | |
| return jsonify(status_info) | |
| except Exception as e: return jsonify({'status': 'error', 'error': str(e)}), 500 | |
| def download_report(): | |
| try: | |
| if not os.path.exists(CHAT_LOG_FILE) or os.path.getsize(CHAT_LOG_FILE) == 0: | |
| return jsonify({'error': 'No chat history available.'}), 404 | |
| return send_file(CHAT_LOG_FILE, mimetype='text/csv', as_attachment=True, download_name=f'chat_history_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') | |
| except Exception as e: | |
| logger.error(f"Error downloading report: {e}", exc_info=True) | |
| return jsonify({'error': 'Failed to generate report'}), 500 | |
| def create_session_route(): | |
| try: | |
| session_id = str(generate_uuid()) | |
| logger.info(f"New session created: {session_id}") | |
| return jsonify({'status': 'success', 'session_id': session_id}), 200 | |
| except Exception as e: | |
| logger.error(f"Session creation error: {e}", exc_info=True) | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| def get_version_route(): | |
| return jsonify({'version': '3.9.1-CSV-Auth-Persistent-History'}), 200 | |
| def clear_session_history_route(): | |
| session_id = request.json.get('session_id') | |
| if not session_id: return jsonify({'status': 'error', 'message': 'session_id is required'}), 400 | |
| # MODIFIED: Use the new, correct method instead of the old one | |
| history_manager.clear_history(session_id) | |
| logger.info(f"Chat history cleared for session: {session_id}") | |
| return jsonify({'status': 'success', 'message': 'History cleared'}) | |
| def get_chat_history_route(): | |
| session_id = request.args.get('session_id') | |
| limit = request.args.get('limit', default=10, type=int) | |
| if not session_id: | |
| return jsonify({"error": "session_id is required"}), 400 | |
| history = history_manager.get_history(session_id, limit=limit) | |
| structured_history = [] | |
| for i in range(0, len(history), 2): | |
| if i + 1 < len(history): | |
| user_msg = history[i] | |
| bot_msg = history[i+1] | |
| structured_history.append({ | |
| "query": user_msg.get('content'), | |
| "response": { "answer": bot_msg.get('content') } | |
| }) | |
| return jsonify({"history": structured_history}) | |
| def retrieve_raw_chunks(): | |
| global rag_system | |
| if not rag_system or not rag_system.retriever: | |
| return jsonify({"error": "RAG system not initialized or retriever not available."}), 503 | |
| data = request.json | |
| query = data.get('query') | |
| if not query: | |
| return jsonify({"error": "A 'query' is required."}), 400 | |
| # Get optional parameters from the request, with defaults from the RAG system's current configuration | |
| use_reranker = data.get('use_reranker', rag_system.retriever.reranker is not None) | |
| initial_fetch_k = data.get('initial_fetch_k', rag_system.retriever.initial_fetch_k) | |
| final_k = data.get('final_k', rag_system.retriever.final_k) | |
| # Store original retriever settings to ensure thread safety and no lasting changes | |
| original_reranker = rag_system.retriever.reranker | |
| original_initial_k = rag_system.retriever.initial_fetch_k | |
| original_final_k = rag_system.retriever.final_k | |
| try: | |
| # Temporarily modify retriever settings for this specific query | |
| rag_system.retriever.reranker = original_reranker if use_reranker else None | |
| rag_system.retriever.initial_fetch_k = int(initial_fetch_k) | |
| rag_system.retriever.final_k = int(final_k) | |
| logger.info(f"Performing raw chunk retrieval for query: '{query[:50]}...'") | |
| logger.info(f"Temporary Settings: use_reranker={use_reranker}, initial_fetch_k={initial_fetch_k}, final_k={final_k}") | |
| # Directly call the retriever to get the relevant documents | |
| retrieved_docs = rag_system.retriever.get_relevant_documents(query) | |
| # Format the results into a JSON-serializable list | |
| results = [] | |
| for doc in retrieved_docs: | |
| results.append({ | |
| "page_content": doc.page_content, | |
| "metadata": doc.metadata | |
| }) | |
| return jsonify({ | |
| "query": query, | |
| "retrieved_chunks": results, | |
| "chunk_count": len(results) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error during raw chunk retrieval: {e}", exc_info=True) | |
| return jsonify({"error": f"An error occurred during retrieval: {str(e)}"}), 500 | |
| finally: | |
| # Restore the original retriever settings to prevent side effects | |
| rag_system.retriever.reranker = original_reranker | |
| rag_system.retriever.initial_fetch_k = original_initial_k | |
| rag_system.retriever.final_k = original_final_k | |
| logger.info("Retriever settings have been restored to their original values.") | |
| # --- App Cleanup and Startup --- | |
| def cleanup_application(): | |
| if personal_data_monitor: personal_data_monitor.stop() | |
| logger.info("Application cleanup finished.") | |
| atexit.register(cleanup_application) | |
| def load_qa_data_on_startup(): | |
| global embedding_manager | |
| try: | |
| general_qa_path = os.path.join(RAG_SOURCES_DIR, 'general_qa.csv') | |
| personal_qa_path = os.path.join(RAG_SOURCES_DIR, 'personal_qa.csv') | |
| greetings_qa_path = os.path.join(RAG_SOURCES_DIR, 'greetings.csv') | |
| general_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| personal_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| greetings_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| if os.path.exists(general_qa_path): | |
| try: general_qa_df = pd.read_csv(general_qa_path, encoding='cp1252') | |
| except Exception as e_csv: logger.error(f"Error reading general_qa.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'general_qa.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| if os.path.exists(personal_qa_path): | |
| try: personal_qa_df = pd.read_csv(personal_qa_path, encoding='cp1252') | |
| except Exception as e_csv: logger.error(f"Error reading personal_qa.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'personal_qa.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| if os.path.exists(greetings_qa_path): | |
| try: greetings_qa_df = pd.read_csv(greetings_qa_path, encoding='cp1252') | |
| except Exception as e_csv: logger.error(f"Error reading greetings.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'greetings.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| dataframes_to_process = { | |
| "general": general_qa_df, | |
| "personal": personal_qa_df, | |
| "greetings": greetings_qa_df | |
| } | |
| for df_name, df_val in dataframes_to_process.items(): | |
| for col in ['Question', 'Answer', 'Image']: | |
| if col not in df_val.columns: | |
| df_val[col] = None | |
| if col != 'Image': | |
| logger.warning(f"'{col}' column missing in {df_name} data. Added empty column.") | |
| if 'Question' in df_val.columns and not df_val['Question'].isnull().all(): | |
| df_val['Question'] = df_val['Question'].astype(str).apply(normalize_text) | |
| elif 'Question' in df_val.columns: | |
| df_val['Question'] = df_val['Question'].astype(str) | |
| if 'Answer' in df_val.columns and not df_val['Answer'].isnull().all(): | |
| df_val['Answer'] = df_val['Answer'].astype(str).apply(normalize_text) | |
| elif 'Answer' in df_val.columns: | |
| df_val['Answer'] = df_val['Answer'].astype(str) | |
| embedding_manager.update_embeddings( | |
| dataframes_to_process["general"], | |
| dataframes_to_process["personal"], | |
| dataframes_to_process["greetings"] | |
| ) | |
| logger.info("CSV QA data loaded and embeddings initialized.") | |
| except Exception as e: | |
| logger.critical(f"CRITICAL: Error loading or processing QA data: {e}. Semantic QA may not function.", exc_info=True) | |
| if __name__ == '__main__': | |
| # CHANGED: Create necessary folders including assets and templates | |
| for folder_path in [os.path.join(_APP_BASE_DIR, 'templates'), | |
| os.path.join(_APP_BASE_DIR, 'static'), | |
| os.path.join(_APP_BASE_DIR, 'assets'), # ADDED | |
| TEXT_EXTRACTIONS_DIR]: | |
| os.makedirs(folder_path, exist_ok=True) | |
| # --- NEW: Download users.csv from GDrive if enabled --- | |
| if GDRIVE_USERS_CSV_ENABLED: | |
| logger.info("[GDRIVE_USERS_DOWNLOAD] Google Drive users.csv download is ENABLED.") | |
| if GDRIVE_USERS_CSV_ID_OR_URL: | |
| users_csv_target_path = os.path.join(_APP_BASE_DIR, 'assets', 'users.csv') | |
| logger.info(f"[GDRIVE_USERS_DOWNLOAD] Attempting to download users.csv to: {users_csv_target_path}") | |
| download_successful = download_gdrive_file(GDRIVE_USERS_CSV_ID_OR_URL, users_csv_target_path) | |
| if download_successful: | |
| logger.info("[GDRIVE_USERS_DOWNLOAD] Successfully downloaded users.csv.") | |
| else: | |
| logger.error("[GDRIVE_USERS_DOWNLOAD] Failed to download users.csv from Google Drive. Will use existing file or fallback.") | |
| else: | |
| logger.warning("[GDRIVE_USERS_DOWNLOAD] GDRIVE_USERS_CSV_ENABLED is True, but GDRIVE_USERS_CSV_URL is not set.") | |
| else: | |
| logger.info("[GDRIVE_USERS_DOWNLOAD] Google Drive users.csv download is DISABLED.") | |
| # Load users from CSV at startup (will use the downloaded file if successful) | |
| load_users_from_csv() | |
| load_qa_data_on_startup() | |
| initialize_chat_log() | |
| # MODIFIED: Download pre-built FAISS index from GDrive if enabled | |
| if GDRIVE_INDEX_ENABLED: | |
| logger.info("[GDRIVE_INDEX_DOWNLOAD] Google Drive index download is ENABLED.") | |
| if GDRIVE_INDEX_ID_OR_URL: | |
| logger.info(f"[GDRIVE_INDEX_DOWNLOAD] Attempting to download and extract index from: {GDRIVE_INDEX_ID_OR_URL}") | |
| # The root directory is the target for extraction, so 'faiss_storage' lands correctly | |
| download_successful = download_and_unzip_gdrive_file(GDRIVE_INDEX_ID_OR_URL, _APP_BASE_DIR) | |
| if download_successful: | |
| logger.info("[GDRIVE_INDEX_DOWNLOAD] Successfully downloaded and extracted FAISS index.") | |
| else: | |
| logger.error("[GDRIVE_INDEX_DOWNLOAD] Failed to download FAISS index from Google Drive. RAG system might build a new one if sources exist.") | |
| else: | |
| logger.warning("[GDRIVE_INDEX_DOWNLOAD] GDRIVE_INDEX_ENABLED is True, but GDRIVE_INDEX_URL is not set.") | |
| else: | |
| logger.info("[GDRIVE_INDEX_DOWNLOAD] Google Drive index download is DISABLED.") | |
| logger.info("Attempting to initialize RAG system from new modules...") | |
| rag_system = initialize_and_get_rag_system() | |
| if rag_system: | |
| logger.info("RAG system initialized successfully via new modules.") | |
| else: | |
| logger.warning("RAG system failed to initialize. Document RAG functionality will be unavailable.") | |
| logger.info(f"Flask application starting with Hybrid RAG (CSV + Dynamic FAISS) on {FLASK_APP_HOST}:{FLASK_APP_PORT} Debug: {FLASK_DEBUG_MODE}...") | |
| if not FLASK_DEBUG_MODE: | |
| werkzeug_log = logging.getLogger('werkzeug') | |
| werkzeug_log.setLevel(logging.ERROR) | |
| app.run(host=FLASK_APP_HOST, port=FLASK_APP_PORT, debug=FLASK_DEBUG_MODE) |