# app.py - سیستم پیشرفته استخراج و تحلیل اسناد حقوقی فارسی import os import gc import sys import time import json import logging import resource import requests import threading import re import random from pathlib import Path from datetime import datetime from typing import List, Dict, Any, Optional, Tuple, Union from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urljoin, urlparse import hashlib import gradio as gr import pandas as pd import torch from bs4 import BeautifulSoup from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, logging as transformers_logging ) import warnings # تنظیمات اولیه warnings.filterwarnings('ignore') transformers_logging.set_verbosity_error() # محدودیت حافظه برای HF Spaces try: resource.setrlimit(resource.RLIMIT_AS, (2*1024*1024*1024, 2*1024*1024*1024)) except: pass # تنظیمات محیط os.environ['TRANSFORMERS_CACHE'] = '/tmp/hf_cache' os.environ['HF_HOME'] = '/tmp/hf_cache' os.environ['TORCH_HOME'] = '/tmp/torch_cache' os.environ['TOKENIZERS_PARALLELISM'] = 'false' # تنظیم لاگ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # SVG Icons SVG_ICONS = { 'search': '🔍', 'document': '📄', 'analyze': '🤖', 'export': '📊', 'settings': '⚙️', 'preview': '👁️', 'link': '🔗', 'success': '✅', 'error': '❌', 'warning': '⚠️' } # منابع حقوقی معتبر ایران LEGAL_SOURCES_CONFIG = { "مجلس شورای اسلامی": { "base_url": "https://rc.majlis.ir", "patterns": ["/fa/law/", "/fa/report/", "/fa/news/"], "selectors": [".main-content", ".article-body", ".content", "article"], "delay_range": (2, 5), "max_depth": 2 }, "پورتال ملی قوانین": { "base_url": "https://www.dotic.ir", "patterns": ["/portal/", "/law/", "/regulation/"], "selectors": [".content-area", ".law-content", ".main-text"], "delay_range": (1, 4), "max_depth": 2 }, "قوه قضاییه": { "base_url": "https://www.judiciary.ir", "patterns": ["/fa/news/", "/fa/verdict/", "/fa/law/"], "selectors": [".news-content", ".verdict-text", ".main-content"], "delay_range": (3, 6), "max_depth": 2 }, "وزارت دادگستری": { "base_url": "https://www.moj.ir", "patterns": ["/fa/news/", "/fa/law/", "/fa/regulation/"], "selectors": [".news-body", ".law-text", ".content"], "delay_range": (2, 4), "max_depth": 2 }, "دیوان عدالت اداری": { "base_url": "https://www.adcourt.ir", "patterns": ["/fa/verdict/", "/fa/news/"], "selectors": [".verdict-content", ".news-text"], "delay_range": (1, 3), "max_depth": 2 } } # واژگان حقوقی فارسی پیشرفته PERSIAN_LEGAL_DICTIONARY = { "قوانین_اساسی": [ "قانون اساسی", "اصول قانون اساسی", "مجلس شورای اسلامی", "شورای نگهبان", "رهبری", "جمهوری اسلامی", "حاکمیت ملی", "ولایت فقیه" ], "قوانین_عادی": [ "ماده", "تبصره", "اصل", "فصل", "باب", "قسمت", "بخش", "کتاب", "قانون مدنی", "قانون جزا", "قانون آیین دادرسی", "قانون تجارت" ], "مقررات_اجرایی": [ "آییننامه", "مقرره", "دستورالعمل", "شیوهنامه", "بند", "جزء", "فقره", "ضابطه", "رهنمود", "دستور", "اعلامیه", "ابلاغیه" ], "اصطلاحات_حقوقی": [ "شخص حقیقی", "شخص حقوقی", "حق", "تکلیف", "مسئولیت", "جرم", "مجازات", "دعوا", "طرف دعوا", "خواهان", "خوانده", "شاکی", "متهم", "مجنیعلیه" ], "نهادهای_قضایی": [ "دادگاه", "قاضی", "دادرس", "مدعیالعموم", "وکیل", "کارشناس", "مترجم", "رای", "حکم", "قرار", "اجرائیه", "کیفرخواست", "لایحه دفاعیه" ], "اصطلاحات_اداری": [ "وزارت", "اداره", "سازمان", "مدیر", "مقام", "مسئول", "کارمند", "کارگزار", "بخشنامه", "تصویبنامه", "مصوبه", "تصمیم", "نظریه", "استعلام" ], "مفاهیم_مالی": [ "مالیات", "عوارض", "پرداخت", "وجه", "ریال", "درهم", "خسارت", "دیه", "تأمین", "ضمانت", "وثیقه", "سپرده", "جریمه", "جزای نقدی" ] } @dataclass class ProcessingProgress: current_step: str = "" progress: float = 0.0 total_documents: int = 0 processed_documents: int = 0 status: str = "آماده" error: Optional[str] = None class MemoryManager: """مدیریت هوشمند حافظه برای HF Spaces""" @staticmethod def get_memory_usage() -> float: try: with open('/proc/self/status') as f: for line in f: if line.startswith('VmRSS:'): return float(line.split()[1]) / 1024 return 0.0 except: return 0.0 @staticmethod def check_memory_available(required_mb: float) -> bool: try: current_usage = MemoryManager.get_memory_usage() return current_usage + required_mb < 1800 except: return True @staticmethod def cleanup_memory(): gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() class SmartTextProcessor: """پردازشگر هوشمند متن با قابلیتهای پیشرفته""" def __init__(self): self.sentence_endings = ['۔', '.', '!', '؟', '?', ';', '؛'] self.paragraph_indicators = ['ماده', 'تبصره', 'بند', 'الف', 'ب', 'ج', 'د', 'ه', 'و'] # الگوهای شناسایی ارجاعات حقوقی self.citation_patterns = [ r'ماده\s*(\d+)', r'تبصره\s*(\d+)', r'بند\s*([الف-ی]|\d+)', r'فصل\s*(\d+)', r'باب\s*(\d+)', r'قسمت\s*(\d+)', r'اصل\s*(\d+)' ] # الگوهای پاکسازی متن self.cleanup_patterns = [ (r'\s+', ' '), (r'([۰-۹])\s+([۰-۹])', r'\1\2'), (r'([a-zA-Z])\s+([a-zA-Z])', r'\1\2'), (r'([ا-ی])\s+(ها|های|ان|ات|ین)', r'\1\2'), (r'(می|نمی|خواهد)\s+(شود|گردد|باشد)', r'\1\2'), ] def normalize_persian_text(self, text: str) -> str: """نرمالسازی پیشرفته متن فارسی""" if not text: return "" # نرمالسازی کاراکترهای فارسی persian_normalization = { 'ي': 'ی', 'ك': 'ک', 'ة': 'ه', 'ؤ': 'و', 'إ': 'ا', 'أ': 'ا', 'ء': '', 'ئ': 'ی', '٠': '۰', '١': '۱', '٢': '۲', '٣': '۳', '٤': '۴', '٥': '۵', '٦': '۶', '٧': '۷', '٨': '۸', '٩': '۹' } for old, new in persian_normalization.items(): text = text.replace(old, new) # اعمال الگوهای پاکسازی for pattern, replacement in self.cleanup_patterns: text = re.sub(pattern, replacement, text) # حذف کاراکترهای غیرضروری text = re.sub(r'[^\u0600-\u06FF\u200C\u200D\s\w\d.,;:!؟()«»\-]', '', text) return text.strip() def detect_long_sentences(self, text: str) -> List[Dict[str, Any]]: """تشخیص جملات طولانی و پیچیده""" sentences = self._split_into_sentences(text) long_sentences = [] for i, sentence in enumerate(sentences): sentence_info = { 'index': i, 'text': sentence, 'word_count': len(sentence.split()), 'is_long': False, 'suggestions': [] } # بررسی طول جمله if sentence_info['word_count'] > 30: sentence_info['is_long'] = True sentence_info['suggestions'].append('جمله بسیار طولانی - تقسیم توصیه میشود') # بررسی پیچیدگی complexity_score = self._calculate_complexity(sentence) if complexity_score > 5: sentence_info['suggestions'].append('جمله پیچیده - سادهسازی توصیه میشود') if sentence_info['is_long'] or sentence_info['suggestions']: long_sentences.append(sentence_info) return long_sentences def _split_into_sentences(self, text: str) -> List[str]: """تقسیم متن به جملات""" boundaries = self.detect_sentence_boundaries(text) sentences = [] start = 0 for boundary in boundaries: sentence = text[start:boundary].strip() if sentence and len(sentence) > 5: sentences.append(sentence) start = boundary # جمله آخر if start < len(text): last_sentence = text[start:].strip() if last_sentence and len(last_sentence) > 5: sentences.append(last_sentence) return sentences def _calculate_complexity(self, sentence: str) -> float: """محاسبه پیچیدگی جمله""" complexity = 0 # تعداد کلمات ربط conjunctions = ['که', 'اگر', 'چون', 'زیرا', 'ولی', 'اما', 'درحالیکه', 'درصورتیکه'] complexity += sum(sentence.count(conj) for conj in conjunctions) * 0.5 # تعداد ویرگول complexity += sentence.count('،') * 0.3 # تعداد کلمات complexity += len(sentence.split()) * 0.02 # جملات تودرتو if sentence.count('(') > 0: complexity += sentence.count('(') * 0.5 return complexity def detect_sentence_boundaries(self, text: str) -> List[int]: """تشخیص هوشمند مرزهای جمله در متون حقوقی فارسی""" boundaries = [] for i, char in enumerate(text): if char in self.sentence_endings: is_real_ending = True # بررسی برای اعداد و اختصارات if i > 0 and text[i-1].isdigit() and char == '.': is_real_ending = False # بررسی برای اختصارات رایج if i > 2: prev_text = text[max(0, i-10):i].strip() if any(abbr in prev_text for abbr in ['ماده', 'بند', 'ج.ا.ا', 'ق.م', 'ق.ج']): if char == '.' and i < len(text) - 1 and not text[i+1].isspace(): is_real_ending = False if is_real_ending: if i < len(text) - 1: next_char = text[i + 1] if next_char.isspace() or next_char in '«"\'': boundaries.append(i + 1) else: boundaries.append(i + 1) return boundaries def reconstruct_legal_text(self, content_fragments: List[str]) -> str: """بازسازی هوشمند متن حقوقی از قطعات پراکنده""" if not content_fragments: return "" # مرحله 1: نرمالسازی تمام قطعات normalized_fragments = [] for fragment in content_fragments: normalized = self.normalize_persian_text(fragment) if normalized and len(normalized.strip()) > 10: normalized_fragments.append(normalized) if not normalized_fragments: return "" # مرحله 2: ادغام هوشمند قطعات combined_text = self._smart_join_fragments(normalized_fragments) # مرحله 3: اعمال قالببندی حقوقی formatted = self._apply_legal_formatting(combined_text) return formatted def _smart_join_fragments(self, fragments: List[str]) -> str: """ادغام هوشمند قطعات با در نظر گیری زمینه""" if len(fragments) == 1: return fragments[0] result = [fragments[0]] for i in range(1, len(fragments)): current_fragment = fragments[i] prev_fragment = result[-1] # بررسی ادامه جمله if self._should_continue_sentence(prev_fragment, current_fragment): result[-1] += ' ' + current_fragment # بررسی ادغام بدون فاصله (نیمفاصله) elif self._should_join_without_space(prev_fragment, current_fragment): result[-1] += current_fragment # شروع پاراگراف جدید elif self._is_new_paragraph(current_fragment): result.append('\n\n' + current_fragment) else: result.append(' ' + current_fragment) return ''.join(result) def _should_continue_sentence(self, prev: str, current: str) -> bool: """تشخیص ادامه جمله""" # کلمات ادامهدهنده continuation_words = ['که', 'تا', 'اگر', 'چون', 'زیرا', 'ولی', 'اما', 'و', 'یا'] # اگر جمله قبلی ناتمام باشد if not any(prev.endswith(end) for end in self.sentence_endings): return True # اگر جمله فعلی با کلمه ادامه شروع شود if any(current.strip().startswith(word) for word in continuation_words): return True return False def _should_join_without_space(self, prev: str, current: str) -> bool: """تشخیص ادغام بدون فاصله""" # پسوندها و پیشوندها suffixes = ['ها', 'های', 'ان', 'ات', 'ین', 'تان', 'شان', 'تون', 'شون'] prefixes = ['می', 'نمی', 'برمی', 'درمی'] current_stripped = current.strip() # بررسی پسوندها if any(current_stripped.startswith(suffix) for suffix in suffixes): return True # بررسی ادامه فعل if prev.endswith(('می', 'نمی', 'خواهد', 'است')): verb_continuations = ['شود', 'گردد', 'باشد', 'کرد', 'کند'] if any(current_stripped.startswith(cont) for cont in verb_continuations): return True return False def _is_new_paragraph(self, text: str) -> bool: """تشخیص شروع پاراگراف جدید""" text_stripped = text.strip() # شاخصهای پاراگراف در متون حقوقی paragraph_starters = [ 'ماده', 'تبصره', 'بند', 'فصل', 'باب', 'قسمت', 'کتاب', 'الف)', 'ب)', 'ج)', 'د)', 'ه)', 'و)', 'ز)', 'ح)', 'ط)', '۱-', '۲-', '۳-', '۴-', '۵-', '۶-', '۷-', '۸-', '۹-', '۱۰-' ] return any(text_stripped.startswith(starter) for starter in paragraph_starters) def _apply_legal_formatting(self, text: str) -> str: """اعمال قالببندی مخصوص اسناد حقوقی""" lines = text.split('\n') formatted_lines = [] for line in lines: line = line.strip() if not line: continue # قالببندی مواد و تبصرهها if any(line.startswith(indicator) for indicator in ['ماده', 'تبصره']): formatted_lines.append(f"\n{line}") # قالببندی بندها elif any(line.startswith(indicator) for indicator in ['الف)', 'ب)', 'ج)']): formatted_lines.append(f" {line}") # قالببندی فصول و ابواب elif any(line.startswith(indicator) for indicator in ['فصل', 'باب', 'کتاب']): formatted_lines.append(f"\n\n{line.upper()}\n") else: formatted_lines.append(line) return '\n'.join(formatted_lines) def extract_legal_entities(self, text: str) -> Dict[str, List[str]]: """استخراج موجودیتهای حقوقی از متن""" entities = { 'articles': [], 'citations': [], 'legal_terms': [], 'organizations': [], 'laws': [] } # استخراج مواد و تبصرهها for pattern in self.citation_patterns: matches = re.findall(pattern, text) if matches: entities['citations'].extend(matches) # استخراج اصطلاحات حقوقی for category, terms in PERSIAN_LEGAL_DICTIONARY.items(): found_terms = [term for term in terms if term in text] entities['legal_terms'].extend(found_terms) # استخراج نام قوانین law_patterns = [ r'قانون\s+([^۔\.\n]{5,50})', r'آییننامه\s+([^۔\.\n]{5,50})', r'مقرره\s+([^۔\.\n]{5,50})' ] for pattern in law_patterns: matches = re.findall(pattern, text) entities['laws'].extend(matches) return entities class AntiDDoSManager: """مدیریت ضد حملات DDoS و تنوع درخواستها""" def __init__(self): self.request_history = {} self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0' ] def get_request_delay(self, domain: str) -> float: """محاسبه تأخیر مناسب برای هر منبع""" source_info = self._identify_source(domain) if source_info and 'delay_range' in source_info: min_delay, max_delay = source_info['delay_range'] return random.uniform(min_delay, max_delay) # تأخیر پیشفرض return random.uniform(1, 3) def get_random_headers(self) -> Dict[str, str]: """تولید هدرهای تصادفی""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': random.choice(['fa,en;q=0.9', 'fa-IR,fa;q=0.9,en;q=0.8', 'fa;q=0.9,en;q=0.8']), 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': random.choice(['no-cache', 'max-age=0', 'no-store']) } def _identify_source(self, domain: str) -> Optional[Dict]: """شناسایی منبع بر اساس دامنه""" for source_name, config in LEGAL_SOURCES_CONFIG.items(): base_domain = config['base_url'].replace('https://', '').replace('http://', '') if base_domain in domain: return config return None def should_allow_request(self, domain: str) -> bool: """بررسی اینکه آیا درخواست مجاز است""" current_time = time.time() if domain not in self.request_history: self.request_history[domain] = [] # حذف درخواستهای قدیمی (بیش از 1 ساعت) self.request_history[domain] = [ req_time for req_time in self.request_history[domain] if current_time - req_time < 3600 ] # بررسی تعداد درخواستها در ساعت گذشته if len(self.request_history[domain]) >= 50: # حداکثر 50 درخواست در ساعت return False # ثبت درخواست جدید self.request_history[domain].append(current_time) return True class ModelManager: """مدیریت پیشرفته مدلهای هوش مصنوعی""" def __init__(self): self.models = {} self.model_status = {} def load_models_progressively(self, progress_callback=None) -> Dict[str, Any]: """بارگذاری تدریجی مدلها با مدیریت حافظه""" logger.info("🚀 شروع بارگذاری مدلهای هوش مصنوعی...") if progress_callback: progress_callback("آمادهسازی محیط...", 0.05) # ایجاد دایرکتوری کش cache_dir = "/tmp/hf_cache" os.makedirs(cache_dir, exist_ok=True) # فاز 1: Tokenizer try: if MemoryManager.check_memory_available(100): logger.info("📝 بارگذاری Tokenizer...") if progress_callback: progress_callback("بارگذاری Tokenizer...", 0.2) self.models['tokenizer'] = AutoTokenizer.from_pretrained( "HooshvareLab/bert-fa-base-uncased", cache_dir=cache_dir, local_files_only=False ) self.model_status['tokenizer'] = 'loaded' logger.info("✅ Tokenizer بارگذاری شد") else: self.model_status['tokenizer'] = 'memory_insufficient' except Exception as e: logger.error(f"❌ خطا در بارگذاری Tokenizer: {e}") self.model_status['tokenizer'] = 'failed' # فاز 2: مدل طبقهبندی متن try: if MemoryManager.check_memory_available(400): logger.info("🏷️ بارگذاری مدل طبقهبندی...") if progress_callback: progress_callback("بارگذاری مدل طبقهبندی...", 0.5) self.models['classifier'] = pipeline( "text-classification", model="HooshvareLab/bert-fa-base-uncased-clf-persiannews", tokenizer="HooshvareLab/bert-fa-base-uncased-clf-persiannews", device=-1, return_all_scores=True, model_kwargs={"cache_dir": cache_dir} ) self.model_status['classifier'] = 'loaded' logger.info("✅ مدل طبقهبندی بارگذاری شد") else: self.model_status['classifier'] = 'memory_insufficient' except Exception as e: logger.error(f"❌ خطا در بارگذاری مدل طبقهبندی: {e}") self.model_status['classifier'] = 'failed' # فاز 3: مدل تشخیص موجودیت try: if MemoryManager.check_memory_available(500): logger.info("👤 بارگذاری مدل NER...") if progress_callback: progress_callback("بارگذاری مدل تشخیص موجودیت...", 0.8) self.models['ner'] = pipeline( "ner", model="HooshvareLab/bert-fa-base-uncased-ner", tokenizer="HooshvareLab/bert-fa-base-uncased-ner", device=-1, aggregation_strategy="simple", model_kwargs={"cache_dir": cache_dir} ) self.model_status['ner'] = 'loaded' logger.info("✅ مدل NER بارگذاری شد") else: self.model_status['ner'] = 'memory_insufficient' except Exception as e: logger.error(f"❌ خطا در بارگذاری مدل NER: {e}") self.model_status['ner'] = 'failed' if progress_callback: progress_callback("تکمیل بارگذاری مدلها", 1.0) # پاکسازی حافظه MemoryManager.cleanup_memory() loaded_count = sum(1 for status in self.model_status.values() if status == 'loaded') logger.info(f"🎯 {loaded_count} مدل با موفقیت بارگذاری شد") return self.models def get_model_status(self) -> str: """دریافت وضعیت مدلها""" status_lines = [] status_icons = { 'loaded': '✅', 'failed': '❌', 'memory_insufficient': '⚠️', 'not_loaded': '⏳' } for model_name, status in self.model_status.items(): icon = status_icons.get(status, '❓') status_persian = { 'loaded': 'بارگذاری شده', 'failed': 'خطا در بارگذاری', 'memory_insufficient': 'حافظه ناکافی', 'not_loaded': 'بارگذاری نشده' }.get(status, 'نامشخص') status_lines.append(f"{icon} {model_name}: {status_persian}") memory_usage = MemoryManager.get_memory_usage() status_lines.append(f"\n💾 مصرف حافظه: {memory_usage:.1f} MB") return '\n'.join(status_lines) class LegalDocumentScraper: """استخراجکننده پیشرفته اسناد حقوقی""" def __init__(self, model_manager: ModelManager, text_processor: SmartTextProcessor): self.model_manager = model_manager self.text_processor = text_processor self.anti_ddos = AntiDDoSManager() self.session = self._create_session() def _create_session(self) -> requests.Session: """ایجاد session با تنظیمات بهینه""" session = requests.Session() # تنظیم adapter برای retry from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def scrape_legal_source(self, url: str, progress_callback=None) -> Dict[str, Any]: """استخراج هوشمند سند حقوقی""" try: domain = urlparse(url).netloc # بررسی مجوز if not self.anti_ddos.should_allow_request(domain): raise Exception("تعداد درخواستها از حد مجاز تجاوز کرده است") if progress_callback: progress_callback(f"🔗 اتصال به {domain}...", 0.1) # اعمال تأخیر ضد DDoS delay = self.anti_ddos.get_request_delay(domain) time.sleep(delay) # درخواست اصلی headers = self.anti_ddos.get_random_headers() response = self.session.get(url, headers=headers, timeout=30, allow_redirects=True) response.raise_for_status() response.encoding = 'utf-8' if progress_callback: progress_callback("📄 تجزیه محتوای HTML...", 0.3) # تجزیه HTML soup = BeautifulSoup(response.content, 'html.parser') self._clean_soup(soup) # شناسایی منبع source_info = self._identify_legal_source(url) if progress_callback: progress_callback("🎯 استخراج محتوای هدفمند...", 0.5) # استخراج محتوا content_fragments = self._extract_content_intelligently(soup, source_info) # بازسازی متن reconstructed_text = self.text_processor.reconstruct_legal_text(content_fragments) if not reconstructed_text or len(reconstructed_text.strip()) < 50: raise Exception("محتوای کافی استخراج نشد") if progress_callback: progress_callback("🔧 تحلیل و پردازش متن...", 0.7) # تحلیلهای پیشرفته quality_assessment = self._assess_content_quality(reconstructed_text) legal_entities = self.text_processor.extract_legal_entities(reconstructed_text) long_sentences = self.text_processor.detect_long_sentences(reconstructed_text) ai_analysis = self._apply_ai_analysis(reconstructed_text) # نتیجه نهایی result = { 'url': url, 'source_info': source_info, 'title': self._extract_title(soup), 'content': reconstructed_text, 'word_count': len(reconstructed_text.split()), 'character_count': len(reconstructed_text), 'quality_assessment': quality_assessment, 'legal_entities': legal_entities, 'long_sentences': long_sentences, 'ai_analysis': ai_analysis, 'extraction_metadata': { 'method': 'advanced_extraction', 'fragments_count': len(content_fragments), 'response_size': len(response.content), 'encoding': response.encoding, 'anti_ddos_delay': delay }, 'timestamp': datetime.now().isoformat(), 'status': 'موفق' } if progress_callback: progress_callback("✅ استخراج با موفقیت تکمیل شد", 1.0) return result except requests.RequestException as e: return self._create_error_result(url, f"خطای شبکه: {str(e)}") except Exception as e: return self._create_error_result(url, f"خطای پردازش: {str(e)}") def _identify_legal_source(self, url: str) -> Dict[str, Any]: """شناسایی منبع حقوقی""" domain = urlparse(url).netloc for source_name, config in LEGAL_SOURCES_CONFIG.items(): if config['base_url'].replace('https://', '').replace('http://', '') in domain: return { 'name': source_name, 'type': 'official', 'credibility': 'high', 'config': config } # بررسی الگوهای عمومی سایتهای حقوقی legal_indicators = ['law', 'legal', 'court', 'judiciary', 'قانون', 'حقوق', 'دادگاه'] if any(indicator in domain.lower() for indicator in legal_indicators): return { 'name': 'منبع حقوقی شناخته نشده', 'type': 'legal_related', 'credibility': 'medium', 'config': {'max_depth': 1, 'delay_range': (2, 4)} } return { 'name': 'منبع عمومی', 'type': 'general', 'credibility': 'low', 'config': {'max_depth': 0, 'delay_range': (3, 6)} } def _clean_soup(self, soup: BeautifulSoup) -> None: """پاکسازی پیشرفته HTML""" # حذف عناصر غیرضروری unwanted_tags = [ 'script', 'style', 'nav', 'footer', 'header', 'aside', 'advertisement', 'ads', 'sidebar', 'menu', 'breadcrumb', 'social', 'share', 'comment', 'popup', 'modal' ] for tag in unwanted_tags: for element in soup.find_all(tag): element.decompose() # حذف عناصر با کلاسهای مشخص unwanted_classes = [ 'ad', 'ads', 'advertisement', 'sidebar', 'menu', 'nav', 'footer', 'header', 'social', 'share', 'comment', 'popup' ] for class_name in unwanted_classes: for element in soup.find_all(class_=lambda x: x and class_name in ' '.join(x).lower()): element.decompose() def _extract_content_intelligently(self, soup: BeautifulSoup, source_info: Dict) -> List[str]: """استخراج هوشمند محتوا""" fragments = [] # استراتژی 1: استفاده از تنظیمات منبع شناخته شده if source_info.get('config') and source_info['config'].get('selectors'): for selector in source_info['config']['selectors']: elements = soup.select(selector) for element in elements: text = self._extract_clean_text(element) if self._is_valid_content(text): fragments.append(text) # استراتژی 2: جستجوی محتوای اصلی if not fragments: main_selectors = [ 'main', 'article', '.main-content', '.content', '.post-content', '.article-content', '.news-content', '.law-content', '.legal-text', '#main', '#content', '#article', '.document-content' ] for selector in main_selectors: elements = soup.select(selector) for element in elements: text = self._extract_clean_text(element) if self._is_valid_content(text): fragments.append(text) if fragments: break # استراتژی 3: استخراج از پاراگرافها if not fragments: for tag in ['p', 'div', 'section', 'article']: elements = soup.find_all(tag) for element in elements: text = self._extract_clean_text(element) if self._is_valid_content(text, min_length=30): fragments.append(text) # استراتژی 4: fallback به body if not fragments: body = soup.find('body') if body: text = self._extract_clean_text(body) if text: paragraphs = [p.strip() for p in text.split('\n') if p.strip()] fragments.extend(p for p in paragraphs if self._is_valid_content(p, min_length=20)) return fragments def _extract_clean_text(self, element) -> str: """استخراج متن تمیز""" if not element: return "" # حذف عناصر تودرتو غیرضروری for unwanted in element.find_all(['script', 'style', 'nav', 'aside']): unwanted.decompose() text = element.get_text(separator=' ', strip=True) text = re.sub(r'\s+', ' ', text) return text.strip() def _is_valid_content(self, text: str, min_length: int = 50) -> bool: """بررسی اعتبار محتوا""" if not text or len(text) < min_length: return False # بررسی نسبت کاراکترهای فارسی persian_chars = sum(1 for c in text if '\u0600' <= c <= '\u06FF') persian_ratio = persian_chars / len(text) if text else 0 if persian_ratio < 0.2: return False # بررسی کلمات کلیدی حقوقی legal_keywords = ['قانون', 'ماده', 'تبصره', 'مقرره', 'آییننامه', 'دادگاه', 'حکم', 'رای'] has_legal_content = any(keyword in text for keyword in legal_keywords) return has_legal_content or persian_ratio > 0.5 def _extract_title(self, soup: BeautifulSoup) -> str: """استخراج عنوان""" title_selectors = [ 'h1', 'h2', 'title', '.page-title', '.article-title', '.post-title', '.document-title', '.news-title', '.law-title' ] for selector in title_selectors: element = soup.select_one(selector) if element: title = element.get_text(strip=True) if title and 5 < len(title) < 200: return re.sub(r'\s+', ' ', title) return "بدون عنوان" def _assess_content_quality(self, text: str) -> Dict[str, Any]: """ارزیابی کیفیت محتوا""" assessment = { 'overall_score': 0, 'factors': {}, 'issues': [], 'strengths': [] } if not text: assessment['issues'].append('متن خالی') return assessment factors = {} # طول متن word_count = len(text.split()) if word_count >= 100: factors['length'] = min(100, word_count / 10) assessment['strengths'].append('طول مناسب متن') else: factors['length'] = word_count assessment['issues'].append('متن کوتاه') # نسبت فارسی persian_chars = sum(1 for c in text if '\u0600' <= c <= '\u06FF') persian_ratio = persian_chars / len(text) if text else 0 factors['persian_content'] = persian_ratio * 100 if persian_ratio >= 0.7: assessment['strengths'].append('محتوای فارسی غنی') elif persian_ratio < 0.3: assessment['issues'].append('محتوای فارسی کم') # محتوای حقوقی legal_terms_count = 0 for category, terms in PERSIAN_LEGAL_DICTIONARY.items(): legal_terms_count += sum(1 for term in terms if term in text) factors['legal_content'] = min(100, legal_terms_count * 5) if legal_terms_count >= 10: assessment['strengths'].append('غنی از اصطلاحات حقوقی') elif legal_terms_count < 3: assessment['issues'].append('فقر اصطلاحات حقوقی') # ساختار متن structure_score = 0 if 'ماده' in text: structure_score += 20 if any(indicator in text for indicator in ['تبصره', 'بند', 'فصل']): structure_score += 15 if re.search(r'[۰-۹]+', text): structure_score += 10 factors['structure'] = structure_score if structure_score >= 30: assessment['strengths'].append('ساختار منظم حقوقی') # محاسبه امتیاز کلی assessment['factors'] = factors assessment['overall_score'] = sum(factors.values()) / len(factors) if factors else 0 assessment['overall_score'] = min(100, max(0, assessment['overall_score'])) return assessment def _apply_ai_analysis(self, text: str) -> Dict[str, Any]: """اعمال تحلیل هوش مصنوعی""" analysis = { 'classification': None, 'entities': [], 'confidence_scores': {}, 'model_performance': {} } if not text or len(text.split()) < 10: return analysis # آمادهسازی متن text_sample = ' '.join(text.split()[:300]) # طبقهبندی متن if 'classifier' in self.model_manager.models: try: start_time = time.time() classification_result = self.model_manager.models['classifier'](text_sample) if classification_result: analysis['classification'] = classification_result[:3] analysis['confidence_scores']['classification'] = classification_result[0]['score'] analysis['model_performance']['classification_time'] = time.time() - start_time except Exception as e: logger.error(f"خطا در طبقهبندی: {e}") # تشخیص موجودیت if 'ner' in self.model_manager.models: try: start_time = time.time() ner_result = self.model_manager.models['ner'](text_sample[:1000]) if ner_result: high_confidence_entities = [ entity for entity in ner_result if entity.get('score', 0) > 0.5 ] analysis['entities'] = high_confidence_entities[:15] analysis['model_performance']['ner_time'] = time.time() - start_time except Exception as e: logger.error(f"خطا در تشخیص موجودیت: {e}") return analysis def _create_error_result(self, url: str, error_message: str) -> Dict[str, Any]: """ایجاد نتیجه خطا""" return { 'url': url, 'error': error_message, 'status': 'ناموفق', 'timestamp': datetime.now().isoformat(), 'content': '', 'word_count': 0, 'quality_assessment': {'overall_score': 0, 'issues': [error_message]} } class PersianLegalScraperApp: """اپلیکیشن اصلی با رابط کاربری پیشرفته""" def __init__(self): self.text_processor = SmartTextProcessor() self.model_manager = ModelManager() self.scraper = None self.results = [] self.processing_stats = { 'total_processed': 0, 'successful': 0, 'failed': 0, 'total_words': 0 } # مقداردهی اولیه self._initialize_system() def _initialize_system(self): """مقداردهی سیستم""" try: logger.info("🚀 مقداردهی سیستم...") self.model_manager.load_models_progressively() self.scraper = LegalDocumentScraper(self.model_manager, self.text_processor) logger.info("✅ سیستم آماده است") except Exception as e: logger.error(f"❌ خطا در مقداردهی: {e}") def process_single_url(self, url: str, progress=gr.Progress()) -> Tuple[str, str, str]: """پردازش یک URL""" if not url or not url.strip(): return "❌ خطا: آدرس معتبری وارد کنید", "", "" url = url.strip() if not url.startswith(('http://', 'https://')): url = 'https://' + url try: progress(0.0, desc="شروع پردازش...") def progress_callback(message: str, value: float): progress(value, desc=message) result = self.scraper.scrape_legal_source(url, progress_callback) if result.get('status') == 'موفق': # بهروزرسانی آمار self.processing_stats['total_processed'] += 1 self.processing_stats['successful'] += 1 self.processing_stats['total_words'] += result.get('word_count', 0) # ذخیره نتیجه self.results.append(result) # تنظیم خروجیها status_text = self._format_single_result(result) analysis_text = self._format_analysis_result(result) content_text = result.get('content', '') return status_text, content_text, analysis_text else: self.processing_stats['total_processed'] += 1 self.processing_stats['failed'] += 1 error_msg = result.get('error', 'خطای نامشخص') return f"❌ خطا: {error_msg}", "", "" except Exception as e: self.processing_stats['total_processed'] += 1 self.processing_stats['failed'] += 1 logger.error(f"خطا در پردازش: {e}") return f"❌ خطای سیستمی: {str(e)}", "", "" def _format_single_result(self, result: Dict[str, Any]) -> str: """قالببندی نتیجه با آیکونهای SVG""" quality = result.get('quality_assessment', {}) source_info = result.get('source_info', {}) legal_entities = result.get('legal_entities', {}) long_sentences = result.get('long_sentences', []) lines = [ f"{SVG_ICONS['success']} **وضعیت**: {result.get('status')}", f"{SVG_ICONS['document']} **عنوان**: {result.get('title', 'بدون عنوان')}", f"🏛️ **منبع**: {source_info.get('name', 'نامشخص')} ({source_info.get('credibility', 'نامشخص')})", f"📊 **آمار محتوا**: {result.get('word_count', 0):,} کلمه، {result.get('character_count', 0):,} کاراکتر", f"🎯 **کیفیت کلی**: {quality.get('overall_score', 0):.1f}/100", f"📈 **محتوای فارسی**: {quality.get('factors', {}).get('persian_content', 0):.1f}%", f"⚖️ **محتوای حقوقی**: {quality.get('factors', {}).get('legal_content', 0):.1f}/100", f"📚 **ارجاعات حقوقی**: {len(legal_entities.get('citations', []))} مورد", f"🏷️ **اصطلاحات حقوقی**: {len(legal_entities.get('legal_terms', []))} مورد" ] # جملات طولانی if long_sentences: lines.append(f"{SVG_ICONS['warning']} **جملات طولانی**: {len(long_sentences)} مورد") # نقاط قوت strengths = quality.get('strengths', []) if strengths: lines.append(f"\n✨ **نقاط قوت**: {' | '.join(strengths)}") # مشکلات issues = quality.get('issues', []) if issues: lines.append(f"{SVG_ICONS['warning']} **نکات**: {' | '.join(issues)}") lines.append(f"🕐 **زمان**: {result.get('timestamp', '')[:19]}") return '\n'.join(lines) def _format_analysis_result(self, result: Dict[str, Any]) -> str: """قالببندی تحلیل هوش مصنوعی""" ai_analysis = result.get('ai_analysis', {}) legal_entities = result.get('legal_entities', {}) long_sentences = result.get('long_sentences', []) lines = [ f"{SVG_ICONS['analyze']} **تحلیل هوش مصنوعی**\n", f"📊 **وضعیت مدلها**: {self.model_manager.get_model_status()}\n" ] # طبقهبندی classification = ai_analysis.get('classification') if classification: lines.append("🏷️ **طبقهبندی محتوا**:") for i, item in enumerate(classification[:3], 1): label = item.get('label', 'نامشخص') score = item.get('score', 0) lines.append(f"{i}. {label}: {score:.1%}") # موجودیتهای شناسایی شده entities = ai_analysis.get('entities', []) if entities: lines.append("\n👥 **موجودیتهای شناسایی شده**:") for entity in entities[:8]: # 8 مورد اول word = entity.get('word', '') label = entity.get('entity_group', '') score = entity.get('score', 0) lines.append(f"• {word} ({label}): {score:.1%}") # ارجاعات حقوقی citations = legal_entities.get('citations', []) if citations: lines.append(f"\n📚 **ارجاعات حقوقی**: {len(citations)} مورد") unique_citations = list(set(citations))[:10] lines.append(f"نمونه: {', '.join(unique_citations)}") # قوانین شناسایی شده laws = legal_entities.get('laws', []) if laws: lines.append(f"\n⚖️ **قوانین شناسایی شده**: {len(laws)} مورد") for law in laws[:3]: lines.append(f"• {law}") # جملات طولانی if long_sentences: lines.append(f"\n{SVG_ICONS['warning']} **جملات طولانی شناسایی شده**:") for i, sentence_info in enumerate(long_sentences[:3], 1): word_count = sentence_info.get('word_count', 0) suggestions = sentence_info.get('suggestions', []) lines.append(f"{i}. {word_count} کلمه - {', '.join(suggestions[:2])}") # عملکرد مدل performance = ai_analysis.get('model_performance', {}) if performance: lines.append(f"\n⏱️ **عملکرد**: ") if 'classification_time' in performance: lines.append(f"طبقهبندی: {performance['classification_time']:.2f}s") if 'ner_time' in performance: lines.append(f"تشخیص موجودیت: {performance['ner_time']:.2f}s") return '\n'.join(lines) def process_multiple_urls(self, urls_text: str, progress=gr.Progress()) -> Tuple[str, str]: """پردازش چندین URL""" if not urls_text or not urls_text.strip(): return "❌ لطفا لیست آدرسها را وارد کنید", "" urls = [url.strip() for url in urls_text.split('\n') if url.strip()] if not urls: return "❌ آدرس معتبری یافت نشد", "" # محدودیت تعداد برای HF Spaces if len(urls) > 10: urls = urls[:10] warning_msg = f"{SVG_ICONS['warning']} به دلیل محدودیتها، تنها 10 آدرس اول پردازش میشود.\n\n" else: warning_msg = "" results = [] total_urls = len(urls) try: progress(0.0, desc=f"شروع پردازش {total_urls} آدرس...") for i, url in enumerate(urls): if not url.startswith(('http://', 'https://')): url = 'https://' + url progress_value = i / total_urls progress(progress_value, desc=f"پردازش {i+1} از {total_urls}: {url[:50]}...") def progress_callback(message: str, value: float): overall_progress = progress_value + (value * (1/total_urls)) progress(overall_progress, desc=f"{message} ({i+1}/{total_urls})") result = self.scraper.scrape_legal_source(url, progress_callback) results.append(result) # بهروزرسانی آمار self.processing_stats['total_processed'] += 1 if result.get('status') == 'موفق': self.processing_stats['successful'] += 1 self.processing_stats['total_words'] += result.get('word_count', 0) else: self.processing_stats['failed'] += 1 # پاکسازی حافظه هر 3 درخواست if (i + 1) % 3 == 0: MemoryManager.cleanup_memory() time.sleep(1) # کمی استراحت progress(1.0, desc="تکمیل پردازش") # ذخیره نتایج self.results.extend(results) # تنظیم خروجیها summary_text = warning_msg + self._format_batch_summary(results) detailed_results = self._format_batch_details(results) return summary_text, detailed_results except Exception as e: logger.error(f"خطا در پردازش دستهای: {e}") return f"❌ خطای سیستمی: {str(e)}", "" def _format_batch_summary(self, results: List[Dict[str, Any]]) -> str: """خلاصه پردازش دستهای با آیکونها""" total = len(results) successful = sum(1 for r in results if r.get('status') == 'موفق') failed = total - successful if successful > 0: total_words = sum(r.get('word_count', 0) for r in results if r.get('status') == 'موفق') avg_quality = sum(r.get('quality_assessment', {}).get('overall_score', 0) for r in results if r.get('status') == 'موفق') / successful else: total_words = 0 avg_quality = 0 lines = [ f"{SVG_ICONS['analyze']} **خلاصه پردازش دستهای**", f"📈 **کل آدرسها**: {total}", f"{SVG_ICONS['success']} **موفق**: {successful} ({successful/total*100:.1f}%)", f"{SVG_ICONS['error']} **ناموفق**: {failed} ({failed/total*100:.1f}%)", f"📝 **کل کلمات**: {total_words:,}", f"🎯 **میانگین کیفیت**: {avg_quality:.1f}/100", f"💾 **حافظه**: {MemoryManager.get_memory_usage():.1f} MB", f"🕐 **زمان**: {datetime.now().strftime('%H:%M:%S')}" ] return '\n'.join(lines) def _format_batch_details(self, results: List[Dict[str, Any]]) -> str: """جزئیات نتایج دستهای""" lines = [] for i, result in enumerate(results, 1): url = result.get('url', '') status = result.get('status', '') if status == 'موفق': title = result.get('title', 'بدون عنوان') word_count = result.get('word_count', 0) quality = result.get('quality_assessment', {}).get('overall_score', 0) source = result.get('source_info', {}).get('name', 'نامشخص') lines.extend([ f"\n**{i}. {SVG_ICONS['success']} {title}**", f"{SVG_ICONS['link']} {url[:70]}{'...' if len(url) > 70 else ''}", f"🏛️ منبع: {source}", f"📊 {word_count:,} کلمه | کیفیت: {quality:.1f}/100" ]) else: error = result.get('error', 'خطای نامشخص') lines.extend([ f"\n**{i}. {SVG_ICONS['error']} ناموفق**", f"{SVG_ICONS['link']} {url[:70]}{'...' if len(url) > 70 else ''}", f"❗ {error[:80]}{'...' if len(error) > 80 else ''}" ]) return '\n'.join(lines) def export_results(self) -> Tuple[str, Optional[str]]: """صادرات نتایج""" if not self.results: return f"{SVG_ICONS['error']} نتیجهای برای صادرات وجود ندارد", None try: successful_results = [r for r in self.results if r.get('status') == 'موفق'] if not successful_results: return f"{SVG_ICONS['error']} نتیجه موفقی برای صادرات وجود ندارد", None export_data = [] for result in successful_results: quality = result.get('quality_assessment', {}) source_info = result.get('source_info', {}) ai_analysis = result.get('ai_analysis', {}) # استخراج اطلاعات طبقهبندی classification = ai_analysis.get('classification', []) top_class = classification[0].get('label', '') if classification else '' export_data.append({ 'آدرس': result.get('url', ''), 'عنوان': result.get('title', ''), 'منبع': source_info.get('name', ''), 'اعتبار منبع': source_info.get('credibility', ''), 'تعداد کلمات': result.get('word_count', 0), 'کیفیت کلی': round(quality.get('overall_score', 0), 1), 'محتوای فارسی (%)': round(quality.get('factors', {}).get('persian_content', 0), 1), 'محتوای حقوقی': round(quality.get('factors', {}).get('legal_content', 0), 1), 'طبقهبندی AI': top_class, 'زمان استخراج': result.get('timestamp', ''), 'محتوا': result.get('content', '')[:2000] + '...' if len(result.get('content', '')) > 2000 else result.get('content', '') }) df = pd.DataFrame(export_data) # ذخیره فایل timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = f"/tmp/legal_scraping_results_{timestamp}.csv" df.to_csv(csv_path, index=False, encoding='utf-8-sig') summary = f"{SVG_ICONS['export']} {len(export_data)} سند با موفقیت صادر شد" return summary, csv_path except Exception as e: logger.error(f"خطا در صادرات: {e}") return f"{SVG_ICONS['error']} خطا در صادرات: {str(e)}", None def get_system_status(self) -> str: """وضعیت سیستم""" model_status = self.model_manager.get_model_status() memory_usage = MemoryManager.get_memory_usage() lines = [ f"{SVG_ICONS['settings']} **وضعیت سیستم**\n", f"💾 **حافظه**: {memory_usage:.1f} MB", f"📊 **آمار کلی**:", f" • کل پردازش شده: {self.processing_stats['total_processed']}", f" • موفق: {self.processing_stats['successful']}", f" • ناموفق: {self.processing_stats['failed']}", f" • کل کلمات: {self.processing_stats['total_words']:,}", f"\n🤖 **مدلها**:", model_status, f"\n⏰ **آخرین بروزرسانی**: {datetime.now().strftime('%Y/%m/%d %H:%M:%S')}" ] return '\n'.join(lines) def clear_results(self) -> str: """پاکسازی نتایج و حافظه""" self.results.clear() self.processing_stats = { 'total_processed': 0, 'successful': 0, 'failed': 0, 'total_words': 0 } MemoryManager.cleanup_memory() return f"{SVG_ICONS['success']} نتایج و حافظه پاکسازی شد" def create_interface(self): """ایجاد رابط کاربری Gradio""" # CSS سفارشی برای RTL و فونت فارسی custom_css = """ .rtl { direction: rtl; text-align: right; font-family: 'Vazirmatn', 'Tahoma', sans-serif; } .persian-title { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; text-align: center; margin-bottom: 20px; font-family: 'Vazirmatn', 'Tahoma', sans-serif; } .status-box { border: 1px solid #ddd; border-radius: 8px; padding: 15px; background-color: #f9f9f9; direction: rtl; font-family: 'Vazirmatn', 'Tahoma', sans-serif; } .gradio-container { font-family: 'Vazirmatn', 'Tahoma', sans-serif !important; } """ with gr.Blocks( title="سیستم پیشرفته استخراج و تحلیل اسناد حقوقی فارسی", css=custom_css, theme=gr.themes.Soft() ) as interface: # عنوان اصلی gr.HTML(f"""
{SVG_ICONS['analyze']} مجهز به مدلهای BERT فارسی | 📊 تحلیل هوشمند محتوا | ⚡ بهینهسازی شده برای Hugging Face Spaces
🎯 منابع معتبر: مجلس، قوه قضاییه، وزارت دادگستری، دیوان عدالت اداری