Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
from dotenv import load_dotenv | |
from langsmith import traceable | |
from datetime import datetime | |
from typing import List, Dict, Optional | |
from app.chat import initialize_session_state, display_chat_history | |
from app.data_loader import get_data, list_all_files, load_docs | |
from app.document_processor import process_documents, save_vector_store_to_supabase, load_vector_store_from_supabase | |
from app.prompts import sahabat_prompt | |
from app.db import supabase | |
from langchain_community.llms import Replicate | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain_community.document_transformers import LongContextReorder | |
load_dotenv() | |
# --------------------------------------------------------- | |
# ⚡️ CONFIG | |
# --------------------------------------------------------- | |
BUCKET_NAME = "pnp-bot-storage-archive" | |
VECTOR_STORE_PREFIX = "vector_store" | |
# --------------------------------------------------------- | |
# ⚡️ UTILITY | |
# --------------------------------------------------------- | |
def get_latest_data_timestamp_from_files(bucket_name: str) -> float: | |
"""Get the latest timestamp from files in a Supabase storage bucket.""" | |
files = list_all_files(bucket_name) | |
latest_time = 0.0 | |
for file in files: | |
iso_time = file.get("updated_at") or file.get("created_at") | |
if iso_time: | |
try: | |
timestamp = datetime.fromisoformat(iso_time.replace('Z', '+00:00')).timestamp() | |
latest_time = max(latest_time, timestamp) | |
except Exception as e: | |
print(f"Gagal parsing waktu dari {file.get('name')}: {e}") | |
return latest_time | |
def get_supabase_vector_store_timestamp() -> Optional[str]: | |
"""Get the latest timestamp of vector store files in the Supabase storage.""" | |
try: | |
response = supabase.storage.from_(BUCKET_NAME).list() | |
timestamps = [] | |
for file in response: | |
if file["name"].startswith(VECTOR_STORE_PREFIX) and ( | |
file["name"].endswith(".faiss") or file["name"].endswith(".pkl") | |
): | |
timestamps.append(file["updated_at"]) | |
if len(timestamps) >= 2: | |
return max(timestamps) | |
return None | |
except Exception as e: | |
print(f"Error getting Supabase timestamp: {e}") | |
return None | |
def vector_store_is_outdated() -> bool: | |
"""Check if vector store needs to be updated based on files in Supabase storage.""" | |
supabase_timestamp = get_supabase_vector_store_timestamp() | |
if supabase_timestamp is None: | |
return True | |
supabase_time = datetime.fromisoformat(supabase_timestamp.replace("Z", "+00:00")).timestamp() | |
data_time = get_latest_data_timestamp_from_files("pnp-bot-storage") | |
return data_time > supabase_time | |
def reorder_embedding(docs): | |
"""Reorder documents for long context retrieval.""" | |
reordering = LongContextReorder() | |
return reordering.transform_documents(docs) | |
# --------------------------------------------------------- | |
# ⚡️ RAG CHAIN | |
# --------------------------------------------------------- | |
def create_conversational_chain(vector_store): | |
"""Create a Conversational Retrieval Chain for RAG.""" | |
llm = Replicate( | |
model="fauzi3007/sahabat-ai-replicate:c3fc398f441379bd3fb6a4498950f9302aa75b7a95e76978a689ceb5c4b4bf09", | |
model_kwargs={"temperature": 0.1, "top_p": 0.9, "max_new_tokens": 10000} | |
) | |
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key="answer") | |
chain = ConversationalRetrievalChain.from_llm( | |
llm, | |
retriever=vector_store.as_retriever(search_kwargs={"k": 6}), | |
combine_docs_chain_kwargs={"prompt": sahabat_prompt}, | |
return_source_documents=True, | |
memory=memory, | |
) | |
return chain | |
def get_rag_chain(vector_store): | |
"""Return a Conversational Retrieval Chain for external use.""" | |
return create_conversational_chain(vector_store) | |
# --------------------------------------------------------- | |
# ⚡️ MAIN FUNCTION | |
# --------------------------------------------------------- | |
def main(): | |
initialize_session_state() | |
st.set_page_config( | |
page_title="PNP-Bot", | |
page_icon="logo-pnp.ico", | |
) | |
vector_store = None | |
if len(st.session_state["history"]) == 0: | |
if vector_store_is_outdated(): | |
with st.spinner("Memuat dan memproses dokumen..."): | |
get_data() | |
docs = load_docs() | |
if len(docs) > 0: | |
reordered_docs = reorder_embedding(docs) | |
vector_store = process_documents(reordered_docs) | |
with st.spinner("Mengunggah vector store ke Supabase..."): | |
success = save_vector_store_to_supabase(vector_store, supabase, BUCKET_NAME, VECTOR_STORE_PREFIX) | |
if success: | |
print("✅ Vector store berhasil diunggah ke Supabase!") | |
else: | |
print("❌ Gagal mengunggah vector store ke Supabase.") | |
else: | |
print("⚠️ Folder 'data/' kosong. Chatbot tetap bisa digunakan, tetapi tanpa konteks dokumen.") | |
vector_store = None | |
else: | |
with st.spinner("Memuat vector store dari Supabase..."): | |
vector_store = load_vector_store_from_supabase(supabase, BUCKET_NAME, VECTOR_STORE_PREFIX) | |
if vector_store: | |
print("✅ Vector store berhasil dimuat dari Supabase!") | |
else: | |
print("❌ Gagal memuat vector store dari Supabase.") | |
else: | |
vector_store = st.session_state.get("vector_store") or load_vector_store_from_supabase(supabase, BUCKET_NAME, VECTOR_STORE_PREFIX) | |
st.session_state["vector_store"] = vector_store | |
if st.session_state["vector_store"] is not None: | |
chain = create_conversational_chain(st.session_state["vector_store"]) | |
display_chat_history(chain) | |
if __name__ == "__main__": | |
main() | |