import gradio as gr import torch import unicodedata import re import numpy as np from pathlib import Path from transformers import AutoTokenizer, AutoModel from sklearn.feature_extraction.text import HashingVectorizer from sklearn.preprocessing import normalize as sk_normalize import chromadb import joblib import pickle import scipy.sparse import textwrap import os import json # Για το διάβασμα του JSON κατά το setup import tqdm.auto as tq # Για progress bars κατά το setup # --------------------------- CONFIG για ChatbotVol107 ----------------------------------- # --- Ρυθμίσεις Μοντέλου και Βάσης Δεδομένων --- MODEL_NAME = "nlpaueb/bert-base-greek-uncased-v1" PERSISTENT_STORAGE_ROOT = Path("/data") # Για Hugging Face Spaces Persistent Storage DB_DIR_APP = PERSISTENT_STORAGE_ROOT / "chroma_db_ChatbotVol107" COL_NAME = "collection_chatbotvol107" ASSETS_DIR_APP = PERSISTENT_STORAGE_ROOT / "assets_ChatbotVol107" DATA_PATH_FOR_SETUP = "./dataset14.json" # --- Ρυθμίσεις για Google Cloud Storage για τα PDF links --- GCS_BUCKET_NAME = "chatbotthesisihu" # Το δικό σας GCS Bucket Name GCS_PUBLIC_URL_PREFIX = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}/" # ------------------------------------------------------------- # --- Παράμετροι Αναζήτησης και Μοντέλου --- CHUNK_SIZE = 512 CHUNK_OVERLAP = 40 BATCH_EMB = 32 # Για τη δημιουργία των embeddings κατά το setup ALPHA_BASE = 0.2 # Βέλτιστη τιμή alpha που βρήκατε ALPHA_LONGQ = 0.35# Βέλτιστη τιμή alpha για μεγάλα queries που βρήκατε DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"Running ChatbotVol107 on device: {DEVICE}") print(f"Using model: {MODEL_NAME}") # === ΛΟΓΙΚΗ ΔΗΜΙΟΥΡΓΙΑΣ ΒΑΣΗΣ ΚΑΙ ASSETS (Αν δεν υπάρχουν) === def setup_database_and_assets(): print("Checking if database and assets need to be created...") # Έλεγχος ύπαρξης βασικών αρχείων για να αποφασιστεί αν το setup χρειάζεται # Ο έλεγχος col.count() run_setup = True if DB_DIR_APP.exists() and ASSETS_DIR_APP.exists() and (ASSETS_DIR_APP / "ids.pkl").exists(): try: client_check = chromadb.PersistentClient(path=str(DB_DIR_APP.resolve())) collection_check = client_check.get_collection(name=COL_NAME) if collection_check.count() > 0: print("✓ Database and assets appear to exist and collection is populated. Skipping setup.") run_setup = False else: print("Collection exists but is empty. Proceeding with setup.") if DB_DIR_APP.exists(): # Καθαρισμός αν η βάση υπάρχει αλλά είναι ελλιπής/άδεια import shutil print(f"Attempting to clean up existing empty/corrupt DB directory: {DB_DIR_APP}") shutil.rmtree(DB_DIR_APP) except Exception as e_check: # Π.χ. η συλλογή δεν υπάρχει print(f"Database or collection check failed (Error: {e_check}). Proceeding with setup.") if DB_DIR_APP.exists(): # Καθαρισμός αν η βάση φαίνεται κατεστραμμένη import shutil print(f"Attempting to clean up existing corrupt DB directory: {DB_DIR_APP}") shutil.rmtree(DB_DIR_APP) if not run_setup: return True # Το setup δεν χρειάζεται print(f"!Database/Assets not found or incomplete. Starting setup process.") print(f"This will take a very long time, especially on the first run !") ASSETS_DIR_APP.mkdir(parents=True, exist_ok=True) DB_DIR_APP.mkdir(parents=True, exist_ok=True) # --- Helper συναρτήσεις για το setup (τοπικές σε αυτή τη συνάρτηση) --- def _strip_acc_setup(s:str)->str: return ''.join(ch for ch in unicodedata.normalize('NFD', s) if not unicodedata.combining(ch)) _STOP_SETUP = {"σχετικο","σχετικά","με","και"} def _preprocess_setup(txt:str)->str: txt = _strip_acc_setup(txt.lower()) txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt) txt = re.sub(r"\s+", " ", txt).strip() return " ".join(w for w in txt.split() if w not in _STOP_SETUP) def _chunk_text_setup(text, tokenizer_setup): token_ids = tokenizer_setup.encode(text, add_special_tokens=False) if len(token_ids) <= (CHUNK_SIZE - 2): return [text] ids_with_special_tokens = tokenizer_setup(text, truncation=False, padding=False)["input_ids"] effective_chunk_size = CHUNK_SIZE step = effective_chunk_size - CHUNK_OVERLAP chunks = [] for i in range(0, len(ids_with_special_tokens), step): current_chunk_ids = ids_with_special_tokens[i:i+effective_chunk_size] if not current_chunk_ids: break if len(chunks) > 0 and len(current_chunk_ids) < CHUNK_OVERLAP: if len(ids_with_special_tokens) - i < effective_chunk_size: pass else: break decoded_chunk = tokenizer_setup.decode(current_chunk_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True).strip() if decoded_chunk: chunks.append(decoded_chunk) return chunks if chunks else [text] def _cls_embed_setup(texts, tokenizer_setup, model_setup, bs=BATCH_EMB): out_embeddings = [] for i in tq.tqdm(range(0, len(texts), bs), desc="Embedding texts for DB setup"): enc = tokenizer_setup(texts[i:i+bs], padding=True, truncation=True, max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE) with torch.no_grad(): model_output = model_setup(**enc) last_hidden_state = model_output.last_hidden_state cls_embedding = last_hidden_state[:, 0, :] cls_normalized = torch.nn.functional.normalize(cls_embedding, p=2, dim=1) out_embeddings.append(cls_normalized.cpu()) return torch.cat(out_embeddings).numpy() # --- Κύρια Λογική του Setup --- print(f"⏳ (Setup) Loading Model ({MODEL_NAME}) and Tokenizer...") tokenizer_setup = AutoTokenizer.from_pretrained(MODEL_NAME) model_setup = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval() print("✓ (Setup) Model and Tokenizer loaded.") print(f"⏳ (Setup) Reading & chunking JSON data from {DATA_PATH_FOR_SETUP}...") if not Path(DATA_PATH_FOR_SETUP).exists(): print(f"!!! CRITICAL SETUP ERROR: Dataset file {DATA_PATH_FOR_SETUP} not found in the Space repo! Please upload it.") return False with open(DATA_PATH_FOR_SETUP, encoding="utf-8") as f: docs_json = json.load(f) raw_chunks_setup, pre_chunks_setup, metas_setup, ids_list_setup = [], [], [], [] for d_setup in tq.tqdm(docs_json, desc="(Setup) Processing documents"): doc_text = d_setup.get("text") if not doc_text: continue chunked_doc_texts = _chunk_text_setup(doc_text, tokenizer_setup) if not chunked_doc_texts: continue for idx, chunk in enumerate(chunked_doc_texts): if not chunk.strip(): continue raw_chunks_setup.append(chunk) pre_chunks_setup.append(_preprocess_setup(chunk)) metas_setup.append({"id": d_setup["id"], "title": d_setup["title"], "url": d_setup["url"], "chunk_num": idx+1, "total_chunks": len(chunked_doc_texts)}) ids_list_setup.append(f'{d_setup["id"]}_c{idx+1}') print(f" → (Setup) Total chunks created: {len(raw_chunks_setup):,}") if not raw_chunks_setup: print("!!! CRITICAL SETUP ERROR: No chunks were created from the dataset.") return False print("⏳ (Setup) Building lexical matrices (TF-IDF)...") char_vec_setup = HashingVectorizer(analyzer="char_wb", ngram_range=(2,5), n_features=2**20, norm=None, alternate_sign=False, binary=True) word_vec_setup = HashingVectorizer(analyzer="word", ngram_range=(1,2), n_features=2**19, norm=None, alternate_sign=False, binary=True) X_char_setup = sk_normalize(char_vec_setup.fit_transform(pre_chunks_setup)) X_word_setup = sk_normalize(word_vec_setup.fit_transform(pre_chunks_setup)) print("✓ (Setup) Lexical matrices built.") print(f"⏳ (Setup) Setting up ChromaDB client at {DB_DIR_APP}...") client_setup = chromadb.PersistentClient(path=str(DB_DIR_APP.resolve())) print(f" → (Setup) Creating collection: {COL_NAME}") try: # Προσπάθεια διαγραφής αν υπάρχει για σίγουρη νέα δημιουργία client_setup.delete_collection(COL_NAME) except: pass col_setup = client_setup.get_or_create_collection(COL_NAME, metadata={"hnsw:space":"cosine"}) print("⏳ (Setup) Encoding chunks and streaming to ChromaDB...") for start_idx in tq.tqdm(range(0, len(pre_chunks_setup), BATCH_EMB), desc="(Setup) Adding to ChromaDB"): end_idx = min(start_idx + BATCH_EMB, len(pre_chunks_setup)) batch_pre_chunks = pre_chunks_setup[start_idx:end_idx] batch_ids = ids_list_setup[start_idx:end_idx] batch_metadatas = metas_setup[start_idx:end_idx] if not batch_pre_chunks: continue batch_embeddings = _cls_embed_setup(batch_pre_chunks, tokenizer_setup, model_setup, bs=BATCH_EMB) col_setup.add(embeddings=batch_embeddings.tolist(), documents=batch_pre_chunks, metadatas=batch_metadatas, ids=batch_ids) final_count = col_setup.count() print(f"✓ (Setup) Index built and stored in ChromaDB. Final count: {final_count}") if final_count != len(ids_list_setup): print(f"!!! WARNING (Setup): Mismatch after setup! Expected {len(ids_list_setup)} items, got {final_count}") # return False # Αποφασίζουμε αν αυτό είναι κρίσιμο σφάλμα ή απλή προειδοποίηση print(f"💾 (Setup) Saving assets to {ASSETS_DIR_APP}...") joblib.dump(char_vec_setup, ASSETS_DIR_APP / "char_vectorizer.joblib") joblib.dump(word_vec_setup, ASSETS_DIR_APP / "word_vectorizer.joblib") scipy.sparse.save_npz(ASSETS_DIR_APP / "X_char_sparse.npz", X_char_setup) scipy.sparse.save_npz(ASSETS_DIR_APP / "X_word_sparse.npz", X_word_setup) with open(ASSETS_DIR_APP / "pre_chunks.pkl", "wb") as f: pickle.dump(pre_chunks_setup, f) with open(ASSETS_DIR_APP / "raw_chunks.pkl", "wb") as f: pickle.dump(raw_chunks_setup, f) with open(ASSETS_DIR_APP / "ids.pkl", "wb") as f: pickle.dump(ids_list_setup, f) with open(ASSETS_DIR_APP / "metas.pkl", "wb") as f: pickle.dump(metas_setup, f) print("✓ (Setup) Assets saved.") del tokenizer_setup, model_setup, docs_json, raw_chunks_setup, pre_chunks_setup, metas_setup, ids_list_setup del char_vec_setup, word_vec_setup, X_char_setup, X_word_setup, client_setup, col_setup if DEVICE == "cuda": torch.cuda.empty_cache() print("🎉 (Setup) Database and assets creation process complete!") return True # ================================================================== setup_successful = setup_database_and_assets() # ----------------------- PRE-/POST HELPERS (για την εφαρμογή Gradio) ---------------------------- def strip_acc(s: str) -> str: return ''.join(ch for ch in unicodedata.normalize('NFD', s) if not unicodedata.combining(ch)) STOP = {"σχετικο", "σχετικα", "με", "και"} def preprocess(txt: str) -> str: txt = strip_acc(txt.lower()) txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt) txt = re.sub(r"\s+", " ", txt).strip() return " ".join(w for w in txt.split() if w not in STOP) # cls_embed για την εφαρμογή Gradio (ένα query κάθε φορά) def cls_embed(texts, tokenizer_app, model_app): out = [] enc = tokenizer_app(texts, padding=True, truncation=True, max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE) with torch.no_grad(): model_output = model_app(**enc) last_hidden_state = model_output.last_hidden_state cls_embedding = last_hidden_state[:, 0, :] cls_normalized = torch.nn.functional.normalize(cls_embedding, p=2, dim=1) out.append(cls_normalized.cpu()) return torch.cat(out).numpy() # ---------------------- LOAD MODELS & DATA (Για την εφαρμογή Gradio) -------------------- tok = None model = None char_vec = None word_vec = None X_char = None X_word = None pre_chunks = None raw_chunks = None ids = None metas = None col = None if setup_successful: print(f"⏳ Loading Model ({MODEL_NAME}) and Tokenizer for Gradio App...") try: tok = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval() print("✓ Model and tokenizer loaded for Gradio App.") except Exception as e: print(f"CRITICAL ERROR loading model/tokenizer for Gradio App: {e}") setup_successful = False if setup_successful: print(f"⏳ Loading TF-IDF/Assets from {ASSETS_DIR_APP} for Gradio App...") try: char_vec = joblib.load(ASSETS_DIR_APP / "char_vectorizer.joblib") word_vec = joblib.load(ASSETS_DIR_APP / "word_vectorizer.joblib") X_char = scipy.sparse.load_npz(ASSETS_DIR_APP / "X_char_sparse.npz") X_word = scipy.sparse.load_npz(ASSETS_DIR_APP / "X_word_sparse.npz") with open(ASSETS_DIR_APP / "pre_chunks.pkl", "rb") as f: pre_chunks = pickle.load(f) with open(ASSETS_DIR_APP / "raw_chunks.pkl", "rb") as f: raw_chunks = pickle.load(f) with open(ASSETS_DIR_APP / "ids.pkl", "rb") as f: ids = pickle.load(f) with open(ASSETS_DIR_APP / "metas.pkl", "rb") as f: metas = pickle.load(f) print("✓ TF-IDF/Assets loaded for Gradio App.") except Exception as e: print(f"CRITICAL ERROR loading TF-IDF/Assets for Gradio App: {e}") setup_successful = False if setup_successful: print(f"⏳ Connecting to ChromaDB at {DB_DIR_APP} for Gradio App...") try: client = chromadb.PersistentClient(path=str(DB_DIR_APP.resolve())) col = client.get_collection(COL_NAME) # Αν δεν υπάρχει μετά το setup, εδώ θα γίνει σφάλμα. print(f"✓ Connected to ChromaDB. Collection '{COL_NAME}' count: {col.count()}") if col.count() == 0 and len(ids) > 0: # Αν υπάρχουν ids αλλά η βάση είναι άδεια print(f"!!! CRITICAL WARNING: ChromaDB collection '{COL_NAME}' is EMPTY at {DB_DIR_APP} but assets were loaded. Setup might have failed to populate DB correctly.") setup_successful = False except Exception as e: print(f"CRITICAL ERROR connecting to ChromaDB or getting collection for Gradio App: {e}") setup_successful = False else: print("!!! Setup process failed or was skipped. Gradio app will not function correctly. !!!") # ---------------------- HYBRID SEARCH (Κύρια Λογική) --- def hybrid_search_gradio(query, k=5): if not setup_successful or not ids or not col or not model or not tok: return "Σφάλμα: Η εφαρμογή δεν αρχικοποιήθηκε σωστά. Τα δεδομένα ή το μοντέλο δεν φορτώθηκαν. Ελέγξτε τα logs εκκίνησης." if not query.strip(): return "Παρακαλώ εισάγετε μια ερώτηση." q_pre = preprocess(query) words = q_pre.split() alpha = ALPHA_LONGQ if len(words) > 30 else ALPHA_BASE exact_ids_set = {ids[i] for i, t in enumerate(pre_chunks) if q_pre in t} q_emb_np = cls_embed([q_pre], tok, model) q_emb_list = q_emb_np.tolist() try: sem_results = col.query(query_embeddings=q_emb_list, n_results=min(k * 30, len(ids)), include=["distances"]) except Exception as e: # Εκτύπωση του σφάλματος στα logs του server για διάγνωση print(f"ERROR during ChromaDB query in hybrid_search_gradio: {type(e).__name__}: {e}") return "Σφάλμα κατά την σημασιολογική αναζήτηση. Επικοινωνήστε με τον διαχειριστή." sem_sims = {doc_id: 1 - dist for doc_id, dist in zip(sem_results["ids"][0], sem_results["distances"][0])} q_char_sparse = char_vec.transform([q_pre]) q_char_normalized = sk_normalize(q_char_sparse) char_sim_scores = (q_char_normalized @ X_char.T).toarray().flatten() q_word_sparse = word_vec.transform([q_pre]) q_word_normalized = sk_normalize(q_word_sparse) word_sim_scores = (q_word_normalized @ X_word.T).toarray().flatten() lex_sims = {} for idx, (c_score, w_score) in enumerate(zip(char_sim_scores, word_sim_scores)): if c_score > 0 or w_score > 0: if idx < len(ids): lex_sims[ids[idx]] = 0.85 * c_score + 0.15 * w_score else: print(f"Warning (hybrid_search): Lexical score index {idx} out of bounds for ids list (len: {len(ids)}).") all_chunk_ids_set = set(sem_sims.keys()) | set(lex_sims.keys()) | exact_ids_set scored = [] for chunk_id_key in all_chunk_ids_set: s = alpha * sem_sims.get(chunk_id_key, 0.0) + (1 - alpha) * lex_sims.get(chunk_id_key, 0.0) if chunk_id_key in exact_ids_set: s = 1.0 scored.append((chunk_id_key, s)) scored.sort(key=lambda x: x[1], reverse=True) hits_output = [] seen_doc_main_ids = set() for chunk_id_val, score_val in scored: try: idx_in_lists = ids.index(chunk_id_val) except ValueError: print(f"Warning (hybrid_search): chunk_id '{chunk_id_val}' not found in loaded ids. Skipping."); continue doc_meta = metas[idx_in_lists] doc_main_id = doc_meta['id'] if doc_main_id in seen_doc_main_ids: continue original_url_from_meta = doc_meta.get('url', '#') pdf_gcs_url = "#" pdf_filename_display = "N/A" if original_url_from_meta and original_url_from_meta != '#': pdf_filename_extracted = os.path.basename(original_url_from_meta) if pdf_filename_extracted and pdf_filename_extracted.lower().endswith(".pdf"): pdf_gcs_url = f"{GCS_PUBLIC_URL_PREFIX}{pdf_filename_extracted}" pdf_filename_display = pdf_filename_extracted elif pdf_filename_extracted: pdf_filename_display = "Source is not a PDF" # else: pdf_filename_display = "No source URL" # This case is covered by initialization # else: pdf_filename_display = "No source URL" # This case is covered by initialization hits_output.append({ "score": score_val, "title": doc_meta.get('title', 'N/A'), "snippet": raw_chunks[idx_in_lists][:500] + " ...", "original_url_meta": original_url_from_meta, "pdf_serve_url": pdf_gcs_url, "pdf_filename_display": pdf_filename_display }) seen_doc_main_ids.add(doc_main_id) if len(hits_output) >= k: break if not hits_output: return "Δεν βρέθηκαν σχετικά αποτελέσματα." output_md = f"Βρέθηκαν **{len(hits_output)}** σχετικά αποτελέσματα:\n\n" for hit in hits_output: output_md += f"### {hit['title']} (Score: {hit['score']:.3f})\n" snippet_wrapped = textwrap.fill(hit['snippet'].replace("\n", " "), width=100) output_md += f"**Απόσπασμα:** {snippet_wrapped}\n" if hit['pdf_serve_url'] and hit['pdf_serve_url'] != '#': output_md += f"**Πηγή (PDF):** {hit['pdf_filename_display']}\n" elif hit['original_url_meta'] and hit['original_url_meta'] != '#': output_md += f"**Πηγή (αρχικό από metadata):** [{hit['original_url_meta']}]({hit['original_url_meta']})\n" output_md += "---\n" return output_md # ---------------------- GRADIO INTERFACE ----------------------------------- print("🚀 Launching Gradio Interface for Meltemi...") iface = gr.Interface( fn=hybrid_search_gradio, inputs=gr.Textbox(lines=3, placeholder="Γράψε την ερώτησή σου εδώ...", label=f"Ερώτηση προς τον βοηθό (Μοντέλο: {MODEL_NAME.split('/')[-1]}):"), outputs=gr.Markdown(label="Απαντήσεις από τα έγγραφα:", rtl=False, sanitize_html=False), title=f"🏛️ Ελληνικό Chatbot Υβριδικής Αναζήτησης (Meltemi - {MODEL_NAME.split('/')[-1]})", description=(f"Πληκτρολογήστε την ερώτησή σας για αναζήτηση. Χρησιμοποιεί το μοντέλο: {MODEL_NAME}.\n" "Τα PDF ανοίγουν από Google Cloud Storage σε νέα καρτέλα."), allow_flagging="never", examples=[ ["Τεχνολογίας τροφίμων;", 5], ["Αμπελουργίας και της οινολογίας", 3], ["Ποιες θέσεις αφορούν διδάσκοντες μερικής απασχόλησης στο Τμήμα Νοσηλευτικής του Πανεπιστημίου Ιωαννίνων;", 5] ], ) if __name__ == '__main__': # Το allowed_paths δεν είναι απαραίτητο αν δεν εξυπηρετούνται άλλα τοπικά στατικά αρχεία. iface.launch()