Spaces:
Sleeping
Sleeping
feat: update config.yaml and chatbot logic to enhance budget handling and improve message formatting
ab998fb
| import os | |
| from dotenv import load_dotenv | |
| from .ConnectorStrategy import ConnectorStrategy | |
| from pinecone import Pinecone, ServerlessSpec | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_pinecone import PineconeVectorStore | |
| from langchain_core.documents import Document | |
| import unicodedata | |
| import time | |
| class PineconeConnector(ConnectorStrategy): | |
| def __init__(self): | |
| load_dotenv() | |
| pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
| self.index_name = os.environ.get("PINECONE_INDEX_NAME") | |
| self.namespace = os.environ.get("PINECONE_NAMESPACE") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
| if self.index_name not in existing_indexes: | |
| pc.create_index( | |
| name=self.index_name, | |
| dimension=3072, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| while not pc.describe_index(self.index_name).status["ready"]: | |
| time.sleep(1) | |
| self.index = pc.Index(self.index_name) | |
| def getDocs(self): | |
| # Simulate getting docs from Pinecone | |
| docs_names = [] | |
| for ids in self.index.list(namespace=self.namespace): | |
| for id in ids: | |
| name_doc = "_".join(id.split("_")[:-1]) | |
| if name_doc not in docs_names: | |
| docs_names.append(name_doc) | |
| return docs_names | |
| def addDoc(self, filename, text_chunks, embedding): | |
| try: | |
| vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) | |
| file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() | |
| documents = [] | |
| uuids = [] | |
| for i, chunk in enumerate(text_chunks): | |
| clean_filename = remove_non_standard_ascii(file_name) | |
| uuid = f"{clean_filename}_{i}" | |
| document = Document( | |
| page_content=chunk, | |
| metadata={ "filename":filename, "chunk_id":uuid }, | |
| ) | |
| uuids.append(uuid) | |
| documents.append(document) | |
| vector_store.add_documents(documents=documents, ids=uuids) | |
| return {"filename_id":clean_filename} | |
| except Exception as e: | |
| print(e) | |
| return False | |
| def retriever(self, query, embedding): | |
| vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) | |
| retriever = vector_store.as_retriever( | |
| search_type="similarity_score_threshold", | |
| search_kwargs={"k": 5, "score_threshold": 0.6}, | |
| ) | |
| return retriever.invoke(query) | |
| def remove_non_standard_ascii(input_string: str) -> str: | |
| normalized_string = unicodedata.normalize('NFKD', input_string) | |
| return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?') | |