Hoghoghi / app /main.py
Really-amin's picture
Upload 2 files
3c588e8 verified
raw
history blame
11.3 kB
```python
import os
import tempfile
import logging
import traceback
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime
from fastapi import FastAPI, File, UploadFile, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from enhanced_legal_scraper import EnhancedLegalScraper, LegalDocument, IRANIAN_LEGAL_SOURCES
try:
import fitz # PyMuPDF
from PIL import Image
PDF_AVAILABLE = True
logger.info("✅ PDF processing libraries loaded")
except ImportError as e:
PDF_AVAILABLE = False
logger.warning(f"⚠️ PDF libraries not available: {e}")
try:
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
import torch
ML_AVAILABLE = True
logger.info("✅ ML libraries loaded")
except ImportError as e:
ML_AVAILABLE = False
logger.warning(f"⚠️ ML libraries not available: {e}")
# Create log directory
log_dir = '/app/logs'
os.makedirs(log_dir, exist_ok=True)
# Configure logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'legal_dashboard.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class OCRResponse(BaseModel):
success: bool
text: str
method: str
metadata: Dict[str, Any]
class SystemStatus(BaseModel):
status: str
services: Dict[str, Any]
timestamp: str
class SearchRequest(BaseModel):
query: str
search_type: str = "هوشمند"
doc_filter: str = "همه"
class LegalDashboardAPI:
def __init__(self):
self.scraper = EnhancedLegalScraper(delay=1.5)
self.ocr_service = OCRService()
class OCRService:
def __init__(self):
self.model = None
self.processor = None
self.model_loaded = False
if ML_AVAILABLE and os.getenv("ENVIRONMENT") != "huggingface_free":
self._load_model()
def _load_model(self):
try:
logger.info("Loading TrOCR model...")
model_name = "microsoft/trocr-base-printed"
self.processor = TrOCRProcessor.from_pretrained(model_name, cache_dir="/app/cache")
self.model = VisionEncoderDecoderModel.from_pretrained(model_name, cache_dir="/app/cache")
self.model_loaded = True
logger.info("✅ TrOCR model loaded successfully")
except Exception as e:
logger.warning(f"❌ Failed to load TrOCR model: {e}. OCR will use basic processing.")
self.model_loaded = False
async def extract_text_from_pdf(self, file_path: str) -> OCRResponse:
if not PDF_AVAILABLE:
return OCRResponse(success=False, text="", method="error", metadata={"error": "PDF processing not available"})
try:
doc = fitz.open(file_path)
pages_text = []
total_chars = 0
total_pages = doc.page_count
for page_num in range(min(total_pages, 10)):
page = doc[page_num]
text = page.get_text()
pages_text.append(text)
total_chars += len(text)
doc.close()
full_text = "\n\n--- Page Break ---\n\n".join(pages_text)
return OCRResponse(
success=True,
text=full_text,
method="PyMuPDF",
metadata={
"pages_processed": len(pages_text),
"total_pages": total_pages,
"total_characters": total_chars,
"file_size_kb": os.path.getsize(file_path) / 1024
}
)
except Exception as e:
logger.error(f"PDF processing error: {e}")
return OCRResponse(success=False, text="", method="error", metadata={"error": str(e)})
async def extract_text_from_image(self, file_path: str) -> OCRResponse:
try:
image = Image.open(file_path)
if self.model_loaded and self.processor and self.model:
pixel_values = self.processor(images=image, return_tensors="pt").pixel_values
generated_ids = self.model.generate(pixel_values)
generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return OCRResponse(
success=True,
text=generated_text,
method="TrOCR",
metadata={
"image_size": image.size,
"image_mode": image.mode,
"model": "microsoft/trocr-base-printed"
}
)
else:
return OCRResponse(
success=True,
text=f"Image processed: {image.size} pixels, {image.mode} mode\nTrOCR model not loaded - text extraction limited",
method="Basic",
metadata={
"image_size": image.size,
"image_mode": image.mode,
"note": "TrOCR model not available"
}
)
except Exception as e:
logger.error(f"Image processing error: {e}")
return OCRResponse(success=False, text="", method="error", metadata={"error": str(e)})
app = FastAPI(
title="Legal Dashboard API",
description="Advanced Legal Document Processing System with OCR and NLP",
version="2.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:7860", "http://127.0.0.1:7860", "*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
legal_api = LegalDashboardAPI()
@app.on_event("startup")
async def startup_event():
if ML_AVAILABLE and os.getenv("ENVIRONMENT") != "huggingface_free":
legal_api.ocr_service._load_model()
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"message": "Legal Dashboard is running",
"timestamp": datetime.now().isoformat(),
"services": {
"pdf_processing": PDF_AVAILABLE,
"ml_models": ML_AVAILABLE,
"ocr_model_loaded": legal_api.ocr_service.model_loaded,
"scraper": bool(legal_api.scraper)
}
}
@app.get("/system/status", response_model=SystemStatus)
async def get_system_status():
return SystemStatus(
status="healthy",
services={
"pdf_processing": {
"available": PDF_AVAILABLE,
"status": "✅ Available" if PDF_AVAILABLE else "❌ Not Available"
},
"ml_models": {
"available": ML_AVAILABLE,
"status": "✅ Available" if ML_AVAILABLE else "❌ Not Available"
},
"ocr_model": {
"loaded": legal_api.ocr_service.model_loaded,
"status": "✅ Loaded" if legal_api.ocr_service.model_loaded else "⏳ Loading..." if ML_AVAILABLE else "❌ Not Available"
},
"scraper": {
"available": bool(legal_api.scraper),
"status": "✅ Available" if legal_api.scraper else "❌ Not Available"
}
},
timestamp=datetime.now().isoformat()
)
@app.post("/api/ocr/extract-pdf", response_model=OCRResponse)
async def extract_pdf_text(file: UploadFile = File(...)):
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="File must be a PDF")
temp_path = None
try:
temp_dir = Path("/app/uploads")
temp_dir.mkdir(exist_ok=True)
temp_path = temp_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
with temp_path.open("wb") as f:
f.write(await file.read())
return await legal_api.ocr_service.extract_text_from_pdf(str(temp_path))
finally:
if temp_path and temp_path.exists():
temp_path.unlink()
@app.post("/api/ocr/extract-image", response_model=OCRResponse)
async def extract_image_text(file: UploadFile = File(...)):
allowed_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions):
raise HTTPException(status_code=400, detail="File must be an image (JPG, PNG, BMP, TIFF)")
temp_path = None
try:
temp_dir = Path("/app/uploads")
temp_dir.mkdir(exist_ok=True)
temp_path = temp_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
with temp_path.open("wb") as f:
f.write(await file.read())
return await legal_api.ocr_service.extract_text_from_image(str(temp_path))
finally:
if temp_path and temp_path.exists():
temp_path.unlink()
@app.post("/api/scrape")
async def scrape_documents(max_docs: int = 20):
try:
documents = legal_api.scraper.scrape_real_sources(max_docs=max_docs)
for doc in documents:
legal_api.scraper.save_document(doc)
return {
"success": True,
"documents_processed": len(documents),
"documents": [doc.__dict__ for doc in documents]
}
except Exception as e:
logger.error(f"Scrape failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search", response_model=List[Dict])
async def search_documents(request: SearchRequest):
try:
filter_map = {
'همه': None,
'قوانین': 'law',
'اخبار': 'news',
'آرا': 'ruling',
'آیین‌نامه': 'regulation',
'عمومی': 'general'
}
doc_type = filter_map.get(request.doc_filter)
if request.search_type == "هوشمند":
results = legal_api.scraper.search_with_similarity(request.query, limit=20)
else:
results = legal_api.scraper._text_search(request.query, limit=20)
if doc_type:
results = [r for r in results if r['document_type'] == doc_type]
return results
except Exception as e:
logger.error(f"Search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/statistics")
async def get_statistics():
try:
return legal_api.scraper.get_enhanced_statistics()
except Exception as e:
logger.error(f"Statistics failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Global exception: {exc}")
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"message": str(exc),
"path": str(request.url)
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False, log_level="info")
```