Spaces:
Sleeping
Sleeping
import os | |
import zipfile | |
import chromadb | |
import gradio as gr | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables import RunnableLambda, RunnablePassthrough | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_together import ChatTogether | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
# Log: Check if chroma_store exists | |
if not os.path.exists("chroma_store"): | |
print("π chroma_store folder not found. Attempting to unzip...") | |
try: | |
with zipfile.ZipFile("chroma_store.zip", "r") as zip_ref: | |
zip_ref.extractall("chroma_store") | |
print("β Successfully extracted chroma_store.zip.") | |
except Exception as e: | |
print(f"β Failed to unzip chroma_store.zip: {e}") | |
else: | |
print("β chroma_store folder already exists. Skipping unzip.") | |
# Initialize ChromaDB client | |
chroma_client = chromadb.PersistentClient(path="./chroma_store") | |
# Vector store and retriever | |
embedding_function = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5") | |
vectorstore = Chroma( | |
client=chroma_client, | |
collection_name="imageonline_chunks", | |
embedding_function=embedding_function | |
) | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 3, "filter": {"site": "imageonline"}}) | |
# Retrieval logic | |
def retrieve_with_metadata(query, k=5): | |
docs = retriever.get_relevant_documents(query) | |
if not docs: | |
return {"context": "No relevant context found.", "references": []} | |
top_doc = docs[0] | |
return { | |
"context": top_doc.page_content, | |
"references": [{ | |
"section": top_doc.metadata.get("section", "Unknown"), | |
"source": top_doc.metadata.get("source", "Unknown") | |
}] | |
} | |
# LLM setup | |
llm = ChatTogether( | |
model="meta-llama/Llama-3-8b-chat-hf", | |
temperature=0.3, | |
max_tokens=1024, | |
top_p=0.7, | |
together_api_key="a36246d65d8290f43667350b364c5b6bb8562eb50a4b947eec5bd7e79f2dffc6" | |
) | |
# Prompt template | |
prompt = ChatPromptTemplate.from_template(""" | |
You are an expert assistant for ImageOnline Web Solutions. | |
Answer the user's query based ONLY on the following context: | |
{context} | |
Query: {question} | |
""") | |
rag_chain = ( | |
{ | |
"context": lambda x: retrieve_with_metadata(x)["context"], | |
"question": RunnablePassthrough() | |
} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
def get_references(query): | |
return retrieve_with_metadata(query)["references"] | |
# Gradio UI | |
# def chat_interface(message, history): | |
# history = history or [] | |
# history.append(("π§ You: " + message, "β³ Generating response...")) | |
# try: | |
# answer = rag_chain.invoke(message) | |
# references = get_references(message) | |
# if references: | |
# ref = references[0] | |
# ref_string = f"\n\nπ **Reference:**\nSection: {ref['section']}\nURL: {ref['source']}" | |
# else: | |
# ref_string = "\n\nπ **Reference:**\n_None available_" | |
# full_response = answer + ref_string | |
# history[-1] = ("π§ You: " + message, "π€ Bot: " + full_response) | |
# except Exception as e: | |
# history[-1] = ("π§ You: " + message, f"π€ Bot: β οΈ {str(e)}") | |
# return history, history | |
from datetime import datetime | |
import time | |
def chat_interface(message, history): | |
history = history or [] | |
# π Timestamp for user | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
user_msg = f"π§ **You**\n{message}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp}</span>" | |
# β³ Show typing indicator | |
bot_msg = "β³ _Bot is typing..._" | |
history.append((user_msg, bot_msg)) | |
try: | |
# π¬ Optional: simulate typing delay (cosmetic only) | |
time.sleep(0.5) | |
# RAG response generation | |
answer = rag_chain.invoke(message) | |
references = get_references(message) | |
if references: | |
ref = references[0] | |
ref_string = f"\n\nπ **Reference:**\nSection: {ref['section']}\nURL: {ref['source']}" | |
else: | |
ref_string = "\n\nπ **Reference:**\n_None available_" | |
full_response = answer + ref_string | |
# π Timestamp for bot | |
timestamp_bot = datetime.now().strftime("%H:%M:%S") | |
bot_response = f"π€ **Bot**\n{full_response}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp_bot}</span>" | |
# Replace typing placeholder | |
history[-1] = (user_msg, bot_response) | |
except Exception as e: | |
timestamp_bot = datetime.now().strftime("%H:%M:%S") | |
error_msg = f"π€ **Bot**\nβ οΈ {str(e)}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp_bot}</span>" | |
history[-1] = (user_msg, error_msg) | |
return history, history, "" # clear input box | |
# def launch_gradio(): | |
# with gr.Blocks() as demo: | |
# gr.Markdown("# π¬ ImageOnline RAG Chatbot") | |
# gr.Markdown("Ask about Website Designing, Web Development, App Development, About Us, Testimonials etc.") | |
# chatbot = gr.Chatbot() | |
# state = gr.State([]) | |
# with gr.Row(): | |
# msg = gr.Textbox(placeholder="Ask your question here...", show_label=False, scale=8) | |
# send_btn = gr.Button("π¨ Send", scale=1) | |
# msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
# send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
# with gr.Row(): | |
# clear_btn = gr.Button("π§Ή Clear Chat") | |
# clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) | |
# return demo | |
def launch_gradio(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# π¬ ImageOnline RAG Chatbot") | |
gr.Markdown("Ask about Website Designing, Web Development, App Development, About Us, Testimonials etc.") | |
chatbot = gr.Chatbot() | |
state = gr.State([]) | |
with gr.Row(): | |
msg = gr.Textbox( | |
placeholder="Ask your question here...", | |
show_label=False, | |
scale=8 | |
) | |
send_btn = gr.Button("π¨ Send", scale=1) | |
# π Trigger chat and clear input | |
msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state, msg]) | |
send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state, msg]) | |
with gr.Row(): | |
clear_btn = gr.Button("π§Ή Clear Chat") | |
clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) | |
return demo | |
demo = launch_gradio() | |
demo.launch() |