Spaces:
Running
Running
| # %% | |
| from typing import List, TypedDict | |
| from langchain_core.messages import BaseMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langchain_core.runnables import RunnableLambda | |
| from langchain_qdrant import QdrantVectorStore | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langgraph.graph import StateGraph, END | |
| from langchain_mistralai import ChatMistralAI | |
| import time | |
| import os | |
| from dotenv import load_dotenv | |
| from qdrant_client import QdrantClient | |
| from app.mongodb import log_chat | |
| load_dotenv() | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| session_histories: dict[str, list] = {} | |
| # %% | |
| LLM_MODEL = "mistral-medium-latest" | |
| OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY') | |
| COLLECTION_NAME = "chatbot_context" | |
| EMBEDDING_MODEL = "intfloat/e5-base-v2" | |
| QDRANT_URL = os.getenv('QDRANT_URL') | |
| QDRANT_API_KEY = os.getenv('QDRANT_API_KEY') | |
| SUPABASE_URL = os.getenv('SUPABASE_URL') | |
| SUPABASE_KEY = os.getenv('SUPABASE_KEY') | |
| MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY') | |
| FAQ_COLLECTION = "auro_faqs" | |
| BLOGS_COLLECTION = "auro_blogs" | |
| TECHNOLOGY_COLLECTION = "auro_technology" | |
| REVOLUTION_COLLECTION = "auro_revolution" | |
| SUPPORT_COLLECTION = "auro_support" | |
| PRODUCT_COLLECTION = "auro_product" | |
| # %% | |
| llm = ChatMistralAI( | |
| model_name=LLM_MODEL, | |
| api_key=MISTRAL_API_KEY, | |
| ) | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) | |
| try: | |
| client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) | |
| print(f"Qdrant Collections: {client.get_collections()}") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to connect to Qdrant: {e}") | |
| # %% | |
| class GraphState(TypedDict): | |
| """ | |
| Represents the state of a chat session, including input, output, history, memory, | |
| response, tool results, and user role for LangGraph | |
| """ | |
| input: str | |
| history: List[BaseMessage] #list of past messages | |
| response: str | |
| tool_results: dict | |
| from pydantic import BaseModel | |
| class ToolInput(BaseModel): | |
| prompt: str | |
| iteration: int = 1 | |
| # %% | |
| template = """ | |
| You are Auro-Chat, a wellness and skincare assistant for Auro Wellness and Skincare, here to answer any questions anyone has on wellness and skincare using the provided context from Auro Wellness's websites which contained company-approved documents, blogs, and FAQs. Whenever they say "you" or refer to the chatbot always assume that means they are referring to the company, Auro. | |
| STRICT RULES YOU MUST FOLLOW: | |
| 1. If the Contextual Knowledge section is empty OR, say: | |
| "At the moment, I cannot answer this question. Please try rephrasing it or contact us via email at [[email protected]](mailto:[email protected]) or call us at [(562)-352-9630](tel:5623529630)" | |
| 2. Do NOT use your own general knowledge. Only use information from the Contextual Knowledge. | |
| 3. Do NOT include any special tokens such as <s>, </s>, [OUT], or any code-like answers | |
| 4. Do NOT include any links, URLs, references to external websites, or promotional content | |
| 5. Keep your answer in a **single paragraph or concise list**; do NOT add extra paragraphs | |
| 6. Keep your answers clear, human-readable, and concise (1-3 sentences) | |
| 7. You are NOT allowed to say transdermal; replace all instances of transdermal with topical | |
| 8. You cannot take actions such as signing users up, storing emails, notifying customers, modifying orders, or accessing accounts. | |
| Conversation History: | |
| {history} | |
| Contextual Knowledge (retrieved results): | |
| {agent_scratchpad} | |
| User Question: | |
| {input} | |
| Response: | |
| """ | |
| def retrieve_products(query: str) -> list[dict[str, str | float]]: | |
| """ | |
| Retrieves detailed product information from Auro Wellness's product documentation. | |
| Includes: | |
| β’ Product descriptions, benefits, and usage instructions | |
| β’ Ingredient details, formulation notes, and safety information | |
| β’ Bundles, sizes, and availability | |
| β’ High-level summaries of product features | |
| Use this tool when the user asks about: | |
| β’ Specific product information ("What is Glutaryl Plus?", "How do I use Rise & Revive?") | |
| β’ Ingredients, benefits, or usage instructions | |
| β’ Product bundles, availability, or ordering details | |
| Do NOT use this tool for: | |
| β’ Customer support, shipping, or policy questions β use Retrieve_Support | |
| β’ General wellness education β use Retrieve_Blogs | |
| β’ Technology/mechanism details β use Retrieve_Technology | |
| β’ High-level company philosophy β use Retrieve_Revolution | |
| β’ Short FAQs β use Retrieve_FAQs | |
| Prioritize this tool for questions explicitly about **products themselves**. | |
| """ | |
| product_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=PRODUCT_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = product_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, "score": score} | |
| for doc, score in docs if score > 0.8] | |
| def retrieve_support(query: str) -> List[dict[str,str | float]]: | |
| """ | |
| Retrieves official customer support, operational, and policy information from Auro Wellness. | |
| This includes: | |
| β’ Ordering, purchasing, and checkout guidance | |
| β’ Shipping methods, delivery timelines, and tracking status | |
| β’ Return, refund, and exchange policies | |
| β’ Subscription setup, modification, cancellation, and billing | |
| β’ Product availability, packaging, sizes, and reorder details | |
| β’ Usage instructions, storage guidelines, and safety precautions | |
| β’ Customer service contact options and escalation steps | |
| Use this tool when the user asks about: | |
| β’ Logistics ("where is my order?", "how long does shipping take?") | |
| β’ Account issues ("how do I cancel?", "update billing?") | |
| β’ Packaging or product delivery format | |
| β’ Safety warnings or proper handling | |
| β’ Operational policies and rules | |
| β’ Troubleshooting product access or purchasing problems | |
| DO NOT use this tool for: | |
| β’ Ingredient science, formulations, or delivery systems β use Retrieve_Technology | |
| β’ General wellness education or lifestyle guidance β use Retrieve_Blogs | |
| β’ Long-form glutathione philosophy or mission content β use Retrieve_Revolution | |
| β’ Short pre-answered high-level common questions β use Retrieve_FAQs | |
| Prioritize this retrieval when the user's intent is support-oriented, | |
| operational, logistics-related, or policy-based. | |
| """ | |
| support_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=SUPPORT_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = support_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, 'score': score} | |
| for doc, score in docs if score > 0.8] | |
| def retrieve_faqs(query: str) -> List[dict[str,str | float]]: | |
| """ | |
| Retrieves short, high-level answers to frequently asked questions (FAQs) about | |
| Auro Wellness products and company practices. | |
| Includes: | |
| β’ Basic product information | |
| β’ Common βhow does this work?β inquiries | |
| β’ General safety or usage clarifications | |
| β’ Typical customer confusion topics | |
| β’ High-level policy summaries | |
| Use this tool when the user asks a standard, common question with a short answer. | |
| Do not use for: | |
| β’ Detailed scientific explanations β Retrieve_Technology | |
| β’ Deep wellness education or lifestyle context β Retrieve_Blogs | |
| β’ Operational logistics, subscriptions, or policies β Retrieve_Support | |
| β’ Glutathione philosophy or mission content β Retrieve_Revolution | |
| Prioritize this for short, straightforward, frequently-seen questions. | |
| """ | |
| faq_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=FAQ_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = faq_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, 'score': score} | |
| for doc, score in docs if score > 0.87] | |
| def retrieve_blogs(query: str) -> List[dict[str,str | float]]: | |
| """ | |
| Retrieves educational wellness blog content published by Auro Wellness. | |
| Includes: | |
| β’ In-depth wellness guidance and lifestyle optimization | |
| β’ Scientific context around nutrients, health, and aging | |
| β’ Benefits of product categories and ingredients | |
| β’ Prevention, recovery, and wellness strategies | |
| β’ Explanations that connect science to real-life outcomes | |
| Use for questions that require background education, context, reasoning, | |
| or wellness advice supported by Auroβs content. | |
| Do not use for: | |
| β’ Technology delivery mechanisms β Retrieve_Technology | |
| β’ Shipping, returns, or logistics β Retrieve_Support | |
| β’ Simple common answers β Retrieve_FAQs | |
| β’ Glutathione mission/philosophy β Retrieve_Revolution | |
| Prioritize this when responding to educational or holistic wellness questions. | |
| """ | |
| blogs_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=BLOGS_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = blogs_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, 'score': score} | |
| for doc, score in docs if score > 0.8] | |
| def retrieve_technology(query: str) -> List[dict[str,str | float]]: | |
| """ | |
| Retrieves advanced scientific and technical content describing the proprietary | |
| delivery systems, formulations, and mechanisms used in Auro Wellness products. | |
| Includes: | |
| β’ Liposomal/nanotechnology delivery systems | |
| β’ Bioavailability improvements | |
| β’ Absorption mechanisms and metabolic pathways | |
| β’ Engineering rationale for ingredient combinations | |
| β’ Science behind enhanced efficacy | |
| Use this tool when the user asks βhow does the technology work?β, | |
| or wants scientific detail about product mechanisms. | |
| Do not use for: | |
| β’ General wellness advice β Retrieve_Blogs | |
| β’ Shipping, account, subscriptions β Retrieve_Support | |
| β’ Short typical product questions β Retrieve_FAQs | |
| β’ Glutathione mission/philosophy β Retrieve_Revolution | |
| Prioritize this when the user's intent is scientific or mechanism-focused. | |
| """ | |
| tech_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=TECHNOLOGY_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = tech_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, 'score': score} | |
| for doc, score in docs if score > 0.8] | |
| # %% | |
| def retrieve_revolution(query: str) -> List[dict[str,str | float]]: | |
| """ | |
| Retrieves content from the Glutathione Revolution book, explaining the philosophy, | |
| history, and scientific importance of glutathione to human health. | |
| Includes: | |
| β’ Background knowledge about glutathione | |
| β’ Role in aging, detoxification, immunity, and metabolism | |
| β’ Why Auro Wellness emphasizes glutathione | |
| β’ Foundational mission/philosophy alignment | |
| Use this tool when the user asks about: | |
| β’ Why glutathione matters | |
| β’ Glutathione deficiency and impact | |
| β’ Foundational reasoning behind product focus | |
| Do not use for: | |
| β’ Logistics, returns, subscriptions β Retrieve_Support | |
| β’ Ingredient delivery mechanisms β Retrieve_Technology | |
| β’ General wellness lifestyle context β Retrieve_Blogs | |
| β’ High-level quick questions β Retrieve_FAQs | |
| Prioritize this when the question is about the 'why' behind glutathione. | |
| """ | |
| rev_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=REVOLUTION_COLLECTION, | |
| embedding=embeddings, | |
| ) | |
| docs = rev_store.similarity_search_with_score(query, k=5) | |
| return [ | |
| {"content": doc.page_content, 'score': score} | |
| for doc, score in docs if score > 0.8] | |
| # %% | |
| all_tools = [retrieve_products, retrieve_support, retrieve_faqs, retrieve_blogs, retrieve_technology, retrieve_revolution] | |
| tool_descriptions = "\n".join(f"{tool.name}: {tool.description}" for tool in all_tools) | |
| tool_names = ", ".join(tool.name for tool in all_tools) | |
| # %% | |
| def retrieve_node(state: GraphState) -> GraphState: | |
| query = state['input'] | |
| tool_results = {} | |
| for tool in all_tools: | |
| print(f"Invoking tool: {tool.name} with query: {query}") | |
| try: | |
| tool_results[tool.name] = tool.invoke({'query': query}) | |
| print(f"{tool.name} returned {len(tool_results[tool.name])} result(s)") | |
| except Exception as e: | |
| tool_results[tool.name] = [{'content': f"Tool {tool.name} failed: {str(e)}", "source": "system"}] | |
| state['tool_results'] = tool_results | |
| return state | |
| # | |
| # %% | |
| def generate_answer(state: GraphState): | |
| """ | |
| This function generates an answer to the query using the llm and the context provided. | |
| """ | |
| query = state['input'] | |
| history = state.get('history', []) | |
| history_text = "\n".join( | |
| f"Human: {m.content}" if isinstance(m, HumanMessage) else f"AI: {m.content}" | |
| for m in history | |
| ) | |
| intermediate_steps = state.get('tool_results', {}) | |
| steps_string = "\n".join( | |
| f"{tool_name} Results:\n" + | |
| "\n".join( | |
| f"- {entry['content']}" for entry in results ) | |
| for tool_name, results in intermediate_steps.items() if results | |
| ) | |
| prompt_input = template.format( | |
| input=query, | |
| agent_scratchpad=steps_string, | |
| history=history_text | |
| ) | |
| print(prompt_input) | |
| llm_response = llm.invoke(prompt_input) | |
| state['response'] = llm_response.content if hasattr(llm_response, 'content') else str(llm_response) | |
| state['history'].append(HumanMessage(content=query)) | |
| state['history'].append(AIMessage(content=state['response'])) | |
| return state | |
| # %% | |
| graph = StateGraph(GraphState) | |
| #Add nodes to the graph | |
| graph.add_node("route_tool", RunnableLambda(retrieve_node)) | |
| graph.add_node("generate_response", RunnableLambda(generate_answer)) | |
| # Define the flow of the graph | |
| graph.set_entry_point("route_tool") | |
| graph.add_edge("route_tool", "generate_response") | |
| graph.add_edge("generate_response", END) | |
| app = graph.compile() | |
| async def get_response(query: str, name, email, config) -> dict: | |
| start_time = time.time() | |
| session_id = config['configurable']['thread_id'] | |
| history = session_histories.get(session_id, []) | |
| input_data = { | |
| "input": query, | |
| "history": history | |
| } | |
| metadata={} | |
| latency_ms = None | |
| try: | |
| result = await app.ainvoke(input_data, config=config) | |
| latency_ms = int((time.time() - start_time) * 1000) | |
| session_histories[session_id] = result.get("history", []) | |
| metadata = { | |
| "retrieved_docs": result.get("tool_results", {}), | |
| "model": LLM_MODEL, | |
| "embedding_model": EMBEDDING_MODEL, | |
| } | |
| filtered_result = result['response'].replace("transdermal", "topical") | |
| result['response'] = filtered_result | |
| except Exception as e: | |
| result = {} | |
| result['response'] = f"Error in processing chat: {e}" | |
| log_chat( | |
| session_id=session_id, | |
| name=name, | |
| email=email, | |
| query=query, | |
| answer=result.get("response", ""), | |
| latency_ms= latency_ms, | |
| metadata=metadata | |
| ) | |
| return result | |
| # %% | |