""" Advanced Data Rating Service =========================== Production-grade rating service that evaluates scraped data quality, source credibility, completeness, and OCR accuracy for the Legal Dashboard OCR system. """ import logging import re import json import sqlite3 from datetime import datetime, timezone from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from enum import Enum import hashlib from urllib.parse import urlparse import asyncio from pydantic import BaseModel, Field import numpy as np from collections import Counter logger = logging.getLogger(__name__) class RatingCriteria(Enum): """Available rating criteria""" SOURCE_CREDIBILITY = "source_credibility" CONTENT_COMPLETENESS = "content_completeness" OCR_ACCURACY = "ocr_accuracy" DATA_FRESHNESS = "data_freshness" CONTENT_RELEVANCE = "content_relevance" TECHNICAL_QUALITY = "technical_quality" class RatingLevel(Enum): """Rating levels""" EXCELLENT = "excellent" GOOD = "good" AVERAGE = "average" POOR = "poor" UNRATED = "unrated" @dataclass class RatingResult: """Result of a rating evaluation""" item_id: str overall_score: float criteria_scores: Dict[str, float] rating_level: RatingLevel confidence: float timestamp: datetime evaluator: str notes: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for storage""" return { 'item_id': self.item_id, 'overall_score': self.overall_score, 'criteria_scores': self.criteria_scores, 'rating_level': self.rating_level.value, 'confidence': self.confidence, 'timestamp': self.timestamp.isoformat(), 'evaluator': self.evaluator, 'notes': self.notes } class RatingConfig(BaseModel): """Configuration for rating evaluation""" source_credibility_weight: float = 0.25 content_completeness_weight: float = 0.25 ocr_accuracy_weight: float = 0.20 data_freshness_weight: float = 0.15 content_relevance_weight: float = 0.10 technical_quality_weight: float = 0.05 # Thresholds for rating levels excellent_threshold: float = 0.8 good_threshold: float = 0.6 average_threshold: float = 0.4 poor_threshold: float = 0.2 class RatingService: """Advanced data rating service with multiple evaluation criteria""" def __init__(self, db_path: str = "legal_documents.db", config: Optional[RatingConfig] = None): self.db_path = db_path self.config = config or RatingConfig() self._initialize_database() # Credible domains for source credibility self.credible_domains = { 'gov.ir', 'court.gov.ir', 'justice.gov.ir', 'mizanonline.ir', 'irna.ir', 'isna.ir', 'mehrnews.com', 'tasnimnews.com', 'farsnews.ir', 'entekhab.ir', 'khabaronline.ir' } # Legal document patterns self.legal_patterns = { 'contract': r'\b(قرارداد|contract|agreement|عهدنامه)\b', 'legal_document': r'\b(سند|document|legal|مدرک)\b', 'court_case': r'\b(پرونده|case|court|دادگاه)\b', 'law_article': r'\b(ماده|article|law|قانون)\b', 'legal_notice': r'\b(اعلان|notice|announcement|آگهی)\b', 'legal_decision': r'\b(رای|decision|verdict|حکم)\b', 'legal_procedure': r'\b(رویه|procedure|process|فرآیند)\b' } # Quality indicators self.quality_indicators = { 'structure': r'\b(فصل|بخش|ماده|تبصره|بند)\b', 'formality': r'\b(مطابق|طبق|بر اساس|مطابق با)\b', 'legal_terms': r'\b(حقوقی|قانونی|قضایی|دادگستری)\b', 'official_language': r'\b(دولت|وزارت|سازمان|اداره)\b' } def _initialize_database(self): """Initialize database tables for rating data""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create rating_results table cursor.execute(""" CREATE TABLE IF NOT EXISTS rating_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id TEXT NOT NULL, overall_score REAL, criteria_scores TEXT, rating_level TEXT, confidence REAL, timestamp TEXT, evaluator TEXT, notes TEXT, FOREIGN KEY (item_id) REFERENCES scraped_items (id) ) """) # Create rating_history table for tracking changes cursor.execute(""" CREATE TABLE IF NOT EXISTS rating_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id TEXT NOT NULL, old_score REAL, new_score REAL, change_reason TEXT, timestamp TEXT, evaluator TEXT ) """) conn.commit() logger.info("✅ Rating database initialized successfully") except Exception as e: logger.error(f"❌ Failed to initialize rating database: {e}") def _evaluate_source_credibility(self, domain: str, url: str, metadata: Dict[str, Any]) -> float: """Evaluate source credibility based on domain and metadata""" score = 0.0 try: # Check if domain is in credible list if domain in self.credible_domains: score += 0.4 # Check for government domains if '.gov.' in domain or domain.endswith('.gov.ir'): score += 0.3 # Check for educational institutions if '.edu.' in domain or domain.endswith('.ac.ir'): score += 0.2 # Check for HTTPS if url.startswith('https://'): score += 0.1 # Check metadata for official indicators if metadata: title = metadata.get('title', '').lower() if any(indicator in title for indicator in ['دولت', 'وزارت', 'سازمان', 'اداره']): score += 0.2 return min(score, 1.0) except Exception as e: logger.error(f"Error evaluating source credibility: {e}") return 0.0 def _evaluate_content_completeness(self, content: str, title: str, word_count: int) -> float: """Evaluate content completeness""" score = 0.0 try: # Check word count (minimum 100 words for good content) if word_count >= 500: score += 0.3 elif word_count >= 200: score += 0.2 elif word_count >= 100: score += 0.1 # Check for structured content if re.search(r'\b(فصل|بخش|ماده|تبصره)\b', content): score += 0.2 # Check for legal document patterns legal_pattern_count = 0 for pattern in self.legal_patterns.values(): if re.search(pattern, content, re.IGNORECASE): legal_pattern_count += 1 if legal_pattern_count >= 3: score += 0.3 elif legal_pattern_count >= 1: score += 0.2 # Check for quality indicators quality_count = 0 for pattern in self.quality_indicators.values(): if re.search(pattern, content, re.IGNORECASE): quality_count += 1 if quality_count >= 2: score += 0.2 return min(score, 1.0) except Exception as e: logger.error(f"Error evaluating content completeness: {e}") return 0.0 def _evaluate_ocr_accuracy(self, content: str, language: str) -> float: """Evaluate OCR accuracy based on content quality""" score = 0.0 try: # Check for common OCR errors ocr_errors = 0 total_chars = len(content) # Check for repeated characters (common OCR error) repeated_chars = len(re.findall(r'(.)\1{2,}', content)) if total_chars > 0: ocr_errors += repeated_chars / total_chars # Check for mixed scripts (indicates OCR issues) persian_chars = len(re.findall(r'[\u0600-\u06FF]', content)) english_chars = len(re.findall(r'[a-zA-Z]', content)) if persian_chars > 0 and english_chars > 0: # Mixed content is normal for legal documents if persian_chars / (persian_chars + english_chars) > 0.7: score += 0.3 else: score += 0.1 # Check for proper sentence structure sentences = re.split(r'[.!?]', content) proper_sentences = sum(1 for s in sentences if len(s.strip()) > 10) if len(sentences) > 0: sentence_quality = proper_sentences / len(sentences) score += sentence_quality * 0.3 # Penalize for OCR errors if ocr_errors < 0.01: score += 0.2 elif ocr_errors < 0.05: score += 0.1 # Check for proper formatting if re.search(r'\n\s*\n', content): # Paragraph breaks score += 0.1 return min(score, 1.0) except Exception as e: logger.error(f"Error evaluating OCR accuracy: {e}") return 0.0 def _evaluate_data_freshness(self, timestamp: str, metadata: Dict[str, Any]) -> float: """Evaluate data freshness""" score = 0.0 try: # Parse timestamp if isinstance(timestamp, str): try: item_time = datetime.fromisoformat( timestamp.replace('Z', '+00:00')) except: item_time = datetime.now(timezone.utc) else: item_time = timestamp current_time = datetime.now(timezone.utc) age_days = (current_time - item_time).days # Score based on age if age_days <= 30: score = 1.0 elif age_days <= 90: score = 0.8 elif age_days <= 365: score = 0.6 elif age_days <= 1095: # 3 years score = 0.4 else: score = 0.2 return score except Exception as e: logger.error(f"Error evaluating data freshness: {e}") return 0.5 # Default to average def _evaluate_content_relevance(self, content: str, title: str, strategy: str) -> float: """Evaluate content relevance to legal domain""" score = 0.0 try: # Count legal terms legal_terms = 0 for pattern in self.legal_patterns.values(): matches = re.findall(pattern, content, re.IGNORECASE) legal_terms += len(matches) # Score based on legal term density if legal_terms >= 10: score += 0.4 elif legal_terms >= 5: score += 0.3 elif legal_terms >= 2: score += 0.2 elif legal_terms >= 1: score += 0.1 # Check title relevance title_legal_terms = 0 for pattern in self.legal_patterns.values(): if re.search(pattern, title, re.IGNORECASE): title_legal_terms += 1 if title_legal_terms >= 1: score += 0.3 # Check for official language official_indicators = len(re.findall( r'\b(دولت|وزارت|سازمان|اداره|قانون|حقوق)\b', content)) if official_indicators >= 3: score += 0.3 elif official_indicators >= 1: score += 0.1 return min(score, 1.0) except Exception as e: logger.error(f"Error evaluating content relevance: {e}") return 0.0 def _evaluate_technical_quality(self, content: str, metadata: Dict[str, Any]) -> float: """Evaluate technical quality of the content""" score = 0.0 try: # Check for proper structure if re.search(r'\b(ماده|بند|تبصره|فصل)\b', content): score += 0.3 # Check for proper formatting if '\n\n' in content: # Paragraph breaks score += 0.2 # Check for consistent language persian_ratio = len(re.findall( r'[\u0600-\u06FF]', content)) / max(len(content), 1) if 0.3 <= persian_ratio <= 0.9: # Good mix or mostly Persian score += 0.2 # Check for metadata quality if metadata and len(metadata) >= 3: score += 0.1 # Check for content length consistency if len(content) >= 200: score += 0.2 return min(score, 1.0) except Exception as e: logger.error(f"Error evaluating technical quality: {e}") return 0.0 def _calculate_confidence(self, criteria_scores: Dict[str, float]) -> float: """Calculate confidence level based on criteria consistency""" try: scores = list(criteria_scores.values()) if not scores: return 0.0 # Calculate standard deviation mean_score = np.mean(scores) variance = np.mean([(s - mean_score) ** 2 for s in scores]) std_dev = np.sqrt(variance) # Higher confidence for consistent scores confidence = max(0.5, 1.0 - std_dev) return confidence except Exception as e: logger.error(f"Error calculating confidence: {e}") return 0.5 def _determine_rating_level(self, overall_score: float) -> RatingLevel: """Determine rating level based on overall score""" if overall_score >= self.config.excellent_threshold: return RatingLevel.EXCELLENT elif overall_score >= self.config.good_threshold: return RatingLevel.GOOD elif overall_score >= self.config.average_threshold: return RatingLevel.AVERAGE elif overall_score >= self.config.poor_threshold: return RatingLevel.POOR else: return RatingLevel.UNRATED async def rate_item(self, item_data: Dict[str, Any], evaluator: str = "auto") -> RatingResult: """Rate a scraped item based on all criteria""" try: item_id = item_data['id'] # Extract item properties url = item_data.get('url', '') title = item_data.get('title', '') content = item_data.get('content', '') metadata = item_data.get('metadata', {}) timestamp = item_data.get('timestamp', '') domain = item_data.get('domain', '') word_count = item_data.get('word_count', 0) language = item_data.get('language', 'unknown') strategy = item_data.get('strategy_used', 'general') # Evaluate each criterion source_credibility = self._evaluate_source_credibility( domain, url, metadata) content_completeness = self._evaluate_content_completeness( content, title, word_count) ocr_accuracy = self._evaluate_ocr_accuracy(content, language) data_freshness = self._evaluate_data_freshness(timestamp, metadata) content_relevance = self._evaluate_content_relevance( content, title, strategy) technical_quality = self._evaluate_technical_quality( content, metadata) # Calculate weighted overall score criteria_scores = { 'source_credibility': source_credibility, 'content_completeness': content_completeness, 'ocr_accuracy': ocr_accuracy, 'data_freshness': data_freshness, 'content_relevance': content_relevance, 'technical_quality': technical_quality } overall_score = ( source_credibility * self.config.source_credibility_weight + content_completeness * self.config.content_completeness_weight + ocr_accuracy * self.config.ocr_accuracy_weight + data_freshness * self.config.data_freshness_weight + content_relevance * self.config.content_relevance_weight + technical_quality * self.config.technical_quality_weight ) # Calculate confidence confidence = self._calculate_confidence(criteria_scores) # Determine rating level rating_level = self._determine_rating_level(overall_score) # Create rating result rating_result = RatingResult( item_id=item_id, overall_score=round(overall_score, 3), criteria_scores={k: round(v, 3) for k, v in criteria_scores.items()}, rating_level=rating_level, confidence=round(confidence, 3), timestamp=datetime.now(timezone.utc), evaluator=evaluator ) # Store rating result await self._store_rating_result(rating_result) # Update item rating in scraped_items table await self._update_item_rating(item_id, overall_score) logger.info( f"✅ Rated item {item_id}: {rating_level.value} ({overall_score:.3f})") return rating_result except Exception as e: logger.error( f"Error rating item {item_data.get('id', 'unknown')}: {e}") raise async def _store_rating_result(self, rating_result: RatingResult): """Store rating result in database""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO rating_results (item_id, overall_score, criteria_scores, rating_level, confidence, timestamp, evaluator, notes) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( rating_result.item_id, rating_result.overall_score, json.dumps(rating_result.criteria_scores), rating_result.rating_level.value, rating_result.confidence, rating_result.timestamp.isoformat(), rating_result.evaluator, rating_result.notes )) conn.commit() except Exception as e: logger.error(f"Error storing rating result: {e}") async def _update_item_rating(self, item_id: str, rating_score: float): """Update rating score in scraped_items table""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Get current rating for history cursor.execute( "SELECT rating_score FROM scraped_items WHERE id = ?", (item_id,)) result = cursor.fetchone() old_score = result[0] if result else 0.0 # Update rating cursor.execute(""" UPDATE scraped_items SET rating_score = ?, processing_status = 'rated' WHERE id = ? """, (rating_score, item_id)) # Store in history if score changed if abs(old_score - rating_score) > 0.01: cursor.execute(""" INSERT INTO rating_history (item_id, old_score, new_score, change_reason, timestamp, evaluator) VALUES (?, ?, ?, ?, ?, ?) """, ( item_id, old_score, rating_score, "Auto re-evaluation", datetime.now(timezone.utc).isoformat(), "auto" )) conn.commit() except Exception as e: logger.error(f"Error updating item rating: {e}") async def get_rating_summary(self) -> Dict[str, Any]: """Get comprehensive rating summary""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Overall statistics cursor.execute(""" SELECT COUNT(*) as total_rated, AVG(overall_score) as avg_score, MIN(overall_score) as min_score, MAX(overall_score) as max_score, AVG(confidence) as avg_confidence FROM rating_results """) stats = cursor.fetchone() # Rating level distribution cursor.execute(""" SELECT rating_level, COUNT(*) FROM rating_results GROUP BY rating_level """) level_distribution = dict(cursor.fetchall()) # Criteria averages cursor.execute("SELECT criteria_scores FROM rating_results") criteria_scores = cursor.fetchall() criteria_averages = {} if criteria_scores: all_criteria = {} for row in criteria_scores: if row[0]: criteria = json.loads(row[0]) for key, value in criteria.items(): if key not in all_criteria: all_criteria[key] = [] all_criteria[key].append(value) for key, values in all_criteria.items(): criteria_averages[key] = round(np.mean(values), 3) # Recent ratings cursor.execute(""" SELECT COUNT(*) FROM rating_results WHERE timestamp > datetime('now', '-24 hours') """) recent_ratings = cursor.fetchone()[0] return { 'total_rated': stats[0] if stats else 0, 'average_score': round(stats[1], 3) if stats and stats[1] else 0, 'score_range': { 'min': round(stats[2], 3) if stats and stats[2] else 0, 'max': round(stats[3], 3) if stats and stats[3] else 0 }, 'average_confidence': round(stats[4], 3) if stats and stats[4] else 0, 'rating_level_distribution': level_distribution, 'criteria_averages': criteria_averages, 'recent_ratings_24h': recent_ratings } except Exception as e: logger.error(f"Error getting rating summary: {e}") return {} async def get_item_rating_history(self, item_id: str) -> List[Dict[str, Any]]: """Get rating history for a specific item""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT old_score, new_score, change_reason, timestamp, evaluator FROM rating_history WHERE item_id = ? ORDER BY timestamp DESC """, (item_id,)) history = [] for row in cursor.fetchall(): history.append({ 'old_score': row[0], 'new_score': row[1], 'change_reason': row[2], 'timestamp': row[3], 'evaluator': row[4] }) return history except Exception as e: logger.error(f"Error getting rating history: {e}") return [] async def re_evaluate_item(self, item_id: str, evaluator: str = "manual") -> Optional[RatingResult]: """Re-evaluate a specific item""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, url, title, content, metadata, timestamp, source_url, word_count, language, strategy_used, domain FROM scraped_items WHERE id = ? """, (item_id,)) row = cursor.fetchone() if not row: logger.warning( f"Item {item_id} not found for re-evaluation") return None item_data = { 'id': row[0], 'url': row[1], 'title': row[2], 'content': row[3], 'metadata': json.loads(row[4]) if row[4] else {}, 'timestamp': row[5], 'source_url': row[6], 'word_count': row[7], 'language': row[8], 'strategy_used': row[9], 'domain': row[10] } return await self.rate_item(item_data, evaluator) except Exception as e: logger.error(f"Error re-evaluating item {item_id}: {e}") return None async def get_low_quality_items(self, threshold: float = 0.4, limit: int = 50) -> List[Dict[str, Any]]: """Get items with low quality ratings""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT si.id, si.url, si.title, si.rating_score, si.processing_status, si.timestamp FROM scraped_items si WHERE si.rating_score < ? AND si.rating_score > 0 ORDER BY si.rating_score ASC LIMIT ? """, (threshold, limit)) items = [] for row in cursor.fetchall(): items.append({ 'id': row[0], 'url': row[1], 'title': row[2], 'rating_score': row[3], 'processing_status': row[4], 'timestamp': row[5] }) return items except Exception as e: logger.error(f"Error getting low quality items: {e}") return []