|
import gradio as gr
|
|
import torch
|
|
import unicodedata
|
|
import re
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from transformers import AutoTokenizer, AutoModel
|
|
from sklearn.feature_extraction.text import HashingVectorizer
|
|
from sklearn.preprocessing import normalize as sk_normalize
|
|
import chromadb
|
|
import joblib
|
|
import pickle
|
|
import scipy.sparse
|
|
import textwrap
|
|
import os
|
|
|
|
|
|
|
|
MODEL_NAME = "nlpaueb/bert-base-greek-uncased-v1"
|
|
DB_DIR = Path("./chroma_db_ChatbotVol107")
|
|
COL_NAME = "collection_chatbotvol107"
|
|
ASSETS_DIR = Path("./assets_ChatbotVol107")
|
|
|
|
|
|
|
|
GCS_BUCKET_NAME = "chatbotthesisihu"
|
|
GCS_PUBLIC_URL_PREFIX = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}/"
|
|
|
|
|
|
CHUNK_SIZE = 512
|
|
ALPHA_BASE = 0.50
|
|
ALPHA_LONGQ = 0.65
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
print(f"Running ChatbotVol107 on device: {DEVICE}")
|
|
print(f"Using model: {MODEL_NAME}")
|
|
print(f"ChromaDB path: {DB_DIR}")
|
|
print(f"Assets path: {ASSETS_DIR}")
|
|
print(f"Collection name: {COL_NAME}")
|
|
|
|
|
|
def strip_acc(s: str) -> str:
|
|
return ''.join(ch for ch in unicodedata.normalize('NFD', s)
|
|
if not unicodedata.combining(ch))
|
|
|
|
STOP = {"σχετικο", "σχετικα", "με", "και"}
|
|
|
|
def preprocess(txt: str) -> str:
|
|
txt = strip_acc(txt.lower())
|
|
txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt)
|
|
txt = re.sub(r"\s+", " ", txt).strip()
|
|
return " ".join(w for w in txt.split() if w not in STOP)
|
|
|
|
|
|
def cls_embed(texts, tok, model):
|
|
out = []
|
|
enc = tok(texts, padding=True, truncation=True,
|
|
max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE)
|
|
with torch.no_grad():
|
|
model_output = model(**enc)
|
|
last_hidden_state = model_output.last_hidden_state
|
|
|
|
cls_embedding = last_hidden_state[:, 0, :]
|
|
cls_normalized = torch.nn.functional.normalize(cls_embedding, p=2, dim=1)
|
|
out.append(cls_normalized.cpu())
|
|
return torch.cat(out).numpy()
|
|
|
|
|
|
|
|
print(f"⏳ Loading Model ({MODEL_NAME}) and Tokenizer...")
|
|
try:
|
|
tok = AutoTokenizer.from_pretrained(MODEL_NAME)
|
|
model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval()
|
|
print("✓ Model and tokenizer loaded.")
|
|
except Exception as e:
|
|
print(f"CRITICAL ERROR loading model/tokenizer: {e}")
|
|
raise
|
|
|
|
print(f"⏳ Loading TF-IDF vectorizers and SPARSE matrices from {ASSETS_DIR}...")
|
|
try:
|
|
char_vec = joblib.load(ASSETS_DIR / "char_vectorizer.joblib")
|
|
word_vec = joblib.load(ASSETS_DIR / "word_vectorizer.joblib")
|
|
X_char = scipy.sparse.load_npz(ASSETS_DIR / "X_char_sparse.npz")
|
|
X_word = scipy.sparse.load_npz(ASSETS_DIR / "X_word_sparse.npz")
|
|
print("✓ TF-IDF components loaded.")
|
|
except Exception as e:
|
|
print(f"CRITICAL ERROR loading TF-IDF components from {ASSETS_DIR}: {e}")
|
|
raise
|
|
|
|
print(f"⏳ Loading chunk data (pre_chunks, raw_chunks, ids, metas) from {ASSETS_DIR}...")
|
|
try:
|
|
with open(ASSETS_DIR / "pre_chunks.pkl", "rb") as f:
|
|
pre_chunks = pickle.load(f)
|
|
with open(ASSETS_DIR / "raw_chunks.pkl", "rb") as f:
|
|
raw_chunks = pickle.load(f)
|
|
with open(ASSETS_DIR / "ids.pkl", "rb") as f:
|
|
ids = pickle.load(f)
|
|
with open(ASSETS_DIR / "metas.pkl", "rb") as f:
|
|
metas = pickle.load(f)
|
|
print(f"✓ Chunk data loaded. Total chunks from ids: {len(ids):,}")
|
|
if not all([pre_chunks, raw_chunks, ids, metas]):
|
|
print("WARNING: One or more chunk data lists are empty!")
|
|
except Exception as e:
|
|
print(f"CRITICAL ERROR loading chunk data from {ASSETS_DIR}: {e}")
|
|
raise
|
|
|
|
print(f"⏳ Connecting to ChromaDB at {DB_DIR}...")
|
|
try:
|
|
client = chromadb.PersistentClient(path=str(DB_DIR.resolve()))
|
|
col = client.get_collection(COL_NAME)
|
|
print(f"✓ Connected to ChromaDB. Collection '{COL_NAME}' count: {col.count()}")
|
|
if col.count() == 0:
|
|
print(f"WARNING: ChromaDB collection '{COL_NAME}' is empty or not found correctly at {DB_DIR}!")
|
|
except Exception as e:
|
|
print(f"CRITICAL ERROR connecting to ChromaDB or getting collection: {e}")
|
|
print(f"Attempted DB path for PersistentClient: {str(DB_DIR.resolve())}")
|
|
raise
|
|
|
|
|
|
def hybrid_search_gradio(query, k=5):
|
|
if not query.strip():
|
|
return "Παρακαλώ εισάγετε μια ερώτηση."
|
|
|
|
if not ids:
|
|
return "Σφάλμα: Τα δεδομένα αναζήτησης (ids) δεν έχουν φορτωθεί. Επικοινωνήστε με τον διαχειριστή."
|
|
|
|
q_pre = preprocess(query)
|
|
words = q_pre.split()
|
|
alpha = ALPHA_LONGQ if len(words) > 30 else ALPHA_BASE
|
|
|
|
exact_ids_set = {ids[i] for i, t in enumerate(pre_chunks) if q_pre in t}
|
|
|
|
q_emb_np = cls_embed([q_pre], tok, model)
|
|
q_emb_list = q_emb_np.tolist()
|
|
|
|
try:
|
|
sem_results = col.query(
|
|
query_embeddings=q_emb_list,
|
|
n_results=min(k * 30, len(ids)),
|
|
include=["distances", "metadatas"]
|
|
)
|
|
except Exception as e:
|
|
print(f"ERROR during ChromaDB query: {e}")
|
|
return "Σφάλμα κατά την σημασιολογική αναζήτηση."
|
|
|
|
sem_sims = {doc_id: 1 - dist for doc_id, dist in zip(sem_results["ids"][0], sem_results["distances"][0])}
|
|
|
|
q_char_sparse = char_vec.transform([q_pre])
|
|
q_char_normalized = sk_normalize(q_char_sparse)
|
|
char_sim_scores = (q_char_normalized @ X_char.T).toarray().flatten()
|
|
|
|
q_word_sparse = word_vec.transform([q_pre])
|
|
q_word_normalized = sk_normalize(q_word_sparse)
|
|
word_sim_scores = (q_word_normalized @ X_word.T).toarray().flatten()
|
|
|
|
lex_sims = {}
|
|
for idx, (c_score, w_score) in enumerate(zip(char_sim_scores, word_sim_scores)):
|
|
if c_score > 0 or w_score > 0:
|
|
if idx < len(ids):
|
|
lex_sims[ids[idx]] = 0.85 * c_score + 0.15 * w_score
|
|
else:
|
|
print(f"Warning: Lexical score index {idx} out of bounds for ids list (len: {len(ids)}).")
|
|
|
|
all_chunk_ids_set = set(sem_sims.keys()) | set(lex_sims.keys()) | exact_ids_set
|
|
scored = []
|
|
for chunk_id_key in all_chunk_ids_set:
|
|
s = alpha * sem_sims.get(chunk_id_key, 0.0) + \
|
|
(1 - alpha) * lex_sims.get(chunk_id_key, 0.0)
|
|
if chunk_id_key in exact_ids_set:
|
|
s = 1.0
|
|
scored.append((chunk_id_key, s))
|
|
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
hits_output = []
|
|
seen_doc_main_ids = set()
|
|
for chunk_id_val, score_val in scored:
|
|
try:
|
|
idx_in_lists = ids.index(chunk_id_val)
|
|
except ValueError:
|
|
print(f"Warning: chunk_id '{chunk_id_val}' from search results not found in global 'ids' list. Skipping.")
|
|
continue
|
|
|
|
doc_meta = metas[idx_in_lists]
|
|
doc_main_id = doc_meta['id']
|
|
|
|
if doc_main_id in seen_doc_main_ids:
|
|
continue
|
|
|
|
original_url_from_meta = doc_meta.get('url', '#')
|
|
|
|
pdf_gcs_url = "#"
|
|
pdf_filename_display = "N/A"
|
|
|
|
if original_url_from_meta and original_url_from_meta != '#':
|
|
pdf_filename_extracted = os.path.basename(original_url_from_meta)
|
|
|
|
if pdf_filename_extracted and pdf_filename_extracted.lower().endswith(".pdf"):
|
|
pdf_gcs_url = f"{GCS_PUBLIC_URL_PREFIX}{pdf_filename_extracted}"
|
|
pdf_filename_display = pdf_filename_extracted
|
|
elif pdf_filename_extracted:
|
|
pdf_filename_display = "Source is not a PDF"
|
|
else:
|
|
pdf_filename_display = "No source URL"
|
|
else:
|
|
pdf_filename_display = "No source URL"
|
|
|
|
hits_output.append({
|
|
"score": score_val,
|
|
"title": doc_meta.get('title', 'N/A'),
|
|
"snippet": raw_chunks[idx_in_lists][:500] + " ...",
|
|
"original_url_meta": original_url_from_meta,
|
|
"pdf_serve_url": pdf_gcs_url,
|
|
"pdf_filename_display": pdf_filename_display
|
|
})
|
|
seen_doc_main_ids.add(doc_main_id)
|
|
if len(hits_output) >= k:
|
|
break
|
|
|
|
if not hits_output:
|
|
return "Δεν βρέθηκαν σχετικά αποτελέσματα."
|
|
|
|
output_md = f"Βρέθηκαν **{len(hits_output)}** σχετικά αποτελέσματα:\n\n"
|
|
for hit in hits_output:
|
|
output_md += f"### {hit['title']} (Score: {hit['score']:.3f})\n"
|
|
snippet_wrapped = textwrap.fill(hit['snippet'].replace("\n", " "), width=100)
|
|
output_md += f"**Απόσπασμα:** {snippet_wrapped}\n"
|
|
|
|
if hit['pdf_serve_url'] and hit['pdf_serve_url'] != '#':
|
|
output_md += f"**Πηγή (PDF):** <a href='{hit['pdf_serve_url']}' target='_blank'>{hit['pdf_filename_display']}</a>\n"
|
|
elif hit['original_url_meta'] and hit['original_url_meta'] != '#':
|
|
output_md += f"**Πηγή (αρχικό από metadata):** [{hit['original_url_meta']}]({hit['original_url_meta']})\n"
|
|
output_md += "---\n"
|
|
|
|
return output_md
|
|
print(">>> Ξεκινά έλεγχος 'Sanity Check' της ChromaDB στο Hugging Face Spaces <<<")
|
|
try:
|
|
|
|
sanity_db_path_str = "./chroma_db_sanity_check_on_hf"
|
|
sanity_db_path = Path(sanity_db_path_str)
|
|
|
|
|
|
if sanity_db_path.exists():
|
|
import shutil
|
|
print(f"--- Sanity Check: Deleting existing test DB at {sanity_db_path_str}")
|
|
shutil.rmtree(sanity_db_path_str)
|
|
|
|
sanity_db_path.mkdir(parents=True, exist_ok=True)
|
|
print(f"--- Sanity Check: Created directory for test DB at {sanity_db_path_str}")
|
|
|
|
sanity_client = chromadb.PersistentClient(path=str(sanity_db_path.resolve()))
|
|
sanity_collection_name = "my_sanity_test_collection"
|
|
|
|
|
|
try:
|
|
print(f"--- Sanity Check: Attempting to delete old sanity collection '{sanity_collection_name}' if it exists...")
|
|
sanity_client.delete_collection(name=sanity_collection_name)
|
|
print(f"--- Sanity Check: Old sanity collection '{sanity_collection_name}' deleted.")
|
|
except Exception as e_delete_coll:
|
|
print(f"--- Sanity Check: Could not delete old sanity collection (maybe it didn't exist): {e_delete_coll}")
|
|
pass
|
|
|
|
print(f"--- Sanity Check: Creating/getting new sanity collection '{sanity_collection_name}'...")
|
|
sanity_col = sanity_client.get_or_create_collection(name=sanity_collection_name)
|
|
print(f"--- Sanity Check: Sanity collection '{sanity_collection_name}' created/retrieved. Initial count: {sanity_col.count()}")
|
|
|
|
|
|
dummy_texts = ["αυτό είναι ένα πολύ απλό δοκιμαστικό κείμενο για έλεγχο"]
|
|
|
|
dummy_embeddings = cls_embed(dummy_texts, tok, model)
|
|
dummy_ids = ["sanity_test_id_001"]
|
|
dummy_metadatas = [{"source": "internal_sanity_test"}]
|
|
|
|
print(f"--- Sanity Check: Adding 1 item to sanity collection...")
|
|
sanity_col.add(
|
|
embeddings=dummy_embeddings.tolist(),
|
|
documents=dummy_texts,
|
|
ids=dummy_ids,
|
|
metadatas=dummy_metadatas
|
|
)
|
|
print(f"--- Sanity Check: Added 1 item. New count in sanity collection: {sanity_col.count()}")
|
|
|
|
|
|
print(f"--- Sanity Check: Querying sanity collection...")
|
|
query_results = sanity_col.query(
|
|
query_embeddings=dummy_embeddings.tolist(),
|
|
n_results=1,
|
|
include=["metadatas", "documents", "distances"]
|
|
)
|
|
print(f"--- Sanity Check: Sanity query successful. Result IDs: {query_results['ids']}")
|
|
print(">>> Έλεγχος 'Sanity Check' της ChromaDB ΟΛΟΚΛΗΡΩΘΗΚΕ ΕΠΙΤΥΧΩΣ στο HF Spaces! <<<")
|
|
|
|
except Exception as e_sanity:
|
|
print(f"!!! Έλεγχος 'Sanity Check' της ChromaDB ΑΠΕΤΥΧΕ στο HF Spaces: {e_sanity}")
|
|
print(f"!!! Πλήρες σφάλμα: {type(e_sanity).__name__}: {str(e_sanity)}")
|
|
print("--------------------------------------------------------------------")
|
|
|
|
|
|
print("🚀 Launching Gradio Interface for ChatbotVol107...")
|
|
iface = gr.Interface(
|
|
fn=hybrid_search_gradio,
|
|
inputs=gr.Textbox(lines=3, placeholder="Γράψε την ερώτησή σου εδώ...", label=f"Ερώτηση προς τον βοηθό (Μοντέλο: {MODEL_NAME.split('/')[-1]}):"),
|
|
outputs=gr.Markdown(label="Απαντήσεις από τα έγγραφα:", rtl=False, sanitize_html=False),
|
|
title=f"🏛️ Ελληνικό Chatbot Υβριδικής Αναζήτησης (ChatbotVol107 - {MODEL_NAME.split('/')[-1]})",
|
|
description=(f"Πληκτρολογήστε την ερώτησή σας για αναζήτηση. Χρησιμοποιεί το μοντέλο: {MODEL_NAME}.\n"
|
|
"Τα PDF ανοίγουν από εξωτερική πηγή (Google Cloud Storage) σε νέα καρτέλα."),
|
|
allow_flagging="never",
|
|
examples=[
|
|
["Ποια είναι τα μέτρα για τον κορονοϊό;", 5],
|
|
["Πληροφορίες για άδεια ειδικού σκοπού", 3],
|
|
["Τι προβλέπεται για τις μετακινήσεις εκτός νομού;", 5]
|
|
],
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
|
|
|
|
iface.launch(allowed_paths=["static_pdfs"]) |