# Core Python imports import os import subprocess import logging from logging.handlers import RotatingFileHandler import time from pathlib import Path from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass from collections import OrderedDict, namedtuple import tempfile import shutil import hashlib import pickle import threading from concurrent.futures import ThreadPoolExecutor, as_completed import warnings import io import re import requests # Third-party imports import numpy as np import cv2 from PIL import Image import pytesseract import easyocr import torch import fitz # PyMuPDF from tqdm import tqdm import gradio as gr from hazm import Normalizer, word_tokenize from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel import psutil import tensorflow as tf # Configuration and setup warnings.filterwarnings('ignore') os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' tf.get_logger().setLevel('ERROR') # Constants TESSERACT_CMD = '/usr/bin/tesseract' TESSDATA_PREFIX_DEFAULT = '/usr/share/tesseract-ocr/4.00/tessdata' TESSDATA_LOCAL = os.path.expanduser('~/.tessdata') # Writable user directory STATUS_EMOJIS = { 'START': '🟦', 'SUCCESS': '✅', 'FAILURE': '❌', 'LOADING': '⏳', 'PROCESSING': '🔄', 'WARNING': '⚠️', 'MEMORY': '💾' } # Data Structures OCRResult = namedtuple('OCRResult', [ 'text', 'numbers', 'confidence', 'model_name', 'processing_time', 'image_quality', 'detected_language', 'word_count', 'char_count', 'preprocessing_info', 'error_rate' ]) # Logging Configuration class CustomFormatter(logging.Formatter): """Custom formatter for logging with colors and emojis""" COLORS = { 'grey': "\x1b[38;21m", 'blue': "\x1b[38;5;39m", 'yellow': "\x1b[38;5;226m", 'red': "\x1b[38;5;196m", 'bold_red': "\x1b[31;1m", 'reset': "\x1b[0m" } def __init__(self, fmt): super().__init__() self.fmt = fmt self.FORMATS = { logging.DEBUG: self.COLORS['grey'] + self.fmt + self.COLORS['reset'], logging.INFO: self.COLORS['blue'] + self.fmt + self.COLORS['reset'], logging.WARNING: self.COLORS['yellow'] + self.fmt + self.COLORS['reset'], logging.ERROR: self.COLORS['red'] + self.fmt + self.COLORS['reset'], logging.CRITICAL: self.COLORS['bold_red'] + self.fmt + self.COLORS['reset'] } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) memory_usage = psutil.virtual_memory().percent record.msg = f"{record.msg} [Mem: {memory_usage:.1f}%]" for status, emoji in STATUS_EMOJIS.items(): if status in record.msg: record.msg = f"{emoji} {record.msg}" return formatter.format(record) # Main OCR System Class class OCRSystem: """Main OCR system class handling all OCR operations""" def __init__(self): self.setup_logging() self.setup_cache() self.setup_models() self.normalizer = Normalizer() self.default_settings = { 'cache_enabled': True, 'preprocessing_enabled': True, 'confidence_threshold': 0.7, 'resize': True, 'resize_scale': 200, 'enhance_contrast': True, 'reduce_noise': True, 'extract_images': True, 'sharpen': True, 'deskew': True, 'optimize_for_ocr': True, 'max_workers': 4 } # Setup Methods def setup_logging(self): """Initialize logging system""" self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) os.makedirs('logs', exist_ok=True) file_handler = RotatingFileHandler( 'logs/ocr.log', maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) console_handler = logging.StreamHandler() console_handler.setFormatter( CustomFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def setup_cache(self): """Initialize caching system""" self.cache_data = OrderedDict() self.cache_max_size = 1000 self.cache_lock = threading.Lock() self.cache_dir = Path('cache') self.cache_dir.mkdir(exist_ok=True) def setup_models(self): """Initialize OCR models configuration based on available hardware""" self.models = {} self.model_performance = { 'microsoft_trocr': {'success': 0, 'fail': 0}, 'pretrained_model': {'success': 0, 'fail': 0}, 'mT5_OCR_fa': {'success': 0, 'fail': 0}, 'LayoutLMv3_fa': {'success': 0, 'fail': 0}, 'easyocr': {'success': 0, 'fail': 0}, 'tesseract': {'success': 0, 'fail': 0}, 'persian_ocr': {'success': 0, 'fail': 0} } self.device = "cuda" if torch.cuda.is_available() else "cpu" self.max_workers = min(4, os.cpu_count() or 1) self.logger.info(f"Using device: {self.device}, Max workers: {self.max_workers}") self.model_configs = { 'microsoft_trocr': { 'name': "microsoft/trocr-base-printed", 'type': "transformer", 'threshold': 0.85, 'device': self.device }, 'pretrained_model': { 'name': "beheshti-ai/TrOCR-fa", 'type': "transformer", 'threshold': 0.8, 'device': self.device }, 'mT5_OCR_fa': { 'name': "aleemeconomist/mT5-OCR-fa", 'type': "image-to-text", 'threshold': 0.7, 'device': self.device }, 'LayoutLMv3_fa': { 'name': "SoheilStar/LayoutLMv3-fa", 'type': "document-question-answering", 'threshold': 0.7, 'device': self.device }, 'persian_ocr': { 'name': "Persian-OCR", 'type': "custom", 'threshold': 0.75, 'device': self.device } } self.model_priority = [ 'microsoft_trocr', 'pretrained_model', 'mT5_OCR_fa', 'LayoutLMv3_fa', 'easyocr', 'tesseract', 'persian_ocr' ] if self.device == "cuda" else [ 'microsoft_trocr', 'tesseract', 'easyocr', 'pretrained_model', 'mT5_OCR_fa', 'LayoutLMv3_fa', 'persian_ocr' ] self.model_lock = threading.Lock() def download_tessdata(self, languages=['eng', 'fas']): """Download Tesseract language data if not present""" tessdata_dir = TESSDATA_LOCAL try: os.makedirs(tessdata_dir, exist_ok=True) except PermissionError as e: self.logger.warning(f"WARNING: Cannot create {tessdata_dir}: {str(e)}. Tesseract may fail without language data.") return False base_url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/" # Updated URL for lang in languages: file_path = os.path.join(tessdata_dir, f"{lang}.traineddata") if not os.path.exists(file_path): self.logger.info(f"LOADING: Downloading {lang}.traineddata to {file_path}") try: response = requests.get(f"{base_url}{lang}.traineddata", stream=True) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) self.logger.info(f"SUCCESS: Downloaded {lang}.traineddata") except Exception as e: self.logger.error(f"FAILURE: Could not download {lang}.traineddata: {str(e)}") else: self.logger.debug(f"{lang}.traineddata already exists at {file_path}") return True def setup_system_dependencies(self): """Install and configure system dependencies only if not already installed""" self.logger.info("START: Checking and installing system dependencies") try: subprocess.run(['apt-get', 'update'], check=True) dependencies = [ 'tesseract-ocr', 'libopencv-dev', 'fontconfig', 'libgl1-mesa-glx', 'libglib2.0-0', 'libsm6', 'libxext6', 'libxrender-dev', 'libqt5gui5' ] for dep in dependencies: try: result = subprocess.run(['dpkg', '-l', dep], capture_output=True, text=True) if result.returncode != 0 or 'ii' not in result.stdout.splitlines()[5:]: self.logger.info(f"PROCESSING: Installing {dep} (not found)") subprocess.run(['apt-get', 'install', '-y', dep], check=True) self.logger.info(f"SUCCESS: Installed {dep}") else: self.logger.info(f"PROCESSING: {dep} is already installed") except subprocess.CalledProcessError as e: self.logger.error(f"FAILURE: Failed to install {dep}: {str(e)}") return False # Download Tesseract language data tessdata_downloaded = self.download_tessdata(['eng', 'fas']) tessdata_prefix = TESSDATA_LOCAL if tessdata_downloaded else TESSDATA_PREFIX_DEFAULT os.environ['TESSDATA_PREFIX'] = tessdata_prefix pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD version = subprocess.check_output([TESSERACT_CMD, '--version']) self.logger.info(f"SUCCESS: Tesseract installed: {version.decode()[:50]}") if not os.path.exists(os.path.join(tessdata_prefix, 'eng.traineddata')): self.logger.warning(f"WARNING: Tesseract language data not found in {tessdata_prefix}, functionality may be limited") self.logger.info("SUCCESS: System dependencies setup completed") return True except subprocess.CalledProcessError as e: self.logger.error(f"FAILURE: Command execution error: {str(e)}") return False except Exception as e: self.logger.error(f"FAILURE: System dependency setup error: {str(e)}") return False # Progress Handling Utility def _update_progress(self, step: int, total_steps: int, progress=None): """Safely update progress with robust error handling""" if progress is None: return try: progress_value = step / total_steps self.logger.debug(f"Updating progress: {progress_value:.3f}") progress(progress_value) except Exception as e: self.logger.warning(f"WARNING Progress update failed: {str(e)}") # Model Loading def load_model(self, model_name: str, progress=None): """Load a specific OCR model with detailed debugging""" with self.model_lock: if model_name in self.models: self.logger.debug(f"Model {model_name} already loaded") return True self.logger.info(f"START Loading model: {model_name}") try: config = self.model_configs.get(model_name) if config: self.logger.debug(f"Config for {model_name}: {config}") if config['type'] == "image-to-text": self.logger.debug(f"Loading image-to-text pipeline for {model_name}") self.models[model_name] = pipeline( config['type'], model=config['name'], device=config['device'] ) elif config['type'] == "document-question-answering": self.logger.debug(f"Loading document-question-answering pipeline for {model_name}") self.models[model_name] = pipeline( config['type'], model=config['name'], device=config['device'] ) elif config['type'] == "transformer": self.logger.debug(f"Loading transformer for {model_name}") self._update_progress(1, 3, progress) processor = TrOCRProcessor.from_pretrained(config['name']) self.logger.debug(f"Processor loaded for {model_name}") self._update_progress(2, 3, progress) model = VisionEncoderDecoderModel.from_pretrained(config['name']) self.logger.debug(f"Model loaded for {model_name}") self.models[model_name] = { 'processor': processor, 'model': model, 'device': config['device'] } elif config['type'] == "custom" and model_name == "persian_ocr": self.logger.debug(f"Setting custom model {model_name}") self.models[model_name] = True elif model_name == "easyocr": self.logger.debug(f"Loading EasyOCR for {model_name}") easyocr_cache_dir = os.path.expanduser('~/.EasyOCR') if not os.path.exists(easyocr_cache_dir): self.logger.info("LOADING: EasyOCR models not found, downloading now...") else: self.logger.debug("EasyOCR model cache found, skipping download") self._update_progress(1, 2, progress) self.models[model_name] = easyocr.Reader( ['fa', 'en'], gpu=(self.device == "cuda"), download_enabled=True ) if self.device != "cuda": self.logger.warning("WARNING: EasyOCR running on CPU, consider GPU for faster processing") elif model_name == "tesseract": self.logger.debug(f"Configuring Tesseract for {model_name}") tessdata_prefix = os.environ.get('TESSDATA_PREFIX', TESSDATA_LOCAL) if not os.path.exists(os.path.join(tessdata_prefix, 'eng.traineddata')): self.logger.error(f"FAILURE: eng.traineddata not found in {tessdata_prefix}, Tesseract may fail") else: self.logger.debug(f"Tesseract language data found at {tessdata_prefix}") pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD self.models[model_name] = True self.logger.info(f"SUCCESS Model {model_name} loaded") self.model_performance[model_name]['success'] += 1 self._update_progress(3 if config and config['type'] == "transformer" else 2, 3 if config and config['type'] == "transformer" else 2, progress) return True except Exception as e: self.logger.error(f"FAILURE Error loading {model_name}: {str(e)}") self.model_performance[model_name]['fail'] += 1 self._update_progress(3 if config and config['type'] == "transformer" else 2, 3 if config and config['type'] == "transformer" else 2, progress) return False # Image Processing def process_single_image(self, image_path: str, settings: Dict, progress=None) -> OCRResult: """Process a single image file""" self.logger.debug(f"Processing single image: {image_path}") try: image = cv2.imread(image_path) if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: self.logger.error(f"FAILURE Unable to read or invalid image: {image_path}") return self._create_empty_result() enhanced_image, preprocessing_info = self.enhance_for_persian(image, settings, progress) if enhanced_image is None: self.logger.error(f"FAILURE Image enhancement failed: {image_path}") return self._create_empty_result() self.logger.debug(f"Calling process_image with enhanced image shape: {enhanced_image.shape}") result = self.process_image(enhanced_image, progress) self.logger.debug(f"process_image returned: {result}") if result: result = result._replace(preprocessing_info=preprocessing_info) return result return self._create_empty_result() except Exception as e: self.logger.error(f"FAILURE Error processing image: {str(e)}") return self._create_empty_result() def process_image(self, image: np.ndarray, progress=None) -> Optional[OCRResult]: """Process an image using available OCR models""" start_time = time.time() self.logger.debug("Starting process_image") self._update_progress(0, len(self.model_priority) + 1, progress) if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: self.logger.error("FAILURE Input image is invalid or empty") raise ValueError("Invalid or empty input image") self.logger.debug(f"Processing image with shape: {image.shape}") sorted_models = sorted( self.model_priority, key=lambda x: self.model_performance[x]['success'] / (self.model_performance[x]['fail'] + 1), reverse=True ) self.logger.debug(f"Sorted models: {sorted_models}") for i, model_name in enumerate(sorted_models): try: self.logger.debug(f"Attempting to load model: {model_name}") if not self.load_model(model_name, progress): self.logger.warning(f"WARNING Failed to load model: {model_name}") continue self._update_progress(i + 1, len(sorted_models) + 1, progress) self.logger.debug(f"Processing with model: {model_name}") result = self._process_with_model(image, model_name) self.logger.debug(f"Model {model_name} result: {result}") if result and result.get('text', '').strip(): processing_time = time.time() - start_time self.logger.debug(f"Formatting result for {model_name}") ocr_result = self._format_result( result['text'], result.get('confidence', 0.5), model_name, processing_time ) self.logger.debug(f"Formatted OCR result: {ocr_result}") threshold = self.model_configs.get(model_name, {}).get('threshold', 0.5) if ocr_result.confidence >= threshold: self.logger.info(f"SUCCESS Model {model_name} succeeded") self._update_progress(len(sorted_models) + 1, len(sorted_models) + 1, progress) return ocr_result except Exception as e: self.logger.warning(f"WARNING Model {model_name} failed: {str(e)}") continue self.logger.warning("WARNING No model succeeded") self._update_progress(len(sorted_models) + 1, len(sorted_models) + 1, progress) return None # Model-Specific Processing def _process_with_model(self, image: np.ndarray, model_name: str) -> Dict: """Process image with a specific model""" if image is None or not isinstance(image, np.ndarray) or image.size == 0: self.logger.error(f"FAILURE Invalid image for {model_name}") return {'text': '', 'confidence': 0} if model_name in self.model_configs: config = self.model_configs[model_name] if config['type'] == "transformer": return self._process_transformer_model(image, model_name) elif config['type'] in ["image-to-text", "document-question-answering"]: return self._process_pipeline_model(image, model_name) elif model_name == 'persian_ocr': return self._process_persian_ocr(image) elif model_name == 'easyocr': return self._process_easyocr(image) elif model_name == 'tesseract': return self._process_tesseract(image) return {'text': '', 'confidence': 0} def _process_transformer_model(self, image: np.ndarray, model_name: str) -> Dict: """Process image using transformer-based model (e.g., Microsoft TrOCR)""" try: pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) model_data = self.models[model_name] processor = model_data['processor'] model = model_data['model'] device = model_data['device'] pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values.to(device) generated_ids = model.generate(pixel_values) generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] return { 'text': generated_text, 'confidence': self.model_configs[model_name]['threshold'] } except Exception as e: self.logger.error(f"FAILURE Transformer model processing failed: {str(e)}") return {'text': '', 'confidence': 0} def _process_pipeline_model(self, image: np.ndarray, model_name: str) -> Dict: """Process image using pipeline model""" try: pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) model = self.models[model_name] if self.model_configs[model_name]['type'] == "image-to-text": result = model(pil_image)[0] return { 'text': result['generated_text'], 'confidence': self.model_configs[model_name]['threshold'] } else: # document-question-answering result = model(pil_image) return { 'text': result['answer'], 'confidence': self.model_configs[model_name]['threshold'] } except Exception as e: self.logger.error(f"FAILURE Pipeline model processing failed: {str(e)}") return {'text': '', 'confidence': 0} def _process_easyocr(self, image: np.ndarray) -> Dict: """Process image using EasyOCR""" try: results = self.models['easyocr'].readtext(image) if not results: return {'text': '', 'confidence': 0} texts = [text for _, text, _ in results] confidence_sum = sum(conf for _, _, conf in results) confidence_avg = confidence_sum / len(results) if results else 0 return { 'text': ' '.join(texts), 'confidence': confidence_avg } except Exception as e: self.logger.error(f"FAILURE EasyOCR processing failed: {str(e)}") return {'text': '', 'confidence': 0} def _process_tesseract(self, image: np.ndarray) -> Dict: """Process image using Tesseract""" try: text = pytesseract.image_to_string( image, config='--oem 3 --psm 6 -l fas+eng' ) return {'text': text, 'confidence': 0.5} except Exception as e: self.logger.error(f"FAILURE Tesseract processing failed: {str(e)}") return {'text': '', 'confidence': 0} def _process_persian_ocr(self, image: np.ndarray) -> Dict: """Process image using Persian OCR""" try: if image is None or not isinstance(image, np.ndarray) or image.size == 0: return {'text': '', 'confidence': 0} text = self.persian_ocr_main(image, langs="fa", mode="tn") return {'text': text, 'confidence': 0.75} except Exception as e: self.logger.error(f"FAILURE Persian-OCR processing failed: {str(e)}") return {'text': '', 'confidence': 0} # Result Formatting def _format_result(self, text: str, confidence: float, model_name: str, processing_time: float) -> OCRResult: """Format OCR results into standardized output""" try: normalized_text = self.normalizer.normalize(text) words = word_tokenize(normalized_text) persian_nums = '۰۱۲۳۴۵۶۷۸۹' number_pattern = f'^[0-9{persian_nums}]+([\\.,،٫][0-9{persian_nums}]+)?' numbers = [w for w in words if re.match(number_pattern, w)] text_list = [w for w in words if not re.match(number_pattern, w)] return OCRResult( text=text_list, numbers=numbers, confidence=confidence, model_name=model_name, processing_time=processing_time, image_quality=self._assess_quality(text_list), detected_language=self._detect_language(text_list), word_count=len(text_list), char_count=sum(len(w) for w in text_list), preprocessing_info={}, error_rate=self._estimate_error_rate(text_list, confidence) ) except Exception as e: self.logger.error(f"FAILURE Formatting result failed: {str(e)}") return self._create_empty_result() def _estimate_error_rate(self, text_list: List[str], confidence: float) -> float: """Estimate error rate based on text characteristics and confidence""" if not text_list: return 1.0 avg_word_length = sum(len(w) for w in text_list) / len(text_list) return max(0.0, min(1.0, 1.0 - confidence + (3 - avg_word_length) / 10)) def _assess_quality(self, text_list: List[str]) -> str: """Assess the quality of extracted text""" if not text_list: return "Low" avg_word_length = sum(len(w) for w in text_list) / len(text_list) word_count = len(text_list) if word_count > 50 and avg_word_length > 3: return "High" elif word_count > 20 and avg_word_length > 2: return "Medium" else: return "Low" def _detect_language(self, text_list: List[str]) -> str: """Detect the dominant language in the text""" if not text_list: return "Unknown" persian_pattern = re.compile(r'[\u0600-\u06FF]') english_pattern = re.compile(r'[a-zA-Z]') persian_chars = sum(1 for word in text_list for _ in persian_pattern.finditer(word)) english_chars = sum(1 for word in text_list for _ in english_pattern.finditer(word)) if persian_chars > english_chars: return "Persian" elif english_chars > persian_chars: return "English" else: return "Mixed" # Persian OCR Specific def persian_ocr_main(self, image: np.ndarray, langs="fa", mode="tn") -> str: """Main Persian OCR processing function""" if image is None or not isinstance(image, np.ndarray) or image.size == 0: self.logger.error("FAILURE Invalid image for Persian OCR") return "" try: with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_input: temp_input_path = temp_input.name cv2.imwrite(temp_input_path, image) with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as temp_output: temp_output_path = temp_output.name im = Image.open(temp_input_path) length_x, width_y = im.size factor = float(1024.0 / length_x) size = int(factor * length_x), int(factor * width_y) image_resize = im.resize(size, Image.Resampling.LANCZOS) image_resize.save(f"{temp_input_path}_Upscaled.png", dpi=(300, 300)) img = cv2.imread(f"{temp_input_path}_Upscaled.png") if img is None: self.logger.error("FAILURE Failed to read upscaled image") return "" gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if gray is None: self.logger.error("FAILURE Failed to convert to grayscale") return "" if langs == "fa": if mode == "t": custom_config = r'-l fas --psm 6 -c tessedit_char_blacklist="۰١۲۳۴۵۶۷۸۹«»1234567890#"' elif mode == "tn": custom_config = r'-l fas --psm 6 -c tessedit_char_whitelist="آابپتثجچحخدذرزژسشصضطظعغفقکگلمنوهی ۰١۲۳۴۵۶۷۸۹.?!,،:;/"' elif mode == "table": custom_config = r'-l fas --psm 6 -c tessedit_char_whitelist="آابپتثجچحخدذرزژسشصضطظعغفقکگلمنوهی۰١۲۳۴۵۶۷۸۹"' elif langs == "en": custom_config = r'-l eng --psm 6' elif langs == "faen": custom_config = r'-l fas+eng --psm 6' else: raise ValueError("Invalid language option") text = pytesseract.image_to_string(gray, config=custom_config) with io.open(temp_output_path, 'w', encoding='utf8') as f: f.write(text) return text except Exception as e: self.logger.error(f"FAILURE Persian OCR failed: {str(e)}") return "" finally: for file in [temp_input_path, f"{temp_input_path}_Upscaled.png", temp_output_path]: try: os.remove(file) except Exception: pass # Image Enhancement def enhance_for_persian(self, image: np.ndarray, settings: Dict[str, Any], progress=None) -> Tuple[Optional[np.ndarray], Dict]: """Enhance image for Persian text recognition with robust validation and debugging""" info = {} if image is None or not isinstance(image, np.ndarray) or image.size == 0 or len(image.shape) < 2: self.logger.error("FAILURE Invalid input image for enhancement") return None, {} self.logger.debug(f"Enhancing image with shape: {image.shape}") try: processed = image.copy() if processed is None or not isinstance(processed, np.ndarray) or processed.size == 0: self.logger.error("FAILURE Failed to create image copy") return None, info self.logger.debug(f"Initial processed shape: {processed.shape}") step = 0 total_steps = 7 # Step 1: Convert to grayscale if len(processed.shape) == 3: self.logger.debug("Starting grayscale conversion") try: processed = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY) if processed is None: self.logger.error("FAILURE Grayscale conversion returned None") return None, info if len(processed.shape) != 2: self.logger.error(f"FAILURE Grayscale conversion produced invalid shape: {processed.shape}") return None, info self.logger.debug(f"After grayscale shape: {processed.shape}") info['grayscale'] = True except Exception as e: self.logger.error(f"FAILURE Grayscale conversion failed: {str(e)}") return None, info else: self.logger.debug("Image already grayscale, skipping conversion") step += 1 self._update_progress(step, total_steps, progress) # Step 2: Resize if settings.get('resize'): scale_percent = settings.get('resize_scale', 200) if scale_percent != 100: if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for resize") return None, info self.logger.debug(f"Starting resize with scale {scale_percent}%") h, w = processed.shape[:2] new_w = int(w * scale_percent / 100) new_h = int(h * scale_percent / 100) if new_w <= 0 or new_h <= 0: self.logger.error(f"FAILURE Invalid resize dimensions: {new_w}x{new_h}") return None, info try: processed = cv2.resize(processed, (new_w, new_h), interpolation=cv2.INTER_CUBIC) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Resize operation returned invalid result") return None, info self.logger.debug(f"After resize shape: {processed.shape}") info['resized'] = f"{scale_percent}%" except Exception as e: self.logger.error(f"FAILURE Resize failed: {str(e)}") return None, info step += 1 self._update_progress(step, total_steps, progress) # Step 3: Contrast Enhancement if settings.get('enhance_contrast'): if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for contrast enhancement") return None, info self.logger.debug("Starting contrast enhancement") try: clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) processed = clahe.apply(processed) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Contrast enhancement returned invalid result") return None, info self.logger.debug(f"After contrast enhancement shape: {processed.shape}") info['contrast_enhanced'] = True except Exception as e: self.logger.error(f"FAILURE Contrast enhancement failed: {str(e)}") return None, info step += 1 self._update_progress(step, total_steps, progress) # Step 4: Noise Reduction if settings.get('reduce_noise'): if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for noise reduction") return None, info self.logger.debug("Starting noise reduction") try: processed = cv2.bilateralFilter(processed, 9, 75, 75) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Noise reduction returned invalid result") return None, info self.logger.debug(f"After noise reduction shape: {processed.shape}") info['noise_reduced'] = True except Exception as e: self.logger.error(f"FAILURE Noise reduction failed: {str(e)}") return None, info step += 1 self._update_progress(step, total_steps, progress) # Step 5: Sharpening if settings.get('sharpen'): if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for sharpening") return None, info self.logger.debug("Starting sharpening") kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) try: processed = cv2.filter2D(processed, -1, kernel) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Sharpening returned invalid result") return None, info self.logger.debug(f"After sharpening shape: {processed.shape}") info['sharpened'] = True except Exception as e: self.logger.error(f"FAILURE Sharpening failed: {str(e)}") return None, info step += 1 self._update_progress(step, total_steps, progress) # Step 6: Deskew if settings.get('deskew'): if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for deskew") return None, info self.logger.debug("Starting deskew") coords = np.column_stack(np.where(processed > 0)) self.logger.debug(f"Coords shape: {coords.shape}") if coords.size >= 5: try: rect = cv2.minAreaRect(coords) self.logger.debug(f"Rect: {rect}") if not (isinstance(rect, tuple) and len(rect) == 3 and isinstance(rect[0], tuple) and isinstance(rect[1], tuple) and isinstance(rect[2], (int, float))): self.logger.warning("WARNING Invalid rect structure from minAreaRect") else: angle = rect[2] if angle < -45: angle = 90 + angle h, w = processed.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) processed = cv2.warpAffine( processed, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE ) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Deskew operation returned invalid result") return None, info self.logger.debug(f"After deskew shape: {processed.shape}") info['deskewed'] = f"angle: {angle:.2f}" except Exception as e: self.logger.warning(f"WARNING Deskew failed: {str(e)}") else: self.logger.warning("WARNING Not enough points for deskewing (coords.size < 5)") step += 1 self._update_progress(step, total_steps, progress) # Step 7: Thresholding if settings.get('threshold'): if len(processed.shape) < 2: self.logger.error("FAILURE Invalid shape for thresholding") return None, info self.logger.debug("Starting thresholding") try: processed = cv2.adaptiveThreshold( processed, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Thresholding returned invalid result") return None, info self.logger.debug(f"After thresholding shape: {processed.shape}") info['thresholded'] = True except Exception as e: self.logger.error(f"FAILURE Thresholding failed: {str(e)}") return None, info step += 1 self._update_progress(step, total_steps, progress) if processed is None or len(processed.shape) < 2: self.logger.error("FAILURE Final image has invalid shape") return None, info self.logger.debug(f"Final processed shape: {processed.shape}") return processed, info except Exception as e: self.logger.error(f"FAILURE Image enhancement error: {str(e)}") self._update_progress(total_steps, total_steps, progress) return None, {} # PDF Processing def process_pdf(self, pdf_path: str, settings: Optional[Dict] = None, progress=None) -> List[OCRResult]: """Process a PDF document and extract text from all pages""" self.logger.info(f"START Processing PDF: {pdf_path}") results = [] current_settings = self.default_settings.copy() if settings: current_settings.update(settings) if not pdf_path or not os.path.exists(pdf_path): self.logger.error(f"FAILURE PDF file not found: {pdf_path}") return [self._create_empty_result()] if not pdf_path.lower().endswith('.pdf'): self.logger.error(f"FAILURE Input file is not a PDF: {pdf_path}") return [self._create_empty_result()] try: self.logger.debug(f"Calling optimize_pdf_document with path: {pdf_path}") optimized_pdf = self.optimize_pdf_document(pdf_path, current_settings, progress) self.logger.debug(f"optimize_pdf_document returned: {optimized_pdf}") if not optimized_pdf or not os.path.exists(optimized_pdf): self.logger.error(f"FAILURE Optimized PDF not generated: {optimized_pdf}") return [self._create_empty_result()] self.logger.debug(f"Calling process_pdf_document with path: {optimized_pdf}") images = self.process_pdf_document(optimized_pdf, current_settings, progress) self.logger.debug(f"process_pdf_document returned {len(images)} images") if not images: self.logger.warning("WARNING No images extracted from PDF") return [self._create_empty_result()] with ThreadPoolExecutor(max_workers=current_settings.get('max_workers', 4)) as executor: futures = [ executor.submit(self.process_single_image, img, current_settings, progress) for img in images if img is not None ] for future in as_completed(futures): result = future.result() self.logger.debug(f"Thread result: {result}") if result and result.text: results.append(result) self.logger.info(f"SUCCESS Processed {len(results)} pages") return results if results else [self._create_empty_result()] except Exception as e: self.logger.error(f"FAILURE PDF processing failed: {str(e)}") return [self._create_empty_result()] def _create_empty_result(self) -> OCRResult: """Create an empty OCR result""" return OCRResult( text=[], numbers=[], confidence=0.0, model_name="None", processing_time=0.0, image_quality="Unknown", detected_language="Unknown", word_count=0, char_count=0, preprocessing_info={}, error_rate=1.0 ) def process_pdf_document(self, pdf_path: str, settings: Dict[str, Any], progress=None) -> List[np.ndarray]: """Extract and process images from PDF document""" self.logger.info(f"START Processing PDF: {pdf_path}") all_images = [] try: doc = fitz.open(pdf_path) total_pages = len(doc) batch_size = settings.get('batch_size', 2) batches = [range(i, min(i + batch_size, total_pages)) for i in range(0, total_pages, batch_size)] with tqdm(total=total_pages, desc="📄 Processing PDF") as pbar: for batch in batches: with ThreadPoolExecutor(max_workers=batch_size) as executor: futures = { executor.submit( self._process_pdf_page, doc, page_num, settings, progress ): page_num for page_num in batch } for future in as_completed(futures): result = future.result() self.logger.debug(f"Page result: {result}") if result and isinstance(result, list): all_images.extend(result) pbar.update(1) self.logger.info(f"SUCCESS Extracted {len(all_images)} images") return all_images except Exception as e: self.logger.error(f"FAILURE PDF processing failed: {str(e)}") return [] def _process_pdf_page(self, doc, page_num: int, settings: Dict[str, Any], progress=None) -> List[np.ndarray]: """Process a single PDF page""" images = [] try: page = doc.load_page(page_num) pix = page.get_pixmap(matrix=fitz.Matrix( settings.get('scale_factor', 2), settings.get('scale_factor', 2) )) if not self._validate_pixmap(pix): self.logger.warning(f"WARNING Invalid pixmap for page {page_num + 1}") return [] img = self._pixmap_to_image(pix) if img is None: self.logger.warning(f"WARNING Failed to convert pixmap to image for page {page_num + 1}") return [] processed_img, _ = self.enhance_for_persian(img, settings, progress) if processed_img is not None: images.append(processed_img) if settings.get('extract_images', True): embedded_images = self._extract_embedded_images(doc, page, page_num) images.extend(embedded_images) if progress: progress(1.0) return images except Exception as e: self.logger.error(f"FAILURE Page {page_num + 1} processing failed: {str(e)}") if progress: progress(1.0) return [] def _validate_pixmap(self, pix) -> bool: """Validate pixmap data with detailed logging""" if not hasattr(pix, 'n') or not hasattr(pix, 'width') or not hasattr(pix, 'height') or not hasattr(pix, 'samples'): self.logger.error("FAILURE Pixmap missing required attributes") return False if pix.n <= 0: self.logger.error("FAILURE Pixmap has invalid number of components") return False if pix.width <= 0 or pix.height <= 0: self.logger.error("FAILURE Pixmap has invalid dimensions") return False if pix.samples is None or len(pix.samples) == 0: self.logger.error("FAILURE Pixmap has no sample data") return False expected_size = pix.width * pix.height * pix.n if len(pix.samples) != expected_size: self.logger.error(f"FAILURE Pixmap sample size mismatch: expected {expected_size}, got {len(pix.samples)}") return False return True def _pixmap_to_image(self, pix) -> Optional[np.ndarray]: """Convert pixmap to numpy array with validation""" try: if not self._validate_pixmap(pix): return None img_data = np.frombuffer(pix.samples, dtype=np.uint8) expected_size = pix.width * pix.height * pix.n if img_data.size != expected_size: self.logger.error( f"FAILURE Pixmap data size mismatch: " f"expected {expected_size}, got {img_data.size}" ) return None reshaped = img_data.reshape(pix.height, pix.width, pix.n) if reshaped is None or len(reshaped.shape) < 2: self.logger.error("FAILURE Failed to reshape pixmap data") return None return reshaped except Exception as e: self.logger.error(f"FAILURE Pixmap conversion failed: {str(e)}") return None def _extract_embedded_images(self, doc, page, page_num: int) -> List[np.ndarray]: """Extract embedded images from PDF page""" images = [] for img_info in page.get_images(full=True): try: xref = img_info[0] base_image = fitz.Pixmap(doc, xref) if base_image.n >= 4: base_image = fitz.Pixmap(fitz.csRGB, base_image) if not self._validate_pixmap(base_image): continue img_array = np.frombuffer(base_image.samples, dtype=np.uint8).reshape( base_image.height, base_image.width, 3 if base_image.n >= 3 else 1 ) if img_array is None or len(img_array.shape) < 2: self.logger.warning(f"WARNING Failed to reshape embedded image on page {page_num + 1}") continue if img_array.shape[0] > 100 and img_array.shape[1] > 100: processed_img, _ = self.enhance_for_persian(img_array, {}) if processed_img is not None: images.append(processed_img) except Exception as e: self.logger.warning( f"WARNING Failed to process embedded image on page {page_num + 1}: {str(e)}" ) continue return images def optimize_pdf_document(self, pdf_path: str, settings: Dict[str, Any], progress=None) -> str: """Optimize PDF document for OCR processing""" self.logger.info(f"START Optimizing PDF: {pdf_path}") if not os.path.exists(pdf_path): self.logger.error(f"FAILURE PDF file not found: {pdf_path}") return pdf_path try: output_path = str(Path(f"optimized_{Path(pdf_path).name}")) doc = fitz.open(pdf_path) new_doc = fitz.open() total_pages = len(doc) for page_num in tqdm(range(total_pages), desc="📄 Optimizing PDF"): page = doc.load_page(page_num) pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) if not self._validate_pixmap(pix): continue img = self._pixmap_to_image(pix) if img is None: continue processed_img, _ = self.enhance_for_persian(img, settings, progress) if processed_img is None: continue img_path = Path(tempfile.mkdtemp()) / f"temp_page_{page_num}.jpg" cv2.imwrite(str(img_path), processed_img) temp_doc = fitz.open(str(img_path)) new_doc.insert_pdf(temp_doc) temp_doc.close() os.remove(img_path) new_doc.save(output_path) self.logger.info(f"SUCCESS PDF optimized: {output_path}") if progress: progress(1.0) return output_path except Exception as e: self.logger.error(f"FAILURE PDF optimization failed: {str(e)}") if progress: progress(1.0) return pdf_path # Gradio Interface def create_gradio_interface(self): """Create Gradio web interface""" def process_file(file, use_cache: bool, preprocessing: bool, confidence: float, scale: int, enhance_contrast: bool, reduce_noise: bool, extract_images: bool): """Handle file processing in Gradio interface""" if file is None: self.logger.error("FAILURE No file provided") return self._empty_interface_result("No file uploaded") settings = { 'cache_enabled': use_cache, 'preprocessing_enabled': preprocessing, 'confidence_threshold': confidence, 'resize': True, 'resize_scale': scale, 'enhance_contrast': enhance_contrast, 'reduce_noise': reduce_noise, 'extract_images': extract_images, 'sharpen': True, 'deskew': True, 'optimize_for_ocr': True } progress = gr.Progress(track_tqdm=True) try: if file.name.lower().endswith('.pdf'): return self._process_pdf_interface(file.name, settings, progress) else: return self._process_image_interface(file.name, settings, progress) except Exception as e: self.logger.error(f"FAILURE Interface error: {str(e)}") return self._empty_interface_result(str(e)) with gr.Blocks(title="Persian OCR System") as interface: gr.Markdown("# Advanced Persian OCR System") with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload File (Image or PDF)") with gr.Accordion("Advanced Settings", open=False): use_cache = gr.Checkbox(label="Use Cache", value=True) preprocessing = gr.Checkbox(label="Enable Preprocessing", value=True) confidence = gr.Slider(0.1, 1.0, value=0.7, label="Confidence Threshold") scale = gr.Slider(100, 400, value=200, step=50, label="Image Scale (%)") enhance_contrast = gr.Checkbox(label="Enhance Contrast", value=True) reduce_noise = gr.Checkbox(label="Reduce Noise", value=True) extract_images = gr.Checkbox(label="Extract Images from PDF", value=True) submit_btn = gr.Button("Process Text") with gr.Column(): outputs = [ gr.Textbox(label="Extracted Text", lines=10), gr.Textbox(label="Extracted Numbers", lines=2), gr.Textbox(label="Confidence Level"), gr.Textbox(label="OCR Model Used"), gr.Textbox(label="Processing Time"), gr.Textbox(label="Image Quality"), gr.Textbox(label="Preprocessing Info", lines=5) ] submit_btn.click( fn=process_file, inputs=[ file_input, use_cache, preprocessing, confidence, scale, enhance_contrast, reduce_noise, extract_images ], outputs=outputs ) return interface def _process_pdf_interface(self, file_path: str, settings: Dict, progress) -> Tuple: """Process PDF file for interface""" results = self.process_pdf(file_path, settings, progress) full_text = "" numbers_combined = [] confidences = [] models_used = [] times = [] qualities = [] preprocess_infos = [] for res in results: full_text += "\n" + " ".join(res.text) numbers_combined.extend(res.numbers) confidences.append(f"{res.confidence:.2f}") models_used.append(res.model_name) times.append(f"{res.processing_time:.2f} seconds") qualities.append(res.image_quality) preprocess_infos.append( "\n".join([f"{k}: {v}" for k, v in res.preprocessing_info.items()]) ) combined_preprocess_info = ("\nPage-wise Preprocessing Info:\n" + "\n\n".join(preprocess_infos) if preprocess_infos else "") return ( full_text.strip(), ", ".join(numbers_combined), ", ".join(confidences), ", ".join(models_used), ", ".join(times), ", ".join(qualities), combined_preprocess_info ) def _process_image_interface(self, file_path: str, settings: Dict, progress) -> Tuple: """Process image file for interface""" result = self.process_single_image(file_path, settings, progress) if result and result.text: preprocess_info = "\n".join([f"{k}: {v}" for k, v in result.preprocessing_info.items()]) \ if result.preprocessing_info else "" return ( "\n".join(result.text), ", ".join(result.numbers), f"{result.confidence:.2f}", result.model_name, f"{result.processing_time:.2f} seconds", result.image_quality, preprocess_info ) return self._empty_interface_result("No text extracted") def _empty_interface_result(self, message: str) -> Tuple: """Create empty result for interface""" return ("", "", "0.0", "None", "0.0", "Unknown", message) # System Runner def run(self): """Run the OCR system""" try: self.logger.info("START Initializing system") self.setup_system_dependencies() os.makedirs('logs', exist_ok=True) os.makedirs('cache', exist_ok=True) interface = self.create_gradio_interface() interface.launch( share=True, debug=True, server_name="0.0.0.0", server_port=7860 ) except Exception as e: self.logger.error(f"FAILURE System initialization failed: {str(e)}") raise # Main Execution if __name__ == "__main__": ocr_system = OCRSystem() ocr_system.run()