""" پلتفرم پیشرفته هوشمند اسناد حقوقی ایران - نسخه ارتقاء یافته مجهز به مدل‌های SOTA فارسی، سیستم کش هوشمند و امتیازدهی پیشرفته """ import os import gc import sys import time import json import logging import hashlib import resource import requests import threading import re import random import sqlite3 import pickle import tempfile from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Tuple, Union from dataclasses import dataclass, field from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from urllib.parse import urljoin, urlparse import warnings import asyncio from functools import lru_cache import numpy as np import gradio as gr import pandas as pd import torch from bs4 import BeautifulSoup from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, logging as transformers_logging, AutoModel ) from sentence_transformers import SentenceTransformer, util from hazm import Normalizer, word_tokenize, Lemmatizer import faiss # === تنظیمات اولیه پیشرفته === warnings.filterwarnings('ignore') transformers_logging.set_verbosity_error() try: resource.setrlimit(resource.RLIMIT_AS, (4*1024*1024*1024, 4*1024*1024*1024)) except: pass # ایجاد دایرکتوری‌های مورد نیاز WORK_DIR = Path("/tmp/legal_scraper_data") WORK_DIR.mkdir(exist_ok=True, parents=True) os.environ.update({ 'TRANSFORMERS_CACHE': str(WORK_DIR / 'hf_cache'), 'HF_HOME': str(WORK_DIR / 'hf_cache'), 'TORCH_HOME': str(WORK_DIR / 'torch_cache'), 'TOKENIZERS_PARALLELISM': 'false', 'CUDA_VISIBLE_DEVICES': '0' if torch.cuda.is_available() else '' }) # ایجاد دایرکتوری‌های کش for cache_dir in [os.environ['TRANSFORMERS_CACHE'], os.environ['HF_HOME'], os.environ['TORCH_HOME']]: os.makedirs(cache_dir, exist_ok=True) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # === ثابت‌های سیستم با مسیرهای اصلاح شده === DB_PATH = str(WORK_DIR / "iranian_legal_archive_advanced.sqlite") CACHE_DB_PATH = str(WORK_DIR / "cache_system.sqlite") EMBEDDINGS_CACHE_PATH = str(WORK_DIR / "embeddings_cache.pkl") VECTOR_INDEX_PATH = str(WORK_DIR / "faiss_index.bin") # آیکون‌های بهبود یافته SVG_ICONS = { 'search': '🔍', 'document': '📄', 'analyze': '🤖', 'export': '📊', 'settings': '⚙️', 'link': '🔗', 'success': '✅', 'error': '❌', 'warning': '⚠️', 'database': '🗄️', 'crawler': '🔄', 'brain': '🧠', 'cache': '⚡', 'score': '📈', 'classify': '🏷️', 'similarity': '🎯', 'legal': '⚖️', 'home': '🏠', 'stats': '📈', 'process': '🔧' } # منابع حقوقی با پیکربندی پیشرفته LEGAL_SOURCES = { "مجلس شورای اسلامی": { "base_url": "https://rc.majlis.ir", "patterns": ["/fa/law/show/", "/fa/report/show/"], "selectors": [".main-content", ".article-body", "article"], "delay_range": (2, 5), "priority": 1, "reliability_score": 0.95 }, "پورتال ملی قوانین": { "base_url": "https://www.dotic.ir", "patterns": ["/portal/law/", "/regulation/"], "selectors": [".content-area", ".law-content"], "delay_range": (1, 4), "priority": 1, "reliability_score": 0.90 }, "قوه قضاییه": { "base_url": "https://www.judiciary.ir", "patterns": ["/fa/news/", "/fa/verdict/"], "selectors": [".news-content", ".main-content"], "delay_range": (3, 6), "priority": 2, "reliability_score": 0.85 }, "وزارت دادگستری": { "base_url": "https://www.moj.ir", "patterns": ["/fa/news/", "/fa/regulation/"], "selectors": [".content-area", ".news-content"], "delay_range": (2, 4), "priority": 2, "reliability_score": 0.80 } } # واژگان تخصصی حقوقی گسترده PERSIAN_LEGAL_TERMS = { "قوانین_اساسی": ["قانون اساسی", "مجلس شورای اسلامی", "شورای نگهبان", "ولایت فقیه", "اصول قانون اساسی"], "قوانین_عادی": ["ماده", "تبصره", "فصل", "باب", "قانون مدنی", "قانون جزا", "قانون تجارت", "قانون کار"], "اصطلاحات_حقوقی": ["شخص حقیقی", "شخص حقوقی", "دعوا", "خواهان", "خوانده", "مجازات", "قرارداد", "تعهد"], "نهادهای_قضایی": ["دادگاه", "قاضی", "وکیل", "مدعی‌العموم", "رای", "حکم", "دادنامه", "قرار"], "اصطلاحات_مالی": ["مالیات", "عوارض", "جریمه", "خسارت", "تأمین", "ضمانت", "وثیقه", "دیه"], "جرائم": ["جنایت", "جنحه", "تخلف", "قصاص", "دیه", "تعزیر", "حدود", "قذف"] } # مدل‌های SOTA پیشنهادی AVAILABLE_MODELS = { "classification": { "fabert": "sbunlp/fabert", "parsbert": "HooshvareLab/bert-base-parsbert-uncased", "legal_roberta": "lexlms/legal-roberta-base" }, "embedding": { "maux_gte": "xmanii/maux-gte-persian", "sentence_transformer": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" }, "ner": { "parsbert_ner": "HooshvareLab/bert-fa-base-uncased-ner" } } @dataclass class ProcessingResult: """نتیجه پردازش سند""" url: str title: str content: str source: str quality_score: float classification: Dict[str, float] sentiment_score: float legal_entities: List[str] embeddings: Optional[np.ndarray] processing_time: float cache_hit: bool = False @dataclass class SystemMetrics: """متریک‌های عملکرد سیستم""" total_processed: int = 0 cache_hits: int = 0 classification_accuracy: float = 0.0 avg_processing_time: float = 0.0 memory_usage: float = 0.0 active_crawlers: int = 0 # === سیستم کش هوشمند === class IntelligentCacheSystem: """سیستم کش هوشمند با TTL و اولویت‌بندی""" def __init__(self, cache_db_path: str = CACHE_DB_PATH): self.cache_db_path = cache_db_path self.memory_cache = {} self.access_count = {} self.max_memory_items = 1000 self._init_database() def _init_database(self): """ایجاد پایگاه داده کش""" try: with sqlite3.connect(self.cache_db_path) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS cache_entries ( key TEXT PRIMARY KEY, value BLOB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, access_count INTEGER DEFAULT 1, ttl_seconds INTEGER DEFAULT 3600, priority INTEGER DEFAULT 1 ) ''') conn.execute(''' CREATE INDEX IF NOT EXISTS idx_created_ttl ON cache_entries(created_at, ttl_seconds) ''') except Exception as e: logger.error(f"خطا در ایجاد پایگاه داده کش: {e}") def _generate_key(self, url: str, model_type: str = "general") -> str: """تولید کلید یکتا برای کش""" content = f"{url}:{model_type}" return hashlib.md5(content.encode()).hexdigest() def get(self, url: str, model_type: str = "general") -> Optional[Dict]: """دریافت از کش با بررسی TTL""" key = self._generate_key(url, model_type) # بررسی memory cache if key in self.memory_cache: self.access_count[key] = self.access_count.get(key, 0) + 1 return self.memory_cache[key] # بررسی database cache try: with sqlite3.connect(self.cache_db_path) as conn: cursor = conn.execute(''' SELECT value, created_at, ttl_seconds, access_count FROM cache_entries WHERE key = ? ''', (key,)) row = cursor.fetchone() if row: value_blob, created_at, ttl_seconds, access_count = row created_time = datetime.fromisoformat(created_at) # بررسی انقضاء if datetime.now() - created_time < timedelta(seconds=ttl_seconds): # بروزرسانی شمارنده دسترسی conn.execute( 'UPDATE cache_entries SET access_count = access_count + 1 WHERE key = ?', (key,) ) # اضافه به memory cache value = pickle.loads(value_blob) self._add_to_memory_cache(key, value) return value else: # حذف entry منقضی conn.execute('DELETE FROM cache_entries WHERE key = ?', (key,)) except Exception as e: logger.error(f"خطا در دریافت از کش: {e}") return None def set(self, url: str, value: Dict, model_type: str = "general", ttl_seconds: int = 3600, priority: int = 1): """ذخیره در کش""" key = self._generate_key(url, model_type) # ذخیره در memory cache self._add_to_memory_cache(key, value) # ذخیره در database try: with sqlite3.connect(self.cache_db_path) as conn: value_blob = pickle.dumps(value) conn.execute(''' INSERT OR REPLACE INTO cache_entries (key, value, ttl_seconds, priority) VALUES (?, ?, ?, ?) ''', (key, value_blob, ttl_seconds, priority)) except Exception as e: logger.error(f"خطا در ذخیره در کش: {e}") def _add_to_memory_cache(self, key: str, value: Dict): """اضافه کردن به memory cache با مدیریت حد""" if len(self.memory_cache) >= self.max_memory_items: # حذف کم‌استفاده‌ترین item if self.access_count: least_used_key = min(self.access_count.keys(), key=self.access_count.get) if least_used_key in self.memory_cache: del self.memory_cache[least_used_key] if least_used_key in self.access_count: del self.access_count[least_used_key] self.memory_cache[key] = value self.access_count[key] = self.access_count.get(key, 0) + 1 def cleanup_expired(self): """پاکسازی entries منقضی""" try: with sqlite3.connect(self.cache_db_path) as conn: conn.execute(''' DELETE FROM cache_entries WHERE datetime(created_at, '+' || ttl_seconds || ' seconds') < datetime('now') ''') conn.commit() except Exception as e: logger.error(f"خطا در پاکسازی کش: {e}") def get_stats(self) -> Dict: """آمار کش""" try: with sqlite3.connect(self.cache_db_path) as conn: cursor = conn.execute(''' SELECT COUNT(*) as total_entries, SUM(access_count) as total_accesses, AVG(access_count) as avg_accesses FROM cache_entries ''') stats = cursor.fetchone() return { 'memory_cache_size': len(self.memory_cache), 'database_entries': stats[0] if stats and stats[0] else 0, 'total_accesses': stats[1] if stats and stats[1] else 0, 'average_accesses': round(stats[2], 2) if stats and stats[2] else 0 } except Exception as e: logger.error(f"خطا در دریافت آمار کش: {e}") return { 'memory_cache_size': len(self.memory_cache), 'database_entries': 0, 'total_accesses': 0, 'average_accesses': 0 } # === سیستم امتیازدهی پیشرفته === class AdvancedScoringSystem: """سیستم امتیازدهی پیشرفته با وزن‌دهی چندگانه""" def __init__(self): self.weights = { 'content_length': 0.15, 'legal_terms_density': 0.25, 'source_reliability': 0.20, 'structure_quality': 0.15, 'linguistic_quality': 0.15, 'citation_count': 0.10 } try: self.normalizer = Normalizer() self.lemmatizer = Lemmatizer() except Exception as e: logger.warning(f"خطا در بارگذاری ابزارهای پردازش متن: {e}") self.normalizer = None self.lemmatizer = None def calculate_comprehensive_score(self, content: str, source_info: Dict, legal_entities: List[str]) -> Dict[str, float]: """محاسبه امتیاز جامع""" scores = {} try: # امتیاز طول محتوا scores['content_length'] = self._score_content_length(content) # تراکم اصطلاحات حقوقی scores['legal_terms_density'] = self._score_legal_terms_density(content) # قابلیت اعتماد منبع scores['source_reliability'] = source_info.get('reliability_score', 0.5) # کیفیت ساختار scores['structure_quality'] = self._score_structure_quality(content) # کیفیت زبانی scores['linguistic_quality'] = self._score_linguistic_quality(content) # تعداد ارجاعات scores['citation_count'] = self._score_citations(content, legal_entities) # محاسبه امتیاز نهایی final_score = sum( scores[factor] * self.weights[factor] for factor in scores ) scores['final_score'] = min(100, max(0, final_score * 100)) except Exception as e: logger.error(f"خطا در محاسبه امتیاز: {e}") scores = { 'content_length': 0.5, 'legal_terms_density': 0.5, 'source_reliability': 0.5, 'structure_quality': 0.5, 'linguistic_quality': 0.5, 'citation_count': 0.5, 'final_score': 50.0 } return scores def _score_content_length(self, content: str) -> float: """امتیازدهی بر اساس طول محتوا""" try: word_count = len(content.split()) if word_count < 50: return word_count / 50 * 0.5 elif word_count > 2000: return 1.0 else: return 0.5 + (word_count - 50) / 1950 * 0.5 except: return 0.5 def _score_legal_terms_density(self, content: str) -> float: """امتیازدهی تراکم اصطلاحات حقوقی""" try: total_terms = 0 content_lower = content.lower() for category, terms in PERSIAN_LEGAL_TERMS.items(): for term in terms: total_terms += content_lower.count(term.lower()) words = len(content.split()) if words == 0: return 0.0 density = total_terms / words return min(1.0, density * 20) # نرمال‌سازی except: return 0.5 def _score_structure_quality(self, content: str) -> float: """امتیازدهی کیفیت ساختار""" try: score = 0.0 # بررسی وجود ساختار مواد و تبصره‌ها if 'ماده' in content: score += 0.3 if 'تبصره' in content: score += 0.2 if 'فصل' in content or 'باب' in content: score += 0.2 # بررسی پاراگراف‌بندی paragraphs = content.split('\n') if len(paragraphs) > 2: score += 0.3 return min(1.0, score) except: return 0.5 def _score_linguistic_quality(self, content: str) -> float: """امتیازدهی کیفیت زبانی""" try: score = 0.0 if not content: return 0.0 # نسبت کاراکترهای فارسی persian_chars = sum(1 for c in content if '\u0600' <= c <= '\u06FF') persian_ratio = persian_chars / len(content) score += persian_ratio * 0.5 # بررسی وجود علائم نگارشی punctuation_count = sum(1 for c in content if c in '.,;:!؟') words_count = len(content.split()) if words_count > 0: punctuation_ratio = punctuation_count / words_count score += min(0.3, punctuation_ratio * 3) # بررسی طول متوسط جملات sentences = re.split(r'[.؟!]', content) if sentences: avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if 10 <= avg_sentence_length <= 25: score += 0.2 return min(1.0, score) except: return 0.5 def _score_citations(self, content: str, legal_entities: List[str]) -> float: """امتیازدهی ارجاعات قانونی""" try: citation_patterns = [ r'ماده\s*\d+', r'تبصره\s*\d+', r'بند\s*\d+', r'فصل\s*\d+', r'قانون\s+[آ-ی\s]+', r'مصوبه\s+[آ-ی\s]+' ] total_citations = 0 for pattern in citation_patterns: total_citations += len(re.findall(pattern, content)) # اضافه کردن موجودیت‌های قانونی total_citations += len(legal_entities) # نرمال‌سازی (هر 5 ارجاع = امتیاز کامل) return min(1.0, total_citations / 5) except: return 0.5 # === سیستم طبقه‌بندی هوشمند === class IntelligentClassificationSystem: """سیستم طبقه‌بندی هوشمند چندمرحله‌ای""" def __init__(self, cache_system: IntelligentCacheSystem): self.cache_system = cache_system self.models = {} self.is_ready = False # دسته‌های حقوقی self.legal_categories = { 'قانون': ['قانون', 'مقررات', 'آیین‌نامه'], 'دادنامه': ['دادنامه', 'رای', 'حکم'], 'قرارداد': ['قرارداد', 'توافق‌نامه', 'پروتکل'], 'لایحه': ['لایحه', 'طرح', 'پیشنهاد'], 'بخشنامه': ['بخشنامه', 'دستورالعمل', 'رهنمود'], 'نظریه': ['نظریه', 'استعلام', 'پاسخ'] } def load_models(self): """بارگذاری مدل‌های طبقه‌بندی""" try: logger.info("شروع بارگذاری مدل‌ها...") # بارگذاری ساده مدل embedding try: self.models['embedder'] = SentenceTransformer( AVAILABLE_MODELS['embedding']['sentence_transformer'] ) logger.info("مدل embedding بارگذاری شد") except Exception as e: logger.warning(f"خطا در بارگذاری مدل embedding: {e}") self.is_ready = True logger.info("سیستم طبقه‌بندی آماده است") except Exception as e: logger.error(f"خطا در بارگذاری مدل‌ها: {e}") self.is_ready = False def classify_document(self, content: str, use_cache: bool = True) -> Dict: """طبقه‌بندی هوشمند سند""" if not content or not content.strip(): return {'error': 'محتوا خالی است'} # بررسی کش cache_key = hashlib.md5(content.encode()).hexdigest() if use_cache: cached = self.cache_system.get(cache_key, 'classification') if cached: cached['cache_hit'] = True return cached start_time = time.time() result = {} try: # متن نمونه برای پردازش text_sample = ' '.join(content.split()[:400]) # طبقه‌بندی بر اساس قوانین result['rule_based_classification'] = self._rule_based_classify(content) # استخراج موجودیت‌های حقوقی result['legal_entities'] = self._extract_legal_entities(content) # تولید embedding if 'embedder' in self.models: try: result['embedding'] = self.models['embedder'].encode(text_sample) except Exception as e: logger.warning(f"خطا در تولید embedding: {e}") result['embedding'] = None # محاسبه اعتماد ترکیبی result['confidence_score'] = self._calculate_combined_confidence(result) result['processing_time'] = time.time() - start_time result['cache_hit'] = False # ذخیره در کش if use_cache: self.cache_system.set(cache_key, result, 'classification', ttl_seconds=7200) except Exception as e: logger.error(f"خطا در طبقه‌بندی: {e}") result['error'] = str(e) return result def _rule_based_classify(self, content: str) -> Dict[str, float]: """طبقه‌بندی بر اساس قوانین""" scores = {} content_lower = content.lower() for category, keywords in self.legal_categories.items(): score = 0.0 for keyword in keywords: count = content_lower.count(keyword.lower()) score += count * 0.1 scores[category] = min(1.0, score) # نرمال‌سازی امتیازها total_score = sum(scores.values()) if total_score > 0: scores = {k: v/total_score for k, v in scores.items()} return scores def _extract_legal_entities(self, content: str) -> List[str]: """استخراج موجودیت‌های حقوقی""" entities = [] try: # الگوهای ارجاعات قانونی patterns = [ r'ماده\s*(\d+)', r'تبصره\s*(\d+)', r'بند\s*([الف-ی]|\d+)', r'فصل\s*(\d+)', r'قانون\s+([آ-ی\s]{5,50})', r'مصوبه\s+([آ-ی\s]{5,50})' ] for pattern in patterns: matches = re.findall(pattern, content) entities.extend([str(m) for m in matches]) except Exception as e: logger.error(f"خطا در استخراج موجودیت‌ها: {e}") return list(set(entities))[:20] def _calculate_combined_confidence(self, result: Dict) -> float: """محاسبه اعتماد ترکیبی""" try: confidence = 0.0 weights = {'rule_based': 0.7, 'entities': 0.3} # اعتماد طبقه‌بندی قانونی if 'rule_based_classification' in result: rule_conf = max(result['rule_based_classification'].values()) if result['rule_based_classification'] else 0 confidence += rule_conf * weights['rule_based'] # تعداد موجودیت‌های قانونی entity_count = len(result.get('legal_entities', [])) entity_conf = min(1.0, entity_count / 5) confidence += entity_conf * weights['entities'] return round(confidence, 3) except: return 0.5 # === مدیریت پایگاه داده پیشرفته === class AdvancedDatabaseManager: """مدیریت پیشرفته پایگاه داده با بهینه‌سازی‌های جدید""" def __init__(self, db_path: str = DB_PATH): self.db_path = db_path self._init_database() def _init_database(self): """ایجاد پایگاه داده با جداول پیشرفته""" try: with sqlite3.connect(self.db_path) as conn: # جدول اسناد اصلی conn.execute(''' CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, title TEXT, source TEXT, content TEXT, word_count INTEGER, quality_scores TEXT, classification_result TEXT, legal_entities TEXT, embedding_vector BLOB, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, processing_status TEXT DEFAULT 'pending' ) ''') # ایجاد ایندکس‌ها برای جدول documents conn.execute('CREATE INDEX IF NOT EXISTS idx_documents_source ON documents(source)') conn.execute('CREATE INDEX IF NOT EXISTS idx_documents_scraped_at ON documents(scraped_at)') conn.execute('CREATE INDEX IF NOT EXISTS idx_documents_status ON documents(processing_status)') conn.execute('CREATE INDEX IF NOT EXISTS idx_documents_updated ON documents(last_updated)') # جدول متریک‌های عملکرد conn.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, metric_name TEXT, metric_value REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # ایندکس‌ها برای جدول metrics conn.execute('CREATE INDEX IF NOT EXISTS idx_metrics_name ON performance_metrics(metric_name)') conn.execute('CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON performance_metrics(timestamp)') # جدول لاگ‌های پردازش conn.execute(''' CREATE TABLE IF NOT EXISTS processing_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, document_id INTEGER, operation TEXT, status TEXT, details TEXT, processing_time REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (document_id) REFERENCES documents (id) ) ''') # ایندکس‌ها برای جدول logs conn.execute('CREATE INDEX IF NOT EXISTS idx_logs_operation ON processing_logs(operation)') conn.execute('CREATE INDEX IF NOT EXISTS idx_logs_status ON processing_logs(status)') conn.execute('CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON processing_logs(timestamp)') conn.execute('CREATE INDEX IF NOT EXISTS idx_logs_document_id ON processing_logs(document_id)') except Exception as e: logger.error(f"خطا در ایجاد پایگاه داده: {e}") def save_document_advanced(self, result: ProcessingResult) -> bool: """ذخیره پیشرفته سند""" try: with sqlite3.connect(self.db_path) as conn: # محاسبه embedding bytes embedding_blob = None if result.embeddings is not None: embedding_blob = result.embeddings.tobytes() conn.execute(''' INSERT OR REPLACE INTO documents (url, title, source, content, word_count, quality_scores, classification_result, legal_entities, embedding_vector, processing_status, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'completed', datetime('now')) ''', ( result.url, result.title, result.source, result.content, len(result.content.split()), json.dumps({'quality_score': result.quality_score}, ensure_ascii=False), json.dumps(result.classification, ensure_ascii=False), json.dumps(result.legal_entities, ensure_ascii=False), embedding_blob )) # ثبت لاگ document_id = conn.lastrowid conn.execute(''' INSERT INTO processing_logs (document_id, operation, status, processing_time) VALUES (?, 'save', 'success', ?) ''', (document_id, result.processing_time)) return True except Exception as e: logger.error(f"خطا در ذخیره پیشرفته: {e}") return False def get_documents_with_embeddings(self) -> List[Tuple]: """دریافت اسناد همراه با embeddings""" try: with sqlite3.connect(self.db_path) as conn: return conn.execute(''' SELECT id, url, title, content, embedding_vector FROM documents WHERE processing_status = 'completed' AND content IS NOT NULL ORDER BY last_updated DESC ''').fetchall() except Exception as e: logger.error(f"خطا در دریافت اسناد: {e}") return [] def get_advanced_stats(self) -> Dict: """آمار پیشرفته سیستم""" try: with sqlite3.connect(self.db_path) as conn: # آمار کلی total_docs = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0] # آمار بر اساس منبع source_stats = conn.execute(''' SELECT source, COUNT(*), AVG(word_count) FROM documents GROUP BY source ORDER BY COUNT(*) DESC ''').fetchall() # آمار عملکرد avg_processing_time = conn.execute(''' SELECT AVG(processing_time) FROM processing_logs WHERE operation = 'save' AND status = 'success' ''').fetchone()[0] or 0 # آمار وضعیت پردازش status_stats = conn.execute(''' SELECT processing_status, COUNT(*) FROM documents GROUP BY processing_status ''').fetchall() return { 'total_documents': total_docs, 'source_statistics': source_stats, 'average_processing_time': round(avg_processing_time, 2), 'status_statistics': status_stats, 'last_updated': datetime.now().isoformat() } except Exception as e: logger.error(f"خطا در دریافت آمار: {e}") return { 'total_documents': 0, 'source_statistics': [], 'average_processing_time': 0, 'status_statistics': [], 'last_updated': datetime.now().isoformat() } # === سیستم جستجوی معنایی پیشرفته === class SemanticSearchEngine: """موتور جستجوی معنایی با ایندکس FAISS""" def __init__(self, cache_system: IntelligentCacheSystem): self.cache_system = cache_system self.embedder = None self.faiss_index = None self.documents = [] self.document_embeddings = None self.is_ready = False def initialize(self): """مقداردهی اولیه موتور جستجو""" try: self.embedder = SentenceTransformer( AVAILABLE_MODELS['embedding']['sentence_transformer'] ) logger.info("مدل embedding بارگذاری شد") # بارگذاری ایندکس موجود در صورت وجود if os.path.exists(VECTOR_INDEX_PATH) and os.path.exists(EMBEDDINGS_CACHE_PATH): self._load_existing_index() self.is_ready = True except Exception as e: logger.error(f"خطا در مقداردهی موتور جستجو: {e}") def build_index(self, documents: List[Tuple], progress_callback=None): """ساخت ایندکس FAISS""" try: if not documents: return False if progress_callback: progress_callback("استخراج متون...", 0.1) self.documents = documents texts = [doc[3] for doc in documents if doc[3]] # content field if progress_callback: progress_callback(f"تولید embedding برای {len(texts)} سند...", 0.3) # تولید embeddings با batch processing batch_size = 16 all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] batch_embeddings = self.embedder.encode( batch, convert_to_tensor=True, show_progress_bar=False ) all_embeddings.append(batch_embeddings) if progress_callback: progress = 0.3 + (i / len(texts)) * 0.6 progress_callback(f"پردازش batch {i//batch_size + 1}...", progress) # ترکیب embeddings self.document_embeddings = torch.cat(all_embeddings, dim=0).cpu().numpy() if progress_callback: progress_callback("ساخت ایندکس FAISS...", 0.9) # ساخت ایندکس FAISS dimension = self.document_embeddings.shape[1] self.faiss_index = faiss.IndexFlatIP(dimension) # Inner Product for cosine similarity # نرمال‌سازی برای cosine similarity faiss.normalize_L2(self.document_embeddings) self.faiss_index.add(self.document_embeddings) # ذخیره ایندکس self._save_index() if progress_callback: progress_callback("تکمیل ایندکس‌سازی", 1.0) logger.info(f"ایندکس با {len(documents)} سند آماده شد") return True except Exception as e: logger.error(f"خطا در ساخت ایندکس: {e}") return False def search(self, query: str, top_k: int = 10, threshold: float = 0.3) -> List[Dict]: """جستجوی معنایی پیشرفته""" if not self.is_ready or self.faiss_index is None: return [] try: # بررسی کش cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}:{top_k}" cached = self.cache_system.get(cache_key, 'search') if cached: return cached # تولید embedding برای query query_embedding = self.embedder.encode([query], convert_to_tensor=True) query_embedding = query_embedding.cpu().numpy() faiss.normalize_L2(query_embedding) # جستجو در ایندکس similarities, indices = self.faiss_index.search(query_embedding, top_k * 2) results = [] for i, (similarity, idx) in enumerate(zip(similarities[0], indices[0])): if similarity < threshold: continue if idx < len(self.documents): doc = self.documents[idx] results.append({ 'rank': i + 1, 'document_id': doc[0], 'url': doc[1], 'title': doc[2], 'content_preview': doc[3][:300] + '...' if len(doc[3]) > 300 else doc[3], 'similarity_score': float(similarity), 'relevance_category': self._categorize_relevance(similarity) }) # مرتب‌سازی نهایی results = results[:top_k] # ذخیره در کش self.cache_system.set(cache_key, results, 'search', ttl_seconds=1800) return results except Exception as e: logger.error(f"خطا در جستجو: {e}") return [] def _categorize_relevance(self, similarity: float) -> str: """دسته‌بندی میزان ارتباط""" if similarity >= 0.8: return "بسیار مرتبط" elif similarity >= 0.6: return "مرتبط" elif similarity >= 0.4: return "نسبتاً مرتبط" else: return "کم‌ارتباط" def _save_index(self): """ذخیره ایندکس و embeddings""" try: if self.faiss_index: faiss.write_index(self.faiss_index, VECTOR_INDEX_PATH) if self.document_embeddings is not None: with open(EMBEDDINGS_CACHE_PATH, 'wb') as f: pickle.dump({ 'embeddings': self.document_embeddings, 'documents_info': [(doc[0], doc[1], doc[2]) for doc in self.documents] }, f) logger.info("ایندکس ذخیره شد") except Exception as e: logger.error(f"خطا در ذخیره ایندکس: {e}") def _load_existing_index(self): """بارگذاری ایندکس موجود""" try: self.faiss_index = faiss.read_index(VECTOR_INDEX_PATH) with open(EMBEDDINGS_CACHE_PATH, 'rb') as f: cache_data = pickle.load(f) self.document_embeddings = cache_data['embeddings'] # بازسازی documents از اطلاعات ذخیره شده # نیاز به query از دیتابیس برای محتوای کامل logger.info("ایندکس موجود بارگذاری شد") except Exception as e: logger.error(f"خطا در بارگذاری ایندکس: {e}") # === سیستم اسکرپر ساده === class SimpleWebScraper: """سیستم اسکرپر ساده برای تست""" def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def scrape_document(self, url: str) -> Dict: """اسکرپ ساده یک سند""" try: response = self.session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # استخراج عنوان title = "" title_tag = soup.find('title') if title_tag: title = title_tag.get_text().strip() # استخراج محتوا content = "" for tag in soup.find_all(['p', 'div', 'article']): if tag.get_text().strip(): content += tag.get_text().strip() + "\n" return { 'status': 'موفق', 'title': title, 'content': content, 'source_info': {'name': urlparse(url).netloc}, 'quality_assessment': {'overall_score': 0.7} } except Exception as e: logger.error(f"خطا در اسکرپ {url}: {e}") return { 'status': 'ناموفق', 'error': str(e) } # === اپلیکیشن اصلی پیشرفته === class AdvancedLegalScrapingApp: """اپلیکیشن اصلی پیشرفته اسکرپینگ حقوقی""" def __init__(self): logger.info("🚀 شروع راه‌اندازی سیستم پیشرفته...") # اجزای اصلی سیستم self.cache_system = IntelligentCacheSystem() self.scoring_system = AdvancedScoringSystem() self.db_manager = AdvancedDatabaseManager() self.classification_system = IntelligentClassificationSystem(self.cache_system) self.search_engine = SemanticSearchEngine(self.cache_system) self.scraper = SimpleWebScraper() # متریک‌های سیستم self.system_metrics = SystemMetrics() logger.info("✅ سیستم آماده است") def initialize_models(self, progress_callback=None) -> str: """مقداردهی مدل‌ها""" try: if progress_callback: progress_callback("بارگذاری مدل‌های طبقه‌بندی...", 0.3) self.classification_system.load_models() if progress_callback: progress_callback("مقداردهی موتور جستجو...", 0.7) self.search_engine.initialize() if progress_callback: progress_callback("تکمیل مقداردهی", 1.0) return "✅ تمام مدل‌ها با موفقیت بارگذاری شدند" except Exception as e: error_msg = f"❌ خطا در بارگذاری مدل‌ها: {str(e)}" logger.error(error_msg) return error_msg def process_urls_intelligent(self, urls_text: str) -> Tuple[str, str]: """پردازش هوشمند URLs""" if not urls_text or not urls_text.strip(): return "❌ لطفاً URLs را وارد کنید", "" urls = [url.strip() for url in urls_text.split('\n') if url.strip()] if not urls: return "❌ URL معتبر یافت نشد", "" results = [] total_processing_time = 0 for url in urls[:5]: # محدودیت برای تست start_time = time.time() try: # اسکرپ سند scraped_result = self.scraper.scrape_document(url) if scraped_result.get('status') == 'موفق': # طبقه‌بندی classification = self.classification_system.classify_document( scraped_result.get('content', '') ) # امتیازدهی quality_scores = self.scoring_system.calculate_comprehensive_score( scraped_result.get('content', ''), scraped_result.get('source_info', {}), classification.get('legal_entities', []) ) # ساخت ProcessingResult processing_result = ProcessingResult( url=url, title=scraped_result.get('title', ''), content=scraped_result.get('content', ''), source=scraped_result.get('source_info', {}).get('name', ''), quality_score=quality_scores.get('final_score', 0), classification=classification.get('rule_based_classification', {}), sentiment_score=0.5, legal_entities=classification.get('legal_entities', []), embeddings=classification.get('embedding'), processing_time=classification.get('processing_time', 0) ) # ذخیره در دیتابیس self.db_manager.save_document_advanced(processing_result) # ذخیره در کش cache_data = { 'title': processing_result.title, 'quality_score': processing_result.quality_score, 'classification': processing_result.classification, 'legal_entities': processing_result.legal_entities, 'status': 'موفق' } self.cache_system.set(url, cache_data, 'comprehensive', ttl_seconds=7200) results.append(cache_data) else: results.append({'status': 'ناموفق', 'error': scraped_result.get('error', '')}) total_processing_time += time.time() - start_time except Exception as e: logger.error(f"خطا در پردازش {url}: {e}") results.append({'status': 'ناموفق', 'error': str(e)}) # تأخیر بین درخواست‌ها time.sleep(1) # تولید گزارش جامع successful = sum(1 for r in results if r.get('status') == 'موفق') failed = len(results) - successful avg_quality = sum(r.get('quality_score', 0) for r in results if r.get('quality_score')) / max(successful, 1) summary = f""" 📊 **گزارش پردازش هوشمند** ✅ **موفق**: {successful} سند ❌ **ناموفق**: {failed} سند 📈 **میانگین کیفیت**: {avg_quality:.1f}/100 ⏱️ **زمان کل**: {total_processing_time:.2f} ثانیه 🎯 **نتایج کیفیت**: • بالا (>80): {sum(1 for r in results if r.get('quality_score', 0) > 80)} • متوسط (50-80): {sum(1 for r in results if 50 <= r.get('quality_score', 0) <= 80)} • پایین (<50): {sum(1 for r in results if 0 < r.get('quality_score', 0) < 50)} """ # جزئیات details = "\n".join( f"📄 {r.get('title', 'بدون عنوان')[:50]}... - امتیاز: {r.get('quality_score', 0):.1f}" for r in results if r.get('status') == 'موفق' ) return summary, details def build_intelligent_search_index(self, progress_callback=None) -> str: """ساخت ایندکس جستجوی هوشمند""" try: if progress_callback: progress_callback("دریافت اسناد از دیتابیس...", 0.1) documents = self.db_manager.get_documents_with_embeddings() if not documents: return "❌ هیچ سندی برای ایندکس‌سازی یافت نشد" success = self.search_engine.build_index(documents, progress_callback) if success: return f"✅ ایندکس جستجو با {len(documents)} سند آماده شد" else: return "❌ خطا در ساخت ایندکس" except Exception as e: return f"❌ خطا در ساخت ایندکس: {str(e)}" def intelligent_semantic_search(self, query: str, limit: int = 10) -> Tuple[str, pd.DataFrame]: """جستجوی معنایی هوشمند""" if not query or not query.strip(): return "❌ لطفاً عبارت جستجو را وارد کنید", pd.DataFrame() try: results = self.search_engine.search(query, top_k=limit) if not results: return "❌ نتیجه‌ای یافت نشد", pd.DataFrame() # تولید خلاصه summary = f""" 🔍 **نتایج جستجو برای**: "{query}" 📊 **آمار**: • تعداد نتایج: {len(results)} • بهترین امتیاز: {max(r['similarity_score'] for r in results):.3f} • میانگین امتیاز: {sum(r['similarity_score'] for r in results) / len(results):.3f} 🎯 **توزیع ارتباط**: • بسیار مرتبط: {sum(1 for r in results if r['relevance_category'] == 'بسیار مرتبط')} • مرتبط: {sum(1 for r in results if r['relevance_category'] == 'مرتبط')} • نسبتاً مرتبط: {sum(1 for r in results if r['relevance_category'] == 'نسبتاً مرتبط')} """ # تولید DataFrame df_data = [] for result in results: df_data.append({ 'رتبه': result['rank'], 'عنوان': result['title'][:50] + '...' if len(result['title']) > 50 else result['title'], 'امتیاز شباهت': f"{result['similarity_score']:.3f}", 'میزان ارتباط': result['relevance_category'], 'پیش‌نمایش محتوا': result['content_preview'][:100] + '...' }) df = pd.DataFrame(df_data) return summary, df except Exception as e: return f"❌ خطا در جستجو: {str(e)}", pd.DataFrame() def export_advanced_data(self) -> Tuple[str, Optional[str]]: """صدور داده‌های پیشرفته""" try: # دریافت آمار stats = self.db_manager.get_advanced_stats() cache_stats = self.cache_system.get_stats() # تولید گزارش جامع report = { 'system_info': { 'export_time': datetime.now().isoformat(), 'total_documents': stats['total_documents'], 'cache_performance': cache_stats }, 'database_statistics': stats, 'performance_metrics': { 'cache_hit_ratio': cache_stats['total_accesses'] / max(cache_stats['database_entries'], 1), 'avg_processing_time': stats['average_processing_time'] } } # ذخیره در فایل موقت temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') json.dump(report, temp_file, ensure_ascii=False, indent=2) temp_file.close() status = f""" 📊 **گزارش صدور داده‌ها** ✅ **موفقیت آمیز** 📄 **تعداد اسناد**: {stats['total_documents']} ⚡ **کارایی کش**: {cache_stats['memory_cache_size']} items در حافظه 📈 **میانگین زمان پردازش**: {stats['average_processing_time']} ثانیه 📅 **زمان صدور**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ return status, temp_file.name except Exception as e: return f"❌ خطا در صدور داده‌ها: {str(e)}", None def get_comprehensive_system_status(self) -> str: """وضعیت جامع سیستم""" try: # آمار پایگاه داده db_stats = self.db_manager.get_advanced_stats() # آمار کش cache_stats = self.cache_system.get_stats() # آمار مدل‌ها models_ready = self.classification_system.is_ready search_ready = self.search_engine.is_ready status_parts = [ "## 🏠 وضعیت سیستم پیشرفته اسناد حقوقی", "", "### 📊 آمار کلی", f"• **تعداد کل اسناد**: {db_stats['total_documents']}", f"• **میانگین زمان پردازش**: {db_stats['average_processing_time']} ثانیه", f"• **آخرین بروزرسانی**: {db_stats['last_updated'][:19]}", "", "### ⚡ عملکرد کش", f"• **حافظه فعال**: {cache_stats['memory_cache_size']} آیتم", f"• **پایگاه داده کش**: {cache_stats['database_entries']} ورودی", f"• **تعداد دسترسی‌ها**: {cache_stats['total_accesses']}", f"• **میانگین استفاده**: {cache_stats['average_accesses']}", "", "### 🧠 وضعیت مدل‌ها", f"• **سیستم طبقه‌بندی**: {'🟢 آماده' if models_ready else '🔴 غیرفعال'}", f"• **موتور جستجو**: {'🟢 آماده' if search_ready else '🔴 غیرفعال'}", "", "### 📈 آمار منابع" ] # اضافه کردن آمار منابع for source, count, avg_words in db_stats.get('source_statistics', []): if source: status_parts.append(f"• **{source}**: {count} سند (میانگین {avg_words:.0f} کلمه)") # وضعیت پردازش status_parts.extend([ "", "### 🔧 وضعیت پردازش" ]) for status, count in db_stats.get('status_statistics', []): status_parts.append(f"• **{status}**: {count} سند") return "\n".join(status_parts) except Exception as e: return f"❌ خطا در دریافت وضعیت سیستم: {str(e)}" def create_advanced_interface(self): """ایجاد رابط کاربری پیشرفته""" # تنظیمات CSS سفارشی برای بهبود ظاهر custom_css = """ .rtl { direction: rtl; text-align: right; } .status-success { color: #10b981; font-weight: bold; } .status-error { color: #ef4444; font-weight: bold; } .metric-card { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 10px; color: white; margin: 10px 0; } .gradio-container { font-family: 'Vazir', 'Tahoma', sans-serif; } """ with gr.Blocks( theme=gr.themes.Soft(), css=custom_css, title="🏛️ سیستم پیشرفته اسناد حقوقی ایران" ) as interface: # هدر اصلی gr.HTML("""

