import os import numpy as np import yaml from docx import Document from langchain_pinecone import PineconeVectorStore from langchain_community.embeddings import HuggingFaceBgeEmbeddings from ragatouille import RAGPretrainedModel from data_ingester import ChatbotDataIngester from data_query import ChatbotDataQuery from getpass import getpass from pinecone import Pinecone, ServerlessSpec import torch import torch.nn.functional as F from transformers import AutoModel from sklearn.metrics.pairwise import cosine_similarity from openai import OpenAI from PRESET_QUERIES import Queries, Query_Doc_Map from data_query import generate_openai_response from dotenv import load_dotenv load_dotenv() class RAGChatbot: def __init__(self, pinecone_api_key=None, index_name="test-index", config_path="../config.yml"): """ Initialize the RAGChatbot. Handles embeddings, vector store, data ingestion, and query. """ self.pinecone_api_key = pinecone_api_key or os.getenv("PINECONE_API_KEY")# or getpass("Enter your Pinecone API key: ") self.index_name = index_name self.embeddings = self.initialize_embeddings() self.dimensions = len(self.embeddings.embed_query("Hello World!")) self.vector_store = self.initialize_vector_store() self.data_ingester = ChatbotDataIngester(vector_store=self.vector_store, embeddings=self.embeddings) self.data_query = ChatbotDataQuery(vector_store=self.vector_store) self.reranker = self.initialize_reranker() self.openai_api_key = os.getenv("OPENAI_API_KEY") self.client = OpenAI(api_key=self.openai_api_key) def load_config(self, config_path): """ Load the configuration file (config.yml). """ with open(config_path, 'r') as file: return yaml.safe_load(file) def initialize_embeddings(self): """ Initialize the embedding model based on the config file. """ model_name = "BAAI/bge-large-en-v1.5" model_kwargs = {"device": "cuda" if torch.cuda.is_available() else "cpu"} encode_kwargs = {"normalize_embeddings": True} hf = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs) return hf def initialize_reranker(self): """ Initialize the reranker """ return RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0") def initialize_vector_store(self): """ Initialize Pinecone vector store. """ pc = Pinecone(api_key=self.pinecone_api_key) existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] if self.index_name not in existing_indexes: pc.create_index( name=self.index_name, dimension=self.dimensions, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) while not pc.describe_index(self.index_name).status["ready"]: import time time.sleep(1) return PineconeVectorStore(index=pc.Index(self.index_name), embedding=self.embeddings) def ingest_data(self, dir_path, empty=False): """ Ingest data from a directory using the ChatbotDataIngester. """ self.data_ingester.load_and_ingest(dir_path, empty_db=empty) def __route(self, query_text): """ Route the query to either GPT-4 or RAG depending on GPT-4's response. """ query_text = query_text.lower() with open('QUESTIONS.txt', 'r') as f: phrases = f.read().splitlines() if phrases and not any(phrase.lower() in query_text for phrase in phrases): def cosine_similarity_calc(vec1, vec2): vec1 = np.array(vec1).reshape(1, -1) vec2 = np.array(vec2).reshape(1, -1) return cosine_similarity(vec1, vec2)[0][0] def get_embeddings(client, text): response = client.embeddings.create( input=text, model="text-embedding-3-large" ) return response.data[0].embedding # Generate embeddings for the incoming query query_embedding = get_embeddings(self.client, query_text) best_match = None highest_similarity = 0 for main_query, similar_queries in Queries.items(): for query in similar_queries: query = query.lower() preset_embedding = get_embeddings(self.client, query) similarity_score = cosine_similarity_calc(query_embedding, preset_embedding) if similarity_score > highest_similarity: highestquery_text_similarity = similarity_score best_match = main_query if highest_similarity >= 0.75: print(f'Response from RAG routing: query_text: {query_text} - best_match query: {best_match} - Doc: {Query_Doc_Map[best_match][0]} - similarity: {highest_similarity}') response, file_path = self.__generate_response_from_file(query_text, Query_Doc_Map[best_match][0]) return response, file_path else: return None, None else: response = '''Hello! My name is Wagner, inspired by the character from Goethe’s Faust. In the play, Wagner is Faust’s loyal assistant, supporting his intellectual pursuits, but in a more concentrated way. Similarly, my task is to assist with Daniel Rangel\'s research in artificial intelligence and marketing. I’m well-versed in Daniel’s publications, his ongoing research, CV, and academic achievements, and my mission is to provide precise, well-structured information about his academic career. While I may not have lofty aspirations like transforming the world, I’m committed to representing Daniel’s work within a defined scope. I aim to assist with inquiries regarding Daniel’s research, teaching, and professional path, and I might even share personal insights if treated with respect.''' return response, 'None' def __generate_response_from_file(self, query_text, file_path): """ Generate response from a file. """ def read_docx(file_path): doc = Document(file_path) full_text = [] for paragraph in doc.paragraphs: full_text.append(paragraph.text) return '\n'.join(full_text) file_content = read_docx(os.path.join('./Data', file_path)) system_prompt = ''' You are an intelligent assistant designed to provide clear, accurate, and helpful responses. Focus on understanding user intent, give concise answers, and offer step-by-step solutions when necessary. Be friendly, professional, and avoid unnecessary information.\n''' input_prompt = f'Query: {query_text}\nContext: {file_content}' response = generate_openai_response(input_prompt, system_prompt) return response.split('\n')[1], os.path.join('./Data', file_path) def query_chatbot(self, query_text, k=1, rerank=False, past_messages=[]): #, fetch_k=2, lambda_mult=0.5 """ Query the chatbot using the provided query text and optional search parameters. """ if past_messages: past_messages = "\n".join([f"{message['role']}: {message['content']}" for message in past_messages]) query_text = f"Past Chat History:\n{past_messages}\n{query_text}" route_response, file_path = self.__route(query_text) if route_response == None: if rerank: response, context_docs = self.data_query.query( query_text=query_text, k=k, reranker=self.reranker ) else: response = self.data_query.query( query_text=query_text, k=k, ) return response, context_docs else: return route_response, file_path