import os from typing import List, Optional from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter try: from semantic_text_splitter import TextSplitter as SemanticTextSplitter # type: ignore _HAS_SEMANTIC = True except ImportError: # graceful fallback if package missing _HAS_SEMANTIC = False from langchain_core.documents import Document from .embeddings import get_embedding_model from .config import ( FAISS_INDEX_PATH, HF_DATASET_REPO_ID, HF_DATASET_REPO_TYPE, FAISS_INDEX_FILES, ) from pathlib import Path from typing import Tuple def _ensure_local_faiss_from_hub(index_dir: str) -> bool: """Download FAISS index files from Hugging Face Hub dataset repo if missing. Returns True if files are present (downloaded or already existed), False otherwise. """ target = Path(index_dir) target.mkdir(parents=True, exist_ok=True) faiss_name, pkl_name = FAISS_INDEX_FILES faiss_path = target / faiss_name pkl_path = target / pkl_name if faiss_path.exists() and pkl_path.exists(): return True try: from huggingface_hub import hf_hub_download, list_repo_files def _download_pair(faiss_fname: str, meta_fname: str) -> bool: try: hf_hub_download( repo_id=HF_DATASET_REPO_ID, repo_type=HF_DATASET_REPO_TYPE, filename=faiss_fname, local_dir=str(target), local_dir_use_symlinks=False, ) hf_hub_download( repo_id=HF_DATASET_REPO_ID, repo_type=HF_DATASET_REPO_TYPE, filename=meta_fname, local_dir=str(target), local_dir_use_symlinks=False, ) return (target / faiss_fname).exists() and (target / meta_fname).exists() except Exception: return False # First try configured names if _download_pair(faiss_name, pkl_name): return True # Fallback: auto-discover by listing repository files try: files = list_repo_files(repo_id=HF_DATASET_REPO_ID, repo_type=HF_DATASET_REPO_TYPE) except Exception as e: print(f"[FAISS download] list_repo_files failed for {HF_DATASET_REPO_ID}: {e}") files = [] faiss_candidates = [f for f in files if f.lower().endswith('.faiss')] meta_candidates = [ f for f in files if f.lower().endswith('.pkl') or f.lower().endswith('.pickle') ] if faiss_candidates and meta_candidates: # Take the first candidates cand_faiss = faiss_candidates[0] cand_meta = meta_candidates[0] if _download_pair(cand_faiss, cand_meta): return True print( f"[FAISS download] Could not find/download FAISS pair in {HF_DATASET_REPO_ID}. " f"Looked for {faiss_name} and {pkl_name}, candidates: {faiss_candidates} / {meta_candidates}" ) return False except Exception as e: print(f"[FAISS download] Could not fetch from Hub ({HF_DATASET_REPO_ID}): {e}") return False def _semantic_chunk_documents( documents: List[Document], chunk_size: int, chunk_overlap: int ) -> List[Document]: # Newer versions expose factory; fallback to direct init if hasattr(SemanticTextSplitter, "from_tiktoken_encoder"): splitter = SemanticTextSplitter.from_tiktoken_encoder( chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) else: # try simple init signature splitter = SemanticTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) semantic_chunks: List[Document] = [] for d in documents: try: parts = splitter.chunks(d.page_content) except AttributeError: # Fallback: naive sentence-ish split parts = d.page_content.split('. ') for part in parts: cleaned = part.strip() if cleaned: semantic_chunks.append( Document(page_content=cleaned, metadata=d.metadata) ) return semantic_chunks def _chunk_documents( documents: List[Document], method: str = "recursive", chunk_size: int = 1000, chunk_overlap: int = 120 ): if method == "semantic" and _HAS_SEMANTIC: try: return _semantic_chunk_documents(documents, chunk_size, chunk_overlap) except Exception as e: print(f"[semantic chunking fallback] {e}; reverting to recursive splitter.") # fallback / default splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True ) return splitter.split_documents(documents) def build_or_load_vectorstore( documents: List[Document], force_rebuild: bool = False, chunk_method: str = "recursive", # or "semantic" chunk_size: int = 1000, chunk_overlap: int = 120 ): # Ensure local index exists (download from Hub if needed) if not os.path.exists(FAISS_INDEX_PATH): fetched = _ensure_local_faiss_from_hub(FAISS_INDEX_PATH) if fetched: print(f"Downloaded FAISS index from Hub into {FAISS_INDEX_PATH}") if os.path.exists(FAISS_INDEX_PATH) and not force_rebuild: print(f"Loading existing FAISS index from {FAISS_INDEX_PATH}...") try: vectorstore = FAISS.load_local( FAISS_INDEX_PATH, get_embedding_model(), allow_dangerous_deserialization=True ) print("Vector store loaded successfully.") return vectorstore except Exception as e: print(f"Failed to load FAISS index due to: {e}") if not documents: raise RuntimeError( "Existing FAISS index is incompatible with current libraries and no documents were " "provided to rebuild it. Delete 'faiss_index' and rebuild, or pass documents to rebuild." ) from e print("Rebuilding FAISS index from provided documents...") print("Building FAISS index (force_rebuild=%s, method=%s)..." % (force_rebuild, chunk_method)) splits = _chunk_documents( documents, method=chunk_method, chunk_size=chunk_size, chunk_overlap=chunk_overlap ) print(f"Split {len(documents)} docs into {len(splits)} chunks (method={chunk_method}).") vectorstore = FAISS.from_documents(splits, get_embedding_model()) vectorstore.save_local(FAISS_INDEX_PATH) print(f"Vector store created and saved to {FAISS_INDEX_PATH}") return vectorstore def build_filtered_retriever(vectorstore, primary_category: Optional[str] = None, k: int = 3): base = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": k}) if not primary_category: return base # Simple wrapper applying post-filtering by metadata; could be replaced by a VectorStore-specific filter if supported def _get_relevant_documents(query): docs = base.get_relevant_documents(query) return [d for d in docs if d.metadata.get("primary_category") == primary_category] base.get_relevant_documents = _get_relevant_documents # monkey patch return base