""" ماژول یکپارچه سیستم OCR پارسی این کد شامل تنظیمات، ابزارهای کمکی، پردازش اسناد و تصاویر، و رابط کاربری Gradio با بهبودهای جدید است. طراحی شده برای اجرا روی CPU بدون نیاز به GPU. """ import os import re import gc import time import json import uuid import logging import hashlib import traceback import threading import subprocess import contextlib import signal import sys from datetime import datetime, timedelta from typing import List, Dict, Tuple, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache, wraps from difflib import SequenceMatcher from collections import deque from dataclasses import dataclass, field import cv2 import numpy as np import fitz # PyMuPDF import pytesseract import requests from pdf2image import convert_from_path from PIL import Image, ImageEnhance, ImageFilter import gradio as gr import multiprocessing import psutil import platform import socket import shutil import glob from pathlib import Path from colorama import Fore, Style, init # تنظیمات colorama init() # تنظیمات پیکربندی CONFIG = { "TESSDATA_LOCAL": os.environ.get("TESSDATA_PATH", "tessdata"), "TESSERACT_CMD": os.environ.get("TESSERACT_CMD", "/usr/bin/tesseract"), "CACHE_DIR": os.environ.get("CACHE_DIR", "cache"), "MODEL_DIR": os.environ.get("MODEL_DIR", "models"), "LOG_DIR": os.environ.get("LOG_DIR", "logs"), "TEMP_DIR": os.environ.get("TEMP_DIR", "temp"), "OUTPUT_DIR": os.environ.get("OUTPUT_DIR", "output"), "DATASET_DIR": os.environ.get("DATASET_DIR", "datasets"), "MAX_CACHE_SIZE_MB": int(os.environ.get("MAX_CACHE_SIZE_MB", "2048")), "MAX_WORKERS": min(multiprocessing.cpu_count(), 1), "MAX_MEMORY_PERCENT": float(os.environ.get("MAX_MEMORY_PERCENT", "99.5")), # آستانه حافظه "MAX_CPU_PERCENT": float(os.environ.get("MAX_CPU_PERCENT", "80.0")), "MIN_FREE_SPACE_BYTES": 1024**3, "DEFAULT_LANGUAGE": os.environ.get("DEFAULT_LANGUAGE", "fas"), "SUPPORTED_LANGUAGES": ["fas", "eng", "ara", "eng+fas", "fas+eng", "ara+fas"], "CONFIDENCE_THRESHOLD": float(os.environ.get("CONFIDENCE_THRESHOLD", "0.65")), "PAGE_SEGMENTATION_MODE": os.environ.get("PAGE_SEGMENTATION_MODE", "3"), "OCR_ENGINE_MODE": os.environ.get("OCR_ENGINE_MODE", "1"), "AZURE_API_KEY": os.environ.get("AZURE_API_KEY", ""), "AZURE_ENDPOINT": os.environ.get("AZURE_ENDPOINT", ""), "GOOGLE_APPLICATION_CREDENTIALS": os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""), "DEBUG_MODE": os.environ.get("DEBUG_MODE", "false").lower() == "true", "VERSION": "2.0.1", "UI_TITLE": "سیستم OCR پارسی" } # تنظیم لاگها با رنگیسازی logging.basicConfig(level=logging.INFO, format=f"{Fore.GREEN}%(asctime)s - %(name)s - %(levelname)s - {Fore.RESET}%(message)s") def log_with_check(message, success=True): """لاگ با تیک یا علامت خطا و جداسازی با خط چین""" separator = f"{Fore.YELLOW}---{Style.RESET_ALL}" check = f"{Fore.GREEN}✅{Style.RESET_ALL}" if success else f"{Fore.RED}❌{Style.RESET_ALL}" logging.info(f"{separator}\n{check} {message}\n{separator}") def init_directories(): """ایجاد دایرکتوریهای مورد نیاز""" directories = [ CONFIG["CACHE_DIR"], CONFIG["MODEL_DIR"], CONFIG["LOG_DIR"], CONFIG["TEMP_DIR"], CONFIG["OUTPUT_DIR"], CONFIG["DATASET_DIR"], CONFIG["TESSDATA_LOCAL"] ] for directory in directories: os.makedirs(directory, exist_ok=True) log_with_check("Directories initialized") def check_optional_libs(): """بررسی دسترسی به کتابخانههای اختیاری""" CONFIG["AZURE_OCR_AVAILABLE"] = False CONFIG["GOOGLE_OCR_AVAILABLE"] = False try: from azure.cognitiveservices.vision.computervision import ComputerVisionClient CONFIG["AZURE_OCR_AVAILABLE"] = True except ImportError: pass try: from google.cloud import vision CONFIG["GOOGLE_OCR_AVAILABLE"] = True except ImportError: pass log_with_check("Optional libraries checked") # دکوراتورها def timed(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) logging.debug(f"{func.__name__} took {time.time() - start_time:.2f} seconds") return result return wrapper @contextlib.contextmanager def temp_file(suffix=None): """مدیریت فایلهای موقت""" temp_path = os.path.join(CONFIG["TEMP_DIR"], f"temp_{uuid.uuid4()}{suffix or ''}") try: yield temp_path finally: if os.path.exists(temp_path): try: os.remove(temp_path) except: pass class Utils: @staticmethod def detect_language(text): if any('\u0600' <= c <= '\u06FF' for c in text): return 'fa' return 'en' @staticmethod def is_rtl_language(lang_code): return lang_code in ['fa', 'fas', 'ar', 'ara'] @staticmethod def get_file_extension(file_path): return os.path.splitext(file_path)[1].lower() @staticmethod def get_human_readable_size(size, decimal_places=2): for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024.0: break size /= 1024.0 return f"{size:.{decimal_places}f} {unit}" @staticmethod def retry(max_attempts=3, delay=1, exceptions=(Exception,)): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts - 1: raise logging.warning(f"Retry {attempt+1}/{max_attempts} for {func.__name__} after error: {str(e)}") time.sleep(delay) return wrapper return decorator @dataclass class ResourceUsage: cpu_percent: float memory_percent: float gpu_memory_percent: Optional[float] = None timestamp: datetime = None @dataclass class LoadPrediction: expected_load: float confidence: float time_window: timedelta details: Dict @dataclass class ResourceMetrics: memory_usage: float cpu_usage: float disk_usage: float timestamp: datetime @dataclass class ScanRegion: content: str confidence: float language: str = 'unknown' direction: str = 'ltr' @dataclass class DocumentStructure: title: str language: str page_count: int toc: Dict sections: List[Dict] metadata: Dict attributes: Dict @dataclass class ProcessingConfig: use_preprocessing: bool = True use_caching: bool = True max_workers: int = CONFIG["MAX_WORKERS"] language: str = CONFIG["DEFAULT_LANGUAGE"] confidence_threshold: float = CONFIG["CONFIDENCE_THRESHOLD"] class CacheManager: def __init__(self, cache_dir=CONFIG["CACHE_DIR"], max_cache_size_mb=CONFIG["MAX_CACHE_SIZE_MB"]): self.cache_dir = cache_dir self.max_cache_size_mb = max_cache_size_mb self.cache_entries = {} self.access_history = deque(maxlen=1000) self.logger = logging.getLogger(__name__) self.lock = threading.Lock() os.makedirs(self.cache_dir, exist_ok=True) self._load_index() log_with_check("CacheManager initialized") @lru_cache(maxsize=1024) def _hash_key(self, key: str) -> str: return hashlib.md5(key.encode('utf-8')).hexdigest() def _cache_path(self, key_hash: str) -> str: return os.path.join(self.cache_dir, f"{key_hash}.cache") def _load_index(self): index_path = os.path.join(self.cache_dir, "index.json") try: if os.path.exists(index_path): with open(index_path, 'r', encoding='utf-8') as f: cache_data = json.load(f) self.cache_entries = cache_data.get('entries', {}) log_with_check(f"Loaded {len(self.cache_entries)} cache entries") except Exception as e: self.logger.error(f"Error loading cache index: {str(e)}") log_with_check("Failed to load cache index", False) def _save_index(self): index_path = os.path.join(self.cache_dir, "index.json") try: with open(index_path, 'w', encoding='utf-8') as f: json.dump({'entries': self.cache_entries}, f) log_with_check("Cache index saved") except Exception as e: self.logger.error(f"Error saving cache index: {str(e)}") log_with_check("Failed to save cache index", False) def get(self, key: str) -> Any: with self.lock: key_hash = self._hash_key(key) if key_hash not in self.cache_entries: return None cache_path = self._cache_path(key_hash) if not os.path.exists(cache_path): del self.cache_entries[key_hash] self._save_index() return None try: with open(cache_path, 'rb') as f: import pickle result = pickle.load(f) self.cache_entries[key_hash]['last_access'] = datetime.now().isoformat() self.access_history.append(key_hash) return result except Exception as e: self.logger.error(f"Error retrieving from cache: {str(e)}") return None def set(self, key: str, value: Any, expire_seconds: int = 86400) -> bool: with self.lock: if self._check_cache_size() > self.max_cache_size_mb: self._clean_cache() key_hash = self._hash_key(key) cache_path = self._cache_path(key_hash) try: with open(cache_path, 'wb') as f: import pickle pickle.dump(value, f) now = datetime.now() self.cache_entries[key_hash] = { 'key': key, 'created': now.isoformat(), 'last_access': now.isoformat(), 'expires': (now + timedelta(seconds=expire_seconds)).isoformat(), 'size': os.path.getsize(cache_path) } self.access_history.append(key_hash) self._save_index() return True except Exception as e: self.logger.error(f"Error storing in cache: {str(e)}") return False def _check_cache_size(self) -> float: total_size = sum(entry.get('size', 0) for entry in self.cache_entries.values()) return total_size / (1024 * 1024) def _clean_cache(self): with self.lock: now = datetime.now() expired_keys = [k for k, v in self.cache_entries.items() if datetime.fromisoformat(v['expires']) < now] for key in expired_keys: self._remove_item(key) if self._check_cache_size() > self.max_cache_size_mb * 0.8: access_counts = {key: self.access_history.count(key) for key in set(self.access_history)} to_remove = sorted( [k for k in self.cache_entries.keys() if k not in expired_keys], key=lambda k: access_counts.get(k, 0) ) for key in to_remove: self._remove_item(key) if self._check_cache_size() < self.max_cache_size_mb * 0.7: break def _remove_item(self, key_hash: str): try: cache_path = self._cache_path(key_hash) if os.path.exists(cache_path): os.remove(cache_path) if key_hash in self.cache_entries: del self.cache_entries[key_hash] except Exception as e: self.logger.error(f"Error removing cache item: {str(e)}") def clear(self): with self.lock: for key_hash in list(self.cache_entries.keys()): self._remove_item(key_hash) self.cache_entries = {} self.access_history.clear() self._save_index() def generate_key(self, image: np.ndarray) -> str: small_img = cv2.resize(image, (32, 32)) if len(small_img.shape) == 3: small_img = cv2.cvtColor(small_img, cv2.COLOR_BGR2GRAY) img_hash = hashlib.md5(small_img.tobytes()).hexdigest() return f"img:{img_hash}" class Normalizer: def __init__(self): self.logger = logging.getLogger(__name__) log_with_check("Normalizer initialized") self.char_mappings = { 'ك': 'ک', 'ي': 'ی', 'أ': 'ا', 'إ': 'ا', 'آ': 'ا', 'ة': 'ه', '٠': '0', '١': '1', '٢': '2', '٣': '3', '٤': '4', '٥': '5', '٦': '6', '٧': '7', '٨': '8', '٩': '9', '۰': '0', '۱': '1', '۲': '2', '۳': '3', '۴': '4', '۵': '5', '۶': '6', '۷': '7', '۸': '8', '۹': '9' } self.space_patterns = [ (r'\s+', ' '), (r'ـ+', ''), (r'[.]{2,}', '...'), (r'[\u200c\u200f\u200e]+', '\u200c') ] def normalize(self, text: str, normalize_chars: bool = True, normalize_spaces: bool = True) -> str: if not text: return text result = text if normalize_chars: for src, dst in self.char_mappings.items(): result = result.replace(src, dst) if normalize_spaces: for pattern, replacement in self.space_patterns: result = re.sub(pattern, replacement, result) return result.strip() class AdaptiveLearner: def __init__(self, model_dir: str = CONFIG["MODEL_DIR"]): os.makedirs(model_dir, exist_ok=True) self.model_dir = model_dir self.corrections = {} self.confidence_threshold = CONFIG["CONFIDENCE_THRESHOLD"] self.logger = logging.getLogger(__name__) self.lock = threading.Lock() self._load_corrections() log_with_check("AdaptiveLearner initialized") def _load_corrections(self): corrections_path = os.path.join(self.model_dir, "corrections.json") try: if os.path.exists(corrections_path): with open(corrections_path, 'r', encoding='utf-8') as f: self.corrections = json.load(f) log_with_check(f"Loaded {sum(len(v) for k, v in self.corrections.items() if isinstance(v, dict))} text corrections") else: log_with_check("No corrections file found", False) except Exception as e: self.logger.error(f"Error loading corrections: {str(e)}") log_with_check("Failed to load corrections", False) self.corrections = {} def _save_corrections(self): with self.lock: corrections_path = os.path.join(self.model_dir, "corrections.json") try: with open(corrections_path, 'w', encoding='utf-8') as f: json.dump(self.corrections, f, ensure_ascii=False, indent=2) log_with_check("Corrections saved") except Exception as e: self.logger.error(f"Error saving corrections: {str(e)}") log_with_check("Failed to save corrections", False) def apply_corrections(self, text: str, context: Dict) -> Tuple[str, float]: if not text: return text, 1.0 language = context.get('language', 'unknown') corrected_text = text confidence = 1.0 for pattern, replacement in self.corrections.get('general', {}).items(): corrected_text = re.sub(pattern, replacement, corrected_text) for pattern, replacement in self.corrections.get(language, {}).items(): corrected_text = re.sub(pattern, replacement, corrected_text) if text != corrected_text: similarity = SequenceMatcher(None, text, corrected_text).ratio() confidence = similarity return corrected_text, confidence def learn_correction(self, original: str, corrected: str, context: Dict): if original == corrected: return with self.lock: language = context.get('language', 'general') if language not in self.corrections: self.corrections[language] = {} if len(original) > 10: matcher = SequenceMatcher(None, original, corrected) for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == 'replace': pattern = re.escape(original[i1:i2]) replacement = corrected[j1:j2] self.corrections[language][pattern] = replacement else: self.corrections[language][re.escape(original)] = corrected self._save_corrections() class ImageProcessor: def __init__(self): self.logger = logging.getLogger(__name__) log_with_check("ImageProcessor initialized") @staticmethod def deskew(image: np.ndarray) -> np.ndarray: try: if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image.copy() thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] coords = np.column_stack(np.where(thresh > 0)) angle = cv2.minAreaRect(coords)[-1] if angle < -45: angle = -(90 + angle) else: angle = -angle (h, w) = image.shape[:2] center = (w // 2, h // 2) matrix = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine(image, matrix, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE) return rotated except Exception as e: logging.error(f"Deskew error: {str(e)}") return image @staticmethod def remove_noise(image: np.ndarray) -> np.ndarray: try: if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image.copy() return cv2.medianBlur(gray, 3) except Exception as e: logging.error(f"Noise removal error: {str(e)}") return image @staticmethod def adjust_contrast(image: np.ndarray, factor=1.5) -> np.ndarray: try: if len(image.shape) == 3: pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) else: pil_image = Image.fromarray(image) enhancer = ImageEnhance.Contrast(pil_image) enhanced_img = enhancer.enhance(factor) if len(image.shape) == 3: return cv2.cvtColor(np.array(enhanced_img), cv2.COLOR_RGB2BGR) return np.array(enhanced_img) except Exception as e: logging.error(f"Contrast adjustment error: {str(e)}") return image @staticmethod def sharpen(image: np.ndarray) -> np.ndarray: try: if len(image.shape) == 3: pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) else: pil_image = Image.fromarray(image) sharpened = pil_image.filter(ImageFilter.SHARPEN) if len(image.shape) == 3: return cv2.cvtColor(np.array(sharpened), cv2.COLOR_RGB2BGR) return np.array(sharpened) except Exception as e: logging.error(f"Sharpen error: {str(e)}") return image @staticmethod def binarize(image: np.ndarray, method="adaptive") -> np.ndarray: try: if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image.copy() if method == "otsu": return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] return cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) except Exception as e: logging.error(f"Binarization error: {str(e)}") return image def enhance_for_ocr(self, image: np.ndarray) -> np.ndarray: try: enhanced = image.copy() enhanced = self.deskew(enhanced) enhanced = self.remove_noise(enhanced) enhanced = self.adjust_contrast(enhanced, 1.5) enhanced = self.sharpen(enhanced) enhanced = self.binarize(enhanced, "adaptive") return enhanced except Exception as e: self.logger.error(f"Image enhancement error: {str(e)}") return image class CloudOCRProvider: def __init__(self): self.logger = logging.getLogger(__name__) log_with_check("CloudOCRProvider initialized") self.azure_client = None self.google_client = None if CONFIG.get("AZURE_OCR_AVAILABLE", False) and CONFIG["AZURE_API_KEY"] and CONFIG["AZURE_ENDPOINT"]: try: from azure.cognitiveservices.vision.computervision import ComputerVisionClient from msrest.authentication import CognitiveServicesCredentials self.azure_client = ComputerVisionClient(CONFIG["AZURE_ENDPOINT"], CognitiveServicesCredentials(CONFIG["AZURE_API_KEY"])) log_with_check("Azure OCR initialized") except Exception as e: self.logger.error(f"Azure OCR initialization error: {str(e)}") log_with_check("Azure OCR initialization failed", False) if CONFIG.get("GOOGLE_OCR_AVAILABLE", False) and CONFIG["GOOGLE_APPLICATION_CREDENTIALS"]: try: from google.cloud import vision self.google_client = vision.ImageAnnotatorClient() log_with_check("Google OCR initialized") except Exception as e: self.logger.error(f"Google OCR initialization error: {str(e)}") log_with_check("Google OCR initialization failed", False) def is_available(self): return self.azure_client is not None or self.google_client is not None @Utils.retry(max_attempts=2, delay=1, exceptions=(Exception,)) def process_with_azure(self, image_path): if not self.azure_client: return None try: with open(image_path, "rb") as image_file: image_data = image_file.read() from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes recognize_results = self.azure_client.recognize_printed_text_in_stream(image_data) result = {"regions": [], "text": "", "language": "", "confidence": 0.0} all_text = [] total_confidence = 0.0 count = 0 for region in recognize_results.regions: region_text = [] region_info = {"bounding_box": region.bounding_box.split(","), "lines": []} for line in region.lines: line_text = [] line_info = {"bounding_box": line.bounding_box.split(","), "words": []} for word in line.words: line_text.append(word.text) total_confidence += word.confidence count += 1 line_info["words"].append({"text": word.text, "confidence": word.confidence, "bounding_box": word.bounding_box.split(",")}) full_line = " ".join(line_text) region_text.append(full_line) line_info["text"] = full_line region_info["lines"].append(line_info) full_region = "\n".join(region_text) all_text.append(full_region) region_info["text"] = full_region result["regions"].append(region_info) result["text"] = "\n\n".join(all_text) if count > 0: result["confidence"] = total_confidence / count result["language"] = recognize_results.language or "unknown" return result except Exception as e: self.logger.error(f"Azure OCR error: {str(e)}") return None @Utils.retry(max_attempts=2, delay=1, exceptions=(Exception,)) def process_with_google(self, image_path): if not self.google_client: return None try: with open(image_path, "rb") as image_file: content = image_file.read() from google.cloud import vision image = vision.Image(content=content) response = self.google_client.text_detection(image=image) if response.error.message: self.logger.error(f"Google OCR API error: {response.error.message}") return None result = {"regions": [], "text": "", "language": "", "confidence": 0.0} full_text_annotation = response.full_text_annotation if full_text_annotation: result["text"] = full_text_annotation.text for page in full_text_annotation.pages: for block in page.blocks: block_info = {"bounding_box": [[vertex.x, vertex.y] for vertex in block.bounding_box.vertices], "paragraphs": [], "text": ""} block_texts = [] for paragraph in block.paragraphs: para_info = {"bounding_box": [[vertex.x, vertex.y] for vertex in paragraph.bounding_box.vertices], "words": [], "text": ""} para_texts = [] for word in paragraph.words: word_text = "".join([symbol.text for symbol in word.symbols]) para_texts.append(word_text) word_info = {"text": word_text, "bounding_box": [[vertex.x, vertex.y] for vertex in word.bounding_box.vertices], "confidence": word.confidence} para_info["words"].append(word_info) para_info["text"] = " ".join(para_texts) block_texts.append(para_info["text"]) block_info["paragraphs"].append(para_info) block_info["text"] = "\n".join(block_texts) result["regions"].append(block_info) result["confidence"] = full_text_annotation.pages[0].confidence if full_text_annotation.pages else 0.0 if result["text"]: result["language"] = Utils.detect_language(result["text"]) return result except Exception as e: self.logger.error(f"Google OCR error: {str(e)}") return None def process_image(self, image_path, prefer_provider=None): providers = [] if prefer_provider: if prefer_provider == "azure" and self.azure_client: providers = ["azure", "google", "local"] elif prefer_provider == "google" and self.google_client: providers = ["google", "azure", "local"] else: providers = ["local", "azure", "google"] else: if self.azure_client: providers.append("azure") if self.google_client: providers.append("google") providers.append("local") for provider in providers: try: if provider == "azure": result = self.process_with_azure(image_path) if result: result["provider"] = "azure" return result elif provider == "google": result = self.process_with_google(image_path) if result: result["provider"] = "google" return result elif provider == "local": img = cv2.imread(image_path) if img is None: continue enhanced = self.enhance_for_ocr(img) text = pytesseract.image_to_string(enhanced, lang=CONFIG["DEFAULT_LANGUAGE"], config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}') boxes = pytesseract.image_to_data(enhanced, lang=CONFIG["DEFAULT_LANGUAGE"], config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}', output_type=pytesseract.Output.DICT) result = {"text": text, "regions": [], "provider": "local", "language": CONFIG["DEFAULT_LANGUAGE"], "confidence": 0.0} confidences = [] current_block = -1 current_region = {} current_lines = [] for i in range(len(boxes['text'])): if boxes['text'][i].strip(): confidences.append(boxes['conf'][i]) if boxes['block_num'][i] != current_block: if current_block != -1 and current_lines: current_region["text"] = "\n".join(current_lines) result["regions"].append(current_region) current_block = boxes['block_num'][i] current_region = {"bounding_box": [boxes['left'][i], boxes['top'][i], boxes['left'][i] + boxes['width'][i], boxes['top'][i] + boxes['height'][i]], "lines": []} current_lines = [] line_num = boxes['line_num'][i] if line_num >= len(current_lines): current_lines.append(boxes['text'][i]) else: current_lines[line_num] += " " + boxes['text'][i] if current_block != -1 and current_lines: current_region["text"] = "\n".join(current_lines) result["regions"].append(current_region) if confidences: valid_confidences = [c for c in confidences if c > 0] if valid_confidences: result["confidence"] = sum(valid_confidences) / len(valid_confidences) / 100.0 return result except Exception as e: self.logger.error(f"OCR with {provider} failed: {str(e)}") continue return None class ResourceManager: """کلاس مدیریت منابع سیستمی""" def __init__(self, max_memory_percent=CONFIG["MAX_MEMORY_PERCENT"], max_cpu_percent=CONFIG["MAX_CPU_PERCENT"], min_free_space_bytes=CONFIG["MIN_FREE_SPACE_BYTES"]): self.logger = logging.getLogger(__name__) self.max_memory_percent = max_memory_percent self.max_cpu_percent = max_cpu_percent self.min_free_space_bytes = min_free_space_bytes self.metrics_history = deque(maxlen=1000) self.last_cleanup = None self.resource_warnings = 0 self.is_cleaning = False self.lock = threading.Lock() self.stats = {'total_processed': 0, 'successful_extractions': 0, 'failed_extractions': 0} log_with_check("ResourceManager initialized") def get_current_metrics(self) -> ResourceMetrics: try: memory = psutil.virtual_memory() cpu = psutil.cpu_percent(interval=0.5) disk = psutil.disk_usage('/') metrics = ResourceMetrics(memory.percent, cpu, disk.percent, datetime.now()) with self.lock: self.metrics_history.append(metrics) return metrics except Exception as e: self.logger.error(f"Error getting metrics: {str(e)}") return ResourceMetrics(0.0, 0.0, 0.0, datetime.now()) def check_resources(self) -> bool: with self.lock: if self.is_cleaning: return False metrics = self.get_current_metrics() needs_cleanup = ( metrics.memory_usage > self.max_memory_percent or metrics.cpu_usage > self.max_cpu_percent or psutil.disk_usage('/').free < self.min_free_space_bytes ) if needs_cleanup: self.cleanup_resources() return True return False def cleanup_resources(self): if self.is_cleaning: return self.is_cleaning = True self.logger.info("Starting resource cleanup") try: gc.collect() self._cleanup_temp_files() self.last_cleanup = datetime.now() log_with_check("Resource cleanup completed") except Exception as e: self.logger.error(f"Error during cleanup: {str(e)}") log_with_check("Resource cleanup failed", False) finally: self.is_cleaning = False def _cleanup_temp_files(self): try: count = 0 for filename in os.listdir(CONFIG["TEMP_DIR"]): file_path = os.path.join(CONFIG["TEMP_DIR"], filename) if os.path.isfile(file_path): os.remove(file_path) count += 1 self.logger.info(f"Cleaned {count} temp files") except Exception as e: self.logger.error(f"Error cleaning temp files: {str(e)}") class LoadPredictor: def __init__(self, history_size: int = 1000, update_interval: int = 60): self.logger = logging.getLogger(__name__) self.history_size = history_size self.update_interval = update_interval self.resource_history = deque(maxlen=history_size) self.current_predictions = {'short_term': None, 'medium_term': None, 'long_term': None} self.thresholds = {'cpu_high': 80.0, 'memory_high': 99.5, 'prediction_confidence': 0.7} self.monitoring = False self.monitor_thread = None self._start_monitoring() log_with_check("LoadPredictor initialized") def _start_monitoring(self): self.monitoring = True self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.daemon = True self.monitor_thread.start() log_with_check("Monitoring thread started") def _monitor_loop(self): while self.monitoring: try: usage = self._collect_resource_usage() self.resource_history.append(usage) self._update_predictions() self._check_alerts() time.sleep(self.update_interval) except Exception as e: self.logger.error(f"Error in monitor loop: {str(e)}") log_with_check("Monitor loop error", False) def _collect_resource_usage(self) -> ResourceUsage: try: cpu_percent = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent return ResourceUsage(cpu_percent, memory_percent, None, datetime.now()) except Exception as e: self.logger.error(f"Error collecting resource usage: {str(e)}") return ResourceUsage(0.0, 0.0, None, datetime.now()) def _update_predictions(self): try: self.current_predictions['short_term'] = self._predict_load(timedelta(minutes=5)) self.current_predictions['medium_term'] = self._predict_load(timedelta(hours=1)) self.current_predictions['long_term'] = self._predict_load(timedelta(days=1)) except Exception as e: self.logger.error(f"Error updating predictions: {str(e)}") def _predict_load(self, time_window: timedelta) -> LoadPrediction: if len(self.resource_history) < 10: return LoadPrediction(50.0, 0.5, time_window, {}) try: history_array = np.array([[usage.cpu_percent, usage.memory_percent] for usage in self.resource_history]) x = np.arange(len(history_array)) cpu_trend = np.polyfit(x, history_array[:, 0], 2) memory_trend = np.polyfit(x, history_array[:, 1], 2) future_point = len(history_array) + time_window.total_seconds() / self.update_interval predicted_cpu = np.polyval(cpu_trend, future_point) predicted_memory = np.polyval(memory_trend, future_point) confidence = self._calculate_prediction_confidence(history_array, cpu_trend, memory_trend) return LoadPrediction((predicted_cpu + predicted_memory) / 2, confidence, time_window, {'cpu': predicted_cpu, 'memory': predicted_memory}) except Exception as e: self.logger.error(f"Error in load prediction: {str(e)}") return LoadPrediction(50.0, 0.3, time_window, {}) def _calculate_prediction_confidence(self, history: np.ndarray, cpu_trend: np.ndarray, memory_trend: np.ndarray) -> float: try: x = np.arange(len(history)) cpu_predictions = np.polyval(cpu_trend, x) memory_predictions = np.polyval(memory_trend, x) cpu_rmse = np.sqrt(np.mean((history[:, 0] - cpu_predictions) ** 2)) memory_rmse = np.sqrt(np.mean((history[:, 1] - memory_predictions) ** 2)) max_rmse = 50.0 cpu_confidence = max(0.0, min(1.0, 1.0 - cpu_rmse / max_rmse)) memory_confidence = max(0.0, min(1.0, 1.0 - memory_rmse / max_rmse)) return 0.6 * cpu_confidence + 0.4 * memory_confidence except Exception as e: self.logger.error(f"Error calculating prediction confidence: {str(e)}") return 0.5 def _check_alerts(self): current_usage = self.resource_history[-1] if self.resource_history else None if not current_usage: return alerts = [] if current_usage.cpu_percent > self.thresholds['cpu_high']: alerts.append({'type': 'high_cpu', 'value': current_usage.cpu_percent}) if current_usage.memory_percent > self.thresholds['memory_high']: alerts.append({'type': 'high_memory', 'value': current_usage.memory_percent}) if alerts: self._handle_alerts(alerts) def _handle_alerts(self, alerts: List[Dict]): for alert in alerts: self.logger.warning(f"هشدار: {alert['type']} در سطح {alert['value']}%") def get_current_load(self) -> Dict: current_usage = self.resource_history[-1] if self.resource_history else None if not current_usage: return {} return { 'current': { 'cpu': current_usage.cpu_percent, 'memory': current_usage.memory_percent, 'timestamp': current_usage.timestamp.isoformat() }, 'predictions': { name: { 'load': pred.expected_load, 'confidence': pred.confidence, 'resources': pred.details } if pred else None for name, pred in self.current_predictions.items() } } class DocumentProcessor: def __init__(self, config=None): self.config = config or ProcessingConfig() self.logger = logging.getLogger(__name__) self.cache = CacheManager() self.normalizer = Normalizer() self.learner = AdaptiveLearner(model_dir=CONFIG["MODEL_DIR"]) self.img_processor = ImageProcessor() self.cloud_ocr = CloudOCRProvider() self.resource_manager = ResourceManager() self.load_predictor = LoadPredictor() self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers) init_directories() self.setup_tesseract() log_with_check(f"DocumentProcessor initialized (version {CONFIG['VERSION']})") def setup_tesseract(self): try: tesseract_installed = False try: version = subprocess.check_output([CONFIG["TESSERACT_CMD"], '--version'], text=True) self.logger.info(f"Tesseract found: {version.split()[0]}") tesseract_installed = True log_with_check("Tesseract found") except subprocess.CalledProcessError: self.logger.warning(f"Tesseract not found in {CONFIG['TESSERACT_CMD']}, attempting to download data and install via system commands") log_with_check("Tesseract not found, attempting installation", False) # روش اول: نصب از طریق subprocess.check_call try: subprocess.check_call(['apt-get', 'update', '-y', '--fix-missing', '--allow-downgrades']) subprocess.check_call(['apt-get', 'install', 'tesseract-ocr', '-y', '--no-install-recommends', '--fix-missing', '--allow-downgrades']) os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"]) pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"] version = pytesseract.get_tesseract_version() self.logger.info(f"Tesseract installed via Method 1, version: {version}") tesseract_installed = True log_with_check("Tesseract installed via Method 1") except subprocess.CalledProcessError as e: self.logger.warning(f"Method 1 failed: {str(e)}, attempting Method 2") log_with_check("Tesseract installation Method 1 failed", False) # روش دوم: استفاده از os.system os.system('chmod 777 /tmp') os.system('apt-get update -y --fix-missing --allow-downgrades') os.system('apt-get install tesseract-ocr -y --no-install-recommends --fix-missing --allow-downgrades') os.system('pip install -q pytesseract') if __name__ == "__main__": print("YES") os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"]) pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"] try: version = pytesseract.get_tesseract_version() self.logger.info(f"Tesseract installed via Method 2, version: {version}") tesseract_installed = True log_with_check("Tesseract installed via Method 2") except Exception as e: self.logger.error(f"Method 2 failed: {str(e)}. Continuing with downloaded data if available.") log_with_check("Tesseract installation Method 2 failed", False) if not tesseract_installed and os.path.exists(CONFIG["TESSDATA_LOCAL"]): os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"]) self.logger.warning("Using downloaded Tesseract data without executable") log_with_check("Using downloaded Tesseract data", False) self._download_tesseract_data() try: version = pytesseract.get_tesseract_version() self.logger.info(f"Tesseract is ready, version: {version}") log_with_check("Tesseract setup completed") return True except Exception as e: self.logger.error(f"Tesseract setup failed: {str(e)}. Continuing with downloaded data if available.") log_with_check("Tesseract setup failed", False) return False except Exception as e: self.logger.error(f"Tesseract setup failed: {str(e)}") log_with_check("Tesseract setup failed due to exception", False) return False def _download_tesseract_data(self): try: os.makedirs(CONFIG["TESSDATA_LOCAL"], exist_ok=True) base_url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/" languages = ["fas", "eng", "ara"] import tqdm for lang in languages: lang_file = os.path.join(CONFIG["TESSDATA_LOCAL"], f"{lang}.traineddata") if not os.path.exists(lang_file): self.logger.info(f"Downloading {lang}.traineddata...") response = requests.get(f"{base_url}{lang}.traineddata", stream=True) response.raise_for_status() with open(lang_file, 'wb') as f: total_size = int(response.headers.get('content-length', 0)) with tqdm.tqdm(total=total_size, unit='B', unit_scale=True, desc=lang) as pbar: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) pbar.update(len(chunk)) self.logger.info(f"Downloaded {lang}.traineddata successfully") log_with_check(f"Downloaded {lang}.traineddata") except Exception as e: self.logger.error(f"Error downloading Tesseract data: {str(e)}") log_with_check("Failed to download Tesseract data", False) @timed def process_pdf(self, pdf_path) -> Dict: self.logger.info(f"Processing PDF: {pdf_path} - Start") log_with_check(f"Starting PDF processing for {pdf_path}") if self.resource_manager.check_resources(): self.logger.warning("Resources checked and cleaned during PDF processing") log_with_check("Resources checked and cleaned") try: doc = fitz.open(pdf_path) result = { "title": os.path.basename(pdf_path), "page_count": len(doc), "pages": [], "metadata": self._extract_pdf_metadata(doc), "toc": self._extract_toc(doc) } futures = [] for page_num in range(len(doc)): future = self.executor.submit(self._process_page, doc, page_num) futures.append(future) for idx, future in enumerate(as_completed(futures)): page_result = future.result() result["pages"].append(page_result) self.logger.debug(f"Completed page {idx+1}/{len(doc)}") log_with_check(f"Completed page {idx+1}/{len(doc)}") gc.collect() # آزادسازی حافظه پس از هر صفحه result["pages"].sort(key=lambda x: x["page_num"]) result["structure"] = self._analyze_document_structure(result) self.logger.info(f"PDF processing complete: {pdf_path} - End") log_with_check(f"PDF processing completed for {pdf_path}") self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['successful_extractions'] += 1 gc.collect() # آزادسازی حافظه پس از پردازش کامل return result except Exception as e: self.logger.error(f"Error processing PDF: {str(e)}") log_with_check(f"Error processing PDF: {str(e)}", False) self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['failed_extractions'] += 1 return {"error": str(e), "traceback": traceback.format_exc()} def _extract_pdf_metadata(self, doc) -> Dict: metadata = {} for key, value in doc.metadata.items(): if value: metadata[key] = value return metadata def _extract_toc(self, doc) -> List[Dict]: toc = [] try: raw_toc = doc.get_toc() for level, title, page in raw_toc: toc.append({"level": level, "title": title, "page": page}) except: pass return toc def _process_page(self, doc, page_num: int) -> Dict: self.logger.debug(f"Processing page {page_num+1} - Start") log_with_check(f"Starting page {page_num+1} processing") page = doc[page_num] cache_key = f"page:{doc.name}:{page_num}:{hash(page.get_text())}" cached_result = self.cache.get(cache_key) if cached_result and self.config.use_caching: self.logger.debug(f"Using cached result for page {page_num+1}") log_with_check(f"Using cached result for page {page_num+1}") return cached_result result = {"page_num": page_num + 1, "width": page.rect.width, "height": page.rect.height, "text_regions": []} image_list = [] try: pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) image_list.append(img) if not image_list or img.size == 0: with temp_file(".pdf") as temp_pdf: with open(temp_pdf, "wb") as f: doc.save(f) page_images = convert_from_path(temp_pdf, first_page=page_num+1, last_page=page_num+1, dpi=75) if page_images: np_image = np.array(page_images[0]) cv_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) image_list.append(cv_image) except Exception as e: self.logger.error(f"Error extracting page image: {str(e)}") log_with_check(f"Error extracting page {page_num+1} image: {str(e)}", False) for img in image_list: regions = self._perform_ocr(img) result["text_regions"].extend(regions) raw_text = page.get_text() if raw_text and raw_text.strip(): result["text_direct"] = raw_text if self.config.use_caching: self.cache.set(cache_key, result) self.logger.debug(f"Processing page {page_num+1} - End") log_with_check(f"Completed page {page_num+1} processing") gc.collect() # آزادسازی حافظه پس از پردازش هر صفحه return result def _perform_ocr(self, image: np.ndarray) -> List[Dict]: cache_key = self.cache.generate_key(image) cached_result = self.cache.get(cache_key) if cached_result and self.config.use_caching: return cached_result regions = [] processed_img = self.img_processor.enhance_for_ocr(image) if self.config.use_preprocessing else image.copy() try: cloud_result = None if self.cloud_ocr.is_available() and self.config.use_distributed: with temp_file(".png") as temp_img: cv2.imwrite(temp_img, processed_img) cloud_result = self.cloud_ocr.process_image(temp_img) if cloud_result: for region in cloud_result.get("regions", []): region_text = region.get("text", "").strip() if region_text: normalized_text = self.normalizer.normalize(region_text) corrected_text, corr_conf = self.learner.apply_corrections( normalized_text, {"language": self.config.language} ) regions.append({ "content": corrected_text, "confidence": cloud_result.get("confidence", 0.8) * corr_conf, "language": cloud_result.get("language", self.config.language), "direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr", "provider": cloud_result.get("provider", "cloud"), "bounding_box": region.get("bounding_box") }) else: ocr_result = pytesseract.image_to_data( processed_img, lang=self.config.language, config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}', output_type=pytesseract.Output.DICT ) current_block = -1 current_text = [] current_conf = [] for i in range(len(ocr_result['text'])): text = ocr_result['text'][i].strip() conf = int(ocr_result['conf'][i]) block_num = ocr_result['block_num'][i] if not text: continue if block_num != current_block: if current_block != -1 and current_text: full_text = ' '.join(current_text) avg_conf = sum(current_conf) / len(current_conf) if current_conf else 0 normalized_text = self.normalizer.normalize(full_text) corrected_text, corr_conf = self.learner.apply_corrections( normalized_text, {"language": self.config.language} ) regions.append({ "content": corrected_text, "confidence": min(avg_conf * corr_conf / 100, 100), "language": self.config.language, "direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr", "provider": "tesseract" }) current_block = block_num current_text = [] current_conf = [] current_text.append(text) current_conf.append(conf) if current_text: full_text = ' '.join(current_text) avg_conf = sum(current_conf) / len(current_conf) if current_conf else 0 normalized_text = self.normalizer.normalize(full_text) corrected_text, corr_conf = self.learner.apply_corrections( normalized_text, {"language": self.config.language} ) regions.append({ "content": corrected_text, "confidence": min(avg_conf * corr_conf / 100, 100), "language": self.config.language, "direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr", "provider": "tesseract" }) except Exception as e: self.logger.error(f"OCR error: {str(e)}") log_with_check("OCR failed", False) if self.config.use_caching: self.cache.set(cache_key, regions) return regions def _analyze_document_structure(self, doc_data: Dict) -> Dict: structure = { "title": doc_data.get("title", ""), "language": self.config.language, "page_count": doc_data.get("page_count", 0), "sections": [], "summary": "" } all_text = [] for page in doc_data.get("pages", []): page_text = [] for region in page.get("text_regions", []): if region.get("confidence", 0) >= self.config.confidence_threshold: page_text.append(region.get("content", "")) if page_text: all_text.append(" ".join(page_text)) toc = doc_data.get("toc", []) if toc: for item in toc: section = {"title": item.get("title", ""), "level": item.get("level", 1), "page": item.get("page", 1)} structure["sections"].append(section) if all_text: combined_text = "\n".join(all_text) structure["summary"] = combined_text[:500] + "..." if len(combined_text) > 500 else combined_text return structure def _process_image_task(self, data: Dict) -> Dict: try: image_path = data['image_path'] settings = data.get('settings', {}) result = self.extract_text_from_image(image_path) self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['successful_extractions'] += 1 log_with_check(f"Image task completed for {image_path}") return {"status": "success", "result": result} except Exception as e: self.logger.error(f"Error processing image task: {str(e)}") self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['failed_extractions'] += 1 log_with_check(f"Error processing image task: {str(e)}", False) return {"status": "error", "error": str(e)} def _process_document_task(self, data: Dict) -> Dict: try: pdf_path = data['pdf_path'] settings = data.get('settings', {}) result = self.process_pdf(pdf_path) self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['successful_extractions'] += 1 log_with_check(f"Document task completed for {pdf_path}") return {"status": "success", "result": result} except Exception as e: self.logger.error(f"Error processing document task: {str(e)}") self.resource_manager.stats['total_processed'] += 1 self.resource_manager.stats['failed_extractions'] += 1 log_with_check(f"Error processing document task: {str(e)}", False) return {"status": "error", "error": str(e)} def extract_text_from_image(self, image_path: str) -> Dict: try: img = cv2.imread(image_path) if img is None: log_with_check(f"Failed to load image: {image_path}", False) return {"error": "Could not load image"} regions = self._perform_ocr(img) log_with_check(f"Extracted text from image: {image_path}") return {"regions": regions, "text": "\n".join([r.get("content", "") for r in regions])} except Exception as e: self.logger.error(f"Error extracting text from image: {str(e)}") log_with_check(f"Error extracting text from image: {str(e)}", False) return {"error": str(e)} def batch_process(self, file_paths: List[str], output_dir: str = None) -> Dict: output_dir = output_dir or CONFIG["OUTPUT_DIR"] os.makedirs(output_dir, exist_ok=True) results = {} for file_path in file_paths: file_ext = Utils.get_file_extension(file_path) file_name = os.path.basename(file_path) output_file = os.path.join(output_dir, file_name.replace(file_ext, ".json")) if file_ext.lower() in [".pdf"]: result = self.process_pdf(file_path) elif file_ext.lower() in [".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".webp"]: result = self.extract_text_from_image(file_path) else: result = {"error": f"Unsupported file type: {file_ext}"} with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) results[file_name] = {"status": "success" if "error" not in result else "error"} log_with_check(f"Processed {file_name} to {output_file}") return results def extract_text_from_image(image_path: str, output_format: str = "text") -> Dict: processor = DocumentProcessor() regions = processor.extract_text_from_image(image_path) if "error" in regions: return regions if output_format == "text": return {"text": regions["text"]} return regions def extract_text_from_pdf(pdf_path: str, output_format: str = "text") -> Dict: processor = DocumentProcessor() result = processor.process_pdf(pdf_path) if "error" in result: return result if output_format == "text": text = "\n".join([region["content"] for page in result["pages"] for region in page["text_regions"] if "content" in region]) return {"text": text} return result def launch_ui(): processor = DocumentProcessor() def process_file(file, output_format): if file is None: logging.info("No file uploaded") return "لطفاً فایلی آپلود کنید", None if not hasattr(file, 'name') or file.name is None: logging.warning("Invalid file object") return "فایل نامعتبر است", None try: start_time = time.time() logging.info(f"Starting process for file: {file.name}") if file.name.endswith(('.jpg', '.png')): result = processor.extract_text_from_image(file.name) elif file.name.endswith('.pdf'): result = processor.process_pdf(file.name) else: logging.error(f"Unsupported file type: {file.name}") return "نوع فایل پشتیبانی نمیشود", None elapsed_time = time.time() - start_time if "error" in result: logging.error(f"Error processing file {file.name}: {result['error']}") return f"خطا: {result['error']}", None logging.info(f"Processing completed for {file.name} in {elapsed_time:.2f} seconds") output = result.get("text", "متن یافت نشد") if output_format == "text" else json.dumps(result, ensure_ascii=False, indent=2) output_file = os.path.join(CONFIG["TEMP_DIR"], f"{os.path.splitext(file.name)[0]}_output.txt" if output_format == "text" else f"{os.path.splitext(file.name)[0]}_output.json") with open(output_file, "w", encoding="utf-8") as f: f.write(output) log_with_check(f"Output file created: {output_file}") gc.collect() # آزادسازی حافظه پس از ایجاد فایل return gr.update(value=output_file, label="دانلود خروجی", visible=True), output except Exception as e: logging.error(f"Unexpected error processing file {file.name}: {str(e)}") return f"خطای غیرمنتظره: {str(e)}", None # ارتقای رابط کاربری با تم و چیدمان بهتر with gr.Blocks(title=CONFIG["UI_TITLE"], theme=gr.themes.Soft()) as demo: gr.Markdown( f""" # {CONFIG['UI_TITLE']}
سیستمی برای استخراج متن از تصاویر و PDFها با دقت بالا
""", elem_id="title" ) with gr.Row(elem_classes="main-row"): with gr.Column(scale=1, elem_classes="input-column"): file_input = gr.File(label="فایل را آپلود کنید", file_types=[".pdf", ".jpg", ".png", ".jpeg"]) output_format = gr.Dropdown(choices=["text", "json"], label="فرمت خروجی", value="text") submit_btn = gr.Button("پردازش", variant="primary") download_output = gr.File(label="دانلود خروجی", visible=False) with gr.Column(scale=2, elem_classes="output-column"): output_text = gr.Textbox(label="متن استخراجشده", lines=10, interactive=False) # اسکریپت جاوااسکریپت برای دانلود demo.js = """ function (file, output) { if (file) { const link = document.createElement('a'); link.href = URL.createObjectURL(file); link.download = file.name; document.body.appendChild(link); link.click(); document.body.removeChild(link); URL.revokeObjectURL(link.href); } return [file, output]; } """ # استایلهای سفارشی demo.css = """ .main-row { padding: 20px; background-color: #f5f5f5; border-radius: 10px; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); } .input-column, .output-column { padding: 10px; animation: fadeIn 0.5s ease-in; } @keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } } #title { font-family: 'Arial', sans-serif; font-size: 24px; margin-bottom: 20px; } """ submit_btn.click( fn=process_file, inputs=[file_input, output_format], outputs=[download_output, output_text] ).then( fn=lambda x: gr.update(visible=True) if x else gr.update(visible=False), inputs=[download_output], outputs=[download_output] ) try: demo.launch(server_name="0.0.0.0", server_port=7860) log_with_check("Gradio interface launched") except Exception as e: logging.error(f"Failed to launch Gradio interface: {str(e)}") log_with_check("Gradio interface launch failed", False) def setup_signal_handlers(): def signal_handler(sig, frame): logging.info("Shutdown signal received, cleaning up...") log_with_check("Starting shutdown cleanup") try: for filename in os.listdir(CONFIG["TEMP_DIR"]): file_path = os.path.join(CONFIG["TEMP_DIR"], filename) if os.path.isfile(file_path): file_age = time.time() - os.path.getmtime(file_path) if file_age > 3600: os.remove(file_path) gc.collect() # آزادسازی حافظه در زمان تعطیلی except: pass log_with_check("Cleanup completed") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def check_dependencies(): missing = [] try: pytesseract.get_tesseract_version() logging.info("Tesseract installed") log_with_check("Tesseract dependency checked") except: missing.append("Tesseract OCR") log_with_check("Tesseract dependency check failed", False) logging.info(f"Missing dependencies: {', '.join(missing)}" if missing else "All dependencies are present") log_with_check("Dependencies check completed") def install_tesseract_method1(): """روش اول: نصب Tesseract با استفاده از subprocess.check_call""" try: subprocess.check_call(['apt-get', 'update', '-y', '--fix-missing', '--allow-downgrades']) subprocess.check_call(['apt-get', 'install', 'tesseract-ocr', '-y', '--no-install-recommends', '--fix-missing', '--allow-downgrades', '--fix-broken']) os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"]) pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"] version = pytesseract.get_tesseract_version() logging.info(f"Tesseract installed via Method 1, version: {version}") log_with_check("Tesseract installed via Method 1") return True except subprocess.CalledProcessError as e: logging.warning(f"Method 1 failed: {str(e)}") log_with_check("Tesseract installation Method 1 failed", False) return False def install_tesseract_method2(): """روش دوم: نصب Tesseract با استفاده از os.system""" logging.warning("Attempting Method 2: manual installation") log_with_check("Attempting Method 2 installation", False) os.system('chmod 777 /tmp') os.system('apt-get update -y --fix-missing --allow-downgrades') os.system('apt-get install tesseract-ocr -y --no-install-recommends --fix-missing --allow-downgrades --fix-broken') os.system('pip install -q pytesseract') if __name__ == "__main__": print("YES") os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"]) pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"] try: version = pytesseract.get_tesseract_version() logging.info(f"Tesseract installed via Method 2, version: {version}") log_with_check("Tesseract installed via Method 2") return True except Exception as e: logging.error(f"Method 2 failed: {str(e)}") log_with_check("Tesseract installation Method 2 failed", False) return False def setup_tesseract_alternative(): """نصب Tesseract با دو روش به صورت متوالی""" log_with_check("Starting Tesseract setup") if not install_tesseract_method1(): install_tesseract_method2() if __name__ == "__main__": setup_signal_handlers() setup_tesseract_alternative() # ابتدا Tesseract را نصب میکنیم check_dependencies() # سپس وابستگیها را چک میکنیم try: launch_ui() except Exception as e: logging.error(f"Application failed to start: {str(e)}") log_with_check("Application startup failed", False)