Spaces:
Running
Running
# Core Python imports | |
import os | |
import subprocess | |
import logging | |
from logging.handlers import RotatingFileHandler | |
import time | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any, Union, Tuple | |
from dataclasses import dataclass | |
from collections import OrderedDict, namedtuple | |
import tempfile | |
import shutil | |
import hashlib | |
import pickle | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import warnings | |
import io | |
import re | |
import requests | |
# Third-party imports | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
import pytesseract | |
import easyocr | |
import torch | |
import fitz # PyMuPDF | |
from tqdm import tqdm | |
import gradio as gr | |
from hazm import Normalizer, word_tokenize | |
from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel | |
import psutil | |
import tensorflow as tf | |
# Configuration and setup | |
warnings.filterwarnings('ignore') | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
tf.get_logger().setLevel('ERROR') | |
# Constants | |
TESSERACT_CMD = '/usr/bin/tesseract' | |
TESSDATA_PREFIX_DEFAULT = '/usr/share/tesseract-ocr/4.00/tessdata' | |
TESSDATA_LOCAL = os.path.expanduser('~/.tessdata') # Writable user directory | |
STATUS_EMOJIS = { | |
'START': '🟦', | |
'SUCCESS': '✅', | |
'FAILURE': '❌', | |
'LOADING': '⏳', | |
'PROCESSING': '🔄', | |
'WARNING': '⚠️', | |
'MEMORY': '💾' | |
} | |
# Data Structures | |
OCRResult = namedtuple('OCRResult', [ | |
'text', 'numbers', 'confidence', 'model_name', 'processing_time', | |
'image_quality', 'detected_language', 'word_count', 'char_count', | |
'preprocessing_info', 'error_rate' | |
]) | |
# Logging Configuration | |
class CustomFormatter(logging.Formatter): | |
"""Custom formatter for logging with colors and emojis""" | |
COLORS = { | |
'grey': "\x1b[38;21m", | |
'blue': "\x1b[38;5;39m", | |
'yellow': "\x1b[38;5;226m", | |
'red': "\x1b[38;5;196m", | |
'bold_red': "\x1b[31;1m", | |
'reset': "\x1b[0m" | |
} | |
def __init__(self, fmt): | |
super().__init__() | |
self.fmt = fmt | |
self.FORMATS = { | |
logging.DEBUG: self.COLORS['grey'] + self.fmt + self.COLORS['reset'], | |
logging.INFO: self.COLORS['blue'] + self.fmt + self.COLORS['reset'], | |
logging.WARNING: self.COLORS['yellow'] + self.fmt + self.COLORS['reset'], | |
logging.ERROR: self.COLORS['red'] + self.fmt + self.COLORS['reset'], | |
logging.CRITICAL: self.COLORS['bold_red'] + self.fmt + self.COLORS['reset'] | |
} | |
def format(self, record): | |
log_fmt = self.FORMATS.get(record.levelno) | |
formatter = logging.Formatter(log_fmt) | |
memory_usage = psutil.virtual_memory().percent | |
record.msg = f"{record.msg} [Mem: {memory_usage:.1f}%]" | |
for status, emoji in STATUS_EMOJIS.items(): | |
if status in record.msg: | |
record.msg = f"{emoji} {record.msg}" | |
return formatter.format(record) | |
# Main OCR System Class | |
class OCRSystem: | |
"""Main OCR system class handling all OCR operations""" | |
def __init__(self): | |
self.setup_logging() | |
self.setup_cache() | |
self.setup_models() | |
self.normalizer = Normalizer() | |
self.default_settings = { | |
'cache_enabled': True, | |
'preprocessing_enabled': True, | |
'confidence_threshold': 0.7, | |
'resize': True, | |
'resize_scale': 200, | |
'enhance_contrast': True, | |
'reduce_noise': True, | |
'extract_images': True, | |
'sharpen': True, | |
'deskew': True, | |
'optimize_for_ocr': True, | |
'max_workers': 4 | |
} | |
# Setup Methods | |
def setup_logging(self): | |
"""Initialize logging system""" | |
self.logger = logging.getLogger(__name__) | |
self.logger.setLevel(logging.DEBUG) | |
os.makedirs('logs', exist_ok=True) | |
file_handler = RotatingFileHandler( | |
'logs/ocr.log', | |
maxBytes=10*1024*1024, | |
backupCount=5, | |
encoding='utf-8' | |
) | |
file_handler.setFormatter( | |
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
) | |
console_handler = logging.StreamHandler() | |
console_handler.setFormatter( | |
CustomFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
) | |
self.logger.addHandler(file_handler) | |
self.logger.addHandler(console_handler) | |
def setup_cache(self): | |
"""Initialize caching system""" | |
self.cache_data = OrderedDict() | |
self.cache_max_size = 1000 | |
self.cache_lock = threading.Lock() | |
self.cache_dir = Path('cache') | |
self.cache_dir.mkdir(exist_ok=True) | |
def setup_models(self): | |
"""Initialize OCR models configuration based on available hardware""" | |
self.models = {} | |
self.model_performance = { | |
'microsoft_trocr': {'success': 0, 'fail': 0}, | |
'pretrained_model': {'success': 0, 'fail': 0}, | |
'mT5_OCR_fa': {'success': 0, 'fail': 0}, | |
'LayoutLMv3_fa': {'success': 0, 'fail': 0}, | |
'easyocr': {'success': 0, 'fail': 0}, | |
'tesseract': {'success': 0, 'fail': 0}, | |
'persian_ocr': {'success': 0, 'fail': 0} | |
} | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.max_workers = min(4, os.cpu_count() or 1) | |
self.logger.info(f"Using device: {self.device}, Max workers: {self.max_workers}") | |
self.model_configs = { | |
'microsoft_trocr': { | |
'name': "microsoft/trocr-base-printed", | |
'type': "transformer", | |
'threshold': 0.85, | |
'device': self.device | |
}, | |
'pretrained_model': { | |
'name': "beheshti-ai/TrOCR-fa", | |
'type': "transformer", | |
'threshold': 0.8, | |
'device': self.device | |
}, | |
'mT5_OCR_fa': { | |
'name': "aleemeconomist/mT5-OCR-fa", | |
'type': "image-to-text", | |
'threshold': 0.7, | |
'device': self.device | |
}, | |
'LayoutLMv3_fa': { | |
'name': "SoheilStar/LayoutLMv3-fa", | |
'type': "document-question-answering", | |
'threshold': 0.7, | |
'device': self.device | |
}, | |
'persian_ocr': { | |
'name': "Persian-OCR", | |
'type': "custom", | |
'threshold': 0.75, | |
'device': self.device | |
} | |
} | |
self.model_priority = [ | |
'microsoft_trocr', 'pretrained_model', 'mT5_OCR_fa', 'LayoutLMv3_fa', | |
'easyocr', 'tesseract', 'persian_ocr' | |
] if self.device == "cuda" else [ | |
'microsoft_trocr', 'tesseract', 'easyocr', 'pretrained_model', | |
'mT5_OCR_fa', 'LayoutLMv3_fa', 'persian_ocr' | |
] | |
self.model_lock = threading.Lock() | |
def download_tessdata(self, languages=['eng', 'fas']): | |
"""Download Tesseract language data if not present""" | |
tessdata_dir = TESSDATA_LOCAL | |
try: | |
os.makedirs(tessdata_dir, exist_ok=True) | |
except PermissionError as e: | |
self.logger.warning(f"WARNING: Cannot create {tessdata_dir}: {str(e)}. Tesseract may fail without language data.") | |
return False | |
base_url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/" # Updated URL | |
for lang in languages: | |
file_path = os.path.join(tessdata_dir, f"{lang}.traineddata") | |
if not os.path.exists(file_path): | |
self.logger.info(f"LOADING: Downloading {lang}.traineddata to {file_path}") | |
try: | |
response = requests.get(f"{base_url}{lang}.traineddata", stream=True) | |
response.raise_for_status() | |
with open(file_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
self.logger.info(f"SUCCESS: Downloaded {lang}.traineddata") | |
except Exception as e: | |
self.logger.error(f"FAILURE: Could not download {lang}.traineddata: {str(e)}") | |
else: | |
self.logger.debug(f"{lang}.traineddata already exists at {file_path}") | |
return True | |
def setup_system_dependencies(self): | |
"""Install and configure system dependencies only if not already installed""" | |
self.logger.info("START: Checking and installing system dependencies") | |
try: | |
subprocess.run(['apt-get', 'update'], check=True) | |
dependencies = [ | |
'tesseract-ocr', 'libopencv-dev', | |
'fontconfig', 'libgl1-mesa-glx', 'libglib2.0-0', | |
'libsm6', 'libxext6', 'libxrender-dev', 'libqt5gui5' | |
] | |
for dep in dependencies: | |
try: | |
result = subprocess.run(['dpkg', '-l', dep], | |
capture_output=True, | |
text=True) | |
if result.returncode != 0 or 'ii' not in result.stdout.splitlines()[5:]: | |
self.logger.info(f"PROCESSING: Installing {dep} (not found)") | |
subprocess.run(['apt-get', 'install', '-y', dep], check=True) | |
self.logger.info(f"SUCCESS: Installed {dep}") | |
else: | |
self.logger.info(f"PROCESSING: {dep} is already installed") | |
except subprocess.CalledProcessError as e: | |
self.logger.error(f"FAILURE: Failed to install {dep}: {str(e)}") | |
return False | |
# Download Tesseract language data | |
tessdata_downloaded = self.download_tessdata(['eng', 'fas']) | |
tessdata_prefix = TESSDATA_LOCAL if tessdata_downloaded else TESSDATA_PREFIX_DEFAULT | |
os.environ['TESSDATA_PREFIX'] = tessdata_prefix | |
pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD | |
version = subprocess.check_output([TESSERACT_CMD, '--version']) | |
self.logger.info(f"SUCCESS: Tesseract installed: {version.decode()[:50]}") | |
if not os.path.exists(os.path.join(tessdata_prefix, 'eng.traineddata')): | |
self.logger.warning(f"WARNING: Tesseract language data not found in {tessdata_prefix}, functionality may be limited") | |
self.logger.info("SUCCESS: System dependencies setup completed") | |
return True | |
except subprocess.CalledProcessError as e: | |
self.logger.error(f"FAILURE: Command execution error: {str(e)}") | |
return False | |
except Exception as e: | |
self.logger.error(f"FAILURE: System dependency setup error: {str(e)}") | |
return False | |
# Progress Handling Utility | |
def _update_progress(self, step: int, total_steps: int, progress=None): | |
"""Safely update progress with robust error handling""" | |
if progress is None: | |
return | |
try: | |
progress_value = step / total_steps | |
self.logger.debug(f"Updating progress: {progress_value:.3f}") | |
progress(progress_value) | |
except Exception as e: | |
self.logger.warning(f"WARNING Progress update failed: {str(e)}") | |
# Model Loading | |
def load_model(self, model_name: str, progress=None): | |
"""Load a specific OCR model with detailed debugging""" | |
with self.model_lock: | |
if model_name in self.models: | |
self.logger.debug(f"Model {model_name} already loaded") | |
return True | |
self.logger.info(f"START Loading model: {model_name}") | |
try: | |
config = self.model_configs.get(model_name) | |
if config: | |
self.logger.debug(f"Config for {model_name}: {config}") | |
if config['type'] == "image-to-text": | |
self.logger.debug(f"Loading image-to-text pipeline for {model_name}") | |
self.models[model_name] = pipeline( | |
config['type'], | |
model=config['name'], | |
device=config['device'] | |
) | |
elif config['type'] == "document-question-answering": | |
self.logger.debug(f"Loading document-question-answering pipeline for {model_name}") | |
self.models[model_name] = pipeline( | |
config['type'], | |
model=config['name'], | |
device=config['device'] | |
) | |
elif config['type'] == "transformer": | |
self.logger.debug(f"Loading transformer for {model_name}") | |
self._update_progress(1, 3, progress) | |
processor = TrOCRProcessor.from_pretrained(config['name']) | |
self.logger.debug(f"Processor loaded for {model_name}") | |
self._update_progress(2, 3, progress) | |
model = VisionEncoderDecoderModel.from_pretrained(config['name']) | |
self.logger.debug(f"Model loaded for {model_name}") | |
self.models[model_name] = { | |
'processor': processor, | |
'model': model, | |
'device': config['device'] | |
} | |
elif config['type'] == "custom" and model_name == "persian_ocr": | |
self.logger.debug(f"Setting custom model {model_name}") | |
self.models[model_name] = True | |
elif model_name == "easyocr": | |
self.logger.debug(f"Loading EasyOCR for {model_name}") | |
easyocr_cache_dir = os.path.expanduser('~/.EasyOCR') | |
if not os.path.exists(easyocr_cache_dir): | |
self.logger.info("LOADING: EasyOCR models not found, downloading now...") | |
else: | |
self.logger.debug("EasyOCR model cache found, skipping download") | |
self._update_progress(1, 2, progress) | |
self.models[model_name] = easyocr.Reader( | |
['fa', 'en'], | |
gpu=(self.device == "cuda"), | |
download_enabled=True | |
) | |
if self.device != "cuda": | |
self.logger.warning("WARNING: EasyOCR running on CPU, consider GPU for faster processing") | |
elif model_name == "tesseract": | |
self.logger.debug(f"Configuring Tesseract for {model_name}") | |
tessdata_prefix = os.environ.get('TESSDATA_PREFIX', TESSDATA_LOCAL) | |
if not os.path.exists(os.path.join(tessdata_prefix, 'eng.traineddata')): | |
self.logger.error(f"FAILURE: eng.traineddata not found in {tessdata_prefix}, Tesseract may fail") | |
else: | |
self.logger.debug(f"Tesseract language data found at {tessdata_prefix}") | |
pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD | |
self.models[model_name] = True | |
self.logger.info(f"SUCCESS Model {model_name} loaded") | |
self.model_performance[model_name]['success'] += 1 | |
self._update_progress(3 if config and config['type'] == "transformer" else 2, | |
3 if config and config['type'] == "transformer" else 2, | |
progress) | |
return True | |
except Exception as e: | |
self.logger.error(f"FAILURE Error loading {model_name}: {str(e)}") | |
self.model_performance[model_name]['fail'] += 1 | |
self._update_progress(3 if config and config['type'] == "transformer" else 2, | |
3 if config and config['type'] == "transformer" else 2, | |
progress) | |
return False | |
# Image Processing | |
def process_single_image(self, image_path: str, settings: Dict, progress=None) -> OCRResult: | |
"""Process a single image file""" | |
self.logger.debug(f"Processing single image: {image_path}") | |
try: | |
image = cv2.imread(image_path) | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: | |
self.logger.error(f"FAILURE Unable to read or invalid image: {image_path}") | |
return self._create_empty_result() | |
enhanced_image, preprocessing_info = self.enhance_for_persian(image, settings, progress) | |
if enhanced_image is None: | |
self.logger.error(f"FAILURE Image enhancement failed: {image_path}") | |
return self._create_empty_result() | |
self.logger.debug(f"Calling process_image with enhanced image shape: {enhanced_image.shape}") | |
result = self.process_image(enhanced_image, progress) | |
self.logger.debug(f"process_image returned: {result}") | |
if result: | |
result = result._replace(preprocessing_info=preprocessing_info) | |
return result | |
return self._create_empty_result() | |
except Exception as e: | |
self.logger.error(f"FAILURE Error processing image: {str(e)}") | |
return self._create_empty_result() | |
def process_image(self, image: np.ndarray, progress=None) -> Optional[OCRResult]: | |
"""Process an image using available OCR models""" | |
start_time = time.time() | |
self.logger.debug("Starting process_image") | |
self._update_progress(0, len(self.model_priority) + 1, progress) | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: | |
self.logger.error("FAILURE Input image is invalid or empty") | |
raise ValueError("Invalid or empty input image") | |
self.logger.debug(f"Processing image with shape: {image.shape}") | |
sorted_models = sorted( | |
self.model_priority, | |
key=lambda x: self.model_performance[x]['success'] / (self.model_performance[x]['fail'] + 1), | |
reverse=True | |
) | |
self.logger.debug(f"Sorted models: {sorted_models}") | |
for i, model_name in enumerate(sorted_models): | |
try: | |
self.logger.debug(f"Attempting to load model: {model_name}") | |
if not self.load_model(model_name, progress): | |
self.logger.warning(f"WARNING Failed to load model: {model_name}") | |
continue | |
self._update_progress(i + 1, len(sorted_models) + 1, progress) | |
self.logger.debug(f"Processing with model: {model_name}") | |
result = self._process_with_model(image, model_name) | |
self.logger.debug(f"Model {model_name} result: {result}") | |
if result and result.get('text', '').strip(): | |
processing_time = time.time() - start_time | |
self.logger.debug(f"Formatting result for {model_name}") | |
ocr_result = self._format_result( | |
result['text'], | |
result.get('confidence', 0.5), | |
model_name, | |
processing_time | |
) | |
self.logger.debug(f"Formatted OCR result: {ocr_result}") | |
threshold = self.model_configs.get(model_name, {}).get('threshold', 0.5) | |
if ocr_result.confidence >= threshold: | |
self.logger.info(f"SUCCESS Model {model_name} succeeded") | |
self._update_progress(len(sorted_models) + 1, len(sorted_models) + 1, progress) | |
return ocr_result | |
except Exception as e: | |
self.logger.warning(f"WARNING Model {model_name} failed: {str(e)}") | |
continue | |
self.logger.warning("WARNING No model succeeded") | |
self._update_progress(len(sorted_models) + 1, len(sorted_models) + 1, progress) | |
return None | |
# Model-Specific Processing | |
def _process_with_model(self, image: np.ndarray, model_name: str) -> Dict: | |
"""Process image with a specific model""" | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0: | |
self.logger.error(f"FAILURE Invalid image for {model_name}") | |
return {'text': '', 'confidence': 0} | |
if model_name in self.model_configs: | |
config = self.model_configs[model_name] | |
if config['type'] == "transformer": | |
return self._process_transformer_model(image, model_name) | |
elif config['type'] in ["image-to-text", "document-question-answering"]: | |
return self._process_pipeline_model(image, model_name) | |
elif model_name == 'persian_ocr': | |
return self._process_persian_ocr(image) | |
elif model_name == 'easyocr': | |
return self._process_easyocr(image) | |
elif model_name == 'tesseract': | |
return self._process_tesseract(image) | |
return {'text': '', 'confidence': 0} | |
def _process_transformer_model(self, image: np.ndarray, model_name: str) -> Dict: | |
"""Process image using transformer-based model (e.g., Microsoft TrOCR)""" | |
try: | |
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
model_data = self.models[model_name] | |
processor = model_data['processor'] | |
model = model_data['model'] | |
device = model_data['device'] | |
pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values.to(device) | |
generated_ids = model.generate(pixel_values) | |
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return { | |
'text': generated_text, | |
'confidence': self.model_configs[model_name]['threshold'] | |
} | |
except Exception as e: | |
self.logger.error(f"FAILURE Transformer model processing failed: {str(e)}") | |
return {'text': '', 'confidence': 0} | |
def _process_pipeline_model(self, image: np.ndarray, model_name: str) -> Dict: | |
"""Process image using pipeline model""" | |
try: | |
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
model = self.models[model_name] | |
if self.model_configs[model_name]['type'] == "image-to-text": | |
result = model(pil_image)[0] | |
return { | |
'text': result['generated_text'], | |
'confidence': self.model_configs[model_name]['threshold'] | |
} | |
else: # document-question-answering | |
result = model(pil_image) | |
return { | |
'text': result['answer'], | |
'confidence': self.model_configs[model_name]['threshold'] | |
} | |
except Exception as e: | |
self.logger.error(f"FAILURE Pipeline model processing failed: {str(e)}") | |
return {'text': '', 'confidence': 0} | |
def _process_easyocr(self, image: np.ndarray) -> Dict: | |
"""Process image using EasyOCR""" | |
try: | |
results = self.models['easyocr'].readtext(image) | |
if not results: | |
return {'text': '', 'confidence': 0} | |
texts = [text for _, text, _ in results] | |
confidence_sum = sum(conf for _, _, conf in results) | |
confidence_avg = confidence_sum / len(results) if results else 0 | |
return { | |
'text': ' '.join(texts), | |
'confidence': confidence_avg | |
} | |
except Exception as e: | |
self.logger.error(f"FAILURE EasyOCR processing failed: {str(e)}") | |
return {'text': '', 'confidence': 0} | |
def _process_tesseract(self, image: np.ndarray) -> Dict: | |
"""Process image using Tesseract""" | |
try: | |
text = pytesseract.image_to_string( | |
image, | |
config='--oem 3 --psm 6 -l fas+eng' | |
) | |
return {'text': text, 'confidence': 0.5} | |
except Exception as e: | |
self.logger.error(f"FAILURE Tesseract processing failed: {str(e)}") | |
return {'text': '', 'confidence': 0} | |
def _process_persian_ocr(self, image: np.ndarray) -> Dict: | |
"""Process image using Persian OCR""" | |
try: | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0: | |
return {'text': '', 'confidence': 0} | |
text = self.persian_ocr_main(image, langs="fa", mode="tn") | |
return {'text': text, 'confidence': 0.75} | |
except Exception as e: | |
self.logger.error(f"FAILURE Persian-OCR processing failed: {str(e)}") | |
return {'text': '', 'confidence': 0} | |
# Result Formatting | |
def _format_result(self, text: str, confidence: float, model_name: str, processing_time: float) -> OCRResult: | |
"""Format OCR results into standardized output""" | |
try: | |
normalized_text = self.normalizer.normalize(text) | |
words = word_tokenize(normalized_text) | |
persian_nums = '۰۱۲۳۴۵۶۷۸۹' | |
number_pattern = f'^[0-9{persian_nums}]+([\\.,،٫][0-9{persian_nums}]+)?' | |
numbers = [w for w in words if re.match(number_pattern, w)] | |
text_list = [w for w in words if not re.match(number_pattern, w)] | |
return OCRResult( | |
text=text_list, | |
numbers=numbers, | |
confidence=confidence, | |
model_name=model_name, | |
processing_time=processing_time, | |
image_quality=self._assess_quality(text_list), | |
detected_language=self._detect_language(text_list), | |
word_count=len(text_list), | |
char_count=sum(len(w) for w in text_list), | |
preprocessing_info={}, | |
error_rate=self._estimate_error_rate(text_list, confidence) | |
) | |
except Exception as e: | |
self.logger.error(f"FAILURE Formatting result failed: {str(e)}") | |
return self._create_empty_result() | |
def _estimate_error_rate(self, text_list: List[str], confidence: float) -> float: | |
"""Estimate error rate based on text characteristics and confidence""" | |
if not text_list: | |
return 1.0 | |
avg_word_length = sum(len(w) for w in text_list) / len(text_list) | |
return max(0.0, min(1.0, 1.0 - confidence + (3 - avg_word_length) / 10)) | |
def _assess_quality(self, text_list: List[str]) -> str: | |
"""Assess the quality of extracted text""" | |
if not text_list: | |
return "Low" | |
avg_word_length = sum(len(w) for w in text_list) / len(text_list) | |
word_count = len(text_list) | |
if word_count > 50 and avg_word_length > 3: | |
return "High" | |
elif word_count > 20 and avg_word_length > 2: | |
return "Medium" | |
else: | |
return "Low" | |
def _detect_language(self, text_list: List[str]) -> str: | |
"""Detect the dominant language in the text""" | |
if not text_list: | |
return "Unknown" | |
persian_pattern = re.compile(r'[\u0600-\u06FF]') | |
english_pattern = re.compile(r'[a-zA-Z]') | |
persian_chars = sum(1 for word in text_list | |
for _ in persian_pattern.finditer(word)) | |
english_chars = sum(1 for word in text_list | |
for _ in english_pattern.finditer(word)) | |
if persian_chars > english_chars: | |
return "Persian" | |
elif english_chars > persian_chars: | |
return "English" | |
else: | |
return "Mixed" | |
# Persian OCR Specific | |
def persian_ocr_main(self, image: np.ndarray, langs="fa", mode="tn") -> str: | |
"""Main Persian OCR processing function""" | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0: | |
self.logger.error("FAILURE Invalid image for Persian OCR") | |
return "" | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_input: | |
temp_input_path = temp_input.name | |
cv2.imwrite(temp_input_path, image) | |
with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as temp_output: | |
temp_output_path = temp_output.name | |
im = Image.open(temp_input_path) | |
length_x, width_y = im.size | |
factor = float(1024.0 / length_x) | |
size = int(factor * length_x), int(factor * width_y) | |
image_resize = im.resize(size, Image.Resampling.LANCZOS) | |
image_resize.save(f"{temp_input_path}_Upscaled.png", dpi=(300, 300)) | |
img = cv2.imread(f"{temp_input_path}_Upscaled.png") | |
if img is None: | |
self.logger.error("FAILURE Failed to read upscaled image") | |
return "" | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
if gray is None: | |
self.logger.error("FAILURE Failed to convert to grayscale") | |
return "" | |
if langs == "fa": | |
if mode == "t": | |
custom_config = r'-l fas --psm 6 -c tessedit_char_blacklist="۰١۲۳۴۵۶۷۸۹«»1234567890#"' | |
elif mode == "tn": | |
custom_config = r'-l fas --psm 6 -c tessedit_char_whitelist="آابپتثجچحخدذرزژسشصضطظعغفقکگلمنوهی ۰١۲۳۴۵۶۷۸۹.?!,،:;/"' | |
elif mode == "table": | |
custom_config = r'-l fas --psm 6 -c tessedit_char_whitelist="آابپتثجچحخدذرزژسشصضطظعغفقکگلمنوهی۰١۲۳۴۵۶۷۸۹"' | |
elif langs == "en": | |
custom_config = r'-l eng --psm 6' | |
elif langs == "faen": | |
custom_config = r'-l fas+eng --psm 6' | |
else: | |
raise ValueError("Invalid language option") | |
text = pytesseract.image_to_string(gray, config=custom_config) | |
with io.open(temp_output_path, 'w', encoding='utf8') as f: | |
f.write(text) | |
return text | |
except Exception as e: | |
self.logger.error(f"FAILURE Persian OCR failed: {str(e)}") | |
return "" | |
finally: | |
for file in [temp_input_path, f"{temp_input_path}_Upscaled.png", temp_output_path]: | |
try: | |
os.remove(file) | |
except Exception: | |
pass | |
# Image Enhancement | |
def enhance_for_persian(self, image: np.ndarray, settings: Dict[str, Any], progress=None) -> Tuple[Optional[np.ndarray], Dict]: | |
"""Enhance image for Persian text recognition with robust validation and debugging""" | |
info = {} | |
if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: | |
self.logger.error("FAILURE Invalid input image for enhancement") | |
return None, {} | |
self.logger.debug(f"Enhancing image with shape: {image.shape}") | |
try: | |
processed = image.copy() | |
if processed is None or not isinstance(processed, np.ndarray) or processed.size == 0: | |
self.logger.error("FAILURE Failed to create image copy") | |
return None, info | |
self.logger.debug(f"Initial processed shape: {processed.shape}") | |
step = 0 | |
total_steps = 7 | |
# Step 1: Convert to grayscale | |
if len(processed.shape) == 3: | |
self.logger.debug("Starting grayscale conversion") | |
try: | |
processed = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY) | |
if processed is None: | |
self.logger.error("FAILURE Grayscale conversion returned None") | |
return None, info | |
if len(processed.shape) != 2: | |
self.logger.error(f"FAILURE Grayscale conversion produced invalid shape: {processed.shape}") | |
return None, info | |
self.logger.debug(f"After grayscale shape: {processed.shape}") | |
info['grayscale'] = True | |
except Exception as e: | |
self.logger.error(f"FAILURE Grayscale conversion failed: {str(e)}") | |
return None, info | |
else: | |
self.logger.debug("Image already grayscale, skipping conversion") | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 2: Resize | |
if settings.get('resize'): | |
scale_percent = settings.get('resize_scale', 200) | |
if scale_percent != 100: | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for resize") | |
return None, info | |
self.logger.debug(f"Starting resize with scale {scale_percent}%") | |
h, w = processed.shape[:2] | |
new_w = int(w * scale_percent / 100) | |
new_h = int(h * scale_percent / 100) | |
if new_w <= 0 or new_h <= 0: | |
self.logger.error(f"FAILURE Invalid resize dimensions: {new_w}x{new_h}") | |
return None, info | |
try: | |
processed = cv2.resize(processed, (new_w, new_h), interpolation=cv2.INTER_CUBIC) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Resize operation returned invalid result") | |
return None, info | |
self.logger.debug(f"After resize shape: {processed.shape}") | |
info['resized'] = f"{scale_percent}%" | |
except Exception as e: | |
self.logger.error(f"FAILURE Resize failed: {str(e)}") | |
return None, info | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 3: Contrast Enhancement | |
if settings.get('enhance_contrast'): | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for contrast enhancement") | |
return None, info | |
self.logger.debug("Starting contrast enhancement") | |
try: | |
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
processed = clahe.apply(processed) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Contrast enhancement returned invalid result") | |
return None, info | |
self.logger.debug(f"After contrast enhancement shape: {processed.shape}") | |
info['contrast_enhanced'] = True | |
except Exception as e: | |
self.logger.error(f"FAILURE Contrast enhancement failed: {str(e)}") | |
return None, info | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 4: Noise Reduction | |
if settings.get('reduce_noise'): | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for noise reduction") | |
return None, info | |
self.logger.debug("Starting noise reduction") | |
try: | |
processed = cv2.bilateralFilter(processed, 9, 75, 75) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Noise reduction returned invalid result") | |
return None, info | |
self.logger.debug(f"After noise reduction shape: {processed.shape}") | |
info['noise_reduced'] = True | |
except Exception as e: | |
self.logger.error(f"FAILURE Noise reduction failed: {str(e)}") | |
return None, info | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 5: Sharpening | |
if settings.get('sharpen'): | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for sharpening") | |
return None, info | |
self.logger.debug("Starting sharpening") | |
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) | |
try: | |
processed = cv2.filter2D(processed, -1, kernel) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Sharpening returned invalid result") | |
return None, info | |
self.logger.debug(f"After sharpening shape: {processed.shape}") | |
info['sharpened'] = True | |
except Exception as e: | |
self.logger.error(f"FAILURE Sharpening failed: {str(e)}") | |
return None, info | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 6: Deskew | |
if settings.get('deskew'): | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for deskew") | |
return None, info | |
self.logger.debug("Starting deskew") | |
coords = np.column_stack(np.where(processed > 0)) | |
self.logger.debug(f"Coords shape: {coords.shape}") | |
if coords.size >= 5: | |
try: | |
rect = cv2.minAreaRect(coords) | |
self.logger.debug(f"Rect: {rect}") | |
if not (isinstance(rect, tuple) and len(rect) == 3 and | |
isinstance(rect[0], tuple) and isinstance(rect[1], tuple) and isinstance(rect[2], (int, float))): | |
self.logger.warning("WARNING Invalid rect structure from minAreaRect") | |
else: | |
angle = rect[2] | |
if angle < -45: | |
angle = 90 + angle | |
h, w = processed.shape[:2] | |
center = (w // 2, h // 2) | |
M = cv2.getRotationMatrix2D(center, angle, 1.0) | |
processed = cv2.warpAffine( | |
processed, M, (w, h), | |
flags=cv2.INTER_CUBIC, | |
borderMode=cv2.BORDER_REPLICATE | |
) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Deskew operation returned invalid result") | |
return None, info | |
self.logger.debug(f"After deskew shape: {processed.shape}") | |
info['deskewed'] = f"angle: {angle:.2f}" | |
except Exception as e: | |
self.logger.warning(f"WARNING Deskew failed: {str(e)}") | |
else: | |
self.logger.warning("WARNING Not enough points for deskewing (coords.size < 5)") | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
# Step 7: Thresholding | |
if settings.get('threshold'): | |
if len(processed.shape) < 2: | |
self.logger.error("FAILURE Invalid shape for thresholding") | |
return None, info | |
self.logger.debug("Starting thresholding") | |
try: | |
processed = cv2.adaptiveThreshold( | |
processed, 255, | |
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 11, 2 | |
) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Thresholding returned invalid result") | |
return None, info | |
self.logger.debug(f"After thresholding shape: {processed.shape}") | |
info['thresholded'] = True | |
except Exception as e: | |
self.logger.error(f"FAILURE Thresholding failed: {str(e)}") | |
return None, info | |
step += 1 | |
self._update_progress(step, total_steps, progress) | |
if processed is None or len(processed.shape) < 2: | |
self.logger.error("FAILURE Final image has invalid shape") | |
return None, info | |
self.logger.debug(f"Final processed shape: {processed.shape}") | |
return processed, info | |
except Exception as e: | |
self.logger.error(f"FAILURE Image enhancement error: {str(e)}") | |
self._update_progress(total_steps, total_steps, progress) | |
return None, {} | |
# PDF Processing | |
def process_pdf(self, pdf_path: str, settings: Optional[Dict] = None, progress=None) -> List[OCRResult]: | |
"""Process a PDF document and extract text from all pages""" | |
self.logger.info(f"START Processing PDF: {pdf_path}") | |
results = [] | |
current_settings = self.default_settings.copy() | |
if settings: | |
current_settings.update(settings) | |
if not pdf_path or not os.path.exists(pdf_path): | |
self.logger.error(f"FAILURE PDF file not found: {pdf_path}") | |
return [self._create_empty_result()] | |
if not pdf_path.lower().endswith('.pdf'): | |
self.logger.error(f"FAILURE Input file is not a PDF: {pdf_path}") | |
return [self._create_empty_result()] | |
try: | |
self.logger.debug(f"Calling optimize_pdf_document with path: {pdf_path}") | |
optimized_pdf = self.optimize_pdf_document(pdf_path, current_settings, progress) | |
self.logger.debug(f"optimize_pdf_document returned: {optimized_pdf}") | |
if not optimized_pdf or not os.path.exists(optimized_pdf): | |
self.logger.error(f"FAILURE Optimized PDF not generated: {optimized_pdf}") | |
return [self._create_empty_result()] | |
self.logger.debug(f"Calling process_pdf_document with path: {optimized_pdf}") | |
images = self.process_pdf_document(optimized_pdf, current_settings, progress) | |
self.logger.debug(f"process_pdf_document returned {len(images)} images") | |
if not images: | |
self.logger.warning("WARNING No images extracted from PDF") | |
return [self._create_empty_result()] | |
with ThreadPoolExecutor(max_workers=current_settings.get('max_workers', 4)) as executor: | |
futures = [ | |
executor.submit(self.process_single_image, img, current_settings, progress) | |
for img in images if img is not None | |
] | |
for future in as_completed(futures): | |
result = future.result() | |
self.logger.debug(f"Thread result: {result}") | |
if result and result.text: | |
results.append(result) | |
self.logger.info(f"SUCCESS Processed {len(results)} pages") | |
return results if results else [self._create_empty_result()] | |
except Exception as e: | |
self.logger.error(f"FAILURE PDF processing failed: {str(e)}") | |
return [self._create_empty_result()] | |
def _create_empty_result(self) -> OCRResult: | |
"""Create an empty OCR result""" | |
return OCRResult( | |
text=[], numbers=[], confidence=0.0, | |
model_name="None", processing_time=0.0, | |
image_quality="Unknown", detected_language="Unknown", | |
word_count=0, char_count=0, preprocessing_info={}, | |
error_rate=1.0 | |
) | |
def process_pdf_document(self, pdf_path: str, settings: Dict[str, Any], progress=None) -> List[np.ndarray]: | |
"""Extract and process images from PDF document""" | |
self.logger.info(f"START Processing PDF: {pdf_path}") | |
all_images = [] | |
try: | |
doc = fitz.open(pdf_path) | |
total_pages = len(doc) | |
batch_size = settings.get('batch_size', 2) | |
batches = [range(i, min(i + batch_size, total_pages)) | |
for i in range(0, total_pages, batch_size)] | |
with tqdm(total=total_pages, desc="📄 Processing PDF") as pbar: | |
for batch in batches: | |
with ThreadPoolExecutor(max_workers=batch_size) as executor: | |
futures = { | |
executor.submit( | |
self._process_pdf_page, | |
doc, page_num, settings, progress | |
): page_num for page_num in batch | |
} | |
for future in as_completed(futures): | |
result = future.result() | |
self.logger.debug(f"Page result: {result}") | |
if result and isinstance(result, list): | |
all_images.extend(result) | |
pbar.update(1) | |
self.logger.info(f"SUCCESS Extracted {len(all_images)} images") | |
return all_images | |
except Exception as e: | |
self.logger.error(f"FAILURE PDF processing failed: {str(e)}") | |
return [] | |
def _process_pdf_page(self, doc, page_num: int, settings: Dict[str, Any], progress=None) -> List[np.ndarray]: | |
"""Process a single PDF page""" | |
images = [] | |
try: | |
page = doc.load_page(page_num) | |
pix = page.get_pixmap(matrix=fitz.Matrix( | |
settings.get('scale_factor', 2), | |
settings.get('scale_factor', 2) | |
)) | |
if not self._validate_pixmap(pix): | |
self.logger.warning(f"WARNING Invalid pixmap for page {page_num + 1}") | |
return [] | |
img = self._pixmap_to_image(pix) | |
if img is None: | |
self.logger.warning(f"WARNING Failed to convert pixmap to image for page {page_num + 1}") | |
return [] | |
processed_img, _ = self.enhance_for_persian(img, settings, progress) | |
if processed_img is not None: | |
images.append(processed_img) | |
if settings.get('extract_images', True): | |
embedded_images = self._extract_embedded_images(doc, page, page_num) | |
images.extend(embedded_images) | |
if progress: | |
progress(1.0) | |
return images | |
except Exception as e: | |
self.logger.error(f"FAILURE Page {page_num + 1} processing failed: {str(e)}") | |
if progress: | |
progress(1.0) | |
return [] | |
def _validate_pixmap(self, pix) -> bool: | |
"""Validate pixmap data with detailed logging""" | |
if not hasattr(pix, 'n') or not hasattr(pix, 'width') or not hasattr(pix, 'height') or not hasattr(pix, 'samples'): | |
self.logger.error("FAILURE Pixmap missing required attributes") | |
return False | |
if pix.n <= 0: | |
self.logger.error("FAILURE Pixmap has invalid number of components") | |
return False | |
if pix.width <= 0 or pix.height <= 0: | |
self.logger.error("FAILURE Pixmap has invalid dimensions") | |
return False | |
if pix.samples is None or len(pix.samples) == 0: | |
self.logger.error("FAILURE Pixmap has no sample data") | |
return False | |
expected_size = pix.width * pix.height * pix.n | |
if len(pix.samples) != expected_size: | |
self.logger.error(f"FAILURE Pixmap sample size mismatch: expected {expected_size}, got {len(pix.samples)}") | |
return False | |
return True | |
def _pixmap_to_image(self, pix) -> Optional[np.ndarray]: | |
"""Convert pixmap to numpy array with validation""" | |
try: | |
if not self._validate_pixmap(pix): | |
return None | |
img_data = np.frombuffer(pix.samples, dtype=np.uint8) | |
expected_size = pix.width * pix.height * pix.n | |
if img_data.size != expected_size: | |
self.logger.error( | |
f"FAILURE Pixmap data size mismatch: " | |
f"expected {expected_size}, got {img_data.size}" | |
) | |
return None | |
reshaped = img_data.reshape(pix.height, pix.width, pix.n) | |
if reshaped is None or len(reshaped.shape) < 2: | |
self.logger.error("FAILURE Failed to reshape pixmap data") | |
return None | |
return reshaped | |
except Exception as e: | |
self.logger.error(f"FAILURE Pixmap conversion failed: {str(e)}") | |
return None | |
def _extract_embedded_images(self, doc, page, page_num: int) -> List[np.ndarray]: | |
"""Extract embedded images from PDF page""" | |
images = [] | |
for img_info in page.get_images(full=True): | |
try: | |
xref = img_info[0] | |
base_image = fitz.Pixmap(doc, xref) | |
if base_image.n >= 4: | |
base_image = fitz.Pixmap(fitz.csRGB, base_image) | |
if not self._validate_pixmap(base_image): | |
continue | |
img_array = np.frombuffer(base_image.samples, dtype=np.uint8).reshape( | |
base_image.height, base_image.width, | |
3 if base_image.n >= 3 else 1 | |
) | |
if img_array is None or len(img_array.shape) < 2: | |
self.logger.warning(f"WARNING Failed to reshape embedded image on page {page_num + 1}") | |
continue | |
if img_array.shape[0] > 100 and img_array.shape[1] > 100: | |
processed_img, _ = self.enhance_for_persian(img_array, {}) | |
if processed_img is not None: | |
images.append(processed_img) | |
except Exception as e: | |
self.logger.warning( | |
f"WARNING Failed to process embedded image on page {page_num + 1}: {str(e)}" | |
) | |
continue | |
return images | |
def optimize_pdf_document(self, pdf_path: str, settings: Dict[str, Any], progress=None) -> str: | |
"""Optimize PDF document for OCR processing""" | |
self.logger.info(f"START Optimizing PDF: {pdf_path}") | |
if not os.path.exists(pdf_path): | |
self.logger.error(f"FAILURE PDF file not found: {pdf_path}") | |
return pdf_path | |
try: | |
output_path = str(Path(f"optimized_{Path(pdf_path).name}")) | |
doc = fitz.open(pdf_path) | |
new_doc = fitz.open() | |
total_pages = len(doc) | |
for page_num in tqdm(range(total_pages), desc="📄 Optimizing PDF"): | |
page = doc.load_page(page_num) | |
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) | |
if not self._validate_pixmap(pix): | |
continue | |
img = self._pixmap_to_image(pix) | |
if img is None: | |
continue | |
processed_img, _ = self.enhance_for_persian(img, settings, progress) | |
if processed_img is None: | |
continue | |
img_path = Path(tempfile.mkdtemp()) / f"temp_page_{page_num}.jpg" | |
cv2.imwrite(str(img_path), processed_img) | |
temp_doc = fitz.open(str(img_path)) | |
new_doc.insert_pdf(temp_doc) | |
temp_doc.close() | |
os.remove(img_path) | |
new_doc.save(output_path) | |
self.logger.info(f"SUCCESS PDF optimized: {output_path}") | |
if progress: | |
progress(1.0) | |
return output_path | |
except Exception as e: | |
self.logger.error(f"FAILURE PDF optimization failed: {str(e)}") | |
if progress: | |
progress(1.0) | |
return pdf_path | |
# Gradio Interface | |
def create_gradio_interface(self): | |
"""Create Gradio web interface""" | |
def process_file(file, use_cache: bool, preprocessing: bool, | |
confidence: float, scale: int, enhance_contrast: bool, | |
reduce_noise: bool, extract_images: bool): | |
"""Handle file processing in Gradio interface""" | |
if file is None: | |
self.logger.error("FAILURE No file provided") | |
return self._empty_interface_result("No file uploaded") | |
settings = { | |
'cache_enabled': use_cache, | |
'preprocessing_enabled': preprocessing, | |
'confidence_threshold': confidence, | |
'resize': True, | |
'resize_scale': scale, | |
'enhance_contrast': enhance_contrast, | |
'reduce_noise': reduce_noise, | |
'extract_images': extract_images, | |
'sharpen': True, | |
'deskew': True, | |
'optimize_for_ocr': True | |
} | |
progress = gr.Progress(track_tqdm=True) | |
try: | |
if file.name.lower().endswith('.pdf'): | |
return self._process_pdf_interface(file.name, settings, progress) | |
else: | |
return self._process_image_interface(file.name, settings, progress) | |
except Exception as e: | |
self.logger.error(f"FAILURE Interface error: {str(e)}") | |
return self._empty_interface_result(str(e)) | |
with gr.Blocks(title="Persian OCR System") as interface: | |
gr.Markdown("# Advanced Persian OCR System") | |
with gr.Row(): | |
with gr.Column(): | |
file_input = gr.File(label="Upload File (Image or PDF)") | |
with gr.Accordion("Advanced Settings", open=False): | |
use_cache = gr.Checkbox(label="Use Cache", value=True) | |
preprocessing = gr.Checkbox(label="Enable Preprocessing", value=True) | |
confidence = gr.Slider(0.1, 1.0, value=0.7, label="Confidence Threshold") | |
scale = gr.Slider(100, 400, value=200, step=50, label="Image Scale (%)") | |
enhance_contrast = gr.Checkbox(label="Enhance Contrast", value=True) | |
reduce_noise = gr.Checkbox(label="Reduce Noise", value=True) | |
extract_images = gr.Checkbox(label="Extract Images from PDF", value=True) | |
submit_btn = gr.Button("Process Text") | |
with gr.Column(): | |
outputs = [ | |
gr.Textbox(label="Extracted Text", lines=10), | |
gr.Textbox(label="Extracted Numbers", lines=2), | |
gr.Textbox(label="Confidence Level"), | |
gr.Textbox(label="OCR Model Used"), | |
gr.Textbox(label="Processing Time"), | |
gr.Textbox(label="Image Quality"), | |
gr.Textbox(label="Preprocessing Info", lines=5) | |
] | |
submit_btn.click( | |
fn=process_file, | |
inputs=[ | |
file_input, use_cache, preprocessing, | |
confidence, scale, enhance_contrast, | |
reduce_noise, extract_images | |
], | |
outputs=outputs | |
) | |
return interface | |
def _process_pdf_interface(self, file_path: str, settings: Dict, progress) -> Tuple: | |
"""Process PDF file for interface""" | |
results = self.process_pdf(file_path, settings, progress) | |
full_text = "" | |
numbers_combined = [] | |
confidences = [] | |
models_used = [] | |
times = [] | |
qualities = [] | |
preprocess_infos = [] | |
for res in results: | |
full_text += "\n" + " ".join(res.text) | |
numbers_combined.extend(res.numbers) | |
confidences.append(f"{res.confidence:.2f}") | |
models_used.append(res.model_name) | |
times.append(f"{res.processing_time:.2f} seconds") | |
qualities.append(res.image_quality) | |
preprocess_infos.append( | |
"\n".join([f"{k}: {v}" for k, v in res.preprocessing_info.items()]) | |
) | |
combined_preprocess_info = ("\nPage-wise Preprocessing Info:\n" + | |
"\n\n".join(preprocess_infos) if preprocess_infos else "") | |
return ( | |
full_text.strip(), | |
", ".join(numbers_combined), | |
", ".join(confidences), | |
", ".join(models_used), | |
", ".join(times), | |
", ".join(qualities), | |
combined_preprocess_info | |
) | |
def _process_image_interface(self, file_path: str, settings: Dict, progress) -> Tuple: | |
"""Process image file for interface""" | |
result = self.process_single_image(file_path, settings, progress) | |
if result and result.text: | |
preprocess_info = "\n".join([f"{k}: {v}" | |
for k, v in result.preprocessing_info.items()]) \ | |
if result.preprocessing_info else "" | |
return ( | |
"\n".join(result.text), | |
", ".join(result.numbers), | |
f"{result.confidence:.2f}", | |
result.model_name, | |
f"{result.processing_time:.2f} seconds", | |
result.image_quality, | |
preprocess_info | |
) | |
return self._empty_interface_result("No text extracted") | |
def _empty_interface_result(self, message: str) -> Tuple: | |
"""Create empty result for interface""" | |
return ("", "", "0.0", "None", "0.0", "Unknown", message) | |
# System Runner | |
def run(self): | |
"""Run the OCR system""" | |
try: | |
self.logger.info("START Initializing system") | |
self.setup_system_dependencies() | |
os.makedirs('logs', exist_ok=True) | |
os.makedirs('cache', exist_ok=True) | |
interface = self.create_gradio_interface() | |
interface.launch( | |
share=True, | |
debug=True, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) | |
except Exception as e: | |
self.logger.error(f"FAILURE System initialization failed: {str(e)}") | |
raise | |
# Main Execution | |
if __name__ == "__main__": | |
ocr_system = OCRSystem() | |
ocr_system.run() |