PersianOCR / app.py
Really-amin's picture
Update app.py
5698086 verified
"""
ماژول یکپارچه سیستم OCR پارسی
این کد شامل تنظیمات، ابزارهای کمکی، پردازش اسناد و تصاویر، و رابط کاربری Gradio با بهبودهای جدید است.
طراحی شده برای اجرا روی CPU بدون نیاز به GPU.
"""
import os
import re
import gc
import time
import json
import uuid
import logging
import hashlib
import traceback
import threading
import subprocess
import contextlib
import signal
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache, wraps
from difflib import SequenceMatcher
from collections import deque
from dataclasses import dataclass, field
import cv2
import numpy as np
import fitz # PyMuPDF
import pytesseract
import requests
from pdf2image import convert_from_path
from PIL import Image, ImageEnhance, ImageFilter
import gradio as gr
import multiprocessing
import psutil
import platform
import socket
import shutil
import glob
from pathlib import Path
from colorama import Fore, Style, init
# تنظیمات colorama
init()
# تنظیمات پیکربندی
CONFIG = {
"TESSDATA_LOCAL": os.environ.get("TESSDATA_PATH", "tessdata"),
"TESSERACT_CMD": os.environ.get("TESSERACT_CMD", "/usr/bin/tesseract"),
"CACHE_DIR": os.environ.get("CACHE_DIR", "cache"),
"MODEL_DIR": os.environ.get("MODEL_DIR", "models"),
"LOG_DIR": os.environ.get("LOG_DIR", "logs"),
"TEMP_DIR": os.environ.get("TEMP_DIR", "temp"),
"OUTPUT_DIR": os.environ.get("OUTPUT_DIR", "output"),
"DATASET_DIR": os.environ.get("DATASET_DIR", "datasets"),
"MAX_CACHE_SIZE_MB": int(os.environ.get("MAX_CACHE_SIZE_MB", "2048")),
"MAX_WORKERS": min(multiprocessing.cpu_count(), 1),
"MAX_MEMORY_PERCENT": float(os.environ.get("MAX_MEMORY_PERCENT", "99.5")), # آستانه حافظه
"MAX_CPU_PERCENT": float(os.environ.get("MAX_CPU_PERCENT", "80.0")),
"MIN_FREE_SPACE_BYTES": 1024**3,
"DEFAULT_LANGUAGE": os.environ.get("DEFAULT_LANGUAGE", "fas"),
"SUPPORTED_LANGUAGES": ["fas", "eng", "ara", "eng+fas", "fas+eng", "ara+fas"],
"CONFIDENCE_THRESHOLD": float(os.environ.get("CONFIDENCE_THRESHOLD", "0.65")),
"PAGE_SEGMENTATION_MODE": os.environ.get("PAGE_SEGMENTATION_MODE", "3"),
"OCR_ENGINE_MODE": os.environ.get("OCR_ENGINE_MODE", "1"),
"AZURE_API_KEY": os.environ.get("AZURE_API_KEY", ""),
"AZURE_ENDPOINT": os.environ.get("AZURE_ENDPOINT", ""),
"GOOGLE_APPLICATION_CREDENTIALS": os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""),
"DEBUG_MODE": os.environ.get("DEBUG_MODE", "false").lower() == "true",
"VERSION": "2.0.1",
"UI_TITLE": "سیستم OCR پارسی"
}
# تنظیم لاگ‌ها با رنگی‌سازی
logging.basicConfig(level=logging.INFO, format=f"{Fore.GREEN}%(asctime)s - %(name)s - %(levelname)s - {Fore.RESET}%(message)s")
def log_with_check(message, success=True):
"""لاگ با تیک یا علامت خطا و جداسازی با خط چین"""
separator = f"{Fore.YELLOW}---{Style.RESET_ALL}"
check = f"{Fore.GREEN}{Style.RESET_ALL}" if success else f"{Fore.RED}{Style.RESET_ALL}"
logging.info(f"{separator}\n{check} {message}\n{separator}")
def init_directories():
"""ایجاد دایرکتوری‌های مورد نیاز"""
directories = [
CONFIG["CACHE_DIR"], CONFIG["MODEL_DIR"], CONFIG["LOG_DIR"],
CONFIG["TEMP_DIR"], CONFIG["OUTPUT_DIR"], CONFIG["DATASET_DIR"],
CONFIG["TESSDATA_LOCAL"]
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
log_with_check("Directories initialized")
def check_optional_libs():
"""بررسی دسترسی به کتابخانه‌های اختیاری"""
CONFIG["AZURE_OCR_AVAILABLE"] = False
CONFIG["GOOGLE_OCR_AVAILABLE"] = False
try:
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
CONFIG["AZURE_OCR_AVAILABLE"] = True
except ImportError:
pass
try:
from google.cloud import vision
CONFIG["GOOGLE_OCR_AVAILABLE"] = True
except ImportError:
pass
log_with_check("Optional libraries checked")
# دکوراتورها
def timed(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
logging.debug(f"{func.__name__} took {time.time() - start_time:.2f} seconds")
return result
return wrapper
@contextlib.contextmanager
def temp_file(suffix=None):
"""مدیریت فایل‌های موقت"""
temp_path = os.path.join(CONFIG["TEMP_DIR"], f"temp_{uuid.uuid4()}{suffix or ''}")
try:
yield temp_path
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
class Utils:
@staticmethod
def detect_language(text):
if any('\u0600' <= c <= '\u06FF' for c in text):
return 'fa'
return 'en'
@staticmethod
def is_rtl_language(lang_code):
return lang_code in ['fa', 'fas', 'ar', 'ara']
@staticmethod
def get_file_extension(file_path):
return os.path.splitext(file_path)[1].lower()
@staticmethod
def get_human_readable_size(size, decimal_places=2):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024.0:
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
@staticmethod
def retry(max_attempts=3, delay=1, exceptions=(Exception,)):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
logging.warning(f"Retry {attempt+1}/{max_attempts} for {func.__name__} after error: {str(e)}")
time.sleep(delay)
return wrapper
return decorator
@dataclass
class ResourceUsage:
cpu_percent: float
memory_percent: float
gpu_memory_percent: Optional[float] = None
timestamp: datetime = None
@dataclass
class LoadPrediction:
expected_load: float
confidence: float
time_window: timedelta
details: Dict
@dataclass
class ResourceMetrics:
memory_usage: float
cpu_usage: float
disk_usage: float
timestamp: datetime
@dataclass
class ScanRegion:
content: str
confidence: float
language: str = 'unknown'
direction: str = 'ltr'
@dataclass
class DocumentStructure:
title: str
language: str
page_count: int
toc: Dict
sections: List[Dict]
metadata: Dict
attributes: Dict
@dataclass
class ProcessingConfig:
use_preprocessing: bool = True
use_caching: bool = True
max_workers: int = CONFIG["MAX_WORKERS"]
language: str = CONFIG["DEFAULT_LANGUAGE"]
confidence_threshold: float = CONFIG["CONFIDENCE_THRESHOLD"]
class CacheManager:
def __init__(self, cache_dir=CONFIG["CACHE_DIR"], max_cache_size_mb=CONFIG["MAX_CACHE_SIZE_MB"]):
self.cache_dir = cache_dir
self.max_cache_size_mb = max_cache_size_mb
self.cache_entries = {}
self.access_history = deque(maxlen=1000)
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
os.makedirs(self.cache_dir, exist_ok=True)
self._load_index()
log_with_check("CacheManager initialized")
@lru_cache(maxsize=1024)
def _hash_key(self, key: str) -> str:
return hashlib.md5(key.encode('utf-8')).hexdigest()
def _cache_path(self, key_hash: str) -> str:
return os.path.join(self.cache_dir, f"{key_hash}.cache")
def _load_index(self):
index_path = os.path.join(self.cache_dir, "index.json")
try:
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
self.cache_entries = cache_data.get('entries', {})
log_with_check(f"Loaded {len(self.cache_entries)} cache entries")
except Exception as e:
self.logger.error(f"Error loading cache index: {str(e)}")
log_with_check("Failed to load cache index", False)
def _save_index(self):
index_path = os.path.join(self.cache_dir, "index.json")
try:
with open(index_path, 'w', encoding='utf-8') as f:
json.dump({'entries': self.cache_entries}, f)
log_with_check("Cache index saved")
except Exception as e:
self.logger.error(f"Error saving cache index: {str(e)}")
log_with_check("Failed to save cache index", False)
def get(self, key: str) -> Any:
with self.lock:
key_hash = self._hash_key(key)
if key_hash not in self.cache_entries:
return None
cache_path = self._cache_path(key_hash)
if not os.path.exists(cache_path):
del self.cache_entries[key_hash]
self._save_index()
return None
try:
with open(cache_path, 'rb') as f:
import pickle
result = pickle.load(f)
self.cache_entries[key_hash]['last_access'] = datetime.now().isoformat()
self.access_history.append(key_hash)
return result
except Exception as e:
self.logger.error(f"Error retrieving from cache: {str(e)}")
return None
def set(self, key: str, value: Any, expire_seconds: int = 86400) -> bool:
with self.lock:
if self._check_cache_size() > self.max_cache_size_mb:
self._clean_cache()
key_hash = self._hash_key(key)
cache_path = self._cache_path(key_hash)
try:
with open(cache_path, 'wb') as f:
import pickle
pickle.dump(value, f)
now = datetime.now()
self.cache_entries[key_hash] = {
'key': key,
'created': now.isoformat(),
'last_access': now.isoformat(),
'expires': (now + timedelta(seconds=expire_seconds)).isoformat(),
'size': os.path.getsize(cache_path)
}
self.access_history.append(key_hash)
self._save_index()
return True
except Exception as e:
self.logger.error(f"Error storing in cache: {str(e)}")
return False
def _check_cache_size(self) -> float:
total_size = sum(entry.get('size', 0) for entry in self.cache_entries.values())
return total_size / (1024 * 1024)
def _clean_cache(self):
with self.lock:
now = datetime.now()
expired_keys = [k for k, v in self.cache_entries.items() if datetime.fromisoformat(v['expires']) < now]
for key in expired_keys:
self._remove_item(key)
if self._check_cache_size() > self.max_cache_size_mb * 0.8:
access_counts = {key: self.access_history.count(key) for key in set(self.access_history)}
to_remove = sorted(
[k for k in self.cache_entries.keys() if k not in expired_keys],
key=lambda k: access_counts.get(k, 0)
)
for key in to_remove:
self._remove_item(key)
if self._check_cache_size() < self.max_cache_size_mb * 0.7:
break
def _remove_item(self, key_hash: str):
try:
cache_path = self._cache_path(key_hash)
if os.path.exists(cache_path):
os.remove(cache_path)
if key_hash in self.cache_entries:
del self.cache_entries[key_hash]
except Exception as e:
self.logger.error(f"Error removing cache item: {str(e)}")
def clear(self):
with self.lock:
for key_hash in list(self.cache_entries.keys()):
self._remove_item(key_hash)
self.cache_entries = {}
self.access_history.clear()
self._save_index()
def generate_key(self, image: np.ndarray) -> str:
small_img = cv2.resize(image, (32, 32))
if len(small_img.shape) == 3:
small_img = cv2.cvtColor(small_img, cv2.COLOR_BGR2GRAY)
img_hash = hashlib.md5(small_img.tobytes()).hexdigest()
return f"img:{img_hash}"
class Normalizer:
def __init__(self):
self.logger = logging.getLogger(__name__)
log_with_check("Normalizer initialized")
self.char_mappings = {
'ك': 'ک', 'ي': 'ی', 'أ': 'ا', 'إ': 'ا', 'آ': 'ا', 'ة': 'ه',
'٠': '0', '١': '1', '٢': '2', '٣': '3', '٤': '4',
'٥': '5', '٦': '6', '٧': '7', '٨': '8', '٩': '9',
'۰': '0', '۱': '1', '۲': '2', '۳': '3', '۴': '4',
'۵': '5', '۶': '6', '۷': '7', '۸': '8', '۹': '9'
}
self.space_patterns = [
(r'\s+', ' '), (r'ـ+', ''), (r'[.]{2,}', '...'),
(r'[\u200c\u200f\u200e]+', '\u200c')
]
def normalize(self, text: str, normalize_chars: bool = True, normalize_spaces: bool = True) -> str:
if not text:
return text
result = text
if normalize_chars:
for src, dst in self.char_mappings.items():
result = result.replace(src, dst)
if normalize_spaces:
for pattern, replacement in self.space_patterns:
result = re.sub(pattern, replacement, result)
return result.strip()
class AdaptiveLearner:
def __init__(self, model_dir: str = CONFIG["MODEL_DIR"]):
os.makedirs(model_dir, exist_ok=True)
self.model_dir = model_dir
self.corrections = {}
self.confidence_threshold = CONFIG["CONFIDENCE_THRESHOLD"]
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
self._load_corrections()
log_with_check("AdaptiveLearner initialized")
def _load_corrections(self):
corrections_path = os.path.join(self.model_dir, "corrections.json")
try:
if os.path.exists(corrections_path):
with open(corrections_path, 'r', encoding='utf-8') as f:
self.corrections = json.load(f)
log_with_check(f"Loaded {sum(len(v) for k, v in self.corrections.items() if isinstance(v, dict))} text corrections")
else:
log_with_check("No corrections file found", False)
except Exception as e:
self.logger.error(f"Error loading corrections: {str(e)}")
log_with_check("Failed to load corrections", False)
self.corrections = {}
def _save_corrections(self):
with self.lock:
corrections_path = os.path.join(self.model_dir, "corrections.json")
try:
with open(corrections_path, 'w', encoding='utf-8') as f:
json.dump(self.corrections, f, ensure_ascii=False, indent=2)
log_with_check("Corrections saved")
except Exception as e:
self.logger.error(f"Error saving corrections: {str(e)}")
log_with_check("Failed to save corrections", False)
def apply_corrections(self, text: str, context: Dict) -> Tuple[str, float]:
if not text:
return text, 1.0
language = context.get('language', 'unknown')
corrected_text = text
confidence = 1.0
for pattern, replacement in self.corrections.get('general', {}).items():
corrected_text = re.sub(pattern, replacement, corrected_text)
for pattern, replacement in self.corrections.get(language, {}).items():
corrected_text = re.sub(pattern, replacement, corrected_text)
if text != corrected_text:
similarity = SequenceMatcher(None, text, corrected_text).ratio()
confidence = similarity
return corrected_text, confidence
def learn_correction(self, original: str, corrected: str, context: Dict):
if original == corrected:
return
with self.lock:
language = context.get('language', 'general')
if language not in self.corrections:
self.corrections[language] = {}
if len(original) > 10:
matcher = SequenceMatcher(None, original, corrected)
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'replace':
pattern = re.escape(original[i1:i2])
replacement = corrected[j1:j2]
self.corrections[language][pattern] = replacement
else:
self.corrections[language][re.escape(original)] = corrected
self._save_corrections()
class ImageProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
log_with_check("ImageProcessor initialized")
@staticmethod
def deskew(image: np.ndarray) -> np.ndarray:
try:
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
coords = np.column_stack(np.where(thresh > 0))
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(image, matrix, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
return rotated
except Exception as e:
logging.error(f"Deskew error: {str(e)}")
return image
@staticmethod
def remove_noise(image: np.ndarray) -> np.ndarray:
try:
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
return cv2.medianBlur(gray, 3)
except Exception as e:
logging.error(f"Noise removal error: {str(e)}")
return image
@staticmethod
def adjust_contrast(image: np.ndarray, factor=1.5) -> np.ndarray:
try:
if len(image.shape) == 3:
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
else:
pil_image = Image.fromarray(image)
enhancer = ImageEnhance.Contrast(pil_image)
enhanced_img = enhancer.enhance(factor)
if len(image.shape) == 3:
return cv2.cvtColor(np.array(enhanced_img), cv2.COLOR_RGB2BGR)
return np.array(enhanced_img)
except Exception as e:
logging.error(f"Contrast adjustment error: {str(e)}")
return image
@staticmethod
def sharpen(image: np.ndarray) -> np.ndarray:
try:
if len(image.shape) == 3:
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
else:
pil_image = Image.fromarray(image)
sharpened = pil_image.filter(ImageFilter.SHARPEN)
if len(image.shape) == 3:
return cv2.cvtColor(np.array(sharpened), cv2.COLOR_RGB2BGR)
return np.array(sharpened)
except Exception as e:
logging.error(f"Sharpen error: {str(e)}")
return image
@staticmethod
def binarize(image: np.ndarray, method="adaptive") -> np.ndarray:
try:
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
if method == "otsu":
return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
return cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
except Exception as e:
logging.error(f"Binarization error: {str(e)}")
return image
def enhance_for_ocr(self, image: np.ndarray) -> np.ndarray:
try:
enhanced = image.copy()
enhanced = self.deskew(enhanced)
enhanced = self.remove_noise(enhanced)
enhanced = self.adjust_contrast(enhanced, 1.5)
enhanced = self.sharpen(enhanced)
enhanced = self.binarize(enhanced, "adaptive")
return enhanced
except Exception as e:
self.logger.error(f"Image enhancement error: {str(e)}")
return image
class CloudOCRProvider:
def __init__(self):
self.logger = logging.getLogger(__name__)
log_with_check("CloudOCRProvider initialized")
self.azure_client = None
self.google_client = None
if CONFIG.get("AZURE_OCR_AVAILABLE", False) and CONFIG["AZURE_API_KEY"] and CONFIG["AZURE_ENDPOINT"]:
try:
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from msrest.authentication import CognitiveServicesCredentials
self.azure_client = ComputerVisionClient(CONFIG["AZURE_ENDPOINT"], CognitiveServicesCredentials(CONFIG["AZURE_API_KEY"]))
log_with_check("Azure OCR initialized")
except Exception as e:
self.logger.error(f"Azure OCR initialization error: {str(e)}")
log_with_check("Azure OCR initialization failed", False)
if CONFIG.get("GOOGLE_OCR_AVAILABLE", False) and CONFIG["GOOGLE_APPLICATION_CREDENTIALS"]:
try:
from google.cloud import vision
self.google_client = vision.ImageAnnotatorClient()
log_with_check("Google OCR initialized")
except Exception as e:
self.logger.error(f"Google OCR initialization error: {str(e)}")
log_with_check("Google OCR initialization failed", False)
def is_available(self):
return self.azure_client is not None or self.google_client is not None
@Utils.retry(max_attempts=2, delay=1, exceptions=(Exception,))
def process_with_azure(self, image_path):
if not self.azure_client:
return None
try:
with open(image_path, "rb") as image_file:
image_data = image_file.read()
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
recognize_results = self.azure_client.recognize_printed_text_in_stream(image_data)
result = {"regions": [], "text": "", "language": "", "confidence": 0.0}
all_text = []
total_confidence = 0.0
count = 0
for region in recognize_results.regions:
region_text = []
region_info = {"bounding_box": region.bounding_box.split(","), "lines": []}
for line in region.lines:
line_text = []
line_info = {"bounding_box": line.bounding_box.split(","), "words": []}
for word in line.words:
line_text.append(word.text)
total_confidence += word.confidence
count += 1
line_info["words"].append({"text": word.text, "confidence": word.confidence, "bounding_box": word.bounding_box.split(",")})
full_line = " ".join(line_text)
region_text.append(full_line)
line_info["text"] = full_line
region_info["lines"].append(line_info)
full_region = "\n".join(region_text)
all_text.append(full_region)
region_info["text"] = full_region
result["regions"].append(region_info)
result["text"] = "\n\n".join(all_text)
if count > 0:
result["confidence"] = total_confidence / count
result["language"] = recognize_results.language or "unknown"
return result
except Exception as e:
self.logger.error(f"Azure OCR error: {str(e)}")
return None
@Utils.retry(max_attempts=2, delay=1, exceptions=(Exception,))
def process_with_google(self, image_path):
if not self.google_client:
return None
try:
with open(image_path, "rb") as image_file:
content = image_file.read()
from google.cloud import vision
image = vision.Image(content=content)
response = self.google_client.text_detection(image=image)
if response.error.message:
self.logger.error(f"Google OCR API error: {response.error.message}")
return None
result = {"regions": [], "text": "", "language": "", "confidence": 0.0}
full_text_annotation = response.full_text_annotation
if full_text_annotation:
result["text"] = full_text_annotation.text
for page in full_text_annotation.pages:
for block in page.blocks:
block_info = {"bounding_box": [[vertex.x, vertex.y] for vertex in block.bounding_box.vertices], "paragraphs": [], "text": ""}
block_texts = []
for paragraph in block.paragraphs:
para_info = {"bounding_box": [[vertex.x, vertex.y] for vertex in paragraph.bounding_box.vertices], "words": [], "text": ""}
para_texts = []
for word in paragraph.words:
word_text = "".join([symbol.text for symbol in word.symbols])
para_texts.append(word_text)
word_info = {"text": word_text, "bounding_box": [[vertex.x, vertex.y] for vertex in word.bounding_box.vertices], "confidence": word.confidence}
para_info["words"].append(word_info)
para_info["text"] = " ".join(para_texts)
block_texts.append(para_info["text"])
block_info["paragraphs"].append(para_info)
block_info["text"] = "\n".join(block_texts)
result["regions"].append(block_info)
result["confidence"] = full_text_annotation.pages[0].confidence if full_text_annotation.pages else 0.0
if result["text"]:
result["language"] = Utils.detect_language(result["text"])
return result
except Exception as e:
self.logger.error(f"Google OCR error: {str(e)}")
return None
def process_image(self, image_path, prefer_provider=None):
providers = []
if prefer_provider:
if prefer_provider == "azure" and self.azure_client:
providers = ["azure", "google", "local"]
elif prefer_provider == "google" and self.google_client:
providers = ["google", "azure", "local"]
else:
providers = ["local", "azure", "google"]
else:
if self.azure_client:
providers.append("azure")
if self.google_client:
providers.append("google")
providers.append("local")
for provider in providers:
try:
if provider == "azure":
result = self.process_with_azure(image_path)
if result:
result["provider"] = "azure"
return result
elif provider == "google":
result = self.process_with_google(image_path)
if result:
result["provider"] = "google"
return result
elif provider == "local":
img = cv2.imread(image_path)
if img is None:
continue
enhanced = self.enhance_for_ocr(img)
text = pytesseract.image_to_string(enhanced, lang=CONFIG["DEFAULT_LANGUAGE"],
config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}')
boxes = pytesseract.image_to_data(enhanced, lang=CONFIG["DEFAULT_LANGUAGE"],
config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}',
output_type=pytesseract.Output.DICT)
result = {"text": text, "regions": [], "provider": "local", "language": CONFIG["DEFAULT_LANGUAGE"], "confidence": 0.0}
confidences = []
current_block = -1
current_region = {}
current_lines = []
for i in range(len(boxes['text'])):
if boxes['text'][i].strip():
confidences.append(boxes['conf'][i])
if boxes['block_num'][i] != current_block:
if current_block != -1 and current_lines:
current_region["text"] = "\n".join(current_lines)
result["regions"].append(current_region)
current_block = boxes['block_num'][i]
current_region = {"bounding_box": [boxes['left'][i], boxes['top'][i],
boxes['left'][i] + boxes['width'][i],
boxes['top'][i] + boxes['height'][i]], "lines": []}
current_lines = []
line_num = boxes['line_num'][i]
if line_num >= len(current_lines):
current_lines.append(boxes['text'][i])
else:
current_lines[line_num] += " " + boxes['text'][i]
if current_block != -1 and current_lines:
current_region["text"] = "\n".join(current_lines)
result["regions"].append(current_region)
if confidences:
valid_confidences = [c for c in confidences if c > 0]
if valid_confidences:
result["confidence"] = sum(valid_confidences) / len(valid_confidences) / 100.0
return result
except Exception as e:
self.logger.error(f"OCR with {provider} failed: {str(e)}")
continue
return None
class ResourceManager:
"""کلاس مدیریت منابع سیستمی"""
def __init__(self, max_memory_percent=CONFIG["MAX_MEMORY_PERCENT"], max_cpu_percent=CONFIG["MAX_CPU_PERCENT"], min_free_space_bytes=CONFIG["MIN_FREE_SPACE_BYTES"]):
self.logger = logging.getLogger(__name__)
self.max_memory_percent = max_memory_percent
self.max_cpu_percent = max_cpu_percent
self.min_free_space_bytes = min_free_space_bytes
self.metrics_history = deque(maxlen=1000)
self.last_cleanup = None
self.resource_warnings = 0
self.is_cleaning = False
self.lock = threading.Lock()
self.stats = {'total_processed': 0, 'successful_extractions': 0, 'failed_extractions': 0}
log_with_check("ResourceManager initialized")
def get_current_metrics(self) -> ResourceMetrics:
try:
memory = psutil.virtual_memory()
cpu = psutil.cpu_percent(interval=0.5)
disk = psutil.disk_usage('/')
metrics = ResourceMetrics(memory.percent, cpu, disk.percent, datetime.now())
with self.lock:
self.metrics_history.append(metrics)
return metrics
except Exception as e:
self.logger.error(f"Error getting metrics: {str(e)}")
return ResourceMetrics(0.0, 0.0, 0.0, datetime.now())
def check_resources(self) -> bool:
with self.lock:
if self.is_cleaning:
return False
metrics = self.get_current_metrics()
needs_cleanup = (
metrics.memory_usage > self.max_memory_percent or
metrics.cpu_usage > self.max_cpu_percent or
psutil.disk_usage('/').free < self.min_free_space_bytes
)
if needs_cleanup:
self.cleanup_resources()
return True
return False
def cleanup_resources(self):
if self.is_cleaning:
return
self.is_cleaning = True
self.logger.info("Starting resource cleanup")
try:
gc.collect()
self._cleanup_temp_files()
self.last_cleanup = datetime.now()
log_with_check("Resource cleanup completed")
except Exception as e:
self.logger.error(f"Error during cleanup: {str(e)}")
log_with_check("Resource cleanup failed", False)
finally:
self.is_cleaning = False
def _cleanup_temp_files(self):
try:
count = 0
for filename in os.listdir(CONFIG["TEMP_DIR"]):
file_path = os.path.join(CONFIG["TEMP_DIR"], filename)
if os.path.isfile(file_path):
os.remove(file_path)
count += 1
self.logger.info(f"Cleaned {count} temp files")
except Exception as e:
self.logger.error(f"Error cleaning temp files: {str(e)}")
class LoadPredictor:
def __init__(self, history_size: int = 1000, update_interval: int = 60):
self.logger = logging.getLogger(__name__)
self.history_size = history_size
self.update_interval = update_interval
self.resource_history = deque(maxlen=history_size)
self.current_predictions = {'short_term': None, 'medium_term': None, 'long_term': None}
self.thresholds = {'cpu_high': 80.0, 'memory_high': 99.5, 'prediction_confidence': 0.7}
self.monitoring = False
self.monitor_thread = None
self._start_monitoring()
log_with_check("LoadPredictor initialized")
def _start_monitoring(self):
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
log_with_check("Monitoring thread started")
def _monitor_loop(self):
while self.monitoring:
try:
usage = self._collect_resource_usage()
self.resource_history.append(usage)
self._update_predictions()
self._check_alerts()
time.sleep(self.update_interval)
except Exception as e:
self.logger.error(f"Error in monitor loop: {str(e)}")
log_with_check("Monitor loop error", False)
def _collect_resource_usage(self) -> ResourceUsage:
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
return ResourceUsage(cpu_percent, memory_percent, None, datetime.now())
except Exception as e:
self.logger.error(f"Error collecting resource usage: {str(e)}")
return ResourceUsage(0.0, 0.0, None, datetime.now())
def _update_predictions(self):
try:
self.current_predictions['short_term'] = self._predict_load(timedelta(minutes=5))
self.current_predictions['medium_term'] = self._predict_load(timedelta(hours=1))
self.current_predictions['long_term'] = self._predict_load(timedelta(days=1))
except Exception as e:
self.logger.error(f"Error updating predictions: {str(e)}")
def _predict_load(self, time_window: timedelta) -> LoadPrediction:
if len(self.resource_history) < 10:
return LoadPrediction(50.0, 0.5, time_window, {})
try:
history_array = np.array([[usage.cpu_percent, usage.memory_percent] for usage in self.resource_history])
x = np.arange(len(history_array))
cpu_trend = np.polyfit(x, history_array[:, 0], 2)
memory_trend = np.polyfit(x, history_array[:, 1], 2)
future_point = len(history_array) + time_window.total_seconds() / self.update_interval
predicted_cpu = np.polyval(cpu_trend, future_point)
predicted_memory = np.polyval(memory_trend, future_point)
confidence = self._calculate_prediction_confidence(history_array, cpu_trend, memory_trend)
return LoadPrediction((predicted_cpu + predicted_memory) / 2, confidence, time_window, {'cpu': predicted_cpu, 'memory': predicted_memory})
except Exception as e:
self.logger.error(f"Error in load prediction: {str(e)}")
return LoadPrediction(50.0, 0.3, time_window, {})
def _calculate_prediction_confidence(self, history: np.ndarray, cpu_trend: np.ndarray, memory_trend: np.ndarray) -> float:
try:
x = np.arange(len(history))
cpu_predictions = np.polyval(cpu_trend, x)
memory_predictions = np.polyval(memory_trend, x)
cpu_rmse = np.sqrt(np.mean((history[:, 0] - cpu_predictions) ** 2))
memory_rmse = np.sqrt(np.mean((history[:, 1] - memory_predictions) ** 2))
max_rmse = 50.0
cpu_confidence = max(0.0, min(1.0, 1.0 - cpu_rmse / max_rmse))
memory_confidence = max(0.0, min(1.0, 1.0 - memory_rmse / max_rmse))
return 0.6 * cpu_confidence + 0.4 * memory_confidence
except Exception as e:
self.logger.error(f"Error calculating prediction confidence: {str(e)}")
return 0.5
def _check_alerts(self):
current_usage = self.resource_history[-1] if self.resource_history else None
if not current_usage:
return
alerts = []
if current_usage.cpu_percent > self.thresholds['cpu_high']:
alerts.append({'type': 'high_cpu', 'value': current_usage.cpu_percent})
if current_usage.memory_percent > self.thresholds['memory_high']:
alerts.append({'type': 'high_memory', 'value': current_usage.memory_percent})
if alerts:
self._handle_alerts(alerts)
def _handle_alerts(self, alerts: List[Dict]):
for alert in alerts:
self.logger.warning(f"هشدار: {alert['type']} در سطح {alert['value']}%")
def get_current_load(self) -> Dict:
current_usage = self.resource_history[-1] if self.resource_history else None
if not current_usage:
return {}
return {
'current': {
'cpu': current_usage.cpu_percent,
'memory': current_usage.memory_percent,
'timestamp': current_usage.timestamp.isoformat()
},
'predictions': {
name: {
'load': pred.expected_load,
'confidence': pred.confidence,
'resources': pred.details
} if pred else None
for name, pred in self.current_predictions.items()
}
}
class DocumentProcessor:
def __init__(self, config=None):
self.config = config or ProcessingConfig()
self.logger = logging.getLogger(__name__)
self.cache = CacheManager()
self.normalizer = Normalizer()
self.learner = AdaptiveLearner(model_dir=CONFIG["MODEL_DIR"])
self.img_processor = ImageProcessor()
self.cloud_ocr = CloudOCRProvider()
self.resource_manager = ResourceManager()
self.load_predictor = LoadPredictor()
self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers)
init_directories()
self.setup_tesseract()
log_with_check(f"DocumentProcessor initialized (version {CONFIG['VERSION']})")
def setup_tesseract(self):
try:
tesseract_installed = False
try:
version = subprocess.check_output([CONFIG["TESSERACT_CMD"], '--version'], text=True)
self.logger.info(f"Tesseract found: {version.split()[0]}")
tesseract_installed = True
log_with_check("Tesseract found")
except subprocess.CalledProcessError:
self.logger.warning(f"Tesseract not found in {CONFIG['TESSERACT_CMD']}, attempting to download data and install via system commands")
log_with_check("Tesseract not found, attempting installation", False)
# روش اول: نصب از طریق subprocess.check_call
try:
subprocess.check_call(['apt-get', 'update', '-y', '--fix-missing', '--allow-downgrades'])
subprocess.check_call(['apt-get', 'install', 'tesseract-ocr', '-y', '--no-install-recommends', '--fix-missing', '--allow-downgrades'])
os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"])
pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"]
version = pytesseract.get_tesseract_version()
self.logger.info(f"Tesseract installed via Method 1, version: {version}")
tesseract_installed = True
log_with_check("Tesseract installed via Method 1")
except subprocess.CalledProcessError as e:
self.logger.warning(f"Method 1 failed: {str(e)}, attempting Method 2")
log_with_check("Tesseract installation Method 1 failed", False)
# روش دوم: استفاده از os.system
os.system('chmod 777 /tmp')
os.system('apt-get update -y --fix-missing --allow-downgrades')
os.system('apt-get install tesseract-ocr -y --no-install-recommends --fix-missing --allow-downgrades')
os.system('pip install -q pytesseract')
if __name__ == "__main__":
print("YES")
os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"])
pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"]
try:
version = pytesseract.get_tesseract_version()
self.logger.info(f"Tesseract installed via Method 2, version: {version}")
tesseract_installed = True
log_with_check("Tesseract installed via Method 2")
except Exception as e:
self.logger.error(f"Method 2 failed: {str(e)}. Continuing with downloaded data if available.")
log_with_check("Tesseract installation Method 2 failed", False)
if not tesseract_installed and os.path.exists(CONFIG["TESSDATA_LOCAL"]):
os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"])
self.logger.warning("Using downloaded Tesseract data without executable")
log_with_check("Using downloaded Tesseract data", False)
self._download_tesseract_data()
try:
version = pytesseract.get_tesseract_version()
self.logger.info(f"Tesseract is ready, version: {version}")
log_with_check("Tesseract setup completed")
return True
except Exception as e:
self.logger.error(f"Tesseract setup failed: {str(e)}. Continuing with downloaded data if available.")
log_with_check("Tesseract setup failed", False)
return False
except Exception as e:
self.logger.error(f"Tesseract setup failed: {str(e)}")
log_with_check("Tesseract setup failed due to exception", False)
return False
def _download_tesseract_data(self):
try:
os.makedirs(CONFIG["TESSDATA_LOCAL"], exist_ok=True)
base_url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/"
languages = ["fas", "eng", "ara"]
import tqdm
for lang in languages:
lang_file = os.path.join(CONFIG["TESSDATA_LOCAL"], f"{lang}.traineddata")
if not os.path.exists(lang_file):
self.logger.info(f"Downloading {lang}.traineddata...")
response = requests.get(f"{base_url}{lang}.traineddata", stream=True)
response.raise_for_status()
with open(lang_file, 'wb') as f:
total_size = int(response.headers.get('content-length', 0))
with tqdm.tqdm(total=total_size, unit='B', unit_scale=True, desc=lang) as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
self.logger.info(f"Downloaded {lang}.traineddata successfully")
log_with_check(f"Downloaded {lang}.traineddata")
except Exception as e:
self.logger.error(f"Error downloading Tesseract data: {str(e)}")
log_with_check("Failed to download Tesseract data", False)
@timed
def process_pdf(self, pdf_path) -> Dict:
self.logger.info(f"Processing PDF: {pdf_path} - Start")
log_with_check(f"Starting PDF processing for {pdf_path}")
if self.resource_manager.check_resources():
self.logger.warning("Resources checked and cleaned during PDF processing")
log_with_check("Resources checked and cleaned")
try:
doc = fitz.open(pdf_path)
result = {
"title": os.path.basename(pdf_path),
"page_count": len(doc),
"pages": [],
"metadata": self._extract_pdf_metadata(doc),
"toc": self._extract_toc(doc)
}
futures = []
for page_num in range(len(doc)):
future = self.executor.submit(self._process_page, doc, page_num)
futures.append(future)
for idx, future in enumerate(as_completed(futures)):
page_result = future.result()
result["pages"].append(page_result)
self.logger.debug(f"Completed page {idx+1}/{len(doc)}")
log_with_check(f"Completed page {idx+1}/{len(doc)}")
gc.collect() # آزادسازی حافظه پس از هر صفحه
result["pages"].sort(key=lambda x: x["page_num"])
result["structure"] = self._analyze_document_structure(result)
self.logger.info(f"PDF processing complete: {pdf_path} - End")
log_with_check(f"PDF processing completed for {pdf_path}")
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['successful_extractions'] += 1
gc.collect() # آزادسازی حافظه پس از پردازش کامل
return result
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
log_with_check(f"Error processing PDF: {str(e)}", False)
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['failed_extractions'] += 1
return {"error": str(e), "traceback": traceback.format_exc()}
def _extract_pdf_metadata(self, doc) -> Dict:
metadata = {}
for key, value in doc.metadata.items():
if value:
metadata[key] = value
return metadata
def _extract_toc(self, doc) -> List[Dict]:
toc = []
try:
raw_toc = doc.get_toc()
for level, title, page in raw_toc:
toc.append({"level": level, "title": title, "page": page})
except:
pass
return toc
def _process_page(self, doc, page_num: int) -> Dict:
self.logger.debug(f"Processing page {page_num+1} - Start")
log_with_check(f"Starting page {page_num+1} processing")
page = doc[page_num]
cache_key = f"page:{doc.name}:{page_num}:{hash(page.get_text())}"
cached_result = self.cache.get(cache_key)
if cached_result and self.config.use_caching:
self.logger.debug(f"Using cached result for page {page_num+1}")
log_with_check(f"Using cached result for page {page_num+1}")
return cached_result
result = {"page_num": page_num + 1, "width": page.rect.width, "height": page.rect.height, "text_regions": []}
image_list = []
try:
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
image_list.append(img)
if not image_list or img.size == 0:
with temp_file(".pdf") as temp_pdf:
with open(temp_pdf, "wb") as f:
doc.save(f)
page_images = convert_from_path(temp_pdf, first_page=page_num+1, last_page=page_num+1, dpi=75)
if page_images:
np_image = np.array(page_images[0])
cv_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR)
image_list.append(cv_image)
except Exception as e:
self.logger.error(f"Error extracting page image: {str(e)}")
log_with_check(f"Error extracting page {page_num+1} image: {str(e)}", False)
for img in image_list:
regions = self._perform_ocr(img)
result["text_regions"].extend(regions)
raw_text = page.get_text()
if raw_text and raw_text.strip():
result["text_direct"] = raw_text
if self.config.use_caching:
self.cache.set(cache_key, result)
self.logger.debug(f"Processing page {page_num+1} - End")
log_with_check(f"Completed page {page_num+1} processing")
gc.collect() # آزادسازی حافظه پس از پردازش هر صفحه
return result
def _perform_ocr(self, image: np.ndarray) -> List[Dict]:
cache_key = self.cache.generate_key(image)
cached_result = self.cache.get(cache_key)
if cached_result and self.config.use_caching:
return cached_result
regions = []
processed_img = self.img_processor.enhance_for_ocr(image) if self.config.use_preprocessing else image.copy()
try:
cloud_result = None
if self.cloud_ocr.is_available() and self.config.use_distributed:
with temp_file(".png") as temp_img:
cv2.imwrite(temp_img, processed_img)
cloud_result = self.cloud_ocr.process_image(temp_img)
if cloud_result:
for region in cloud_result.get("regions", []):
region_text = region.get("text", "").strip()
if region_text:
normalized_text = self.normalizer.normalize(region_text)
corrected_text, corr_conf = self.learner.apply_corrections(
normalized_text, {"language": self.config.language}
)
regions.append({
"content": corrected_text,
"confidence": cloud_result.get("confidence", 0.8) * corr_conf,
"language": cloud_result.get("language", self.config.language),
"direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr",
"provider": cloud_result.get("provider", "cloud"),
"bounding_box": region.get("bounding_box")
})
else:
ocr_result = pytesseract.image_to_data(
processed_img,
lang=self.config.language,
config=f'--oem {CONFIG["OCR_ENGINE_MODE"]} --psm {CONFIG["PAGE_SEGMENTATION_MODE"]}',
output_type=pytesseract.Output.DICT
)
current_block = -1
current_text = []
current_conf = []
for i in range(len(ocr_result['text'])):
text = ocr_result['text'][i].strip()
conf = int(ocr_result['conf'][i])
block_num = ocr_result['block_num'][i]
if not text:
continue
if block_num != current_block:
if current_block != -1 and current_text:
full_text = ' '.join(current_text)
avg_conf = sum(current_conf) / len(current_conf) if current_conf else 0
normalized_text = self.normalizer.normalize(full_text)
corrected_text, corr_conf = self.learner.apply_corrections(
normalized_text, {"language": self.config.language}
)
regions.append({
"content": corrected_text,
"confidence": min(avg_conf * corr_conf / 100, 100),
"language": self.config.language,
"direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr",
"provider": "tesseract"
})
current_block = block_num
current_text = []
current_conf = []
current_text.append(text)
current_conf.append(conf)
if current_text:
full_text = ' '.join(current_text)
avg_conf = sum(current_conf) / len(current_conf) if current_conf else 0
normalized_text = self.normalizer.normalize(full_text)
corrected_text, corr_conf = self.learner.apply_corrections(
normalized_text, {"language": self.config.language}
)
regions.append({
"content": corrected_text,
"confidence": min(avg_conf * corr_conf / 100, 100),
"language": self.config.language,
"direction": "rtl" if Utils.is_rtl_language(self.config.language) else "ltr",
"provider": "tesseract"
})
except Exception as e:
self.logger.error(f"OCR error: {str(e)}")
log_with_check("OCR failed", False)
if self.config.use_caching:
self.cache.set(cache_key, regions)
return regions
def _analyze_document_structure(self, doc_data: Dict) -> Dict:
structure = {
"title": doc_data.get("title", ""),
"language": self.config.language,
"page_count": doc_data.get("page_count", 0),
"sections": [],
"summary": ""
}
all_text = []
for page in doc_data.get("pages", []):
page_text = []
for region in page.get("text_regions", []):
if region.get("confidence", 0) >= self.config.confidence_threshold:
page_text.append(region.get("content", ""))
if page_text:
all_text.append(" ".join(page_text))
toc = doc_data.get("toc", [])
if toc:
for item in toc:
section = {"title": item.get("title", ""), "level": item.get("level", 1), "page": item.get("page", 1)}
structure["sections"].append(section)
if all_text:
combined_text = "\n".join(all_text)
structure["summary"] = combined_text[:500] + "..." if len(combined_text) > 500 else combined_text
return structure
def _process_image_task(self, data: Dict) -> Dict:
try:
image_path = data['image_path']
settings = data.get('settings', {})
result = self.extract_text_from_image(image_path)
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['successful_extractions'] += 1
log_with_check(f"Image task completed for {image_path}")
return {"status": "success", "result": result}
except Exception as e:
self.logger.error(f"Error processing image task: {str(e)}")
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['failed_extractions'] += 1
log_with_check(f"Error processing image task: {str(e)}", False)
return {"status": "error", "error": str(e)}
def _process_document_task(self, data: Dict) -> Dict:
try:
pdf_path = data['pdf_path']
settings = data.get('settings', {})
result = self.process_pdf(pdf_path)
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['successful_extractions'] += 1
log_with_check(f"Document task completed for {pdf_path}")
return {"status": "success", "result": result}
except Exception as e:
self.logger.error(f"Error processing document task: {str(e)}")
self.resource_manager.stats['total_processed'] += 1
self.resource_manager.stats['failed_extractions'] += 1
log_with_check(f"Error processing document task: {str(e)}", False)
return {"status": "error", "error": str(e)}
def extract_text_from_image(self, image_path: str) -> Dict:
try:
img = cv2.imread(image_path)
if img is None:
log_with_check(f"Failed to load image: {image_path}", False)
return {"error": "Could not load image"}
regions = self._perform_ocr(img)
log_with_check(f"Extracted text from image: {image_path}")
return {"regions": regions, "text": "\n".join([r.get("content", "") for r in regions])}
except Exception as e:
self.logger.error(f"Error extracting text from image: {str(e)}")
log_with_check(f"Error extracting text from image: {str(e)}", False)
return {"error": str(e)}
def batch_process(self, file_paths: List[str], output_dir: str = None) -> Dict:
output_dir = output_dir or CONFIG["OUTPUT_DIR"]
os.makedirs(output_dir, exist_ok=True)
results = {}
for file_path in file_paths:
file_ext = Utils.get_file_extension(file_path)
file_name = os.path.basename(file_path)
output_file = os.path.join(output_dir, file_name.replace(file_ext, ".json"))
if file_ext.lower() in [".pdf"]:
result = self.process_pdf(file_path)
elif file_ext.lower() in [".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".webp"]:
result = self.extract_text_from_image(file_path)
else:
result = {"error": f"Unsupported file type: {file_ext}"}
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
results[file_name] = {"status": "success" if "error" not in result else "error"}
log_with_check(f"Processed {file_name} to {output_file}")
return results
def extract_text_from_image(image_path: str, output_format: str = "text") -> Dict:
processor = DocumentProcessor()
regions = processor.extract_text_from_image(image_path)
if "error" in regions:
return regions
if output_format == "text":
return {"text": regions["text"]}
return regions
def extract_text_from_pdf(pdf_path: str, output_format: str = "text") -> Dict:
processor = DocumentProcessor()
result = processor.process_pdf(pdf_path)
if "error" in result:
return result
if output_format == "text":
text = "\n".join([region["content"] for page in result["pages"] for region in page["text_regions"] if "content" in region])
return {"text": text}
return result
def launch_ui():
processor = DocumentProcessor()
def process_file(file, output_format):
if file is None:
logging.info("No file uploaded")
return "لطفاً فایلی آپلود کنید", None
if not hasattr(file, 'name') or file.name is None:
logging.warning("Invalid file object")
return "فایل نامعتبر است", None
try:
start_time = time.time()
logging.info(f"Starting process for file: {file.name}")
if file.name.endswith(('.jpg', '.png')):
result = processor.extract_text_from_image(file.name)
elif file.name.endswith('.pdf'):
result = processor.process_pdf(file.name)
else:
logging.error(f"Unsupported file type: {file.name}")
return "نوع فایل پشتیبانی نمی‌شود", None
elapsed_time = time.time() - start_time
if "error" in result:
logging.error(f"Error processing file {file.name}: {result['error']}")
return f"خطا: {result['error']}", None
logging.info(f"Processing completed for {file.name} in {elapsed_time:.2f} seconds")
output = result.get("text", "متن یافت نشد") if output_format == "text" else json.dumps(result, ensure_ascii=False, indent=2)
output_file = os.path.join(CONFIG["TEMP_DIR"], f"{os.path.splitext(file.name)[0]}_output.txt" if output_format == "text" else f"{os.path.splitext(file.name)[0]}_output.json")
with open(output_file, "w", encoding="utf-8") as f:
f.write(output)
log_with_check(f"Output file created: {output_file}")
gc.collect() # آزادسازی حافظه پس از ایجاد فایل
return gr.update(value=output_file, label="دانلود خروجی", visible=True), output
except Exception as e:
logging.error(f"Unexpected error processing file {file.name}: {str(e)}")
return f"خطای غیرمنتظره: {str(e)}", None
# ارتقای رابط کاربری با تم و چیدمان بهتر
with gr.Blocks(title=CONFIG["UI_TITLE"], theme=gr.themes.Soft()) as demo:
gr.Markdown(
f"""
# {CONFIG['UI_TITLE']}
<p style="text-align: center; color: #4CAF50;">سیستمی برای استخراج متن از تصاویر و PDFها با دقت بالا</p>
""",
elem_id="title"
)
with gr.Row(elem_classes="main-row"):
with gr.Column(scale=1, elem_classes="input-column"):
file_input = gr.File(label="فایل را آپلود کنید", file_types=[".pdf", ".jpg", ".png", ".jpeg"])
output_format = gr.Dropdown(choices=["text", "json"], label="فرمت خروجی", value="text")
submit_btn = gr.Button("پردازش", variant="primary")
download_output = gr.File(label="دانلود خروجی", visible=False)
with gr.Column(scale=2, elem_classes="output-column"):
output_text = gr.Textbox(label="متن استخراج‌شده", lines=10, interactive=False)
# اسکریپت جاوااسکریپت برای دانلود
demo.js = """
function (file, output) {
if (file) {
const link = document.createElement('a');
link.href = URL.createObjectURL(file);
link.download = file.name;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
URL.revokeObjectURL(link.href);
}
return [file, output];
}
"""
# استایل‌های سفارشی
demo.css = """
.main-row {
padding: 20px;
background-color: #f5f5f5;
border-radius: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
.input-column, .output-column {
padding: 10px;
animation: fadeIn 0.5s ease-in;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
#title {
font-family: 'Arial', sans-serif;
font-size: 24px;
margin-bottom: 20px;
}
"""
submit_btn.click(
fn=process_file,
inputs=[file_input, output_format],
outputs=[download_output, output_text]
).then(
fn=lambda x: gr.update(visible=True) if x else gr.update(visible=False),
inputs=[download_output],
outputs=[download_output]
)
try:
demo.launch(server_name="0.0.0.0", server_port=7860)
log_with_check("Gradio interface launched")
except Exception as e:
logging.error(f"Failed to launch Gradio interface: {str(e)}")
log_with_check("Gradio interface launch failed", False)
def setup_signal_handlers():
def signal_handler(sig, frame):
logging.info("Shutdown signal received, cleaning up...")
log_with_check("Starting shutdown cleanup")
try:
for filename in os.listdir(CONFIG["TEMP_DIR"]):
file_path = os.path.join(CONFIG["TEMP_DIR"], filename)
if os.path.isfile(file_path):
file_age = time.time() - os.path.getmtime(file_path)
if file_age > 3600:
os.remove(file_path)
gc.collect() # آزادسازی حافظه در زمان تعطیلی
except:
pass
log_with_check("Cleanup completed")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def check_dependencies():
missing = []
try:
pytesseract.get_tesseract_version()
logging.info("Tesseract installed")
log_with_check("Tesseract dependency checked")
except:
missing.append("Tesseract OCR")
log_with_check("Tesseract dependency check failed", False)
logging.info(f"Missing dependencies: {', '.join(missing)}" if missing else "All dependencies are present")
log_with_check("Dependencies check completed")
def install_tesseract_method1():
"""روش اول: نصب Tesseract با استفاده از subprocess.check_call"""
try:
subprocess.check_call(['apt-get', 'update', '-y', '--fix-missing', '--allow-downgrades'])
subprocess.check_call(['apt-get', 'install', 'tesseract-ocr', '-y', '--no-install-recommends', '--fix-missing', '--allow-downgrades', '--fix-broken'])
os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"])
pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"]
version = pytesseract.get_tesseract_version()
logging.info(f"Tesseract installed via Method 1, version: {version}")
log_with_check("Tesseract installed via Method 1")
return True
except subprocess.CalledProcessError as e:
logging.warning(f"Method 1 failed: {str(e)}")
log_with_check("Tesseract installation Method 1 failed", False)
return False
def install_tesseract_method2():
"""روش دوم: نصب Tesseract با استفاده از os.system"""
logging.warning("Attempting Method 2: manual installation")
log_with_check("Attempting Method 2 installation", False)
os.system('chmod 777 /tmp')
os.system('apt-get update -y --fix-missing --allow-downgrades')
os.system('apt-get install tesseract-ocr -y --no-install-recommends --fix-missing --allow-downgrades --fix-broken')
os.system('pip install -q pytesseract')
if __name__ == "__main__":
print("YES")
os.environ["TESSDATA_PREFIX"] = os.path.abspath(CONFIG["TESSDATA_LOCAL"])
pytesseract.pytesseract.tesseract_cmd = CONFIG["TESSERACT_CMD"]
try:
version = pytesseract.get_tesseract_version()
logging.info(f"Tesseract installed via Method 2, version: {version}")
log_with_check("Tesseract installed via Method 2")
return True
except Exception as e:
logging.error(f"Method 2 failed: {str(e)}")
log_with_check("Tesseract installation Method 2 failed", False)
return False
def setup_tesseract_alternative():
"""نصب Tesseract با دو روش به صورت متوالی"""
log_with_check("Starting Tesseract setup")
if not install_tesseract_method1():
install_tesseract_method2()
if __name__ == "__main__":
setup_signal_handlers()
setup_tesseract_alternative() # ابتدا Tesseract را نصب می‌کنیم
check_dependencies() # سپس وابستگی‌ها را چک می‌کنیم
try:
launch_ui()
except Exception as e:
logging.error(f"Application failed to start: {str(e)}")
log_with_check("Application startup failed", False)