|
|
import os |
|
|
import gradio as gr |
|
|
import faiss |
|
|
import pickle |
|
|
from PyPDF2 import PdfReader |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from huggingface_hub import InferenceClient, HfApi |
|
|
import pdfplumber |
|
|
|
|
|
|
|
|
HF_REPO_ID = "MoslemBot/kajibuku" |
|
|
HF_API_TOKEN = os.getenv("HF_TOKEN") |
|
|
api = HfApi() |
|
|
|
|
|
def upload_to_hub(local_path, remote_path): |
|
|
api.upload_file( |
|
|
path_or_fileobj=local_path, |
|
|
path_in_repo=remote_path, |
|
|
repo_id=HF_REPO_ID, |
|
|
repo_type="space", |
|
|
token=HF_API_TOKEN |
|
|
) |
|
|
print(f"β
Uploaded to Hub: {remote_path}") |
|
|
|
|
|
|
|
|
embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
|
|
llm = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
DATA_DIR = "data" |
|
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
def save_pdf(file, title): |
|
|
folder = os.path.join(DATA_DIR, title.strip()) |
|
|
if os.path.exists(folder): |
|
|
return f"'{title}' already exists. Use a different title." |
|
|
|
|
|
os.makedirs(folder, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with pdfplumber.open(file.name) as pdf: |
|
|
full_text = "" |
|
|
for page in pdf.pages: |
|
|
full_text += page.extract_text() + "\n" |
|
|
|
|
|
print(full_text) |
|
|
|
|
|
|
|
|
chunks = [full_text[i:i+500] for i in range(0, len(full_text), 500)] |
|
|
|
|
|
|
|
|
embeddings = embedder.encode(chunks) |
|
|
|
|
|
print("Embeddings shape:", embeddings.shape) |
|
|
if len(embeddings.shape) != 2: |
|
|
raise ValueError(f"Expected 2D embeddings, got shape {embeddings.shape}") |
|
|
|
|
|
index = faiss.IndexFlatL2(embeddings.shape[1]) |
|
|
index.add(embeddings) |
|
|
|
|
|
|
|
|
index_path = os.path.join(folder, "index.faiss") |
|
|
chunks_path = os.path.join(folder, "chunks.pkl") |
|
|
faiss.write_index(index, index_path) |
|
|
with open(chunks_path, "wb") as f: |
|
|
pickle.dump(chunks, f) |
|
|
|
|
|
|
|
|
upload_to_hub(index_path, f"data/{title}/index.faiss") |
|
|
upload_to_hub(chunks_path, f"data/{title}/chunks.pkl") |
|
|
|
|
|
return f"β
Saved and indexed '{title}', and uploaded to Hub. Please reload (refresh) the page." |
|
|
|
|
|
|
|
|
def list_titles(): |
|
|
print(f"Listing in: {DATA_DIR} β {os.listdir(DATA_DIR)}") |
|
|
return [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))] |
|
|
|
|
|
|
|
|
def ask_question(message, history, selected_titles): |
|
|
if not selected_titles: |
|
|
return "β Please select at least one PDF." |
|
|
|
|
|
combined_answer = "" |
|
|
for title in selected_titles: |
|
|
folder = os.path.join(DATA_DIR, title) |
|
|
try: |
|
|
index = faiss.read_index(os.path.join(folder, "index.faiss")) |
|
|
with open(os.path.join(folder, "chunks.pkl"), "rb") as f: |
|
|
chunks = pickle.load(f) |
|
|
|
|
|
q_embed = embedder.encode([message]) |
|
|
D, I = index.search(q_embed, k=3) |
|
|
context = "\n".join([chunks[i] for i in I[0]]) |
|
|
|
|
|
|
|
|
|
|
|
response = llm.chat_completion( |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are a helpful assistant. Answer based only on the given context."}, |
|
|
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {message}"} |
|
|
], |
|
|
model="deepseek-ai/DeepSeek-R1-0528", |
|
|
max_tokens=2048, |
|
|
) |
|
|
|
|
|
response = response.choices[0].message["content"] |
|
|
|
|
|
|
|
|
|
|
|
combined_answer += f"**{title}**:\n{response.strip()}\n\n" |
|
|
except Exception as e: |
|
|
combined_answer += f"β οΈ Error with {title}: {str(e)}\n\n" |
|
|
|
|
|
return combined_answer.strip() |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
with gr.Tab("π€ Upload PDF"): |
|
|
file = gr.File(label="PDF File", file_types=[".pdf"]) |
|
|
title = gr.Textbox(label="Title for PDF") |
|
|
upload_btn = gr.Button("Upload and Index") |
|
|
upload_status = gr.Textbox(label="Status") |
|
|
upload_btn.click(fn=save_pdf, inputs=[file, title], outputs=upload_status) |
|
|
|
|
|
with gr.Tab("π¬ Chat with PDFs"): |
|
|
pdf_selector = gr.CheckboxGroup(label="Select PDFs", choices=list_titles()) |
|
|
refresh_btn = gr.Button("π Refresh PDF List") |
|
|
refresh_btn.click(fn=list_titles, outputs=pdf_selector) |
|
|
chat = gr.ChatInterface(fn=ask_question, additional_inputs=[pdf_selector]) |
|
|
|
|
|
demo.launch() |