import chromadb import uuid import os from chunker import llama_index_sentence_splitter from scrape import get_url_content from web_search import google client = chromadb.Client() COLLECTION_NAME = str.strip(os.getenv("RAG_INDEX", "default_index")) or "default_index" async def search(query: str, top_k: int = 5) -> list: """ Search the ChromaDB collection for documents similar to the query. Arguments: query (str): The search query. top_k (int): The number of top results to return. Returns: list: A list of dictionaries containing the search results, including documents and metadata. """ print("Searching ChromaDB collection for documents similar to the query.") if not query: raise ValueError("Query cannot be empty.") web_search = await google(q=f"{query} -filetype:pdf -site:youtube.com", results=2) _index_links([result["link"] for result in web_search["organic"]]) results = _search_k(query, top_k) print(f"Found {len(results['documents'])} documents matching the query.") return [ { "content": doc, "distance": distance, "metadata": metadata, } for i, (doc, metadata, distance) in enumerate(zip(results["documents"], results["metadatas"], results["distances"])) ] def _index_links(links: list) -> int: """ Index a list of URLs by adding their content to the ChromaDB collection. Arguments: links (list): A list of URLs to index. Returns: int: The total number of chunks added to the collection. """ from concurrent.futures import ThreadPoolExecutor print("Indexing multiple URLs:", links) with ThreadPoolExecutor() as executor: tasks = [lambda link=link: _index_url(link) for link in links] running_tasks = [executor.submit(task) for task in tasks] for running_task in running_tasks: running_task.result() total_chunks = sum(task.result() for task in running_tasks) print(f"Total chunks indexed from {len(links)} URLs: {total_chunks}") return total_chunks def _url_exists(url: str) -> bool: """ Check if a URL is already indexed in the ChromaDB collection. Arguments: url (str): The URL to check. Returns: bool: True if the URL is indexed, False otherwise. """ print("Checking if URL exists in the collection:", url) collection = _get_collection() # Check if the document with the given source exists exists = len(collection.get( where={"source": url}, limit=1, include=["documents"] ).get("documents", [])) > 0 print(f"URL {url} exists: {exists}") return exists def _index_url(url: str) -> int: """ Index a URL by adding its content to the ChromaDB collection. Arguments: url (str): The URL to index. Returns: int: The total number of chunks added to the collection. """ print("Indexing URL", url) if _url_exists(url): print(f"URL {url} is already indexed. Skipping indexing.") return 0 document = get_url_content(url) if not document: print("No content found at the provided URL.") return 0 total_chunks = _add_document_to_collection(document, url) print(f"Indexed {total_chunks} chunks from URL: {url}") return total_chunks def _get_collection() -> "chromadb.Collection": """ Get the collection from the ChromaDB client. :return: The collection object. """ collection = client.get_or_create_collection(COLLECTION_NAME) print(f"Using collection: {COLLECTION_NAME} with {collection.count()} indexed chunks") return collection def _add_document_to_collection(document: str, source: str): """ Adds a document to the ChromaDB collection. Args: document (str): The content of the document to be added. source (str): The source URI of the document.s Returns: The upserted document with its metadata in the collection. """ collection = _get_collection() document_chunks = llama_index_sentence_splitter( documents=[document], document_ids=[source], ) if not document_chunks: print("No document chunks were created. Please check the input document.") return 0 collection.upsert( ids=[str(uuid.uuid4().hex) for _ in document_chunks], documents=[chunk["content"] for chunk in document_chunks], metadatas=[ {"source": source, "chunk_id": i} for i in range(0, len(document_chunks)) ], ) return len(document_chunks) def _search_k(query: str, k: int = 5): """ Search the ChromaDB collection for the top k documents matching the query. Arguments: query (str): The search query. k (int): The number of top results to return. Returns: dict: A dictionary containing the search results, including documents and metadata. """ collection = _get_collection() results = collection.query( query_texts=[query], n_results=k, include=["documents", "metadatas", "distances"], ) if not results or not results.get("documents"): print("No results found for the query.") return { "documents": [], "metadatas": [], "distances": [] } query_results = { "documents": results["documents"][0], "metadatas": results["metadatas"][0], "distances": results["distances"][0] } return query_results