from typing import List, Dict, Any from collections import defaultdict import numpy as np def reciprocal_rank_fusion(semantic_results: List[Dict], bm25_results: List[Dict], k: int = 60) -> List[Dict]: """ Fuse two ranked lists using Reciprocal Rank Fusion (RRF). RRF score = sum(1 / (k + rank)) for each result across both lists. """ scores = defaultdict(float) all_results = {} # Add semantic results for rank, result in enumerate(semantic_results, start=1): chunk_id = result.get("chunk_id") or result.get("id", "") scores[chunk_id] += 1.0 / (k + rank) if chunk_id not in all_results: all_results[chunk_id] = result # Add BM25 results for rank, result in enumerate(bm25_results, start=1): chunk_id = result.get("chunk_id") or result.get("id", "") scores[chunk_id] += 1.0 / (k + rank) if chunk_id not in all_results: all_results[chunk_id] = result # Sort by RRF score fused = [(all_results[cid], score) for cid, score in scores.items()] fused.sort(key=lambda x: x[1], reverse=True) # Return results with RRF scores return [{"result": r[0], "rrf_score": r[1]} for r in fused] def hybrid_retrieve( query: str, indexes: Dict[str, Any], n_semantic: int = 20, n_bm25: int = 20, alpha: float = 0.5, beta: float = 0.5, ) -> List[Dict[str, Any]]: """ Run semantic + BM25 retrieval in parallel, fuse via RRF, return candidates. Args: query: User query indexes: Dict from load_indexes() with collection, bm25, embedder, etc. n_semantic: Number of semantic results to retrieve n_bm25: Number of BM25 results to retrieve alpha: Weight for semantic (not used in RRF, but kept for API consistency) beta: Weight for BM25 (not used in RRF, but kept for API consistency) Returns: List of fused results with RRF scores, sorted by relevance """ collection = indexes["collection"] embedder = indexes["embedder"] bm25 = indexes["bm25"] bm25_texts = indexes["bm25_texts"] bm25_metas = indexes["bm25_metas"] bm25_ids = indexes["bm25_ids"] # Semantic retrieval query_embedding = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True)[0] semantic_results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=n_semantic, include=["documents", "metadatas", "distances"] ) semantic_candidates = [] if semantic_results["ids"] and len(semantic_results["ids"][0]) > 0: for i, doc_id in enumerate(semantic_results["ids"][0]): semantic_candidates.append({ "chunk_id": doc_id, "text": semantic_results["documents"][0][i], "metadata": semantic_results["metadatas"][0][i], "distance": semantic_results["distances"][0][i], "score": 1.0 - semantic_results["distances"][0][i], # Convert distance to similarity }) # BM25 retrieval query_tokens = [w.lower() for w in query.split()] bm25_scores = bm25.get_scores(query_tokens) top_bm25_indices = np.argsort(bm25_scores)[::-1][:n_bm25] bm25_candidates = [] for idx in top_bm25_indices: if bm25_scores[idx] > 0: # Only include results with positive scores bm25_candidates.append({ "chunk_id": bm25_ids[idx], "text": bm25_texts[idx], "metadata": bm25_metas[idx], "bm25_score": float(bm25_scores[idx]), }) # Fuse using RRF fused_results = reciprocal_rank_fusion(semantic_candidates, bm25_candidates) # Format output output = [] for item in fused_results: result = item["result"] output.append({ "chunk_id": result.get("chunk_id"), "text": result.get("text"), "metadata": result.get("metadata"), "rrf_score": item["rrf_score"], "semantic_score": result.get("score"), "bm25_score": result.get("bm25_score"), }) return output