🏛️ سیستم پیشرفته اسناد حقوقی ایران

پلتفرم هوشمند تحلیل و دسته‌بندی متون حقوقی با قابلیت‌های پیشرفته

""") # تب‌های اصلی with gr.Tabs(): # تب خانه و داشبورد with gr.Tab("🏠 داشبورد اصلی"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 🚀 مقداردهی سیستم") initialize_btn = gr.Button( "⚡ بارگذاری مدل‌های هوش مصنوعی", variant="primary", size="lg" ) initialization_status = gr.Textbox( label="وضعیت مقداردهی", interactive=False, elem_classes=["rtl"] ) with gr.Column(scale=1): gr.Markdown("### 📊 آمار سریع") quick_stats = gr.HTML() # تب پردازش هوشمند with gr.Tab("🤖 پردازش هوشمند اسناد"): gr.Markdown("### 🧠 پردازش و تحلیل هوشمند اسناد حقوقی") with gr.Row(): urls_input = gr.Textbox( label="📝 آدرس اسناد (هر خط یک URL)", placeholder="https://rc.majlis.ir/fa/law/show/123456\nhttps://www.dotic.ir/portal/law/789", lines=5, elem_classes=["rtl"] ) with gr.Row(): process_intelligent_btn = gr.Button( "🚀 شروع پردازش هوشمند", variant="primary", size="lg" ) clear_cache_btn = gr.Button("🗑️ پاک کردن کش", variant="secondary") with gr.Row(): intelligent_summary = gr.Textbox( label="📊 گزارش جامع پردازش", interactive=False, lines=8, elem_classes=["rtl"] ) intelligent_details = gr.Textbox( label="📋 جزئیات و امتیازها", interactive=False, lines=8, elem_classes=["rtl"] ) # تب جستجوی هوشمند with gr.Tab("🔍 جستجوی معنایی پیشرفته"): gr.Markdown("### 🎯 جستجوی معنایی با الگوریتم‌های پیشرفته") with gr.Row(): build_advanced_index_btn = gr.Button("🔧 ساخت ایندکس هوشمند", variant="secondary") advanced_index_status = gr.Textbox( label="📈 وضعیت ایندکس", interactive=False, elem_classes=["rtl"] ) with gr.Row(): search_query = gr.Textbox( label="🔍 عبارت جستجو", placeholder="مسئولیت کیفری اشخاص حقوقی", elem_classes=["rtl"], scale=3 ) search_limit = gr.Slider( minimum=5, maximum=20, value=10, step=1, label="📊 تعداد نتایج", scale=1 ) advanced_search_btn = gr.Button("🎯 جستجوی هوشمند", variant="primary") advanced_search_results_text = gr.Markdown(elem_classes=["rtl"]) advanced_search_results_df = gr.DataFrame( label="📋 نتایج تفصیلی", interactive=False ) # تب مدیریت سیستم with gr.Tab("📊 مدیریت سیستم پیشرفته"): with gr.Row(): with gr.Column(): gr.Markdown("### 🏥 وضعیت جامع سیستم") comprehensive_status = gr.Markdown(elem_classes=["rtl"]) with gr.Row(): refresh_status_btn = gr.Button("🔄 بروزرسانی") optimize_system_btn = gr.Button("⚡ بهینه‌سازی سیستم") with gr.Column(): gr.Markdown("### 📤 صدور و پشتیبان‌گیری") advanced_export_btn = gr.Button("📊 صدور داده‌های پیشرفته", variant="primary") advanced_export_status = gr.Textbox( label="📋 وضعیت صدور", interactive=False, elem_classes=["rtl"] ) advanced_export_file = gr.File(label="📁 فایل صادر شده") # === اتصال Event Handlers === # مقداردهی سیستم initialize_btn.click( fn=self.initialize_models, outputs=[initialization_status], show_progress=True ) # پردازش هوشمند process_intelligent_btn.click( fn=self.process_urls_intelligent, inputs=[urls_input], outputs=[intelligent_summary, intelligent_details], show_progress=True ) # پاک کردن کش clear_cache_btn.click( fn=lambda: self.cache_system.cleanup_expired() or "🗑️ کش پاکسازی شد", outputs=[intelligent_summary] ) # ساخت ایندکس هوشمند build_advanced_index_btn.click( fn=self.build_intelligent_search_index, outputs=[advanced_index_status], show_progress=True ) # جستجوی هوشمند advanced_search_btn.click( fn=self.intelligent_semantic_search, inputs=[search_query, search_limit], outputs=[advanced_search_results_text, advanced_search_results_df] ) # مدیریت سیستم refresh_status_btn.click( fn=self.get_comprehensive_system_status, outputs=[comprehensive_status] ) optimize_system_btn.click( fn=lambda: (gc.collect(), self.cache_system.cleanup_expired(), "⚡ سیستم بهینه‌سازی شد")[2], outputs=[advanced_export_status] ) # صدور پیشرفته advanced_export_btn.click( fn=self.export_advanced_data, outputs=[advanced_export_status, advanced_export_file] ) # بارگذاری اولیه interface.load( fn=self.get_comprehensive_system_status, outputs=[comprehensive_status] ) return interface # === تابع اصلی === def main(): """تابع اصلی اجرای برنامه""" try: # ایجاد اپلیکیشن app = AdvancedLegalScrapingApp() # ایجاد رابط کاربری interface = app.create_advanced_interface() # اجرای برنامه interface.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True, favicon_path=None, ssl_verify=False ) except Exception as e: logger.error(f"خطا در اجرای برنامه: {e}") print(f"❌ خطا در راه‌اندازی: {e}") if __name__ == "__main__": main()