import httpx # An asynchronous HTTP client. import os # To handle file paths and create directories. import asyncio # To run synchronous libraries in an async environment. from urllib.parse import unquote, urlparse # To get the filename from the URL. import uuid # To generate unique filenames if needed. from pydantic import HttpUrl from langchain_pymupdf4llm import PyMuPDF4LLMLoader import json import re from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document import os import time import fitz # PyMuPDF import httpx import tempfile import asyncio import concurrent.futures from typing import Optional from pathlib import Path import os import argparse from typing import Optional from urllib.parse import urlparse, unquote from llama_index.readers.file import PyMuPDFReader from llama_index.core import Document # Make sure Document is imported if used elsewhere from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint from concurrent.futures import ThreadPoolExecutor, as_completed # Ensure required libraries are installed. # You can install them using: # pip install llama_cloud_services pydantic python-dotenv from llama_cloud_services import LlamaExtract from pydantic import BaseModel, Field from dotenv import load_dotenv import tempfile from pathlib import Path from llama_index.readers.file import PDFReader from llama_index.readers.file import PyMuPDFReader import PyPDF2 # Global variable for the extractor agent llama_extract_agent = None class Insurance(BaseModel): """ A Pydantic model to define the data schema for extraction. The description helps guide the AI model. """ headings: str = Field(description="An array of headings") class Insurance(BaseModel): headings: str = Field(description="An array of headings") def initialize_llama_extract_agent(): global llama_extract_agent if llama_extract_agent is None: print("Initializing LlamaExtract client and getting agent...") try: extractor = LlamaExtract() llama_extract_agent = extractor.get_agent(name="insurance-parser") print("LlamaExtract agent initialized.") except Exception as e: print(f"Error initializing LlamaExtract agent: {e}") llama_extract_agent = None # Ensure it's None if there was an error def extract_schema_from_file(file_path: str) -> Optional[Insurance]: if not os.path.exists(file_path): print(f"āŒ Error: The file '{file_path}' was not found.") return None if llama_extract_agent is None: print("LlamaExtract agent not initialized. Attempting to initialize now.") initialize_llama_extract_agent() if llama_extract_agent is None: print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.") return None print(f"šŸš€ Sending '{file_path}' to LlamaCloud for schema extraction...") try: result = llama_extract_agent.extract(file_path) if result and result.data: print("āœ… Extraction successful!") return result.data else: print("āš ļø Extraction did not return any data.") return None except Exception as e: print(f"\nāŒ An error occurred during the API call: {e}") print("Please check your API key, network connection, and file format.") return None def process_pdf_chunk(chunk_path: str) -> str: """ Worker function for the ProcessPoolExecutor. It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the extracted markdown content, prepending the original page number. Args: chunk_path: The file path to a temporary PDF chunk. Returns: A string containing the markdown content for the chunk. """ try: # Load the document chunk using LangChain's loader loader = PyMuPDF4LLMLoader(chunk_path) documents = loader.load() page_contents = [] for doc in documents: # Reconstruct the original page number from the filename for context original_page_in_doc = int(doc.metadata.get('page', -1)) base_name = os.path.basename(chunk_path) # Filename format is "chunk_START-END.pdf" start_page_of_chunk = int(base_name.split('_')[1].split('-')[0]) # The final page number is the start of the chunk + the page within the chunk doc actual_page_num = start_page_of_chunk + original_page_in_doc page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}") return "".join(page_contents) except Exception as e: # Return an error message if a chunk fails to process return f"Error processing chunk {os.path.basename(chunk_path)}: {e}" def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: """ Splits a local PDF into chunks, processes them in parallel using multiple CPU cores, and then combines the results. This method is ideal for very large documents and runs entirely locally. Args: pdf_path: The file path to the local PDF. output_path: (Optional) The file path to save the output Markdown. chunk_size: The number of pages to include in each parallel chunk. Returns: A string containing the full markdown content of the PDF. """ start_time = time.monotonic() if not os.path.exists(pdf_path): raise FileNotFoundError(f"āŒ The file '{pdf_path}' was not found.") temp_dir = "temp_pdf_chunks" os.makedirs(temp_dir, exist_ok=True) chunk_paths = [] markdown_text = "" try: # Step 1: Split the source PDF into smaller chunk files print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...") doc = fitz.open(pdf_path) num_pages = len(doc) for i in range(0, num_pages, chunk_size): chunk_start = i chunk_end = min(i + chunk_size, num_pages) # Create a new blank PDF and insert pages from the source with fitz.open() as chunk_doc: chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1) # Naming convention includes page numbers for sorting chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf") chunk_doc.save(chunk_path) chunk_paths.append(chunk_path) doc.close() print(f"āœ… Successfully created {len(chunk_paths)} PDF chunks.") # Step 2: Process the chunks in parallel print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...") results = {} with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: # Map each future to its corresponding chunk path future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths} for future in concurrent.futures.as_completed(future_to_chunk): chunk_path = future_to_chunk[future] try: # Store the result from the completed future results[chunk_path] = future.result() except Exception as e: results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}" # Step 3: Combine results in the correct order print("Combining processed chunks...") # Sort results based on the original chunk path list to maintain order sorted_results = [results[path] for path in chunk_paths] markdown_text = "\n\n".join(sorted_results) # Step 4: Save to file if an output path is provided if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(markdown_text) print(f"āœ… Markdown saved to: {output_path}") finally: # Step 5: Clean up temporary chunk files and directory # print("Cleaning up temporary files...") for path in chunk_paths: if os.path.exists(path): os.remove(path) if os.path.exists(temp_dir): try: if not os.listdir(temp_dir): os.rmdir(temp_dir) except OSError as e: print(f"Could not remove temp directory '{temp_dir}': {e}") end_time = time.monotonic() print(f"ā±ļø Total processing time: {end_time - start_time:.2f} seconds") return markdown_text async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: """ High-level async function to process a PDF from a URL or local path. It downloads the file if a URL is provided, then uses the local parallel processor to convert it to markdown. Args: source: A local file path or a public URL to a PDF file. output_path: (Optional) The file path to save the output Markdown. chunk_size: The number of pages per chunk for parallel processing. Returns: A string containing the full markdown content of the PDF. """ # Check if the source is a URL is_url = source.lower().startswith('http://') or source.lower().startswith('https://') if is_url: print(f"ā¬‡ļø Downloading from URL: {source}") temp_pdf_file_path = None try: # Download the file asynchronously async with httpx.AsyncClient() as client: response = await client.get(source, timeout=60.0, follow_redirects=True) response.raise_for_status() # Save the downloaded content to a temporary file with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file: temp_file.write(response.content) temp_pdf_file_path = temp_file.name print(f"āœ… Download complete. Saved to temporary file: {temp_pdf_file_path}") # The parallel function is synchronous, so we run it in an executor # to avoid blocking the asyncio event loop. loop = asyncio.get_running_loop() markdown_content = await loop.run_in_executor( None, # Use the default thread pool executor pdf_to_markdown_parallel, temp_pdf_file_path, output_path, chunk_size ) return markdown_content finally: # Clean up the temporary file after processing if temp_pdf_file_path and os.path.exists(temp_pdf_file_path): os.remove(temp_pdf_file_path) print(f"šŸ—‘ļø Cleaned up temporary file: {temp_pdf_file_path}") else: # If it's a local path, process it directly print(f"āš™ļø Processing local file: {source}") loop = asyncio.get_running_loop() markdown_content = await loop.run_in_executor( None, pdf_to_markdown_parallel, source, output_path, chunk_size ) return markdown_content # Define the batch size for parallel processing BATCH_SIZE = 25 def process_page_batch(documents_batch: list[Document]) -> str: """ Helper function to extract content from a batch of LlamaIndex Document objects and join them into a single string. """ return "\n\n".join([d.get_content() for d in documents_batch]) async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str: """ Asynchronously downloads a document, saves it to a local directory, and then parses it using PyMuPDFReader from LlamaIndex. The parsing of page content is parallelized using a ThreadPoolExecutor with a specified batch size. Args: doc_url: The Pydantic-validated URL of the document to process. Returns: A single string containing the document's extracted text. """ print(f"Initiating download from: {doc_url}") try: # Create the local storage directory if it doesn't exist. LOCAL_STORAGE_DIR = "data/" os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) async with httpx.AsyncClient() as client: response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) doc_bytes = response.content print("Download successful.") # --- Logic to determine the local filename --- parsed_path = urlparse(str(doc_url)).path filename = unquote(os.path.basename(parsed_path)) if not filename: # If the URL doesn't provide a filename, create a generic one. # Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing. filename = "downloaded_document.pdf" local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) # Save the downloaded document to the local file. with open(local_file_path, "wb") as f: f.write(doc_bytes) print(f"Document saved locally at: {local_file_path}") print("Parsing document with PyMuPDFReader...") # PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page) loader = PyMuPDFReader() docs0 = loader.load_data(file_path=Path(local_file_path)) # Measure time for parallel doc_text creation start_time_conversion = time.perf_counter() all_extracted_texts = [] # Use ThreadPoolExecutor for parallel processing of page batches # The max_workers argument can be adjusted based on available CPU cores with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: futures = [] # Divide docs0 (list of pages) into batches of BATCH_SIZE for i in range(0, len(docs0), BATCH_SIZE): batch = docs0[i:i + BATCH_SIZE] # Submit each batch to the executor for processing futures.append(executor.submit(process_page_batch, batch)) # Collect results as they complete # as_completed returns futures as they finish, maintaining responsiveness for future in as_completed(futures): all_extracted_texts.append(future.result()) # Join all extracted texts from the batches into a single string doc_text = "\n\n".join(all_extracted_texts) end_time_conversion = time.perf_counter() elapsed_time_conversion = end_time_conversion - start_time_conversion print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.") # The extracted text is now in 'doc_text' if doc_text: print(f"Parsing complete. Extracted {len(doc_text)} characters.") # The local file is NOT deleted, as per original code. return doc_text else: raise ValueError("PyMuPDFReader did not extract any content.") except httpx.HTTPStatusError as e: print(f"Error downloading document: HTTP status error: {e}") raise except Exception as e: print(f"An unexpected error occurred during processing: {e}") raise