Spaces:
Build error
Build error
| import os | |
| import streamlit as st | |
| from datetime import datetime | |
| import re | |
| from werkzeug.utils import secure_filename | |
| from src.gpp import GPP, GPPConfig | |
| from src.qa import AnswerGenerator | |
| # Check if we need to modify the AnswerGenerator class to accept conversation context | |
| # If the original implementation doesn't support this, we'll create a wrapper | |
| class ContextAwareAnswerGenerator: | |
| """Wrapper around AnswerGenerator to include conversation context""" | |
| def __init__(self, chunks): | |
| self.chunks = chunks | |
| self.original_generator = AnswerGenerator(chunks) | |
| def answer(self, question, conversation_context=None): | |
| """ | |
| Generate answer with conversation context | |
| Args: | |
| chunks: Document chunks to search | |
| question: Current question | |
| conversation_context: List of previous Q&A for context | |
| Returns: | |
| answer, supporting_chunks | |
| """ | |
| # If no conversation context or original implementation supports it directly | |
| if conversation_context is None or len(conversation_context) <= 1: | |
| return self.original_generator.answer(question) | |
| # Otherwise, enhance the question with context | |
| # Create a contextual prompt by summarizing previous exchanges | |
| context_prompt = "Based on our conversation so far:\n" | |
| # Include the last few exchanges (limiting to prevent context getting too large) | |
| max_history = min(len(conversation_context) - 1, 4) # Last 4 exchanges maximum | |
| for i in range(max(0, len(conversation_context) - max_history - 1), len(conversation_context) - 1, 2): | |
| if i < len(conversation_context) and i+1 < len(conversation_context): | |
| user_q = conversation_context[i]["content"] | |
| assistant_a = conversation_context[i+1]["content"] | |
| context_prompt += f"You were asked: '{user_q}'\n" | |
| context_prompt += f"You answered: '{assistant_a}'\n" | |
| context_prompt += f"\nNow answer this follow-up question: {question}" | |
| # Use the enhanced prompt | |
| return self.original_generator.answer(context_prompt) | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="Document Intelligence Q&A", | |
| page_icon="📄", | |
| layout="wide" | |
| ) | |
| # --- Session State Initialization --- | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] # List of {role: 'user'/'assistant', content: str} | |
| if 'parsed' not in st.session_state: | |
| st.session_state.parsed = None | |
| if "selected_chunks" not in st.session_state: | |
| st.session_state.selected_chunks = [] | |
| if "conversation_context" not in st.session_state: | |
| st.session_state.conversation_context = [] | |
| # --- Custom CSS for styling --- | |
| st.markdown( | |
| """ | |
| <style> | |
| /* Global Styles */ | |
| body { | |
| background-color: #fafafa; | |
| font-family: 'Helvetica Neue', sans-serif; | |
| } | |
| /* Header Styles */ | |
| .main-header { | |
| margin-bottom: 2rem; | |
| } | |
| /* Card Styles */ | |
| .card { | |
| background: white; | |
| border-radius: 8px; | |
| padding: 20px; | |
| margin-bottom: 20px; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.12), 0 1px 2px rgba(0,0,0,0.24); | |
| } | |
| /* Button Styles */ | |
| .stButton>button { | |
| background-color: #4361ee; | |
| color: white; | |
| border-radius: 4px; | |
| border: none; | |
| padding: 8px 16px; | |
| font-weight: 500; | |
| } | |
| .stButton>button:hover { | |
| background-color: #3a56d4; | |
| } | |
| /* Input Styles */ | |
| .stTextInput>div>div>input { | |
| border-radius: 4px; | |
| border: 1px solid #e0e0e0; | |
| } | |
| /* Code Block Styles */ | |
| pre { | |
| background-color: #f5f5f5; | |
| padding: 12px; | |
| border-radius: 4px; | |
| font-size: 14px; | |
| } | |
| /* Hide Streamlit footer */ | |
| footer { | |
| display: none; | |
| } | |
| /* Sidebar Styles */ | |
| .css-18e3th9 { | |
| padding-top: 1rem; | |
| } | |
| /* Expander styles */ | |
| .streamlit-expanderHeader { | |
| font-size: 1rem; | |
| font-weight: 500; | |
| } | |
| /* Chat Interface Styles */ | |
| .chat-container { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 12px; | |
| margin-top: 20px; | |
| margin-bottom: 20px; | |
| } | |
| .chat-message { | |
| display: flex; | |
| margin-bottom: 10px; | |
| } | |
| .user-message { | |
| justify-content: flex-end; | |
| } | |
| .assistant-message { | |
| justify-content: flex-start; | |
| } | |
| .message-content { | |
| padding: 12px 16px; | |
| border-radius: 18px; | |
| max-width: 80%; | |
| overflow-wrap: break-word; | |
| } | |
| .user-message .message-content { | |
| background-color: #4361ee; | |
| color: white; | |
| border-bottom-right-radius: 4px; | |
| } | |
| .assistant-message .message-content { | |
| background-color: #f0f2f6; | |
| color: #1e1e1e; | |
| border-bottom-left-radius: 4px; | |
| } | |
| .message-content p { | |
| margin: 0; | |
| padding: 0; | |
| } | |
| /* Empty chat placeholder style */ | |
| .empty-chat-placeholder { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: center; | |
| height: 300px; | |
| background-color: #f8f9fa; | |
| border-radius: 8px; | |
| margin-bottom: 20px; | |
| text-align: center; | |
| color: #6c757d; | |
| } | |
| .empty-chat-icon { | |
| font-size: 40px; | |
| margin-bottom: 16px; | |
| color: #adb5bd; | |
| } | |
| /* Message typing indicator */ | |
| .typing-indicator { | |
| display: flex; | |
| align-items: center; | |
| justify-content: flex-start; | |
| margin-top: 8px; | |
| } | |
| .typing-indicator span { | |
| height: 8px; | |
| width: 8px; | |
| background-color: #4361ee; | |
| border-radius: 50%; | |
| margin: 0 2px; | |
| display: inline-block; | |
| opacity: 0.7; | |
| } | |
| .typing-indicator span:nth-child(1) { | |
| animation: pulse 1s infinite; | |
| } | |
| .typing-indicator span:nth-child(2) { | |
| animation: pulse 1s infinite 0.2s; | |
| } | |
| .typing-indicator span:nth-child(3) { | |
| animation: pulse 1s infinite 0.4s; | |
| } | |
| @keyframes pulse { | |
| 0% { transform: scale(1); opacity: 0.7; } | |
| 50% { transform: scale(1.2); opacity: 1; } | |
| 100% { transform: scale(1); opacity: 0.7; } | |
| } | |
| /* Spinner */ | |
| .stSpinner > div > div { | |
| border-top-color: #4361ee !important; | |
| } | |
| /* Info box */ | |
| .stAlert { | |
| border-radius: 8px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True | |
| ) | |
| # --- Left Sidebar: Instructions & Upload --- | |
| with st.sidebar: | |
| # App info section | |
| st.image("https://img.icons8.com/ios-filled/50/4A90E2/document.png", width=40) | |
| st.title("Document Intelligence") | |
| st.caption(f"Last updated: {datetime.now().strftime('%Y-%m-%d')}") | |
| with st.expander("How It Works", expanded=True): | |
| st.markdown( | |
| """ | |
| 1. **Upload PDF**: Select and parse your document | |
| 2. **Ask Questions**: Type your query about the document | |
| 3. **Get Answers**: AI analyzes and responds with insights | |
| 4. **View Evidence**: See supporting chunks in the right sidebar | |
| """ | |
| ) | |
| st.markdown("---") | |
| # Upload section | |
| st.subheader("Upload Document") | |
| uploaded_file = st.file_uploader("Select a PDF", type=["pdf"], help="Upload a PDF file to analyze") | |
| if uploaded_file: | |
| try: | |
| filename = secure_filename(uploaded_file.name) | |
| if not re.match(r'^[\w\-. ]+$', filename): | |
| st.error("Invalid file name. Please rename your file.") | |
| else: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Parse pdf", use_container_width=True, key="parse_button"): | |
| output_dir = os.path.join("./parsed", filename) | |
| os.makedirs(output_dir, exist_ok=True) | |
| pdf_path = os.path.join(output_dir, filename) | |
| with open(pdf_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| with st.spinner("Parsing document..."): | |
| try: | |
| gpp = GPP(GPPConfig()) | |
| parsed = gpp.run(pdf_path, output_dir) | |
| st.session_state.parsed = parsed | |
| st.session_state.chat_history = [] # Reset chat when new document is parsed | |
| st.session_state.conversation_context = [] # Reset conversation context | |
| st.session_state.selected_chunks = [] # Reset selected chunks | |
| st.success("Document parsed successfully!") | |
| except Exception as e: | |
| st.error(f"Parsing failed: {str(e)}") | |
| st.session_state.parsed = None | |
| with col2: | |
| if st.button("Clear", use_container_width=True, key="clear_button"): | |
| st.session_state.parsed = None | |
| st.session_state.selected_chunks = [] | |
| st.session_state.chat_history = [] | |
| st.session_state.conversation_context = [] | |
| st.experimental_rerun() | |
| except Exception as e: | |
| st.error(f"Upload error: {str(e)}") | |
| # Display document preview if parsed | |
| if st.session_state.parsed: | |
| st.markdown("---") | |
| st.subheader("Document Preview") | |
| parsed = st.session_state.parsed | |
| # Layout PDF | |
| layout_pdf = parsed.get("layout_pdf") | |
| if layout_pdf and os.path.exists(layout_pdf): | |
| with st.expander("View Layout PDF", expanded=False): | |
| st.markdown(f"[Open in new tab]({layout_pdf})") | |
| # Content preview | |
| md_path = parsed.get("md_path") | |
| if md_path and os.path.exists(md_path): | |
| try: | |
| with open(md_path, 'r', encoding='utf-8') as md_file: | |
| md_text = md_file.read() | |
| with st.expander("Content Preview", expanded=False): | |
| st.markdown(f"<pre style='font-size:12px;max-height:300px;overflow-y:auto'>{md_text[:3000]}{'...' if len(md_text)>3000 else ''}</pre>", unsafe_allow_html=True) | |
| except Exception as e: | |
| st.warning(f"Could not preview content: {str(e)}") | |
| # --- Main Content Area --- | |
| # Create a two-column layout for main content | |
| main_col, evidence_col = st.columns([3, 1]) | |
| with main_col: | |
| st.markdown("<div class='main-header'>", unsafe_allow_html=True) | |
| st.title("Document Q&A") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| if not st.session_state.parsed: | |
| st.info("👈 Please upload and parse a document to begin asking questions.") | |
| else: | |
| # Q&A Section with chat-like interface | |
| st.markdown("<div class='card'>", unsafe_allow_html=True) | |
| question = st.text_input( | |
| "Ask a question about your document:", | |
| key="question_input", | |
| placeholder="E.g., 'What are the key findings?' or 'Summarize the data'", | |
| on_change=None # Ensure the input field gets cleared naturally after submission | |
| ) | |
| col_btn1, col_btn2 = st.columns([4, 1]) | |
| with col_btn1: | |
| submit_button = st.button("Get Answer", use_container_width=True) | |
| with col_btn2: | |
| clear_chat = st.button("Clear Chat", use_container_width=True) | |
| # Initialize chat history | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| # Clear chat when button is pressed | |
| if clear_chat: | |
| st.session_state.chat_history = [] | |
| st.session_state.conversation_context = [] | |
| st.session_state.selected_chunks = [] | |
| st.experimental_rerun() | |
| if submit_button and question: | |
| with st.spinner("Analyzing document and generating answer..."): | |
| try: | |
| # Add user question to chat history | |
| st.session_state.chat_history.append({"role": "user", "content": question}) | |
| # Generate answer using conversation context | |
| generator = ContextAwareAnswerGenerator(st.session_state.parsed['chunks']) | |
| answer, supporting_chunks = generator.answer( | |
| question, conversation_context=st.session_state.chat_history | |
| ) | |
| # Add assistant response to chat history | |
| st.session_state.chat_history.append({"role": "assistant", "content": answer}) | |
| # Store supporting chunks in session state for the right sidebar | |
| st.session_state.selected_chunks = supporting_chunks | |
| # Clear the question input | |
| question = "" | |
| except Exception as e: | |
| st.error(f"Failed to generate answer: {str(e)}") | |
| st.session_state.selected_chunks = [] | |
| # Display chat history | |
| st.markdown("<div class='chat-container'>", unsafe_allow_html=True) | |
| if not st.session_state.chat_history: | |
| # Show empty chat state with icon | |
| st.markdown(""" | |
| <div class='empty-chat-placeholder'> | |
| <div class='empty-chat-icon'>💬</div> | |
| <p>Ask questions about your document to start a conversation</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| for message in st.session_state.chat_history: | |
| if message["role"] == "user": | |
| st.markdown(f""" | |
| <div class='chat-message user-message'> | |
| <div class='message-content'> | |
| <p>{message["content"]}</p> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| st.markdown(f""" | |
| <div class='chat-message assistant-message'> | |
| <div class='message-content'> | |
| <p>{message["content"]}</p> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # --- Supporting Evidence in the right column --- | |
| with evidence_col: | |
| if st.session_state.parsed: | |
| st.markdown("### Supporting Evidence") | |
| if not st.session_state.selected_chunks: | |
| st.info("Evidence chunks will appear here after you ask a question.") | |
| else: | |
| for idx, chunk in enumerate(st.session_state.selected_chunks): | |
| with st.expander(f"Evidence #{idx+1}", expanded=True): | |
| st.markdown(f"**Type:** {chunk['type'].capitalize()}") | |
| st.markdown(chunk.get('narration', 'No narration available')) | |
| # Display table if available | |
| if 'table_structure' in chunk: | |
| st.write("**Table Data:**") | |
| st.dataframe(chunk['table_structure'], use_container_width=True) | |
| # Display images if available | |
| for blk in chunk.get('blocks', []): | |
| if blk.get('type') == 'img_path' and 'images_dir' in st.session_state.parsed: | |
| img_path = os.path.join(st.session_state.parsed['images_dir'], blk.get('img_path','')) | |
| if os.path.exists(img_path): | |
| st.image(img_path, use_column_width=True) | |
| # -- Error handling wrapper -- | |
| def handle_error(func): | |
| try: | |
| func() | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| st.info("Please refresh the page and try again.") | |
| # Wrap the entire app in the error handler | |
| handle_error(lambda: None) |