# backendv1.py # VÉGLEGES, JAVÍTOTT VERZIÓ: Elastic Cloud és GitHub Secrets kompatibilis. # A RAG rendszer motorja: adatfeldolgozás, keresés, generálás és tanulás. # JAVÍTVA: A kategória-alapú szűrés ideiglenesen kikapcsolva a megbízhatóbb eredmények érdekében. import os import time import datetime import traceback import re from collections import defaultdict from elasticsearch import Elasticsearch, exceptions as es_exceptions import torch from sentence_transformers import SentenceTransformer from sentence_transformers.cross_encoder import CrossEncoder from spellchecker import SpellChecker from dotenv import load_dotenv import sys import nltk from concurrent.futures import ThreadPoolExecutor # Késleltetett importálás, hogy csak akkor legyen hiba, ha tényleg használjuk try: from together import Together TOGETHER_AVAILABLE = True except ImportError: TOGETHER_AVAILABLE = False # === ANSI Színkódok (konzol loggoláshoz) === GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' RESET = '\033[0m' BLUE = '\033[94m' CYAN = '\033[96m' MAGENTA = '\033[95m' # --- Konfiguráció --- # A hitelesítő adatok a környezeti változókból kerülnek beolvasásra. CONFIG = { "VECTOR_INDEX_NAMES": ["duna", "dunawebindexai"], "FEEDBACK_INDEX_NAME": "feedback_index", "ES_CLIENT_TIMEOUT": 90, "EMBEDDING_MODEL_NAME": 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2', "CROSS_ENCODER_MODEL_NAME": 'cross-encoder/mmarco-mMiniLMv2-L12-H384-v1', "TOGETHER_MODEL_NAME": "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", "QUERY_EXPANSION_MODEL": "mistralai/Mixtral-8x7B-Instruct-v0.1", "LLM_CLIENT_TIMEOUT": 120, "NUM_CONTEXT_RESULTS": 5, "RE_RANK_CANDIDATE_COUNT": 50, "RRF_RANK_CONSTANT": 60, "INITIAL_SEARCH_SIZE": 150, "KNN_NUM_CANDIDATES": 200, "MAX_GENERATION_TOKENS": 1024, "GENERATION_TEMPERATURE": 0.6, "USE_QUERY_EXPANSION": True, "SPELLCHECK_LANG": 'hu', "MAX_HISTORY_TURNS": 3 } # --- Segédfüggvények --- def correct_spellings(text, spell_checker_instance): if not spell_checker_instance: return text try: words = re.findall(r'\b\w+\b', text.lower()) misspelled = spell_checker_instance.unknown(words) if not misspelled: return text corrected_text = text for word in misspelled: correction = spell_checker_instance.correction(word) if correction and correction != word: corrected_text = re.sub(r'\b' + re.escape(word) + r'\b', corrected_text, flags=re.IGNORECASE) return corrected_text except Exception as e: print(f"{RED}Hiba a helyesírás javítása közben: {e}{RESET}") return text def get_query_category_with_llm(client, query): if not client: return 'egyéb' print(f" {CYAN}-> Lekérdezés kategorizálása LLM-mel...{RESET}") category_list = ['IT biztonsági szolgáltatások', 'szolgáltatások', 'hardver', 'szoftver', 'hírek', 'audiovizuális konferenciatechnika'] categories_text = ", ".join([f"'{cat}'" for cat in category_list]) prompt = f"""Adott egy felhasználói kérdés. Adj meg egyetlen, rövid kategóriát a következő listából, ami a legjobban jellemzi a kérdést. A válaszodban csak a kategória szerepeljen, más szöveg nélkül. Lehetséges kategóriák: {categories_text} Kérdés: '{query}' Kategória:""" messages = [{"role": "user", "content": prompt}] try: response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.1, max_tokens=30) if response and response.choices: category = response.choices[0].message.content.strip().replace("'", "").replace("`", "") for cat in category_list: if cat.lower() in category.lower(): print(f" {GREEN}-> A kérdés LLM által generált kategóriája: '{cat}'{RESET}") return cat.lower() return 'egyéb' except Exception as e: print(f"{RED}Hiba LLM kategorizáláskor: {e}{RESET}") return 'egyéb' def expand_or_rewrite_query(original_query, client): final_queries = [original_query] if not (CONFIG["USE_QUERY_EXPANSION"] and client): return final_queries print(f" {BLUE}-> Lekérdezés bővítése/átírása...{RESET}") prompt = f"Adott egy magyar nyelvű felhasználói kérdés: '{original_query}'. Generálj 2 db alternatív, releváns keresőkifejezést. A válaszodban csak ezeket add vissza, vesszővel (,) elválasztva, minden más szöveg nélkül." messages = [{"role": "user", "content": prompt}] try: response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.5, max_tokens=100) if response and response.choices: generated_text = response.choices[0].message.content.strip() alternatives = [q.strip().replace('"', '').replace("'", '').replace('.', '') for q in generated_text.split(',') if q.strip() and q.strip() != original_query] final_queries.extend(alternatives) print(f" {GREEN}-> Bővített lekérdezések: {final_queries}{RESET}") except Exception as e: print(f"{RED}Hiba a lekérdezés bővítése során: {e}{RESET}") return final_queries def run_separate_searches(es_client, query_text, embedding_model, expanded_queries, query_category=None): results = {'knn': {}, 'keyword': {}} es_client_with_timeout = es_client.options(request_timeout=CONFIG["ES_CLIENT_TIMEOUT"]) source_fields = ["text_content", "source_url", "summary", "category"] filters = [] ### JAVÍTÁS ### # A kategória-alapú szűrés ideiglenesen ki van kapcsolva, mert pontatlan # kategorizálás esetén drasztikusan rontja a találatok minőségét. # A keresés így a teljes adatbázisban fut, ami megbízhatóbb. # # if query_category and query_category != 'egyéb': # print(f" {MAGENTA}-> Kategória-alapú szűrés hozzáadása a kereséshez: '{query_category}'{RESET}") # filters.append({"match": {"category": query_category}}) def knn_search(index, query_vector): try: knn_query = {"field": "embedding", "query_vector": query_vector, "k": CONFIG["INITIAL_SEARCH_SIZE"], "num_candidates": CONFIG["KNN_NUM_CANDIDATES"], "filter": filters} response = es_client_with_timeout.search(index=index, knn=knn_query, _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) return index, response.get('hits', {}).get('hits', []) except Exception as e: print(f"{RED}Hiba kNN keresés során ({index}): {e}{RESET}") return index, [] def keyword_search(index, expanded_queries): try: should_clauses = [{"match": {"text_content": {"query": q, "operator": "OR", "fuzziness": "AUTO"}}} for q in expanded_queries] query_body = {"query": {"bool": {"should": should_clauses, "minimum_should_match": 1, "filter": filters}}} response = es_client_with_timeout.search(index=index, query=query_body['query'], _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) return index, response.get('hits', {}).get('hits', []) except Exception as e: print(f"{RED}Hiba kulcsszavas keresés során ({index}): {e}{RESET}") return index, [] query_vector = embedding_model.encode(query_text, normalize_embeddings=True).tolist() if embedding_model else None with ThreadPoolExecutor(max_workers=len(CONFIG["VECTOR_INDEX_NAMES"]) * 2) as executor: knn_futures = {executor.submit(knn_search, index, query_vector) for index in CONFIG["VECTOR_INDEX_NAMES"] if query_vector} keyword_futures = {executor.submit(keyword_search, index, expanded_queries) for index in CONFIG["VECTOR_INDEX_NAMES"]} for future in knn_futures: index, hits = future.result() results['knn'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] for future in keyword_futures: index, hits = future.result() results['keyword'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] total_knn_hits = sum(len(h) for h in results['knn'].values()) total_keyword_hits = sum(len(h) for h in results['keyword'].values()) print(f"{CYAN}Vektorkeresési találatok száma: {total_knn_hits}{RESET}") print(f"{CYAN}Kulcsszavas keresési találatok száma: {total_keyword_hits}{RESET}") return results def merge_results_rrf(search_results): rrf_scores = defaultdict(float) all_hits_data = {} for search_type in search_results: for index_name in search_results[search_type]: for rank, hit in search_results[search_type][index_name]: doc_id = hit['_id'] rrf_scores[doc_id] += 1.0 / (CONFIG["RRF_RANK_CONSTANT"] + rank) if doc_id not in all_hits_data: all_hits_data[doc_id] = hit combined_results = sorted([(doc_id, score, all_hits_data[doc_id]) for doc_id, score in rrf_scores.items()], key=lambda item: item[1], reverse=True) print(f"{CYAN}RRF által rangsorolt Top 5 pontszám: {[f'{score:.4f}' for doc_id, score, hit in combined_results[:5]]}{RESET}") return combined_results def retrieve_context_reranked(backend, query_text, confidence_threshold, fallback_message, query_category): expanded_queries = expand_or_rewrite_query(query_text, backend["llm_client"]) search_results = run_separate_searches(backend["es_client"], query_text, backend["embedding_model"], expanded_queries, query_category) merged_results = merge_results_rrf(search_results) if not merged_results: return fallback_message, [], None candidates_to_rerank = merged_results[:CONFIG["RE_RANK_CANDIDATE_COUNT"]] hits_data_for_reranking = [hit for _, _, hit in candidates_to_rerank] query_chunk_pairs = [[query_text, hit['_source'].get('summary', hit['_source'].get('text_content'))] for hit in hits_data_for_reranking if hit and '_source' in hit] ranked_by_ce = [] if backend["cross_encoder"] and query_chunk_pairs: ce_scores = backend["cross_encoder"].predict(query_chunk_pairs, show_progress_bar=False) ranked_by_ce = sorted(zip(ce_scores, hits_data_for_reranking), key=lambda x: x[0], reverse=True) print(f"{CYAN}Cross-Encoder pontszámok (Top 5):{RESET} {[f'{score:.4f}' for score, _ in ranked_by_ce[:5]]}") if not ranked_by_ce: return fallback_message, [], None top_score = float(ranked_by_ce[0][0]) if top_score < confidence_threshold: dynamic_fallback = (f"{fallback_message}\n\nA '{query_text}' kérdésre a legjobb találat megbízhatósági pontszáma ({top_score:.2f}) nem érte el a beállított küszöböt ({confidence_threshold:.2f}).") return dynamic_fallback, [], top_score final_hits_for_context = [hit for _, hit in ranked_by_ce[:CONFIG["NUM_CONTEXT_RESULTS"]]] context_parts = [hit['_source'].get('summary', hit['_source'].get('text_content')) for hit in final_hits_for_context] context_string = "\n\n---\n\n".join(context_parts) sources = [{"url": hit['_source'].get('source_url', '?'), "content": hit['_source'].get('text_content', 'N/A')} for hit in final_hits_for_context] return context_string, sources, top_score def generate_answer_with_history(client, model_name, messages, temperature): if not client: return "Hiba: Az AI kliens nincs inicializálva." try: response = client.chat.completions.create(model=model_name, messages=messages, temperature=temperature, max_tokens=CONFIG["MAX_GENERATION_TOKENS"], timeout=CONFIG["LLM_CLIENT_TIMEOUT"]) if response and response.choices: return response.choices[0].message.content.strip() return "Hiba: Nem érkezett érvényes válasz az AI modelltől." except Exception as e: print(f"{RED}Hiba a válasz generálásakor: {e}{RESET}") return "Hiba történt az AI modell hívásakor." def search_in_feedback_index(es_client, embedding_model, question, min_score=0.75): try: if not es_client.indices.exists(index=CONFIG["FEEDBACK_INDEX_NAME"]): return None, None embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() knn_query = {"field": "embedding", "query_vector": embedding, "k": 1, "num_candidates": 10} response = es_client.search(index=CONFIG["FEEDBACK_INDEX_NAME"], knn=knn_query, _source=["question_text", "correction_text"]) hits = response.get('hits', {}).get('hits', []) if hits and hits[0]['_score'] >= min_score: top_hit = hits[0]; source = top_hit['_source']; score = top_hit['_score'] if score > 0.98: return "direct_answer", source['correction_text'] instruction = f"Egy nagyon hasonló kérdésre ('{source['question_text']}') korábban a következő javítást/iránymutatást adtad: '{source['correction_text']}'. A válaszodat elsősorban ez alapján alkosd meg!" return "instruction", instruction except Exception: return None, None return None, None def index_feedback(es_client, embedding_model, question, correction): try: embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() doc = {"question_text": question, "correction_text": correction, "embedding": embedding, "timestamp": datetime.datetime.now()} es_client.index(index=CONFIG["FEEDBACK_INDEX_NAME"], document=doc) return True except Exception as e: print(f"{RED}Hiba a visszajelzés indexelése során: {e}{RESET}") return False def get_all_feedback(es_client, index_name): try: if not es_client.indices.exists(index=index_name): return [] response = es_client.search(index=index_name, query={"match_all": {}}, size=1000, sort=[{"timestamp": {"order": "desc"}}]) return response.get('hits', {}).get('hits', []) except Exception as e: print(f"{RED}Hiba a visszajelzések listázása során: {e}{RESET}") return [] def delete_feedback_by_id(es_client, index_name, doc_id): try: es_client.delete(index=index_name, id=doc_id) return True except Exception as e: print(f"{RED}Hiba a visszajelzés törlése során (ID: {doc_id}): {e}{RESET}") return False def update_feedback_comment(es_client, index_name, doc_id, new_comment): try: es_client.update(index=index_name, id=doc_id, doc={"correction_text": new_comment}) return True except Exception as e: print(f"{RED}Hiba a visszajelzés szerkesztése során (ID: {doc_id}): {e}{RESET}") return False def initialize_backend(): print("----- Backend Motor Inicializálása -----") load_dotenv() es_cloud_id = os.getenv("ES_CLOUD_ID") es_api_key = os.getenv("ES_API_KEY") together_api_key = os.getenv("TOGETHER_API_KEY") if not all([es_cloud_id, es_api_key, together_api_key]): print(f"{RED}Hiba: Hiányzó környezeti változók! Szükséges: ES_CLOUD_ID, ES_API_KEY, TOGETHER_API_KEY{RESET}") return None if not TOGETHER_AVAILABLE: print(f"{RED}Hiba: A 'together' csomag nincs telepítve.{RESET}") return None try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt', quiet=True) spell_checker = None try: spell_checker = SpellChecker(language=CONFIG["SPELLCHECK_LANG"]) custom_words = ["dunaelektronika", "kft", "outsourcing", "dell", "lenovo", "nis2", "szerver", "kliens", "hálózati", "hpe"] spell_checker.word_frequency.load_words(custom_words) except Exception as e: print(f"{RED}Helyesírás-ellenőrző hiba: {e}{RESET}") try: print(f"{CYAN}Elasticsearch kliens inicializálása...{RESET}") es_client = Elasticsearch(cloud_id=es_cloud_id, api_key=es_api_key, request_timeout=CONFIG["ES_CLIENT_TIMEOUT"]) if not es_client.ping(): raise ConnectionError("Elasticsearch ping sikertelen.") print(f"{GREEN}Elasticsearch kliens kész.{RESET}") print(f"{CYAN}AI modellek betöltése...{RESET}") device = 'cuda' if torch.cuda.is_available() else 'cpu' embedding_model = SentenceTransformer(CONFIG["EMBEDDING_MODEL_NAME"], device=device) cross_encoder = CrossEncoder(CONFIG["CROSS_ENCODER_MODEL_NAME"], device=device) llm_client = Together(api_key=together_api_key) print(f"{GREEN}AI modellek betöltve (eszköz: {device}).{RESET}") backend_objects = { "es_client": es_client, "embedding_model": embedding_model, "cross_encoder": cross_encoder, "llm_client": llm_client, "spell_checker": spell_checker } print(f"{GREEN}----- Backend Motor Készen Áll -----{RESET}") return backend_objects except Exception as e: print(f"{RED}Hiba a backend inicializálása során: {e}{RESET}") traceback.print_exc() return None def process_query(user_question, chat_history, backend, confidence_threshold, fallback_message): print(f"\n{BLUE}----- Új lekérdezés feldolgozása ----{RESET}") print(f"{BLUE}Kérdés: {user_question}{RESET}") corrected_question = correct_spellings(user_question, backend["spell_checker"]) print(f"{BLUE}Javított kérdés: {corrected_question}{RESET}") feedback_type, feedback_content = search_in_feedback_index(backend["es_client"], backend["embedding_model"], corrected_question) if feedback_type == "direct_answer": print(f"{GREEN}Direkt válasz a visszajelzési adatbázisból.{RESET}") return {"answer": feedback_content, "sources": [{"url": "Személyes visszajelzés alapján", "content": "Ez egy korábban megadott, pontosított válasz."}], "corrected_question": corrected_question, "confidence_score": 10.0} feedback_instructions = feedback_content if feedback_type == "instruction" else "" query_category = get_query_category_with_llm(backend["llm_client"], corrected_question) retrieved_context, sources, confidence_score = retrieve_context_reranked(backend, corrected_question, confidence_threshold, fallback_message, query_category) if not sources and not feedback_instructions: return {"answer": retrieved_context, "sources": [], "corrected_question": corrected_question, "confidence_score": confidence_score} system_prompt = f"""Te egy professzionális, segítőkész AI asszisztens vagy. A feladatod, hogy a KONTEXTUS-ból és a FEJLESZTŐI UTASÍTÁSOKBól származó információkat egyetlen, jól strukturált és ismétlés-mentes válasszá szintetizálld. {feedback_instructions} KRITIKUS SZABÁLY: Értékeld a kapott KONTEXTUS relevanciáját a felhasználó kérdéséhez képest. Ha egy kontextus-részlet nem kapcsolódik szorosan a kérdéshez, azt hagyd figyelmen kívül! FIGYELEM: Szigorúan csak a megadott KONTEXTUS-ra és a fejlesztői utasításokra támaszkodj. Ha a releváns információk alapján nem tudsz válaszolni, add ezt a választ: '{fallback_message}' KONTEXTUS: --- {retrieved_context if sources else "A tudásbázisban nem található releváns információ."} --- """ messages_for_llm = chat_history[-(CONFIG["MAX_HISTORY_TURNS"] * 2):] if chat_history else [] messages_for_llm.extend([{"role": "system", "content": system_prompt}, {"role": "user", "content": corrected_question}]) answer = generate_answer_with_history(backend["llm_client"], CONFIG["TOGETHER_MODEL_NAME"], messages_for_llm, CONFIG["GENERATION_TEMPERATURE"]) return {"answer": answer, "sources": sources, "corrected_question": corrected_question, "confidence_score": confidence_score}