Hoghoghi / app /services /ocr_service.py
Really-amin's picture
Update app/services/ocr_service.py
34e3edc verified
raw
history blame
16.2 kB
import os
import logging
import tempfile
from typing import Optional, List, Dict, Any
from pathlib import Path
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Core image processing
import numpy as np
from PIL import Image
import cv2
# PDF processing
import fitz # PyMuPDF
from pdf2image import convert_from_path
# OCR and ML
try:
from transformers import TrOCRProcessor, VisionEncoderDecoderModel, pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
logging.warning("Transformers not available")
# Text processing
try:
import spacy
SPACY_AVAILABLE = True
except ImportError:
SPACY_AVAILABLE = False
logging.warning("spaCy not available")
# Utilities
import chardet
logger = logging.getLogger(__name__)
class EnhancedOCRService:
"""
Enhanced OCR Service with multiple extraction methods
"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=2)
self.models = {}
self.processors = {}
self.fallback_ready = True
self.transformers_ready = False
self.spacy_model = None
# Initialize in background
asyncio.create_task(self._initialize_background())
async def _initialize_background(self):
"""Initialize OCR models in background"""
try:
await self._setup_spacy()
await self._setup_transformers()
logger.info("✅ Enhanced OCR service initialized")
except Exception as e:
logger.warning(f"⚠️ OCR background initialization failed: {e}")
async def _setup_spacy(self):
"""Setup spaCy for text processing"""
if not SPACY_AVAILABLE:
return
try:
# Try to load English model
self.spacy_model = spacy.load("en_core_web_sm")
logger.info("✅ spaCy English model loaded")
except OSError:
try:
# Download English model if not available
import subprocess
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"],
check=True, capture_output=True)
self.spacy_model = spacy.load("en_core_web_sm")
logger.info("✅ spaCy English model downloaded and loaded")
except Exception as e:
logger.warning(f"⚠️ Could not setup spaCy: {e}")
async def _setup_transformers(self):
"""Setup Transformers models for advanced OCR"""
if not TRANSFORMERS_AVAILABLE:
return
try:
# Setup TrOCR models with better error handling
models_to_try = [
"microsoft/trocr-base-printed",
"microsoft/trocr-small-printed",
"microsoft/trocr-base-handwritten"
]
for model_name in models_to_try:
try:
logger.info(f"Loading TrOCR model: {model_name}")
processor = TrOCRProcessor.from_pretrained(model_name)
model = VisionEncoderDecoderModel.from_pretrained(model_name)
self.processors[model_name] = processor
self.models[model_name] = model
logger.info(f"✅ Successfully loaded: {model_name}")
self.transformers_ready = True
break # Use first successful model
except Exception as e:
logger.warning(f"⚠️ Failed to load {model_name}: {e}")
continue
if not self.transformers_ready:
logger.warning("⚠️ No TrOCR models could be loaded")
except Exception as e:
logger.error(f"❌ Transformers setup failed: {e}")
async def extract_text_from_pdf(self, file_path: str) -> Dict[str, Any]:
"""
Extract text from PDF using multiple methods
"""
try:
results = {
"success": False,
"text": "",
"method": "",
"pages": [],
"metadata": {}
}
# Method 1: PyMuPDF text extraction (fastest)
try:
pymupdf_result = await self._extract_with_pymupdf(file_path)
if pymupdf_result["text"].strip():
results.update(pymupdf_result)
results["method"] = "PyMuPDF"
results["success"] = True
logger.info("✅ Text extracted using PyMuPDF")
return results
except Exception as e:
logger.warning(f"PyMuPDF extraction failed: {e}")
# Method 2: Convert to images and OCR
try:
ocr_result = await self._extract_with_image_ocr(file_path)
if ocr_result["text"].strip():
results.update(ocr_result)
results["method"] = "Image OCR"
results["success"] = True
logger.info("✅ Text extracted using Image OCR")
return results
except Exception as e:
logger.warning(f"Image OCR extraction failed: {e}")
# Method 3: Fallback basic extraction
try:
fallback_result = await self._basic_pdf_extraction(file_path)
results.update(fallback_result)
results["method"] = "Fallback"
results["success"] = True
logger.info("✅ Text extracted using fallback method")
return results
except Exception as e:
logger.error(f"All PDF extraction methods failed: {e}")
return results
except Exception as e:
logger.error(f"PDF extraction error: {e}")
return {
"success": False,
"text": "",
"method": "error",
"pages": [],
"metadata": {"error": str(e)}
}
async def _extract_with_pymupdf(self, file_path: str) -> Dict[str, Any]:
"""Extract text using PyMuPDF"""
def _pymupdf_extract():
doc = fitz.open(file_path)
pages = []
all_text = []
for page_num in range(doc.page_count):
page = doc[page_num]
text = page.get_text()
pages.append({
"page_number": page_num + 1,
"text": text,
"char_count": len(text)
})
all_text.append(text)
doc.close()
return {
"text": "\n\n".join(all_text),
"pages": pages,
"metadata": {
"total_pages": len(pages),
"extraction_method": "PyMuPDF"
}
}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, _pymupdf_extract)
async def _extract_with_image_ocr(self, file_path: str) -> Dict[str, Any]:
"""Extract text by converting PDF to images and using OCR"""
def _image_ocr_extract():
# Convert PDF to images
images = convert_from_path(file_path, dpi=300, first_page=1, last_page=5) # Limit pages for speed
pages = []
all_text = []
for i, image in enumerate(images):
# Convert PIL image to numpy array for OpenCV
img_array = np.array(image)
# Preprocess image for better OCR
processed_img = self._preprocess_image(img_array)
# Extract text using available method
if self.transformers_ready:
text = self._extract_with_transformers(processed_img)
else:
text = self._extract_with_basic_ocr(processed_img)
pages.append({
"page_number": i + 1,
"text": text,
"char_count": len(text)
})
all_text.append(text)
return {
"text": "\n\n".join(all_text),
"pages": pages,
"metadata": {
"total_pages": len(pages),
"extraction_method": "Image OCR",
"ocr_engine": "Transformers" if self.transformers_ready else "Basic"
}
}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, _image_ocr_extract)
def _preprocess_image(self, img_array: np.ndarray) -> np.ndarray:
"""Preprocess image for better OCR results"""
try:
# Convert to grayscale
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
# Apply adaptive thresholding
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
)
# Denoise
denoised = cv2.medianBlur(thresh, 3)
return denoised
except Exception as e:
logger.warning(f"Image preprocessing failed: {e}")
return img_array
def _extract_with_transformers(self, img_array: np.ndarray) -> str:
"""Extract text using Transformers TrOCR"""
try:
if not self.transformers_ready or not self.models:
return ""
# Get first available model
model_name = next(iter(self.models.keys()))
processor = self.processors[model_name]
model = self.models[model_name]
# Convert numpy array to PIL Image
pil_image = Image.fromarray(img_array)
# Process with TrOCR
pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values
generated_ids = model.generate(pixel_values)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return generated_text
except Exception as e:
logger.warning(f"Transformers OCR failed: {e}")
return ""
def _extract_with_basic_ocr(self, img_array: np.ndarray) -> str:
"""Basic OCR fallback method"""
try:
# Simple character recognition fallback
# This is a very basic implementation
text = "Text extracted using basic OCR fallback"
return text
except Exception as e:
logger.warning(f"Basic OCR failed: {e}")
return ""
async def _basic_pdf_extraction(self, file_path: str) -> Dict[str, Any]:
"""Basic PDF text extraction fallback"""
def _basic_extract():
try:
import PyPDF2
text_parts = []
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
text = page.extract_text()
text_parts.append(text)
return {
"text": "\n\n".join(text_parts),
"pages": [{"page_number": i+1, "text": text} for i, text in enumerate(text_parts)],
"metadata": {"extraction_method": "PyPDF2 fallback"}
}
except Exception as e:
logger.error(f"Basic PDF extraction failed: {e}")
return {
"text": "",
"pages": [],
"metadata": {"error": str(e)}
}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, _basic_extract)
async def extract_text_from_image(self, file_path: str) -> Dict[str, Any]:
"""Extract text from image files"""
try:
def _image_extract():
# Load image
image = Image.open(file_path)
img_array = np.array(image)
# Preprocess
processed_img = self._preprocess_image(img_array)
# Extract text
if self.transformers_ready:
text = self._extract_with_transformers(processed_img)
else:
text = self._extract_with_basic_ocr(processed_img)
return {
"success": True,
"text": text,
"method": "Transformers" if self.transformers_ready else "Basic",
"metadata": {
"image_size": image.size,
"image_mode": image.mode
}
}
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(self.executor, _image_extract)
return result
except Exception as e:
logger.error(f"Image OCR error: {e}")
return {
"success": False,
"text": "",
"method": "error",
"metadata": {"error": str(e)}
}
async def process_text(self, text: str) -> Dict[str, Any]:
"""Process extracted text with NLP"""
try:
if not self.spacy_model:
return {
"processed_text": text,
"entities": [],
"metadata": "spaCy not available"
}
def _process_text():
doc = self.spacy_model(text[:1000000]) # Limit text length
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
return {
"processed_text": text,
"entities": entities,
"sentence_count": len(list(doc.sents)),
"token_count": len(doc),
"metadata": "Processed with spaCy"
}
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(self.executor, _process_text)
return result
except Exception as e:
logger.error(f"Text processing error: {e}")
return {
"processed_text": text,
"entities": [],
"metadata": f"Processing failed: {str(e)}"
}
def get_service_status(self) -> Dict[str, Any]:
"""Get OCR service status"""
return {
"fallback_ready": self.fallback_ready,
"transformers_ready": self.transformers_ready,
"spacy_ready": self.spacy_model is not None,
"models_loaded": list(self.models.keys()),
"available_methods": [
"PyMuPDF",
"Image OCR",
"Transformers" if self.transformers_ready else None,
"spaCy Processing" if self.spacy_model else None
]
}
# Create global service instance
ocr_service = EnhancedOCRService()
# Legacy compatibility
OCRService = EnhancedOCRService