Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import zipfile | |
| import gradio as gr | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.docstore.document import Document | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.llms import HuggingFacePipeline | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.prompts import PromptTemplate | |
| # --- Step 1: Clean .txt files --- | |
| DATA_DIR = "knowledge_base" | |
| docs = [] | |
| for fname in os.listdir(DATA_DIR): | |
| file_path = os.path.join(DATA_DIR, fname) | |
| if os.path.isfile(file_path) and fname.endswith(".txt"): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| cleaned_text = text.replace('\xa0', ' ') | |
| cleaned_text = re.sub(r'\n+', '\n', cleaned_text) | |
| cleaned_text = re.sub(r' +', ' ', cleaned_text).strip() | |
| docs.append({"page": fname, "text": cleaned_text}) | |
| # --- Step 2: Split text into chunks --- | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| texts = [] | |
| metadatas = [] | |
| for doc in docs: | |
| chunks = splitter.split_text(doc["text"]) | |
| for i, chunk in enumerate(chunks): | |
| texts.append(chunk) | |
| metadatas.append({"source": doc["page"], "chunk": i}) | |
| # --- Step 3: Create Document objects --- | |
| documents = [Document(page_content=texts[i], metadata=metadatas[i]) for i in range(len(texts))] | |
| # --- Step 4: Load embedding model --- | |
| embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # --- Step 5: Build FAISS index --- | |
| vectordb = FAISS.from_documents(documents, embedding_model) | |
| # --- Step 6: Load the LLM --- | |
| model_id = "tiiuae/falcon3-1b-instruct" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForCausalLM.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| pad_token_id=tokenizer.eos_token_id, | |
| max_new_tokens=200, | |
| do_sample=True, | |
| temperature=1.0, | |
| ) | |
| llm = HuggingFacePipeline(pipeline=pipe) | |
| # --- Step 7: Setup memory and QA chain --- | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| custom_prompt = PromptTemplate.from_template(""" | |
| You are a helpful assistant at the University of Hertfordshire. Use the context below to answer the question clearly and factually. | |
| If the answer is not in the context, say you don't know. | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| Answer: | |
| """) | |
| qa_chain = ConversationalRetrievalChain.from_llm( | |
| llm=llm, | |
| retriever=vectordb.as_retriever(search_kwargs={"k": 3}), | |
| memory=memory, | |
| chain_type="stuff", | |
| combine_docs_chain_kwargs={"prompt": custom_prompt} | |
| ) | |
| # --- Step 8: Define chatbot logic --- | |
| def chat(message, history): | |
| result = qa_chain.invoke({"question": message}) | |
| response = result.get("answer", "") | |
| response = response.split("Answer:")[-1].replace("<|assistant|>", "").strip() | |
| return response | |
| # UH logo | |
| UH_LOGO = "images/UH.png" | |
| # --- Step 9: UI --- | |
| sample_questions = [ | |
| "How do I register as a new student?", | |
| "Where can I find accommodation?", | |
| "Can I renew my tenancy agreement?", | |
| "What do I do on my first day?", | |
| ] | |
| with gr.Blocks() as demo: | |
| gr.Image(UH_LOGO, show_label=False, container=False, scale=1) | |
| gr.Markdown("## ASK Herts Students Help Chatbot π€") | |
| chatbot = gr.Chatbot() | |
| txt = gr.Textbox(placeholder="Ask me anything about university life...", label="Your question") | |
| submit = gr.Button("Submit") | |
| gr.Markdown("#### π‘ Sample Questions:") | |
| with gr.Row(): | |
| for q in sample_questions: | |
| gr.Button(q).click(lambda x=q: gr.update(value=x), outputs=[txt]) | |
| def respond(message, history): | |
| answer = chat(message, history) | |
| history.append((message, answer)) | |
| return "", history | |
| submit.click(respond, [txt, chatbot], [txt, chatbot]) | |
| txt.submit(respond, [txt, chatbot], [txt, chatbot]) | |
| demo.launch() | |