Spaces:
Sleeping
Sleeping
import os | |
import zipfile | |
import chromadb | |
import gradio as gr | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables import RunnablePassthrough | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_together import ChatTogether | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
# Log: Check if chroma_store exists | |
if not os.path.exists("chroma_store"): | |
print("π chroma_store folder not found. Attempting to unzip...") | |
try: | |
with zipfile.ZipFile("chroma_store.zip", "r") as zip_ref: | |
zip_ref.extractall("chroma_store") | |
print("β Successfully extracted chroma_store.zip.") | |
except Exception as e: | |
print(f"β Failed to unzip chroma_store.zip: {e}") | |
else: | |
print("β chroma_store folder already exists. Skipping unzip.") | |
# ChromaDB setup | |
chroma_client = chromadb.PersistentClient(path="./chroma_store") | |
embedding_function = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5") | |
vectorstore = Chroma( | |
client=chroma_client, | |
collection_name="imageonline_chunks", | |
embedding_function=embedding_function | |
) | |
# Retriever setup (k=5) | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 5, "filter": {"site": "imageonline"}}) | |
# Updated retrieval logic: return full concatenated context and top 2 references | |
def retrieve_with_metadata(query, k=5, max_refs=2): | |
docs = retriever.get_relevant_documents(query) | |
if not docs: | |
return { | |
"context": "No relevant context found.", | |
"references": [] | |
} | |
# Join all documents for LLM input | |
context = "\n\n".join(doc.page_content for doc in docs) | |
# Unique references (max 2) | |
seen = set() | |
references = [] | |
for doc in docs: | |
source = doc.metadata.get("source", "Unknown") | |
section = doc.metadata.get("section", "Unknown") | |
key = (section, source) | |
if key not in seen: | |
seen.add(key) | |
references.append({"section": section, "source": source}) | |
if len(references) >= max_refs: | |
break | |
return { | |
"context": context, | |
"references": references | |
} | |
# LLM initialization | |
llm = ChatTogether( | |
model="meta-llama/Llama-3-8b-chat-hf", | |
temperature=0.3, | |
max_tokens=1024, | |
top_p=0.7, | |
together_api_key="a36246d65d8290f43667350b364c5b6bb8562eb50a4b947eec5bd7e79f2dffc6" | |
) | |
# Improved Prompt Template | |
prompt = ChatPromptTemplate.from_template(""" | |
You are a knowledgeable assistant for ImageOnline Pvt. Ltd. . | |
Answer the user's query using ONLY the following context extracted from our official website. | |
If the answer is not clearly present in the context, say "I couldn't find the information on the site." | |
-------------------- | |
{context} | |
-------------------- | |
Query: {question} | |
""") | |
# RAG chain | |
rag_chain = ( | |
{ | |
"context": lambda x: retrieve_with_metadata(x)["context"], | |
"question": RunnablePassthrough() | |
} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
# References for display | |
def get_references(query): | |
return retrieve_with_metadata(query)["references"] | |
from datetime import datetime | |
import time | |
import gradio as gr | |
# Chat function | |
def chat_interface(message, history): | |
history = history or [] | |
timestamp_user = datetime.now().strftime("%H:%M:%S") | |
user_msg = f"π§ **You**\n{message}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp_user}</span>" | |
bot_msg = "β³ _Bot is typing..._" | |
history.append((user_msg, bot_msg)) | |
try: | |
time.sleep(0.5) | |
answer = rag_chain.invoke(message) | |
references = get_references(message) | |
if references: | |
ref_lines = "\n".join(f"{ref['section']} β {ref['source']}" for ref in references) | |
ref_string = f"\n\nπ **Reference(s):**\n{ref_lines}" | |
else: | |
ref_string = "\n\nπ **Reference(s):**\n_None available_" | |
full_response = answer.strip() + ref_string | |
timestamp_bot = datetime.now().strftime("%H:%M:%S") | |
bot_msg = f"π€ **Bot**\n{full_response}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp_bot}</span>" | |
history[-1] = (user_msg, bot_msg) | |
except Exception as e: | |
timestamp_bot = datetime.now().strftime("%H:%M:%S") | |
error_msg = f"π€ **Bot**\nβ οΈ {str(e)}\n\n<span style='font-size: 0.8em; color: gray;'>β±οΈ {timestamp_bot}</span>" | |
history[-1] = (user_msg, error_msg) | |
return history, history, "" | |
# Gradio Launcher | |
def launch_gradio(): | |
with gr.Blocks(css=""" | |
.gr-button { | |
background-color: orange !important; | |
color: white !important; | |
font-weight: bold; | |
border-radius: 6px !important; | |
border: 1px solid darkorange !important; | |
} | |
.gr-button:hover { | |
background-color: darkorange !important; | |
} | |
.gr-textbox textarea { | |
border: 2px solid orange !important; | |
border-radius: 6px !important; | |
padding: 0.75rem !important; | |
font-size: 1rem; | |
} | |
""") as demo: | |
# Header and Subtitle | |
gr.Markdown("# π¬ ImageOnline RAG Chatbot") | |
gr.Markdown("Welcome! Ask about Website Designing, Web Development, App Development, About Us, Digital Marketing etc.") | |
chatbot = gr.Chatbot() | |
state = gr.State([]) | |
with gr.Row(equal_height=True): | |
msg = gr.Textbox( | |
placeholder="Ask your question here...", | |
show_label=False, | |
scale=9 | |
) | |
send_btn = gr.Button("π Send", scale=1) | |
msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state, msg]) | |
send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state, msg]) | |
with gr.Row(): | |
clear_btn = gr.Button("π§Ή Clear Chat") | |
clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) | |
return demo | |
# Launch the app | |
demo = launch_gradio() | |
demo.launch() |