Spaces:
Sleeping
Sleeping
| # # # import os | |
| # # # import time | |
| # # # import pandas as pd | |
| # # # import gradio as gr | |
| # # # from langchain_groq import ChatGroq | |
| # # # from langchain_huggingface import HuggingFaceEmbeddings | |
| # # # from langchain_community.vectorstores import Chroma | |
| # # # from langchain_core.prompts import PromptTemplate | |
| # # # from langchain_core.output_parsers import StrOutputParser | |
| # # # from langchain_core.runnables import RunnablePassthrough | |
| # # # from PyPDF2 import PdfReader | |
| # # # # Configuration constants | |
| # # # COLLECTION_NAME = "GBVRS" | |
| # # # DATA_FOLDER = "./" | |
| # # # APP_VERSION = "v1.0.0" | |
| # # # APP_NAME = "Ijwi ry'Ubufasha" | |
| # # # MAX_HISTORY_MESSAGES = 8 # Limit history to avoid token limits | |
| # # # # Global variables for application state | |
| # # # llm = None | |
| # # # embed_model = None | |
| # # # vectorstore = None | |
| # # # retriever = None | |
| # # # rag_chain = None | |
| # # # # User session management | |
| # # # class UserSession: | |
| # # # def __init__(self, session_id, llm): | |
| # # # """Initialize a user session with unique ID and language model.""" | |
| # # # self.session_id = session_id | |
| # # # self.user_info = {"Nickname": "Guest"} | |
| # # # self.conversation_history = [] | |
| # # # self.llm = llm | |
| # # # self.welcome_message = None | |
| # # # self.last_activity = time.time() | |
| # # # def set_user(self, user_info): | |
| # # # """Set user information and generate welcome message.""" | |
| # # # self.user_info = user_info | |
| # # # self.generate_welcome_message() | |
| # # # # Initialize conversation history with welcome message | |
| # # # welcome = self.get_welcome_message() | |
| # # # self.conversation_history = [ | |
| # # # {"role": "assistant", "content": welcome}, | |
| # # # ] | |
| # # # def get_user(self): | |
| # # # """Get current user information.""" | |
| # # # return self.user_info | |
| # # # def generate_welcome_message(self): | |
| # # # """Generate a dynamic welcome message using the LLM.""" | |
| # # # try: | |
| # # # nickname = self.user_info.get("Nickname", "Guest") | |
| # # # # Use the LLM to generate the message | |
| # # # prompt = ( | |
| # # # f"Create a brief and warm welcome message for {nickname} that's about 1-2 sentences. " | |
| # # # f"Emphasize this is a safe space for discussing gender-based violence issues " | |
| # # # f"and that we provide support and resources. Keep it warm and reassuring." | |
| # # # ) | |
| # # # response = self.llm.invoke(prompt) | |
| # # # welcome = response.content.strip() | |
| # # # # Format the message with HTML styling | |
| # # # self.welcome_message = ( | |
| # # # f"<div style='font-size: 18px; color: #4E6BBF;'>" | |
| # # # f"{welcome}" | |
| # # # f"</div>" | |
| # # # ) | |
| # # # except Exception as e: | |
| # # # # Fallback welcome message | |
| # # # nickname = self.user_info.get("Nickname", "Guest") | |
| # # # self.welcome_message = ( | |
| # # # f"<div style='font-size: 18px; color: #4E6BBF;'>" | |
| # # # f"Welcome, {nickname}! You're in a safe space. We're here to provide support with " | |
| # # # f"gender-based violence issues and connect you with resources that can help." | |
| # # # f"</div>" | |
| # # # ) | |
| # # # def get_welcome_message(self): | |
| # # # """Get the formatted welcome message.""" | |
| # # # if not self.welcome_message: | |
| # # # self.generate_welcome_message() | |
| # # # return self.welcome_message | |
| # # # def add_to_history(self, role, message): | |
| # # # """Add a message to the conversation history.""" | |
| # # # self.conversation_history.append({"role": role, "content": message}) | |
| # # # self.last_activity = time.time() | |
| # # # # Trim history if it gets too long | |
| # # # if len(self.conversation_history) > MAX_HISTORY_MESSAGES * 2: # Keep pairs of messages | |
| # # # # Keep the first message (welcome) and the most recent messages | |
| # # # self.conversation_history = [self.conversation_history[0]] + self.conversation_history[-MAX_HISTORY_MESSAGES*2+1:] | |
| # # # def get_conversation_history(self): | |
| # # # """Get the full conversation history.""" | |
| # # # return self.conversation_history | |
| # # # def get_formatted_history(self): | |
| # # # """Get conversation history formatted as a string for the LLM.""" | |
| # # # # Skip the welcome message and only include the last few exchanges | |
| # # # recent_history = self.conversation_history[1:] if len(self.conversation_history) > 1 else [] | |
| # # # # Limit to last MAX_HISTORY_MESSAGES exchanges | |
| # # # if len(recent_history) > MAX_HISTORY_MESSAGES * 2: | |
| # # # recent_history = recent_history[-MAX_HISTORY_MESSAGES*2:] | |
| # # # formatted_history = "" | |
| # # # for entry in recent_history: | |
| # # # role = "User" if entry["role"] == "user" else "Assistant" | |
| # # # # Truncate very long messages to avoid token limits | |
| # # # content = entry["content"] | |
| # # # if len(content) > 500: # Limit message length | |
| # # # content = content[:500] + "..." | |
| # # # formatted_history += f"{role}: {content}\n\n" | |
| # # # return formatted_history | |
| # # # def is_expired(self, timeout_seconds=3600): | |
| # # # """Check if the session has been inactive for too long.""" | |
| # # # return (time.time() - self.last_activity) > timeout_seconds | |
| # # # # Session manager to handle multiple users | |
| # # # class SessionManager: | |
| # # # def __init__(self): | |
| # # # """Initialize the session manager.""" | |
| # # # self.sessions = {} | |
| # # # self.session_timeout = 3600 # 1 hour timeout | |
| # # # def get_session(self, session_id): | |
| # # # """Get an existing session or create a new one.""" | |
| # # # # Clean expired sessions first | |
| # # # self._clean_expired_sessions() | |
| # # # # Create new session if needed | |
| # # # if session_id not in self.sessions: | |
| # # # self.sessions[session_id] = UserSession(session_id, llm) | |
| # # # return self.sessions[session_id] | |
| # # # def _clean_expired_sessions(self): | |
| # # # """Remove expired sessions to free up memory.""" | |
| # # # expired_keys = [] | |
| # # # for key, session in self.sessions.items(): | |
| # # # if session.is_expired(self.session_timeout): | |
| # # # expired_keys.append(key) | |
| # # # for key in expired_keys: | |
| # # # del self.sessions[key] | |
| # # # # Initialize the session manager | |
| # # # session_manager = SessionManager() | |
| # # # def initialize_assistant(): | |
| # # # """Initialize the assistant with necessary components and configurations.""" | |
| # # # global llm, embed_model, vectorstore, retriever, rag_chain | |
| # # # # Initialize API key - try both possible key names | |
| # # # groq_api_key = os.environ.get('GBV') or os.environ.get('GBV') | |
| # # # if not groq_api_key: | |
| # # # print("WARNING: No GROQ API key found in userdata.") | |
| # # # # Initialize LLM - Default to Llama model which is more widely available | |
| # # # llm = ChatGroq( | |
| # # # model="llama-3.3-70b-versatile", # More reliable than whisper model | |
| # # # api_key=groq_api_key | |
| # # # ) | |
| # # # # Set up embedding model | |
| # # # try: | |
| # # # embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # # # except Exception as e: | |
| # # # # Fallback to smaller model | |
| # # # embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # # # # Process data and create vector store | |
| # # # print("Processing data files...") | |
| # # # data = process_data_files() | |
| # # # print("Creating vector store...") | |
| # # # vectorstore = create_vectorstore(data) | |
| # # # retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
| # # # # Create RAG chain | |
| # # # print("Setting up RAG chain...") | |
| # # # rag_chain = create_rag_chain() | |
| # # # print(f"✅ {APP_NAME} initialized successfully") | |
| # # # def process_data_files(): | |
| # # # """Process all data files from the specified folder.""" | |
| # # # context_data = [] | |
| # # # try: | |
| # # # if not os.path.exists(DATA_FOLDER): | |
| # # # print(f"WARNING: Data folder does not exist: {DATA_FOLDER}") | |
| # # # return context_data | |
| # # # # Get list of data files | |
| # # # all_files = os.listdir(DATA_FOLDER) | |
| # # # data_files = [f for f in all_files if f.lower().endswith(('.csv', '.xlsx', '.xls'))] | |
| # # # if not data_files: | |
| # # # print(f"WARNING: No data files found in: {DATA_FOLDER}") | |
| # # # return context_data | |
| # # # # Process each file | |
| # # # for index, file_name in enumerate(data_files, 1): | |
| # # # print(f"Processing file {index}/{len(data_files)}: {file_name}") | |
| # # # file_path = os.path.join(DATA_FOLDER, file_name) | |
| # # # try: | |
| # # # # Read file based on extension | |
| # # # if file_name.lower().endswith('.csv'): | |
| # # # df = pd.read_csv(file_path) | |
| # # # else: | |
| # # # df = pd.read_excel(file_path) | |
| # # # # Check if column 3 exists (source data is in third column) | |
| # # # if df.shape[1] > 2: | |
| # # # column_data = df.iloc[:, 2].dropna().astype(str).tolist() | |
| # # # # Each row becomes one chunk with metadata | |
| # # # for i, text in enumerate(column_data): | |
| # # # if text and len(text.strip()) > 0: | |
| # # # context_data.append({ | |
| # # # "page_content": text, | |
| # # # "metadata": { | |
| # # # "source": file_name, | |
| # # # "row": i+1 | |
| # # # } | |
| # # # }) | |
| # # # else: | |
| # # # print(f"WARNING: File {file_name} has fewer than 3 columns.") | |
| # # # except Exception as e: | |
| # # # print(f"ERROR processing file {file_name}: {e}") | |
| # # # print(f"✅ Created {len(context_data)} chunks from {len(data_files)} files.") | |
| # # # except Exception as e: | |
| # # # print(f"ERROR accessing data folder: {e}") | |
| # # # return context_data | |
| # # # def create_vectorstore(data): | |
| # # # """ | |
| # # # Creates and returns a Chroma vector store populated with the provided data. | |
| # # # Parameters: | |
| # # # data (list): A list of dictionaries, each containing 'page_content' and 'metadata'. | |
| # # # Returns: | |
| # # # Chroma: The populated Chroma vector store instance. | |
| # # # """ | |
| # # # # Initialize the vector store | |
| # # # vectorstore = Chroma( | |
| # # # collection_name=COLLECTION_NAME, | |
| # # # embedding_function=embed_model, | |
| # # # persist_directory="./" | |
| # # # ) | |
| # # # if not data: | |
| # # # print("⚠️ No data provided. Returning an empty vector store.") | |
| # # # return vectorstore | |
| # # # try: | |
| # # # # Extract text and metadata from the data | |
| # # # texts = [doc["page_content"] for doc in data] | |
| # # # # Add the texts and metadata to the vector store | |
| # # # vectorstore.add_texts(texts) | |
| # # # except Exception as e: | |
| # # # print(f"❌ Failed to add documents to vector store: {e}") | |
| # # # # Fix: Return vectorstore instead of vs | |
| # # # return vectorstore # Changed from 'return vs' to 'return vectorstore' | |
| # # # def create_rag_chain(): | |
| # # # """Create the RAG chain for processing user queries.""" | |
| # # # # Define the prompt template | |
| # # # template = """ | |
| # # # You are a compassionate and supportive AI assistant specializing in helping individuals affected by Gender-Based Violence (GBV). Your responses must be based EXCLUSIVELY on the information provided in the context. Your primary goal is to provide emotionally intelligent support while maintaining appropriate boundaries. | |
| # # # **Previous conversation:** {conversation_history} | |
| # # # **Context information:** {context} | |
| # # # **User's Question:** {question} | |
| # # # When responding follow these guidelines: | |
| # # # 1. **Strict Context Adherence** | |
| # # # - Only use information that appears in the provided {context} | |
| # # # - If the answer is not found in the context, state "I don't have that information in my available resources" rather than generating a response | |
| # # # 2. **Personalized Communication** | |
| # # # - Avoid contractions (e.g., use I am instead of I'm) | |
| # # # - Incorporate thoughtful pauses or reflective questions when the conversation involves difficult topics | |
| # # # - Use selective emojis (😊, 🤗, ❤️) only when tone-appropriate and not during crisis discussions | |
| # # # - Balance warmth with professionalism | |
| # # # 3. **Emotional Intelligence** | |
| # # # - Validate feelings without judgment | |
| # # # - Offer reassurance when appropriate, always centered on empowerment | |
| # # # - Adjust your tone based on the emotional state conveyed | |
| # # # 4. **Conversation Management** | |
| # # # - Refer to {conversation_history} to maintain continuity and avoid repetition | |
| # # # - Use clear paragraph breaks for readability | |
| # # # 5. **Information Delivery** | |
| # # # - Extract only relevant information from {context} that directly addresses the question | |
| # # # - Present information in accessible, non-technical language | |
| # # # - When information is unavailable, respond with: "I don't have that specific information right now, {first_name}. Would it be helpful if I focus on [alternative support option]?" | |
| # # # 6. **Safety and Ethics** | |
| # # # - Do not generate any speculative content or advice not supported by the context | |
| # # # - If the context contains safety information, prioritize sharing that information | |
| # # # Your response must come entirely from the provided context, maintaining the supportive tone while never introducing information from outside the provided materials. | |
| # # # **Context:** {context} | |
| # # # **User's Question:** {question} | |
| # # # **Your Response:** | |
| # # # """ | |
| # # # rag_prompt = PromptTemplate.from_template(template) | |
| # # # def get_context_and_question(query_with_session): | |
| # # # # Extract query and session_id | |
| # # # query = query_with_session["query"] | |
| # # # session_id = query_with_session["session_id"] | |
| # # # # Get the user session | |
| # # # session = session_manager.get_session(session_id) | |
| # # # user_info = session.get_user() | |
| # # # first_name = user_info.get("Nickname", "User") | |
| # # # conversation_hist = session.get_formatted_history() | |
| # # # try: | |
| # # # # Retrieve relevant documents | |
| # # # retrieved_docs = retriever.invoke(query) | |
| # # # context_str = format_context(retrieved_docs) | |
| # # # except Exception as e: | |
| # # # print(f"ERROR retrieving documents: {e}") | |
| # # # context_str = "No relevant information found." | |
| # # # # Return the combined inputs for the prompt | |
| # # # return { | |
| # # # "context": context_str, | |
| # # # "question": query, | |
| # # # "first_name": first_name, | |
| # # # "conversation_history": conversation_hist | |
| # # # } | |
| # # # # Build the chain | |
| # # # try: | |
| # # # chain = ( | |
| # # # RunnablePassthrough() | |
| # # # | get_context_and_question | |
| # # # | rag_prompt | |
| # # # | llm | |
| # # # | StrOutputParser() | |
| # # # ) | |
| # # # return chain | |
| # # # except Exception as e: | |
| # # # print(f"ERROR creating RAG chain: {e}") | |
| # # # # Return a simple function as fallback | |
| # # # def fallback_chain(query_with_session): | |
| # # # session_id = query_with_session["session_id"] | |
| # # # session = session_manager.get_session(session_id) | |
| # # # nickname = session.get_user().get("Nickname", "there") | |
| # # # return f"I'm here to help you, {nickname}, but I'm experiencing some technical difficulties right now. Please try again shortly." | |
| # # # return fallback_chain | |
| # # # def format_context(retrieved_docs): | |
| # # # """Format retrieved documents into a string context.""" | |
| # # # if not retrieved_docs: | |
| # # # return "No relevant information available." | |
| # # # return "\n\n".join([doc.page_content for doc in retrieved_docs]) | |
| # # # def rag_memory_stream(message, history, session_id): | |
| # # # """Process user message and generate response with memory.""" | |
| # # # # Get the user session | |
| # # # session = session_manager.get_session(session_id) | |
| # # # # Add user message to history | |
| # # # session.add_to_history("user", message) | |
| # # # try: | |
| # # # # Get response from RAG chain | |
| # # # print(f"Processing message for session {session_id}: {message[:50]}...") | |
| # # # # Pass both query and session_id to the chain | |
| # # # response = rag_chain.invoke({ | |
| # # # "query": message, | |
| # # # "session_id": session_id | |
| # # # }) | |
| # # # print(f"Generated response: {response[:50]}...") | |
| # # # # Add assistant response to history | |
| # # # session.add_to_history("assistant", response) | |
| # # # # Yield the response | |
| # # # yield response | |
| # # # except Exception as e: | |
| # # # import traceback | |
| # # # print(f"ERROR in rag_memory_stream: {e}") | |
| # # # print(f"Detailed error: {traceback.format_exc()}") | |
| # # # nickname = session.get_user().get("Nickname", "there") | |
| # # # error_msg = f"I'm sorry, {nickname}. I encountered an error processing your request. Let's try a different question." | |
| # # # session.add_to_history("assistant", error_msg) | |
| # # # yield error_msg | |
| # # # def collect_user_info(nickname, session_id): | |
| # # # """Store user details and initialize session.""" | |
| # # # if not nickname or nickname.strip() == "": | |
| # # # return "Nickname is required to proceed.", gr.update(visible=False), gr.update(visible=True), [] | |
| # # # # Store user info for chat session | |
| # # # user_info = { | |
| # # # "Nickname": nickname.strip(), | |
| # # # "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
| # # # } | |
| # # # # Get the session and set user info | |
| # # # session = session_manager.get_session(session_id) | |
| # # # session.set_user(user_info) | |
| # # # # Generate welcome message | |
| # # # welcome_message = session.get_welcome_message() | |
| # # # # Return welcome message and update UI | |
| # # # return welcome_message, gr.update(visible=True), gr.update(visible=False), [(None, welcome_message)] | |
| # # # def get_css(): | |
| # # # """Define CSS for the UI.""" | |
| # # # return """ | |
| # # # :root { | |
| # # # --primary: #4E6BBF; | |
| # # # --primary-light: #697BBF; | |
| # # # --text-primary: #333333; | |
| # # # --text-secondary: #666666; | |
| # # # --background: #F9FAFC; | |
| # # # --card-bg: #FFFFFF; | |
| # # # --border: #E1E5F0; | |
| # # # --shadow: rgba(0, 0, 0, 0.05); | |
| # # # } | |
| # # # body, .gradio-container { | |
| # # # margin: 0; | |
| # # # padding: 0; | |
| # # # width: 100vw; | |
| # # # height: 100vh; | |
| # # # display: flex; | |
| # # # flex-direction: column; | |
| # # # justify-content: center; | |
| # # # align-items: center; | |
| # # # background: var(--background); | |
| # # # color: var(--text-primary); | |
| # # # font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| # # # } | |
| # # # .gradio-container { | |
| # # # max-width: 100%; | |
| # # # max-height: 100%; | |
| # # # } | |
| # # # .gr-box { | |
| # # # background: var(--card-bg); | |
| # # # color: var(--text-primary); | |
| # # # border-radius: 12px; | |
| # # # padding: 2rem; | |
| # # # border: 1px solid var(--border); | |
| # # # box-shadow: 0 4px 12px var(--shadow); | |
| # # # } | |
| # # # .gr-button-primary { | |
| # # # background: var(--primary); | |
| # # # color: white; | |
| # # # padding: 12px 24px; | |
| # # # border-radius: 8px; | |
| # # # transition: all 0.3s ease; | |
| # # # border: none; | |
| # # # font-weight: bold; | |
| # # # } | |
| # # # .gr-button-primary:hover { | |
| # # # transform: translateY(-1px); | |
| # # # box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1); | |
| # # # background: var(--primary-light); | |
| # # # } | |
| # # # footer { | |
| # # # text-align: center; | |
| # # # color: var(--text-secondary); | |
| # # # padding: 1rem; | |
| # # # font-size: 0.9em; | |
| # # # } | |
| # # # .gr-markdown h2 { | |
| # # # color: var(--primary); | |
| # # # margin-bottom: 0.5rem; | |
| # # # font-size: 1.8em; | |
| # # # } | |
| # # # .gr-markdown h3 { | |
| # # # color: var(--text-secondary); | |
| # # # margin-bottom: 1.5rem; | |
| # # # font-weight: normal; | |
| # # # } | |
| # # # #chatbot_container .chat-title h1, | |
| # # # #chatbot_container .empty-chatbot { | |
| # # # color: var(--primary); | |
| # # # } | |
| # # # #input_nickname { | |
| # # # padding: 12px; | |
| # # # border-radius: 8px; | |
| # # # border: 1px solid var(--border); | |
| # # # background: var(--card-bg); | |
| # # # transition: all 0.3s ease; | |
| # # # } | |
| # # # #input_nickname:focus { | |
| # # # border-color: var(--primary); | |
| # # # box-shadow: 0 0 0 2px rgba(78, 107, 191, 0.2); | |
| # # # outline: none; | |
| # # # } | |
| # # # .chatbot-container .message.user { | |
| # # # background: #E8F0FE; | |
| # # # border-radius: 12px 12px 0 12px; | |
| # # # } | |
| # # # .chatbot-container .message.bot { | |
| # # # background: #F5F7FF; | |
| # # # border-radius: 12px 12px 12px 0; | |
| # # # } | |
| # # # """ | |
| # # # def create_ui(): | |
| # # # """Create and configure the Gradio UI.""" | |
| # # # with gr.Blocks(css=get_css(), theme=gr.themes.Soft()) as demo: | |
| # # # # Create a unique session ID for this browser tab | |
| # # # session_id = gr.State(value=f"session_{int(time.time())}_{os.urandom(4).hex()}") | |
| # # # # Registration section | |
| # # # with gr.Column(visible=True, elem_id="registration_container") as registration_container: | |
| # # # gr.Markdown(f"## Welcome to {APP_NAME}") | |
| # # # gr.Markdown("### Your privacy is important to us. Please provide a nickname to continue.") | |
| # # # with gr.Row(): | |
| # # # first_name = gr.Textbox( | |
| # # # label="Nickname", | |
| # # # placeholder="Enter your nickname", | |
| # # # scale=1, | |
| # # # elem_id="input_nickname" | |
| # # # ) | |
| # # # with gr.Row(): | |
| # # # submit_btn = gr.Button("Start Chatting", variant="primary", scale=2) | |
| # # # response_message = gr.Markdown() | |
| # # # # Chatbot section (initially hidden) | |
| # # # with gr.Column(visible=False, elem_id="chatbot_container") as chatbot_container: | |
| # # # # Create a custom chat interface to pass session_id to our function | |
| # # # chatbot = gr.Chatbot( | |
| # # # elem_id="chatbot", | |
| # # # height=500, | |
| # # # show_label=False | |
| # # # ) | |
| # # # with gr.Row(): | |
| # # # msg = gr.Textbox( | |
| # # # placeholder="Type your message here...", | |
| # # # show_label=False, | |
| # # # container=False, | |
| # # # scale=9 | |
| # # # ) | |
| # # # submit = gr.Button("Send", scale=1, variant="primary") | |
| # # # examples = gr.Examples( | |
| # # # examples=[ | |
| # # # "What resources are available for GBV victims?", | |
| # # # "How can I report an incident?", | |
| # # # "What are my legal rights?", | |
| # # # "I need help, what should I do first?" | |
| # # # ], | |
| # # # inputs=msg | |
| # # # ) | |
| # # # # Footer with version info | |
| # # # gr.Markdown(f"{APP_NAME} {APP_VERSION} © 2025") | |
| # # # # Handle chat message submission | |
| # # # def respond(message, chat_history, session_id): | |
| # # # bot_message = "" | |
| # # # for chunk in rag_memory_stream(message, chat_history, session_id): | |
| # # # bot_message += chunk | |
| # # # chat_history.append((message, bot_message)) | |
| # # # return "", chat_history | |
| # # # msg.submit(respond, [msg, chatbot, session_id], [msg, chatbot]) | |
| # # # submit.click(respond, [msg, chatbot, session_id], [msg, chatbot]) | |
| # # # # Handle user registration | |
| # # # submit_btn.click( | |
| # # # collect_user_info, | |
| # # # inputs=[first_name, session_id], | |
| # # # outputs=[response_message, chatbot_container, registration_container, chatbot] | |
| # # # ) | |
| # # # return demo | |
| # # # def launch_app(): | |
| # # # """Launch the Gradio interface.""" | |
| # # # ui = create_ui() | |
| # # # ui.launch(share=True) | |
| # # # # Main execution | |
| # # # if __name__ == "__main__": | |
| # # # try: | |
| # # # # Initialize and launch the assistant | |
| # # # initialize_assistant() | |
| # # # launch_app() | |
| # # # except Exception as e: | |
| # # # import traceback | |
| # # # print(f"❌ Fatal error initializing GBV Assistant: {e}") | |
| # # # print(traceback.format_exc()) | |
| # # # # Create a minimal emergency UI to display the error | |
| # # # with gr.Blocks() as error_demo: | |
| # # # gr.Markdown("## System Error") | |
| # # # gr.Markdown(f"An error occurred while initializing the application: {str(e)}") | |
| # # # gr.Markdown("Please check your configuration and try again.") | |
| # # # error_demo.launch(share=True, inbrowser=True, debug=True) | |
| # # ############################################################################################################ | |
| # # import os | |
| # # from langchain_groq import ChatGroq | |
| # # from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
| # # from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
| # # from urllib.parse import urljoin, urlparse | |
| # # import requests | |
| # # from io import BytesIO | |
| # # from langchain_chroma import Chroma | |
| # # import requests | |
| # # from bs4 import BeautifulSoup | |
| # # from langchain_core.prompts import ChatPromptTemplate | |
| # # import gradio as gr | |
| # # from PyPDF2 import PdfReader | |
| # # from langchain_huggingface import HuggingFaceEmbeddings | |
| # # groq_api_key= os.environ.get('GBV') | |
| # # embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # # def scrape_websites(base_urls): | |
| # # try: | |
| # # visited_links = set() # To avoid revisiting the same link | |
| # # content_by_url = {} # Store content from each URL | |
| # # for base_url in base_urls: | |
| # # if not base_url.strip(): | |
| # # continue # Skip empty or invalid URLs | |
| # # print(f"Scraping base URL: {base_url}") | |
| # # html_content = fetch_page_content(base_url) | |
| # # if html_content: | |
| # # cleaned_content = clean_body_content(html_content) | |
| # # content_by_url[base_url] = cleaned_content | |
| # # visited_links.add(base_url) | |
| # # # Extract and process all internal links | |
| # # soup = BeautifulSoup(html_content, "html.parser") | |
| # # links = extract_internal_links(base_url, soup) | |
| # # for link in links: | |
| # # if link not in visited_links: | |
| # # print(f"Scraping link: {link}") | |
| # # page_content = fetch_page_content(link) | |
| # # if page_content: | |
| # # cleaned_content = clean_body_content(page_content) | |
| # # content_by_url[link] = cleaned_content | |
| # # visited_links.add(link) | |
| # # # If the link is a PDF file, extract its content | |
| # # if link.lower().endswith('.pdf'): | |
| # # print(f"Extracting PDF content from: {link}") | |
| # # pdf_content = extract_pdf_text(link) | |
| # # if pdf_content: | |
| # # content_by_url[link] = pdf_content | |
| # # return content_by_url | |
| # # except Exception as e: | |
| # # print(f"Error during scraping: {e}") | |
| # # return {} | |
| # # def fetch_page_content(url): | |
| # # try: | |
| # # response = requests.get(url, timeout=10) | |
| # # response.raise_for_status() | |
| # # return response.text | |
| # # except requests.exceptions.RequestException as e: | |
| # # print(f"Error fetching {url}: {e}") | |
| # # return None | |
| # # def extract_internal_links(base_url, soup): | |
| # # links = set() | |
| # # for anchor in soup.find_all("a", href=True): | |
| # # href = anchor["href"] | |
| # # full_url = urljoin(base_url, href) | |
| # # if is_internal_link(base_url, full_url): | |
| # # links.add(full_url) | |
| # # return links | |
| # # def is_internal_link(base_url, link_url): | |
| # # base_netloc = urlparse(base_url).netloc | |
| # # link_netloc = urlparse(link_url).netloc | |
| # # return base_netloc == link_netloc | |
| # # def extract_pdf_text(pdf_url): | |
| # # try: | |
| # # response = requests.get(pdf_url) | |
| # # response.raise_for_status() | |
| # # with BytesIO(response.content) as file: | |
| # # reader = PdfReader(file) | |
| # # pdf_text = "" | |
| # # for page in reader.pages: | |
| # # pdf_text += page.extract_text() | |
| # # return pdf_text if pdf_text else None | |
| # # except requests.exceptions.RequestException as e: | |
| # # print(f"Error fetching PDF {pdf_url}: {e}") | |
| # # return None | |
| # # except Exception as e: | |
| # # print(f"Error reading PDF {pdf_url}: {e}") | |
| # # return None | |
| # # def clean_body_content(html_content): | |
| # # soup = BeautifulSoup(html_content, "html.parser") | |
| # # for script_or_style in soup(["script", "style"]): | |
| # # script_or_style.extract() | |
| # # cleaned_content = soup.get_text(separator="\n") | |
| # # cleaned_content = "\n".join( | |
| # # line.strip() for line in cleaned_content.splitlines() if line.strip() | |
| # # ) | |
| # # return cleaned_content | |
| # # if __name__ == "__main__": | |
| # # website = ["https://haguruka.org.rw/" | |
| # # ] | |
| # # all_content = scrape_websites(website) | |
| # # temp_list = [] | |
| # # for url, content in all_content.items(): | |
| # # temp_list.append((url, content)) | |
| # # processed_texts = [] | |
| # # for element in temp_list: | |
| # # if isinstance(element, tuple): | |
| # # url, content = element | |
| # # processed_texts.append(f"url: {url}, content: {content}") | |
| # # elif isinstance(element, str): | |
| # # processed_texts.append(element) | |
| # # else: | |
| # # processed_texts.append(str(element)) | |
| # # def chunk_string(s, chunk_size=1000): | |
| # # return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
| # # chunked_texts = [] | |
| # # for text in processed_texts: | |
| # # chunked_texts.extend(chunk_string(text)) | |
| # # vectorstore = Chroma( | |
| # # collection_name="GBVR_Dataset", | |
| # # embedding_function=embed_model, | |
| # # persist_directory="./", | |
| # # ) | |
| # # vectorstore.get().keys() | |
| # # vectorstore.add_texts(chunked_texts) | |
| # # template = (""" | |
| # # You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
| # # 1. **Warm & Natural Interaction** | |
| # # - If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
| # # - Example responses: | |
| # # - "😊 Good morning! How can I assist you today?" | |
| # # - "Hello! What can I do for you? 🚀" | |
| # # 2. **Precise Information Extraction** | |
| # # - Provide only the relevant details from the given context: {context}. | |
| # # - Do not generate extra content or assumptions beyond the provided information. | |
| # # 3. **Conversational & Engaging Tone** | |
| # # - Keep responses friendly, natural, and engaging. | |
| # # - Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
| # # 4. **Awareness of Real-Time Context** | |
| # # - If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
| # # 5. **Handling Missing Information** | |
| # # - If no relevant information exists in the context, respond politely: | |
| # # - "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
| # # 6. **Personalized Interaction** | |
| # # - If user history is available, tailor responses based on their previous interactions for a more natural and engaging conversation. | |
| # # 7. **Direct, Concise Responses** | |
| # # - If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
| # # 8. **Extracting Relevant Links** | |
| # # - If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
| # # - Example response: | |
| # # - "Here is the link you requested: [URL]" | |
| # # **Context:** {context} | |
| # # **User's Question:** {question} | |
| # # **Your Response:** | |
| # # """) | |
| # # rag_prompt = PromptTemplate.from_template(template) | |
| # # retriever = vectorstore.as_retriever() | |
| # # from langchain_core.output_parsers import StrOutputParser | |
| # # from langchain_core.runnables import RunnablePassthrough | |
| # # llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key ) | |
| # # rag_chain = ( | |
| # # {"context": retriever, "question": RunnablePassthrough()} | |
| # # | rag_prompt | |
| # # | llm | |
| # # | StrOutputParser() | |
| # # ) | |
| # # # Define the RAG memory stream function | |
| # # def rag_memory_stream(message, history): | |
| # # partial_text = "" | |
| # # for new_text in rag_chain.stream(message): # Replace with actual streaming logic | |
| # # partial_text += new_text | |
| # # yield partial_text | |
| # # # Title with emojis | |
| # # title = "GBVR Chatbot" | |
| # # # Custom CSS for styling the interface | |
| # # custom_css = """ | |
| # # body { | |
| # # font-family: "Arial", serif; | |
| # # } | |
| # # .gradio-container { | |
| # # font-family: "Times New Roman", serif; | |
| # # } | |
| # # .gr-button { | |
| # # background-color: #007bff; /* Blue button */ | |
| # # color: white; | |
| # # border: none; | |
| # # border-radius: 5px; | |
| # # font-size: 16px; | |
| # # padding: 10px 20px; | |
| # # cursor: pointer; | |
| # # } | |
| # # .gr-textbox:focus, .gr-button:focus { | |
| # # outline: none; /* Remove outline focus for a cleaner look */ | |
| # # } | |
| # # """ | |
| # # # Create the Chat Interface | |
| # # demo = gr.ChatInterface( | |
| # # fn=rag_memory_stream, | |
| # # title=title, | |
| # # fill_height=True, | |
| # # theme="soft", | |
| # # css=custom_css, # Apply the custom CSS | |
| # # ) | |
| # # # Launch the app | |
| # # if __name__ == "__main__": | |
| # # demo.launch(share=True, inbrowser=True, debug=True) | |
| # import os | |
| # from langchain_groq import ChatGroq | |
| # from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
| # from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
| # from urllib.parse import urljoin, urlparse | |
| # import requests | |
| # from io import BytesIO | |
| # from langchain_chroma import Chroma | |
| # import requests | |
| # from bs4 import BeautifulSoup | |
| # from langchain_core.prompts import ChatPromptTemplate | |
| # import gradio as gr | |
| # from PyPDF2 import PdfReader | |
| # from langchain_huggingface import HuggingFaceEmbeddings | |
| # from langchain_core.output_parsers import StrOutputParser | |
| # from langchain_core.runnables import RunnablePassthrough | |
| # # Simple session management | |
| # class SessionManager: | |
| # def __init__(self): | |
| # self.sessions = {} | |
| # def get_or_create_session(self, session_id): | |
| # if session_id not in self.sessions: | |
| # self.sessions[session_id] = [] | |
| # return self.sessions[session_id] | |
| # def add_interaction(self, session_id, user_message, ai_response): | |
| # session = self.get_or_create_session(session_id) | |
| # session.append({"user": user_message, "ai": ai_response}) | |
| # def get_history(self, session_id, max_turns=5): | |
| # session = self.get_or_create_session(session_id) | |
| # recent_history = session[-max_turns:] if len(session) > max_turns else session | |
| # history_text = "" | |
| # for interaction in recent_history: | |
| # history_text += f"User: {interaction['user']}\n" | |
| # history_text += f"Assistant: {interaction['ai']}\n\n" | |
| # return history_text.strip() | |
| # # Initialize session manager | |
| # session_manager = SessionManager() | |
| # groq_api_key= os.environ.get('GBV') | |
| # embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| # def scrape_websites(base_urls): | |
| # try: | |
| # visited_links = set() # To avoid revisiting the same link | |
| # content_by_url = {} # Store content from each URL | |
| # for base_url in base_urls: | |
| # if not base_url.strip(): | |
| # continue # Skip empty or invalid URLs | |
| # print(f"Scraping base URL: {base_url}") | |
| # html_content = fetch_page_content(base_url) | |
| # if html_content: | |
| # cleaned_content = clean_body_content(html_content) | |
| # content_by_url[base_url] = cleaned_content | |
| # visited_links.add(base_url) | |
| # # Extract and process all internal links | |
| # soup = BeautifulSoup(html_content, "html.parser") | |
| # links = extract_internal_links(base_url, soup) | |
| # for link in links: | |
| # if link not in visited_links: | |
| # print(f"Scraping link: {link}") | |
| # page_content = fetch_page_content(link) | |
| # if page_content: | |
| # cleaned_content = clean_body_content(page_content) | |
| # content_by_url[link] = cleaned_content | |
| # visited_links.add(link) | |
| # # If the link is a PDF file, extract its content | |
| # if link.lower().endswith('.pdf'): | |
| # print(f"Extracting PDF content from: {link}") | |
| # pdf_content = extract_pdf_text(link) | |
| # if pdf_content: | |
| # content_by_url[link] = pdf_content | |
| # return content_by_url | |
| # except Exception as e: | |
| # print(f"Error during scraping: {e}") | |
| # return {} | |
| # def fetch_page_content(url): | |
| # try: | |
| # response = requests.get(url, timeout=10) | |
| # response.raise_for_status() | |
| # return response.text | |
| # except requests.exceptions.RequestException as e: | |
| # print(f"Error fetching {url}: {e}") | |
| # return None | |
| # def extract_internal_links(base_url, soup): | |
| # links = set() | |
| # for anchor in soup.find_all("a", href=True): | |
| # href = anchor["href"] | |
| # full_url = urljoin(base_url, href) | |
| # if is_internal_link(base_url, full_url): | |
| # links.add(full_url) | |
| # return links | |
| # def is_internal_link(base_url, link_url): | |
| # base_netloc = urlparse(base_url).netloc | |
| # link_netloc = urlparse(link_url).netloc | |
| # return base_netloc == link_netloc | |
| # def extract_pdf_text(pdf_url): | |
| # try: | |
| # response = requests.get(pdf_url) | |
| # response.raise_for_status() | |
| # with BytesIO(response.content) as file: | |
| # reader = PdfReader(file) | |
| # pdf_text = "" | |
| # for page in reader.pages: | |
| # pdf_text += page.extract_text() | |
| # return pdf_text if pdf_text else None | |
| # except requests.exceptions.RequestException as e: | |
| # print(f"Error fetching PDF {pdf_url}: {e}") | |
| # return None | |
| # except Exception as e: | |
| # print(f"Error reading PDF {pdf_url}: {e}") | |
| # return None | |
| # def clean_body_content(html_content): | |
| # soup = BeautifulSoup(html_content, "html.parser") | |
| # for script_or_style in soup(["script", "style"]): | |
| # script_or_style.extract() | |
| # cleaned_content = soup.get_text(separator="\n") | |
| # cleaned_content = "\n".join( | |
| # line.strip() for line in cleaned_content.splitlines() if line.strip() | |
| # ) | |
| # return cleaned_content | |
| # if __name__ == "__main__": | |
| # website = ["https://haguruka.org.rw/" | |
| # ] | |
| # all_content = scrape_websites(website) | |
| # temp_list = [] | |
| # for url, content in all_content.items(): | |
| # temp_list.append((url, content)) | |
| # processed_texts = [] | |
| # for element in temp_list: | |
| # if isinstance(element, tuple): | |
| # url, content = element | |
| # processed_texts.append(f"url: {url}, content: {content}") | |
| # elif isinstance(element, str): | |
| # processed_texts.append(element) | |
| # else: | |
| # processed_texts.append(str(element)) | |
| # def chunk_string(s, chunk_size=1000): | |
| # return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
| # chunked_texts = [] | |
| # for text in processed_texts: | |
| # chunked_texts.extend(chunk_string(text)) | |
| # vectorstore = Chroma( | |
| # collection_name="GBVR_Dataset", | |
| # embedding_function=embed_model, | |
| # persist_directory="./", | |
| # ) | |
| # vectorstore.get().keys() | |
| # vectorstore.add_texts(chunked_texts) | |
| # # Updated template to include conversation history | |
| # template = (""" | |
| # You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
| # 1. **Warm & Natural Interaction** | |
| # - If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
| # - Example responses: | |
| # - "😊 Good morning! How can I assist you today?" | |
| # - "Hello! What can I do for you? 🚀" | |
| # 2. **Precise Information Extraction** | |
| # - Provide only the relevant details from the given context: {context}. | |
| # - Do not generate extra content or assumptions beyond the provided information. | |
| # 3. **Conversational & Engaging Tone** | |
| # - Keep responses friendly, natural, and engaging. | |
| # - Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
| # 4. **Awareness of Real-Time Context** | |
| # - If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
| # 5. **Handling Missing Information** | |
| # - If no relevant information exists in the context, respond politely: | |
| # - "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
| # 6. **Personalized Interaction** | |
| # - Use the conversation history to provide more personalized and contextually relevant responses. | |
| # - Previous conversation history: {conversation_history} | |
| # 7. **Direct, Concise Responses** | |
| # - If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
| # 8. **Extracting Relevant Links** | |
| # - If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
| # - Example response: | |
| # - "Here is the link you requested: [URL]" | |
| # **Context:** {context} | |
| # **User's Question:** {question} | |
| # **Your Response:** | |
| # """) | |
| # rag_prompt = PromptTemplate.from_template(template) | |
| # retriever = vectorstore.as_retriever() | |
| # llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key) | |
| # # Dictionary to store user sessions with session IDs | |
| # user_sessions = {} | |
| # # Define the RAG chain with session history | |
| # def rag_chain(question, session_id="default"): | |
| # # Get conversation history if available | |
| # conversation_history = session_manager.get_history(session_id) | |
| # # Get context from retriever | |
| # context_docs = retriever.invoke(question) | |
| # context = "\n".join(doc.page_content for doc in context_docs) | |
| # # Create prompt with history | |
| # prompt = rag_prompt.format( | |
| # context=context, | |
| # question=question, | |
| # conversation_history=conversation_history | |
| # ) | |
| # # Generate response | |
| # response = llm.invoke(prompt).content | |
| # # Store the interaction | |
| # session_manager.add_interaction(session_id, question, response) | |
| # return response | |
| # # Define the RAG memory stream function | |
| # def rag_memory_stream(message, history): | |
| # # Generate a session ID based on the first message if not exists | |
| # session_id = None | |
| # for msg in history: | |
| # if msg[0]: # If there's a user message | |
| # # Use first few characters of first message as simple session ID | |
| # session_id = hash(msg[0][:20]) if session_id is None else session_id | |
| # break | |
| # # Default session ID if history is empty | |
| # if session_id is None: | |
| # session_id = "default_session" | |
| # # Process the message and get response | |
| # response = rag_chain(message, str(session_id)) | |
| # # Stream the response word by word | |
| # partial_text = "" | |
| # words = response.split(' ') | |
| # for word in words: | |
| # partial_text += word + " " | |
| # yield partial_text.strip() | |
| # # Title with emojis | |
| # title = "GBVR Chatbot" | |
| # # Custom CSS for styling the interface | |
| # custom_css = """ | |
| # body { | |
| # font-family: "Arial", serif; | |
| # } | |
| # .gradio-container { | |
| # font-family: "Times New Roman", serif; | |
| # } | |
| # .gr-button { | |
| # background-color: #007bff; /* Blue button */ | |
| # color: white; | |
| # border: none; | |
| # border-radius: 5px; | |
| # font-size: 16px; | |
| # padding: 10px 20px; | |
| # cursor: pointer; | |
| # } | |
| # .gr-textbox:focus, .gr-button:focus { | |
| # outline: none; /* Remove outline focus for a cleaner look */ | |
| # } | |
| # """ | |
| # # Create the Chat Interface | |
| # demo = gr.ChatInterface( | |
| # fn=rag_memory_stream, | |
| # title=title, | |
| # fill_height=True, | |
| # theme="soft", | |
| # css=custom_css, # Apply the custom CSS | |
| # ) | |
| # # Launch the app | |
| # if __name__ == "__main__": | |
| # demo.launch(share=True, inbrowser=True, debug=True) | |
| import os | |
| from langchain_groq import ChatGroq | |
| from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
| from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
| from urllib.parse import urljoin, urlparse | |
| import requests | |
| from io import BytesIO | |
| from langchain_chroma import Chroma | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langchain_core.prompts import ChatPromptTemplate | |
| import gradio as gr | |
| from PyPDF2 import PdfReader | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough | |
| # Simple session management | |
| class SessionManager: | |
| def __init__(self): | |
| self.sessions = {} | |
| def get_or_create_session(self, session_id): | |
| if session_id not in self.sessions: | |
| self.sessions[session_id] = [] | |
| return self.sessions[session_id] | |
| def add_interaction(self, session_id, user_message, ai_response): | |
| session = self.get_or_create_session(session_id) | |
| session.append({"user": user_message, "ai": ai_response}) | |
| def get_history(self, session_id, max_turns=5): | |
| session = self.get_or_create_session(session_id) | |
| recent_history = session[-max_turns:] if len(session) > max_turns else session | |
| history_text = "" | |
| for interaction in recent_history: | |
| history_text += f"User: {interaction['user']}\n" | |
| history_text += f"Assistant: {interaction['ai']}\n\n" | |
| return history_text.strip() | |
| # Initialize session manager | |
| session_manager = SessionManager() | |
| groq_api_key= os.environ.get('GBV') | |
| embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
| def scrape_websites(base_urls): | |
| try: | |
| visited_links = set() # To avoid revisiting the same link | |
| content_by_url = {} # Store content from each URL | |
| for base_url in base_urls: | |
| if not base_url.strip(): | |
| continue # Skip empty or invalid URLs | |
| print(f"Scraping base URL: {base_url}") | |
| html_content = fetch_page_content(base_url) | |
| if html_content: | |
| cleaned_content = clean_body_content(html_content) | |
| content_by_url[base_url] = cleaned_content | |
| visited_links.add(base_url) | |
| # Extract and process all internal links | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| links = extract_internal_links(base_url, soup) | |
| for link in links: | |
| if link not in visited_links: | |
| print(f"Scraping link: {link}") | |
| page_content = fetch_page_content(link) | |
| if page_content: | |
| cleaned_content = clean_body_content(page_content) | |
| content_by_url[link] = cleaned_content | |
| visited_links.add(link) | |
| # If the link is a PDF file, extract its content | |
| if link.lower().endswith('.pdf'): | |
| print(f"Extracting PDF content from: {link}") | |
| pdf_content = extract_pdf_text(link) | |
| if pdf_content: | |
| content_by_url[link] = pdf_content | |
| return content_by_url | |
| except Exception as e: | |
| print(f"Error during scraping: {e}") | |
| return {} | |
| def fetch_page_content(url): | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching {url}: {e}") | |
| return None | |
| def extract_internal_links(base_url, soup): | |
| links = set() | |
| for anchor in soup.find_all("a", href=True): | |
| href = anchor["href"] | |
| full_url = urljoin(base_url, href) | |
| if is_internal_link(base_url, full_url): | |
| links.add(full_url) | |
| return links | |
| def is_internal_link(base_url, link_url): | |
| base_netloc = urlparse(base_url).netloc | |
| link_netloc = urlparse(link_url).netloc | |
| return base_netloc == link_netloc | |
| def extract_pdf_text(pdf_url): | |
| try: | |
| response = requests.get(pdf_url) | |
| response.raise_for_status() | |
| with BytesIO(response.content) as file: | |
| reader = PdfReader(file) | |
| pdf_text = "" | |
| for page in reader.pages: | |
| pdf_text += page.extract_text() | |
| return pdf_text if pdf_text else None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching PDF {pdf_url}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error reading PDF {pdf_url}: {e}") | |
| return None | |
| def clean_body_content(html_content): | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.extract() | |
| cleaned_content = soup.get_text(separator="\n") | |
| cleaned_content = "\n".join( | |
| line.strip() for line in cleaned_content.splitlines() if line.strip() | |
| ) | |
| return cleaned_content | |
| if __name__ == "__main__": | |
| website = ["https://haguruka.org.rw/" | |
| ] | |
| all_content = scrape_websites(website) | |
| temp_list = [] | |
| for url, content in all_content.items(): | |
| temp_list.append((url, content)) | |
| processed_texts = [] | |
| for element in temp_list: | |
| if isinstance(element, tuple): | |
| url, content = element | |
| processed_texts.append(f"url: {url}, content: {content}") | |
| elif isinstance(element, str): | |
| processed_texts.append(element) | |
| else: | |
| processed_texts.append(str(element)) | |
| def chunk_string(s, chunk_size=1000): | |
| return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
| chunked_texts = [] | |
| for text in processed_texts: | |
| chunked_texts.extend(chunk_string(text)) | |
| vectorstore = Chroma( | |
| collection_name="GBVR_Datast", | |
| embedding_function=embed_model, | |
| persist_directory="./", | |
| ) | |
| vectorstore.get().keys() | |
| vectorstore.add_texts(chunked_texts) | |
| # Updated template to include conversation history | |
| template = (""" | |
| You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
| 1. **Warm & Natural Interaction** | |
| - If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
| - Example responses: | |
| - "😊 Good morning! How can I assist you today?" | |
| - "Hello! What can I do for you? 🚀" | |
| 2. **Precise Information Extraction** | |
| - Provide only the relevant details from the given context: {context}. | |
| - Do not generate extra content or assumptions beyond the provided information. | |
| 3. **Conversational & Engaging Tone** | |
| - Keep responses friendly, natural, and engaging. | |
| - Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
| 4. **Awareness of Real-Time Context** | |
| - If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
| 5. **Handling Missing Information** | |
| - If no relevant information exists in the context, respond politely: | |
| - "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
| 6. **Personalized Interaction** | |
| - Use the conversation history to provide more personalized and contextually relevant responses. | |
| - Previous conversation history: {conversation_history} | |
| 7. **Direct, Concise Responses** | |
| - If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
| 8. **Extracting Relevant Links** | |
| - If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
| - Example response: | |
| - "Here is the link you requested: [URL]" | |
| **Context:** {context} | |
| **User's Question:** {question} | |
| **Your Response:** | |
| """) | |
| rag_prompt = PromptTemplate.from_template(template) | |
| retriever = vectorstore.as_retriever() | |
| llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key) | |
| # Dictionary to store user sessions with session IDs | |
| user_sessions = {} | |
| # Define the RAG chain with session history | |
| def rag_chain(question, session_id="default"): | |
| # Get conversation history if available | |
| conversation_history = session_manager.get_history(session_id) | |
| # Get context from retriever | |
| context_docs = retriever.invoke(question) | |
| context = "\n".join(doc.page_content for doc in context_docs) | |
| # Create prompt with history | |
| prompt = rag_prompt.format( | |
| context=context, | |
| question=question, | |
| conversation_history=conversation_history | |
| ) | |
| # Generate response | |
| response = llm.invoke(prompt).content | |
| # Store the interaction | |
| session_manager.add_interaction(session_id, question, response) | |
| return response | |
| # Define the RAG memory stream function | |
| def rag_memory_stream(message, history): | |
| # Generate a session ID based on the first message if not exists | |
| session_id = None | |
| for msg in history: | |
| if msg[0]: # If there's a user message | |
| # Use first few characters of first message as simple session ID | |
| session_id = hash(msg[0][:20]) if session_id is None else session_id | |
| break | |
| # Default session ID if history is empty | |
| if session_id is None: | |
| session_id = "default_session" | |
| # Process the message and get response | |
| response = rag_chain(message, str(session_id)) | |
| # Stream the response word by word | |
| partial_text = "" | |
| words = response.split(' ') | |
| for word in words: | |
| partial_text += word + " " | |
| yield partial_text.strip() | |
| # Title with emojis | |
| title = "GBVR Chatbot" | |
| # Custom CSS for styling the interface | |
| custom_css = """ | |
| /* Custom CSS for styling the interface */ | |
| body { | |
| font-family: "Arial", serif; | |
| } | |
| .gradio-container { | |
| font-family: "Times New Roman", serif; | |
| } | |
| .gr-button { | |
| background-color: #007bff; /* Blue button */ | |
| color: white; | |
| border: none; | |
| border-radius: 5px; | |
| font-size: 16px; | |
| padding: 10px 20px; | |
| cursor: pointer; | |
| } | |
| .gr-textbox:focus, .gr-button:focus { | |
| outline: none; /* Remove outline focus for a cleaner look */ | |
| } | |
| /* Specific CSS for the welcome message */ | |
| .gradio-description { | |
| font-size: 30px; /* Set font size for the welcome message */ | |
| font-family: "Arial", sans-serif; | |
| text-align: center; /* Optional: Center-align the text */ | |
| padding: 20px; /* Optional: Add padding around the welcome message */ | |
| } | |
| """ | |
| # Generate a simple welcome message using the LLM | |
| def generate_welcome_message(): | |
| welcome_prompt = """ | |
| Generate a short, simple welcome message for a chatbot about Gender-Based Violence Resources in Rwanda. | |
| Keep it under 3 sentences, and use simple language. | |
| Make it warm and supportive but direct and easy to read. | |
| """ | |
| # Get the welcome message from the LLM | |
| welcome_message = llm.invoke(welcome_prompt).content | |
| return welcome_message | |
| # Create simple welcome message | |
| welcome_msg = generate_welcome_message() | |
| # Create the Chat Interface with welcome message | |
| demo = gr.ChatInterface( | |
| fn=rag_memory_stream, | |
| title=title, | |
| fill_height=True, | |
| theme="soft", | |
| css=custom_css, # Apply the custom CSS | |
| description=welcome_msg | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch(share=True, inbrowser=True, debug=True) |