gpt-oss-RAG / app-backup.py
openfree's picture
Create app-backup.py
e1df55d verified
import gradio as gr
import spaces
import os
from typing import List, Dict, Any, Optional, Tuple
import hashlib
from datetime import datetime
import numpy as np
from transformers import pipeline, TextIteratorStreamer
import torch
from threading import Thread
import re
# PDF ์ฒ˜๋ฆฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
print("โš ๏ธ PyMuPDF not installed. Install with: pip install pymupdf")
try:
from sentence_transformers import SentenceTransformer
ST_AVAILABLE = True
except ImportError:
ST_AVAILABLE = False
print("โš ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers")
# Custom CSS
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
min-height: 100vh;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.main-container {
background: rgba(255, 255, 255, 0.98);
border-radius: 16px;
padding: 24px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06);
border: 1px solid rgba(0, 0, 0, 0.05);
margin: 12px;
}
.pdf-status {
padding: 12px 16px;
border-radius: 12px;
margin: 12px 0;
font-size: 0.95rem;
font-weight: 500;
}
.pdf-success {
background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%);
border: 1px solid #b1dfbb;
color: #155724;
}
.pdf-error {
background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%);
border: 1px solid #f1aeb5;
color: #721c24;
}
.pdf-info {
background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%);
border: 1px solid #9ec5d8;
color: #0c5460;
}
.rag-context {
background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%);
border-left: 4px solid #f59e0b;
padding: 12px;
margin: 12px 0;
border-radius: 8px;
font-size: 0.9rem;
}
.thinking-section {
background: rgba(0, 0, 0, 0.02);
border: 1px solid rgba(0, 0, 0, 0.1);
border-radius: 8px;
padding: 12px;
margin: 8px 0;
}
"""
class SimpleTextSplitter:
"""ํ…์ŠคํŠธ ๋ถ„ํ• ๊ธฐ"""
def __init__(self, chunk_size=800, chunk_overlap=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_text(self, text: str) -> List[str]:
"""ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• """
chunks = []
sentences = text.split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class PDFRAGSystem:
"""PDF ๊ธฐ๋ฐ˜ RAG ์‹œ์Šคํ…œ"""
def __init__(self):
self.documents = {}
self.document_chunks = {}
self.embeddings_store = {}
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100)
# ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.embedder = None
if ST_AVAILABLE:
try:
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print("โœ… ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์„ฑ๊ณต")
except Exception as e:
print(f"โš ๏ธ ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์‹คํŒจ: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDF์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ"""
if not PDF_AVAILABLE:
return {
"metadata": {
"title": "PDF Reader Not Available",
"file_name": os.path.basename(pdf_path),
"pages": 0
},
"full_text": "PDF ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด 'pip install pymupdf'๋ฅผ ์‹คํ–‰ํ•ด์ฃผ์„ธ์š”."
}
try:
doc = fitz.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", os.path.basename(pdf_path)),
"pages": len(doc),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append(text)
doc.close()
return {
"metadata": metadata,
"full_text": "\n\n".join(text_content)
}
except Exception as e:
raise Exception(f"PDF ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {str(e)}")
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ"""
try:
# PDF ํ…์ŠคํŠธ ์ถ”์ถœ
pdf_data = self.extract_text_from_pdf(pdf_path)
# ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• 
chunks = self.text_splitter.split_text(pdf_data["full_text"])
if not chunks:
print("Warning: No chunks created from PDF")
return {"success": False, "error": "No text content found in PDF"}
print(f"Created {len(chunks)} chunks from PDF")
# ์ฒญํฌ ์ €์žฅ
self.document_chunks[doc_id] = chunks
# ์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ (์„ ํƒ์ )
if self.embedder:
try:
print("Generating embeddings...")
embeddings = self.embedder.encode(chunks)
self.embeddings_store[doc_id] = embeddings
print(f"Generated {len(embeddings)} embeddings")
except Exception as e:
print(f"Warning: Failed to generate embeddings: {e}")
# ์ž„๋ฒ ๋”ฉ ์‹คํŒจํ•ด๋„ ๊ณ„์† ์ง„ํ–‰
# ๋ฌธ์„œ ์ •๋ณด ์ €์žฅ
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
# ๋””๋ฒ„๊ทธ: ์ฒซ ๋ฒˆ์งธ ์ฒญํฌ ์ถœ๋ ฅ
print(f"First chunk preview: {chunks[0][:200]}...")
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
print(f"Error processing PDF: {e}")
return {"success": False, "error": str(e)}
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]:
"""๊ด€๋ จ ์ฒญํฌ ๊ฒ€์ƒ‰"""
all_relevant_chunks = []
print(f"Searching chunks for query: '{query[:50]}...' in {len(doc_ids)} documents")
# ๋จผ์ € ๋ฌธ์„œ๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธ
for doc_id in doc_ids:
if doc_id not in self.document_chunks:
print(f"Warning: Document {doc_id} not found in chunks")
continue
chunks = self.document_chunks[doc_id]
print(f"Document {doc_id} has {len(chunks)} chunks")
# ์ž„๋ฒ ๋”ฉ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰ ์‹œ๋„
if self.embedder and doc_id in self.embeddings_store:
try:
query_embedding = self.embedder.encode([query])[0]
doc_embeddings = self.embeddings_store[doc_id]
# ์ฝ”์‚ฌ์ธ ์œ ์‚ฌ๋„ ๊ณ„์‚ฐ (์•ˆ์ „ํ•˜๊ฒŒ)
similarities = []
for i, emb in enumerate(doc_embeddings):
try:
query_norm = np.linalg.norm(query_embedding)
emb_norm = np.linalg.norm(emb)
if query_norm > 0 and emb_norm > 0:
sim = np.dot(query_embedding, emb) / (query_norm * emb_norm)
similarities.append(sim)
else:
similarities.append(0.0)
except Exception as e:
print(f"Error calculating similarity for chunk {i}: {e}")
similarities.append(0.0)
# ์ƒ์œ„ ์ฒญํฌ ์„ ํƒ
if similarities:
top_indices = np.argsort(similarities)[-min(top_k, len(similarities)):][::-1]
for idx in top_indices:
if idx < len(chunks): # ์ธ๋ฑ์Šค ๋ฒ”์œ„ ํ™•์ธ
all_relevant_chunks.append({
"content": chunks[idx],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": similarities[idx]
})
print(f"Added chunk {idx} with similarity: {similarities[idx]:.3f}")
except Exception as e:
print(f"Error in embedding search: {e}")
# ์ž„๋ฒ ๋”ฉ ์‹คํŒจ์‹œ ํด๋ฐฑ
# ์ž„๋ฒ ๋”ฉ์ด ์—†๊ฑฐ๋‚˜ ์‹คํŒจํ•œ ๊ฒฝ์šฐ - ๊ฐ„๋‹จํžˆ ์ฒ˜์Œ N๊ฐœ ์ฒญํฌ ๋ฐ˜ํ™˜
if not all_relevant_chunks:
print(f"Falling back to simple chunk selection for {doc_id}")
for i in range(min(top_k, len(chunks))):
all_relevant_chunks.append({
"content": chunks[i],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": 1.0 - (i * 0.1) # ์ˆœ์„œ๋Œ€๋กœ ๊ฐ€์ค‘์น˜
})
print(f"Added chunk {i} (fallback)")
# ์œ ์‚ฌ๋„ ๊ธฐ์ค€ ์ •๋ ฌ
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True)
# ์ƒ์œ„ K๊ฐœ ์„ ํƒ
result = all_relevant_chunks[:top_k]
print(f"Returning {len(result)} chunks")
# ๋””๋ฒ„๊ทธ: ์ฒซ ๋ฒˆ์งธ ์ฒญํฌ ๋‚ด์šฉ ์ผ๋ถ€ ์ถœ๋ ฅ
if result:
print(f"First chunk preview: {result[0]['content'][:100]}...")
return result
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> tuple:
"""RAG ํ”„๋กฌํ”„ํŠธ ์ƒ์„ฑ - ์ฟผ๋ฆฌ์™€ ์ปจํ…์ŠคํŠธ๋ฅผ ๋ถ„๋ฆฌํ•˜์—ฌ ๋ฐ˜ํ™˜"""
print(f"Creating RAG prompt for query: '{query[:50]}...' with docs: {doc_ids}")
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k)
if not relevant_chunks:
print("No relevant chunks found - checking if documents exist")
# ๋ฌธ์„œ๊ฐ€ ์žˆ๋Š”๋ฐ ์ฒญํฌ๋ฅผ ๋ชป ์ฐพ์€ ๊ฒฝ์šฐ, ์ฒซ ๋ฒˆ์งธ ์ฒญํฌ๋ผ๋„ ์‚ฌ์šฉ
for doc_id in doc_ids:
if doc_id in self.document_chunks and self.document_chunks[doc_id]:
print(f"Using first chunk from {doc_id} as fallback")
relevant_chunks = [{
"content": self.document_chunks[doc_id][0],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": 0.5
}]
break
if not relevant_chunks:
print("No documents or chunks available")
return query, ""
print(f"Using {len(relevant_chunks)} chunks for context")
# ์ปจํ…์ŠคํŠธ ๊ตฌ์„ฑ
context_parts = []
context_parts.append("Based on the following document context, please answer the question below:")
context_parts.append("=" * 40)
for i, chunk in enumerate(relevant_chunks, 1):
context_parts.append(f"\n[Document Reference {i} - {chunk['doc_name']}]")
# ์ฒญํฌ ํฌ๊ธฐ ์ฆ๊ฐ€
content = chunk['content'][:1000] if len(chunk['content']) > 1000 else chunk['content']
context_parts.append(content)
print(f"Added chunk {i} ({len(content)} chars) with similarity: {chunk.get('similarity', 0):.3f}")
context_parts.append("\n" + "=" * 40)
context = "\n".join(context_parts)
enhanced_query = f"{context}\n\nQuestion: {query}\n\nAnswer based on the document context provided above:"
print(f"Enhanced query length: {len(enhanced_query)} chars (original: {len(query)} chars)")
return enhanced_query, context
# Initialize model and RAG system
model_id = "openai/gpt-oss-20b"
pipe = pipeline(
"text-generation",
model=model_id,
torch_dtype="auto",
device_map="auto",
)
rag_system = PDFRAGSystem()
# Global state for RAG
rag_enabled = False
selected_docs = []
top_k_chunks = 3
last_context = ""
def format_conversation_history(chat_history):
"""Format conversation history for the model"""
messages = []
for item in chat_history:
role = item["role"]
content = item["content"]
if isinstance(content, list):
content = content[0]["text"] if content and "text" in content[0] else str(content)
messages.append({"role": role, "content": content})
return messages
@spaces.GPU()
def generate_response(input_data, chat_history, max_new_tokens, system_prompt, temperature, top_p, top_k, repetition_penalty):
"""Generate response with optional RAG enhancement"""
global last_context, rag_enabled, selected_docs, top_k_chunks
# Debug logging
print(f"RAG Enabled: {rag_enabled}")
print(f"Selected Docs: {selected_docs}")
print(f"Available Docs: {list(rag_system.documents.keys())}")
# Apply RAG if enabled
if rag_enabled and selected_docs:
doc_ids = [doc.split(":")[0] for doc in selected_docs]
enhanced_input, context = rag_system.create_rag_prompt(input_data, doc_ids, top_k_chunks)
last_context = context
actual_input = enhanced_input
print(f"RAG Applied - Original: {len(input_data)} chars, Enhanced: {len(enhanced_input)} chars")
else:
actual_input = input_data
last_context = ""
print("RAG Not Applied")
# Prepare messages
new_message = {"role": "user", "content": actual_input}
system_message = [{"role": "system", "content": system_prompt}] if system_prompt else []
processed_history = format_conversation_history(chat_history)
messages = system_message + processed_history + [new_message]
# Setup streaming
streamer = TextIteratorStreamer(pipe.tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
"max_new_tokens": max_new_tokens,
"do_sample": True,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"streamer": streamer
}
thread = Thread(target=pipe, args=(messages,), kwargs=generation_kwargs)
thread.start()
# Process streaming output
thinking = ""
final = ""
started_final = False
for chunk in streamer:
if not started_final:
if "assistantfinal" in chunk.lower():
split_parts = re.split(r'assistantfinal', chunk, maxsplit=1)
thinking += split_parts[0]
final += split_parts[1]
started_final = True
else:
thinking += chunk
else:
final += chunk
clean_thinking = re.sub(r'^analysis\s*', '', thinking).strip()
clean_final = final.strip()
# Add RAG context indicator if used
rag_indicator = ""
if rag_enabled and selected_docs and last_context:
rag_indicator = "<div class='rag-context'>๐Ÿ“š RAG Context Applied</div>\n\n"
formatted = f"{rag_indicator}<details open><summary>Click to view Thinking Process</summary>\n\n{clean_thinking}\n\n</details>\n\n{clean_final}"
yield formatted
def upload_pdf(file):
"""PDF ํŒŒ์ผ ์—…๋กœ๋“œ ์ฒ˜๋ฆฌ"""
if file is None:
return (
gr.update(value="<div class='pdf-status pdf-info'>๐Ÿ“ ํŒŒ์ผ์„ ์„ ํƒํ•ด์ฃผ์„ธ์š”</div>"),
gr.update(choices=[])
)
try:
# ํŒŒ์ผ ํ•ด์‹œ๋ฅผ ID๋กœ ์‚ฌ์šฉ
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ
result = rag_system.process_and_store_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
<div class="pdf-status pdf-success">
โœ… PDF ์—…๋กœ๋“œ ์™„๋ฃŒ!<br>
๐Ÿ“„ {result['title']}<br>
๐Ÿ“‘ {result['pages']} ํŽ˜์ด์ง€ | ๐Ÿ” {result['chunks']} ์ฒญํฌ
</div>
"""
# ๋ฌธ์„œ ๋ชฉ๋ก ์—…๋ฐ์ดํŠธ
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in rag_system.documents.keys()]
return (
status_html,
gr.update(choices=doc_choices, value=doc_choices)
)
else:
return (
f"<div class='pdf-status pdf-error'>โŒ ์˜ค๋ฅ˜: {result['error']}</div>",
gr.update()
)
except Exception as e:
return (
f"<div class='pdf-status pdf-error'>โŒ ์˜ค๋ฅ˜: {str(e)}</div>",
gr.update()
)
def clear_documents():
"""๋ฌธ์„œ ์ดˆ๊ธฐํ™”"""
global selected_docs
rag_system.documents = {}
rag_system.document_chunks = {}
rag_system.embeddings_store = {}
selected_docs = []
return (
gr.update(value="<div class='pdf-status pdf-info'>๐Ÿ—‘๏ธ ๋ชจ๋“  ๋ฌธ์„œ๊ฐ€ ์‚ญ์ œ๋˜์—ˆ์Šต๋‹ˆ๋‹ค</div>"),
gr.update(choices=[], value=[])
)
def update_rag_settings(enable, docs, k):
"""Update RAG settings"""
global rag_enabled, selected_docs, top_k_chunks
rag_enabled = enable
selected_docs = docs if docs else []
top_k_chunks = k
# Debug logging
print(f"RAG Settings Updated - Enabled: {rag_enabled}, Docs: {selected_docs}, Top-K: {top_k_chunks}")
status = "โœ… Enabled" if enable and docs else "โญ• Disabled"
status_html = f"<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>{status}</strong></div>"
# Show context preview if RAG is enabled
if enable and docs:
preview = f"<div class='rag-context'>๐Ÿ“š Using {len(docs)} document(s) with {k} chunks per query</div>"
return gr.update(value=status_html), gr.update(value=preview, visible=True)
else:
return gr.update(value=status_html), gr.update(value="", visible=False)
# Build the interface
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css, fill_height=True) as demo:
gr.Markdown("# ๐Ÿš€ GPT-OSS-20B with PDF RAG System")
gr.Markdown("Enhanced AI assistant with document-based context understanding")
with gr.Row():
# Left sidebar for RAG controls
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("### ๐Ÿ“š Document RAG Settings")
pdf_upload = gr.File(
label="Upload PDF",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ“ค Upload a PDF to enable document-based answers</div>"
)
document_list = gr.CheckboxGroup(
choices=[],
label="๐Ÿ“„ Select Documents",
info="Choose documents to use as context"
)
clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear All Documents", size="sm", variant="secondary")
enable_rag = gr.Checkbox(
label="โœจ Enable RAG",
value=False,
info="Use documents for context-aware responses"
)
top_k_slider = gr.Slider(
minimum=1,
maximum=5,
value=3,
step=1,
label="Context Chunks",
info="Number of document chunks to use"
)
# RAG status display
rag_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>Disabled</strong></div>"
)
context_preview = gr.HTML(value="", visible=False)
# Right side for chat interface
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
# Create ChatInterface with custom function
chat_interface = gr.ChatInterface(
fn=generate_response,
additional_inputs=[
gr.Slider(label="Max new tokens", minimum=64, maximum=4096, step=1, value=2048),
gr.Textbox(
label="System Prompt",
value="You are a helpful assistant. Reasoning: medium",
lines=4,
placeholder="Change system prompt"
),
gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, step=0.1, value=0.7),
gr.Slider(label="Top-p", minimum=0.05, maximum=1.0, step=0.05, value=0.9),
gr.Slider(label="Top-k", minimum=1, maximum=100, step=1, value=50),
gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.0)
],
examples=[
[{"text": "Explain Newton laws clearly and concisely"}],
[{"text": "Write a Python function to calculate the Fibonacci sequence"}],
[{"text": "What are the benefits of open weight AI models"}],
],
cache_examples=False,
type="messages",
description="""Chat with GPT-OSS-20B. Upload PDFs to enhance responses with document context.
Click to view thinking process (default is on).""",
textbox=gr.Textbox(
label="Query Input",
placeholder="Type your prompt (RAG will be applied if enabled)"
),
stop_btn="Stop Generation",
multimodal=False
)
# Event handlers
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list]
)
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list]
)
# Update RAG settings when changed
enable_rag.change(
fn=update_rag_settings,
inputs=[enable_rag, document_list, top_k_slider],
outputs=[rag_status, context_preview]
)
document_list.change(
fn=update_rag_settings,
inputs=[enable_rag, document_list, top_k_slider],
outputs=[rag_status, context_preview]
)
top_k_slider.change(
fn=update_rag_settings,
inputs=[enable_rag, document_list, top_k_slider],
outputs=[rag_status, context_preview]
)
if __name__ == "__main__":
demo.launch(share=True)