```python import os import tempfile import logging import traceback from pathlib import Path from typing import Dict, Any, List from datetime import datetime from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from enhanced_legal_scraper import EnhancedLegalScraper, LegalDocument, IRANIAN_LEGAL_SOURCES try: import fitz # PyMuPDF from PIL import Image PDF_AVAILABLE = True logger.info("✅ PDF processing libraries loaded") except ImportError as e: PDF_AVAILABLE = False logger.warning(f"⚠️ PDF libraries not available: {e}") try: from transformers import TrOCRProcessor, VisionEncoderDecoderModel import torch ML_AVAILABLE = True logger.info("✅ ML libraries loaded") except ImportError as e: ML_AVAILABLE = False logger.warning(f"⚠️ ML libraries not available: {e}") # Create log directory log_dir = '/app/logs' os.makedirs(log_dir, exist_ok=True) # Configure logging logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(log_dir, 'legal_dashboard.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class OCRResponse(BaseModel): success: bool text: str method: str metadata: Dict[str, Any] class SystemStatus(BaseModel): status: str services: Dict[str, Any] timestamp: str class SearchRequest(BaseModel): query: str search_type: str = "هوشمند" doc_filter: str = "همه" class LegalDashboardAPI: def __init__(self): self.scraper = EnhancedLegalScraper(delay=1.5) self.ocr_service = OCRService() class OCRService: def __init__(self): self.model = None self.processor = None self.model_loaded = False if ML_AVAILABLE and os.getenv("ENVIRONMENT") != "huggingface_free": self._load_model() def _load_model(self): try: logger.info("Loading TrOCR model...") model_name = "microsoft/trocr-base-printed" self.processor = TrOCRProcessor.from_pretrained(model_name, cache_dir="/app/cache") self.model = VisionEncoderDecoderModel.from_pretrained(model_name, cache_dir="/app/cache") self.model_loaded = True logger.info("✅ TrOCR model loaded successfully") except Exception as e: logger.warning(f"❌ Failed to load TrOCR model: {e}. OCR will use basic processing.") self.model_loaded = False async def extract_text_from_pdf(self, file_path: str) -> OCRResponse: if not PDF_AVAILABLE: return OCRResponse(success=False, text="", method="error", metadata={"error": "PDF processing not available"}) try: doc = fitz.open(file_path) pages_text = [] total_chars = 0 total_pages = doc.page_count for page_num in range(min(total_pages, 10)): page = doc[page_num] text = page.get_text() pages_text.append(text) total_chars += len(text) doc.close() full_text = "\n\n--- Page Break ---\n\n".join(pages_text) return OCRResponse( success=True, text=full_text, method="PyMuPDF", metadata={ "pages_processed": len(pages_text), "total_pages": total_pages, "total_characters": total_chars, "file_size_kb": os.path.getsize(file_path) / 1024 } ) except Exception as e: logger.error(f"PDF processing error: {e}") return OCRResponse(success=False, text="", method="error", metadata={"error": str(e)}) async def extract_text_from_image(self, file_path: str) -> OCRResponse: try: image = Image.open(file_path) if self.model_loaded and self.processor and self.model: pixel_values = self.processor(images=image, return_tensors="pt").pixel_values generated_ids = self.model.generate(pixel_values) generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0] return OCRResponse( success=True, text=generated_text, method="TrOCR", metadata={ "image_size": image.size, "image_mode": image.mode, "model": "microsoft/trocr-base-printed" } ) else: return OCRResponse( success=True, text=f"Image processed: {image.size} pixels, {image.mode} mode\nTrOCR model not loaded - text extraction limited", method="Basic", metadata={ "image_size": image.size, "image_mode": image.mode, "note": "TrOCR model not available" } ) except Exception as e: logger.error(f"Image processing error: {e}") return OCRResponse(success=False, text="", method="error", metadata={"error": str(e)}) app = FastAPI( title="Legal Dashboard API", description="Advanced Legal Document Processing System with OCR and NLP", version="2.0.0", docs_url="/api/docs", redoc_url="/api/redoc" ) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:7860", "http://127.0.0.1:7860", "*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) legal_api = LegalDashboardAPI() @app.on_event("startup") async def startup_event(): if ML_AVAILABLE and os.getenv("ENVIRONMENT") != "huggingface_free": legal_api.ocr_service._load_model() @app.get("/health") async def health_check(): return { "status": "healthy", "message": "Legal Dashboard is running", "timestamp": datetime.now().isoformat(), "services": { "pdf_processing": PDF_AVAILABLE, "ml_models": ML_AVAILABLE, "ocr_model_loaded": legal_api.ocr_service.model_loaded, "scraper": bool(legal_api.scraper) } } @app.get("/system/status", response_model=SystemStatus) async def get_system_status(): return SystemStatus( status="healthy", services={ "pdf_processing": { "available": PDF_AVAILABLE, "status": "✅ Available" if PDF_AVAILABLE else "❌ Not Available" }, "ml_models": { "available": ML_AVAILABLE, "status": "✅ Available" if ML_AVAILABLE else "❌ Not Available" }, "ocr_model": { "loaded": legal_api.ocr_service.model_loaded, "status": "✅ Loaded" if legal_api.ocr_service.model_loaded else "⏳ Loading..." if ML_AVAILABLE else "❌ Not Available" }, "scraper": { "available": bool(legal_api.scraper), "status": "✅ Available" if legal_api.scraper else "❌ Not Available" } }, timestamp=datetime.now().isoformat() ) @app.post("/api/ocr/extract-pdf", response_model=OCRResponse) async def extract_pdf_text(file: UploadFile = File(...)): if not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="File must be a PDF") temp_path = None try: temp_dir = Path("/app/uploads") temp_dir.mkdir(exist_ok=True) temp_path = temp_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}" with temp_path.open("wb") as f: f.write(await file.read()) return await legal_api.ocr_service.extract_text_from_pdf(str(temp_path)) finally: if temp_path and temp_path.exists(): temp_path.unlink() @app.post("/api/ocr/extract-image", response_model=OCRResponse) async def extract_image_text(file: UploadFile = File(...)): allowed_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff'] if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions): raise HTTPException(status_code=400, detail="File must be an image (JPG, PNG, BMP, TIFF)") temp_path = None try: temp_dir = Path("/app/uploads") temp_dir.mkdir(exist_ok=True) temp_path = temp_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}" with temp_path.open("wb") as f: f.write(await file.read()) return await legal_api.ocr_service.extract_text_from_image(str(temp_path)) finally: if temp_path and temp_path.exists(): temp_path.unlink() @app.post("/api/scrape") async def scrape_documents(max_docs: int = 20): try: documents = legal_api.scraper.scrape_real_sources(max_docs=max_docs) for doc in documents: legal_api.scraper.save_document(doc) return { "success": True, "documents_processed": len(documents), "documents": [doc.__dict__ for doc in documents] } except Exception as e: logger.error(f"Scrape failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/search", response_model=List[Dict]) async def search_documents(request: SearchRequest): try: filter_map = { 'همه': None, 'قوانین': 'law', 'اخبار': 'news', 'آرا': 'ruling', 'آیین‌نامه': 'regulation', 'عمومی': 'general' } doc_type = filter_map.get(request.doc_filter) if request.search_type == "هوشمند": results = legal_api.scraper.search_with_similarity(request.query, limit=20) else: results = legal_api.scraper._text_search(request.query, limit=20) if doc_type: results = [r for r in results if r['document_type'] == doc_type] return results except Exception as e: logger.error(f"Search failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/statistics") async def get_statistics(): try: return legal_api.scraper.get_enhanced_statistics() except Exception as e: logger.error(f"Statistics failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Global exception: {exc}") logger.error(traceback.format_exc()) return JSONResponse( status_code=500, content={ "error": "Internal server error", "message": str(exc), "path": str(request.url) } ) if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False, log_level="info") ```