import gradio as gr import torch import unicodedata import re import numpy as np from pathlib import Path from transformers import AutoTokenizer, AutoModel from sklearn.feature_extraction.text import HashingVectorizer from sklearn.preprocessing import normalize as sk_normalize import chromadb import joblib import pickle import scipy.sparse import textwrap import os # --------------------------- CONFIG για ChatbotVol107 ----------------------------------- # --- Ρυθμίσεις Μοντέλου και Βάσης Δεδομένων --- MODEL_NAME = "nlpaueb/bert-base-greek-uncased-v1" DB_DIR = Path("./chroma_db_ChatbotVol107") # Τοπική διαδρομή για τη βάση που κατεβάσατε COL_NAME = "collection_chatbotvol107" # Πρέπει να ταιριάζει με το Colab ASSETS_DIR = Path("./assets_ChatbotVol107") # Τοπική διαδρομή για τα assets που κατεβάσατε # --------------------------------------------- # --- Ρυθμίσεις για Google Cloud Storage (παραμένουν ίδιες) --- GCS_BUCKET_NAME = "chatbotthesisihu" # Το όνομα του GCS bucket σας (βεβαιωθείτε ότι είναι σωστό) GCS_PUBLIC_URL_PREFIX = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}/" # ------------------------------------------------------------- CHUNK_SIZE = 512 # Από το Colab config, για συνέπεια στο cls_embed ALPHA_BASE = 0.50 # Από το Colab config ALPHA_LONGQ = 0.65 # Από το Colab config DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"Running ChatbotVol107 on device: {DEVICE}") print(f"Using model: {MODEL_NAME}") print(f"ChromaDB path: {DB_DIR}") print(f"Assets path: {ASSETS_DIR}") print(f"Collection name: {COL_NAME}") # ----------------------- PRE-/POST HELPERS ---------------------------- def strip_acc(s: str) -> str: return ''.join(ch for ch in unicodedata.normalize('NFD', s) if not unicodedata.combining(ch)) STOP = {"σχετικο", "σχετικα", "με", "και"} def preprocess(txt: str) -> str: txt = strip_acc(txt.lower()) txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt) txt = re.sub(r"\s+", " ", txt).strip() return " ".join(w for w in txt.split() if w not in STOP) # --- cls_embed ΠΡΕΠΕΙ ΝΑ ΕΙΝΑΙ ΙΔΙΑ ΜΕ ΤΟΥ COLAB (για ένα query) --- def cls_embed(texts, tok, model): # texts είναι μια λίστα με ένα string (το query) out = [] enc = tok(texts, padding=True, truncation=True, max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE) with torch.no_grad(): model_output = model(**enc) last_hidden_state = model_output.last_hidden_state # Παίρνουμε το embedding του [CLS] token cls_embedding = last_hidden_state[:, 0, :] cls_normalized = torch.nn.functional.normalize(cls_embedding, p=2, dim=1) out.append(cls_normalized.cpu()) return torch.cat(out).numpy() # ---------------------------------------------------- # ---------------------- LOAD MODELS & DATA (Μία φορά κατά την εκκίνηση) -------------------- print(f"⏳ Loading Model ({MODEL_NAME}) and Tokenizer...") try: tok = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval() print("✓ Model and tokenizer loaded.") except Exception as e: print(f"CRITICAL ERROR loading model/tokenizer: {e}") raise print(f"⏳ Loading TF-IDF vectorizers and SPARSE matrices from {ASSETS_DIR}...") try: char_vec = joblib.load(ASSETS_DIR / "char_vectorizer.joblib") word_vec = joblib.load(ASSETS_DIR / "word_vectorizer.joblib") X_char = scipy.sparse.load_npz(ASSETS_DIR / "X_char_sparse.npz") X_word = scipy.sparse.load_npz(ASSETS_DIR / "X_word_sparse.npz") print("✓ TF-IDF components loaded.") except Exception as e: print(f"CRITICAL ERROR loading TF-IDF components from {ASSETS_DIR}: {e}") raise print(f"⏳ Loading chunk data (pre_chunks, raw_chunks, ids, metas) from {ASSETS_DIR}...") try: with open(ASSETS_DIR / "pre_chunks.pkl", "rb") as f: pre_chunks = pickle.load(f) with open(ASSETS_DIR / "raw_chunks.pkl", "rb") as f: raw_chunks = pickle.load(f) with open(ASSETS_DIR / "ids.pkl", "rb") as f: ids = pickle.load(f) with open(ASSETS_DIR / "metas.pkl", "rb") as f: metas = pickle.load(f) print(f"✓ Chunk data loaded. Total chunks from ids: {len(ids):,}") if not all([pre_chunks, raw_chunks, ids, metas]): print("WARNING: One or more chunk data lists are empty!") except Exception as e: print(f"CRITICAL ERROR loading chunk data from {ASSETS_DIR}: {e}") raise print(f"⏳ Connecting to ChromaDB at {DB_DIR}...") try: client = chromadb.PersistentClient(path=str(DB_DIR.resolve())) col = client.get_collection(COL_NAME) print(f"✓ Connected to ChromaDB. Collection '{COL_NAME}' count: {col.count()}") if col.count() == 0: print(f"WARNING: ChromaDB collection '{COL_NAME}' is empty or not found correctly at {DB_DIR}!") except Exception as e: print(f"CRITICAL ERROR connecting to ChromaDB or getting collection: {e}") print(f"Attempted DB path for PersistentClient: {str(DB_DIR.resolve())}") raise # ---------------------- HYBRID SEARCH (Κύρια Λογική) --- def hybrid_search_gradio(query, k=5): if not query.strip(): return "Παρακαλώ εισάγετε μια ερώτηση." if not ids: return "Σφάλμα: Τα δεδομένα αναζήτησης (ids) δεν έχουν φορτωθεί. Επικοινωνήστε με τον διαχειριστή." q_pre = preprocess(query) words = q_pre.split() alpha = ALPHA_LONGQ if len(words) > 30 else ALPHA_BASE exact_ids_set = {ids[i] for i, t in enumerate(pre_chunks) if q_pre in t} q_emb_np = cls_embed([q_pre], tok, model) q_emb_list = q_emb_np.tolist() try: sem_results = col.query( query_embeddings=q_emb_list, n_results=min(k * 30, len(ids)), include=["distances", "metadatas"] ) except Exception as e: print(f"ERROR during ChromaDB query: {e}") return "Σφάλμα κατά την σημασιολογική αναζήτηση." sem_sims = {doc_id: 1 - dist for doc_id, dist in zip(sem_results["ids"][0], sem_results["distances"][0])} q_char_sparse = char_vec.transform([q_pre]) q_char_normalized = sk_normalize(q_char_sparse) char_sim_scores = (q_char_normalized @ X_char.T).toarray().flatten() q_word_sparse = word_vec.transform([q_pre]) q_word_normalized = sk_normalize(q_word_sparse) word_sim_scores = (q_word_normalized @ X_word.T).toarray().flatten() lex_sims = {} for idx, (c_score, w_score) in enumerate(zip(char_sim_scores, word_sim_scores)): if c_score > 0 or w_score > 0: if idx < len(ids): lex_sims[ids[idx]] = 0.85 * c_score + 0.15 * w_score else: print(f"Warning: Lexical score index {idx} out of bounds for ids list (len: {len(ids)}).") all_chunk_ids_set = set(sem_sims.keys()) | set(lex_sims.keys()) | exact_ids_set scored = [] for chunk_id_key in all_chunk_ids_set: s = alpha * sem_sims.get(chunk_id_key, 0.0) + \ (1 - alpha) * lex_sims.get(chunk_id_key, 0.0) if chunk_id_key in exact_ids_set: s = 1.0 scored.append((chunk_id_key, s)) scored.sort(key=lambda x: x[1], reverse=True) hits_output = [] seen_doc_main_ids = set() for chunk_id_val, score_val in scored: try: idx_in_lists = ids.index(chunk_id_val) except ValueError: print(f"Warning: chunk_id '{chunk_id_val}' from search results not found in global 'ids' list. Skipping.") continue doc_meta = metas[idx_in_lists] doc_main_id = doc_meta['id'] if doc_main_id in seen_doc_main_ids: continue original_url_from_meta = doc_meta.get('url', '#') pdf_gcs_url = "#" pdf_filename_display = "N/A" if original_url_from_meta and original_url_from_meta != '#': pdf_filename_extracted = os.path.basename(original_url_from_meta) if pdf_filename_extracted and pdf_filename_extracted.lower().endswith(".pdf"): pdf_gcs_url = f"{GCS_PUBLIC_URL_PREFIX}{pdf_filename_extracted}" pdf_filename_display = pdf_filename_extracted elif pdf_filename_extracted: pdf_filename_display = "Source is not a PDF" else: pdf_filename_display = "No source URL" else: pdf_filename_display = "No source URL" hits_output.append({ "score": score_val, "title": doc_meta.get('title', 'N/A'), "snippet": raw_chunks[idx_in_lists][:500] + " ...", "original_url_meta": original_url_from_meta, "pdf_serve_url": pdf_gcs_url, "pdf_filename_display": pdf_filename_display }) seen_doc_main_ids.add(doc_main_id) if len(hits_output) >= k: break if not hits_output: return "Δεν βρέθηκαν σχετικά αποτελέσματα." output_md = f"Βρέθηκαν **{len(hits_output)}** σχετικά αποτελέσματα:\n\n" for hit in hits_output: output_md += f"### {hit['title']} (Score: {hit['score']:.3f})\n" snippet_wrapped = textwrap.fill(hit['snippet'].replace("\n", " "), width=100) output_md += f"**Απόσπασμα:** {snippet_wrapped}\n" if hit['pdf_serve_url'] and hit['pdf_serve_url'] != '#': output_md += f"**Πηγή (PDF):** {hit['pdf_filename_display']}\n" elif hit['original_url_meta'] and hit['original_url_meta'] != '#': output_md += f"**Πηγή (αρχικό από metadata):** [{hit['original_url_meta']}]({hit['original_url_meta']})\n" output_md += "---\n" return output_md print(">>> Ξεκινά έλεγχος 'Sanity Check' της ChromaDB στο Hugging Face Spaces <<<") try: # Ορίζουμε μια νέα, προσωρινή διαδρομή για τη δοκιμαστική βάση sanity_db_path_str = "./chroma_db_sanity_check_on_hf" sanity_db_path = Path(sanity_db_path_str) # Διαγραφή τυχόν προηγούμενης δοκιμαστικής βάσης για καθαρή εκκίνηση if sanity_db_path.exists(): import shutil print(f"--- Sanity Check: Deleting existing test DB at {sanity_db_path_str}") shutil.rmtree(sanity_db_path_str) sanity_db_path.mkdir(parents=True, exist_ok=True) print(f"--- Sanity Check: Created directory for test DB at {sanity_db_path_str}") sanity_client = chromadb.PersistentClient(path=str(sanity_db_path.resolve())) sanity_collection_name = "my_sanity_test_collection" # Προσπάθεια διαγραφής της συλλογής αν υπάρχει από προηγούμενη εκτέλεση try: print(f"--- Sanity Check: Attempting to delete old sanity collection '{sanity_collection_name}' if it exists...") sanity_client.delete_collection(name=sanity_collection_name) print(f"--- Sanity Check: Old sanity collection '{sanity_collection_name}' deleted.") except Exception as e_delete_coll: print(f"--- Sanity Check: Could not delete old sanity collection (maybe it didn't exist): {e_delete_coll}") pass print(f"--- Sanity Check: Creating/getting new sanity collection '{sanity_collection_name}'...") sanity_col = sanity_client.get_or_create_collection(name=sanity_collection_name) print(f"--- Sanity Check: Sanity collection '{sanity_collection_name}' created/retrieved. Initial count: {sanity_col.count()}") # Προσθήκη ενός δοκιμαστικού αντικειμένου dummy_texts = ["αυτό είναι ένα πολύ απλό δοκιμαστικό κείμενο για έλεγχο"] # Χρησιμοποιούμε την υπάρχουσα cls_embed και τα φορτωμένα tok, model dummy_embeddings = cls_embed(dummy_texts, tok, model) dummy_ids = ["sanity_test_id_001"] dummy_metadatas = [{"source": "internal_sanity_test"}] print(f"--- Sanity Check: Adding 1 item to sanity collection...") sanity_col.add( embeddings=dummy_embeddings.tolist(), documents=dummy_texts, # Προαιρετικά, μπορείτε να αποθηκεύσετε και τα documents ids=dummy_ids, metadatas=dummy_metadatas ) print(f"--- Sanity Check: Added 1 item. New count in sanity collection: {sanity_col.count()}") # Εκτέλεση query στο δοκιμαστικό αντικείμενο print(f"--- Sanity Check: Querying sanity collection...") query_results = sanity_col.query( query_embeddings=dummy_embeddings.tolist(), # Query με το ίδιο το embedding n_results=1, include=["metadatas", "documents", "distances"] ) print(f"--- Sanity Check: Sanity query successful. Result IDs: {query_results['ids']}") print(">>> Έλεγχος 'Sanity Check' της ChromaDB ΟΛΟΚΛΗΡΩΘΗΚΕ ΕΠΙΤΥΧΩΣ στο HF Spaces! <<<") except Exception as e_sanity: print(f"!!! Έλεγχος 'Sanity Check' της ChromaDB ΑΠΕΤΥΧΕ στο HF Spaces: {e_sanity}") print(f"!!! Πλήρες σφάλμα: {type(e_sanity).__name__}: {str(e_sanity)}") print("--------------------------------------------------------------------") # ---------------------- GRADIO INTERFACE ----------------------------------- print("🚀 Launching Gradio Interface for ChatbotVol107...") iface = gr.Interface( fn=hybrid_search_gradio, inputs=gr.Textbox(lines=3, placeholder="Γράψε την ερώτησή σου εδώ...", label=f"Ερώτηση προς τον βοηθό (Μοντέλο: {MODEL_NAME.split('/')[-1]}):"), outputs=gr.Markdown(label="Απαντήσεις από τα έγγραφα:", rtl=False, sanitize_html=False), title=f"🏛️ Ελληνικό Chatbot Υβριδικής Αναζήτησης (ChatbotVol107 - {MODEL_NAME.split('/')[-1]})", description=(f"Πληκτρολογήστε την ερώτησή σας για αναζήτηση. Χρησιμοποιεί το μοντέλο: {MODEL_NAME}.\n" "Τα PDF ανοίγουν από εξωτερική πηγή (Google Cloud Storage) σε νέα καρτέλα."), allow_flagging="never", examples=[ ["Ποια είναι τα μέτρα για τον κορονοϊό;", 5], ["Πληροφορίες για άδεια ειδικού σκοπού", 3], ["Τι προβλέπεται για τις μετακινήσεις εκτός νομού;", 5] ], ) if __name__ == '__main__': # Αφού τα PDF εξυπηρετούνται από το GCS, το allowed_paths μπορεί να μην είναι απαραίτητο # εκτός αν έχετε άλλα τοπικά στατικά αρχεία (π.χ. εικόνες για το UI) που θέλετε να εξυπηρετήσετε. # Αν δεν έχετε, μπορείτε να το αφαιρέσετε ή να βάλετε κενή λίστα: iface.launch(allowed_paths=[]) # Για τώρα, το αφήνουμε όπως ήταν, σε περίπτωση που χρειάζεται για caching ή άλλα assets. iface.launch(allowed_paths=["static_pdfs"]) # Η STATIC_PDF_DIR_NAME ήταν "static_pdfs"