Hoghoghi / app /services /ai_service.py
Really-amin's picture
Upload 143 files
c636ebf verified
raw
history blame
18.3 kB
"""
AI Service for Legal Dashboard
=============================
Advanced AI-powered features for legal document analysis including:
- Intelligent document scoring and classification
- Legal entity extraction and recognition
- Sentiment analysis for legal documents
- Smart search and recommendation engine
- Document similarity analysis
"""
import re
import json
import logging
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import hashlib
import sqlite3
from pathlib import Path
logger = logging.getLogger(__name__)
class AIScoringEngine:
"""
Advanced AI scoring engine for legal documents
Provides intelligent analysis, classification, and recommendations
"""
def __init__(self):
"""Initialize the AI scoring engine"""
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words=None, # Keep Persian stop words for legal context
ngram_range=(1, 3)
)
self.document_vectors = {}
self.legal_keywords = self._load_legal_keywords()
self.entity_patterns = self._load_entity_patterns()
self.sentiment_indicators = self._load_sentiment_indicators()
self.classification_categories = self._load_classification_categories()
def _load_legal_keywords(self) -> Dict[str, List[str]]:
"""Load Persian legal keywords for different categories"""
return {
"قانون": [
"قانون", "ماده", "تبصره", "بند", "فصل", "باب", "مصوبه", "تصویب",
"مجلس", "شورای", "ملی", "اساسی", "مدنی", "جزایی", "تجاری"
],
"قرارداد": [
"قرارداد", "عقد", "مفاد", "طرفین", "متعاهدین", "شرایط", "ماده",
"بند", "مبلغ", "پرداخت", "تعهد", "مسئولیت", "ضمانت"
],
"احکام": [
"حکم", "رای", "دادگاه", "قاضی", "شعبه", "دعوی", "خواهان",
"خوانده", "شهادت", "دلیل", "اثبات", "قانونی", "محکوم"
],
"مالی": [
"مالیات", "درآمد", "سود", "زیان", "دارایی", "بدهی", "حساب",
"ترازنامه", "صورت", "مالی", "دریافتی", "پرداختی"
],
"اداری": [
"اداره", "سازمان", "وزارت", "دولت", "مقام", "مسئول", "کارمند",
"مقررات", "دستورالعمل", "بخشنامه", "آیین‌نامه"
]
}
def _load_entity_patterns(self) -> Dict[str, str]:
"""Load regex patterns for legal entity extraction"""
return {
"نام_شخص": r"([آ-ی]{2,}\s+){2,}",
"نام_شرکت": r"(شرکت|موسسه|سازمان|بنیاد)\s+([آ-ی\s]+)",
"شماره_قرارداد": r"شماره\s*:?\s*(\d+/\d+/\d+)",
"تاریخ": r"(\d{1,2}/\d{1,2}/\d{2,4})",
"مبلغ": r"(\d{1,3}(?:,\d{3})*)\s*(ریال|تومان|دلار|یورو)",
"شماره_ملی": r"(\d{10})",
"کد_پستی": r"(\d{10})",
"شماره_تلفن": r"(\d{2,4}-\d{3,4}-\d{4})"
}
def _load_sentiment_indicators(self) -> Dict[str, List[str]]:
"""Load Persian sentiment indicators for legal documents"""
return {
"positive": [
"موافق", "تایید", "قبول", "اجازه", "مجوز", "تصویب", "قانونی",
"مشروع", "صحیح", "درست", "مناسب", "مطلوب", "سودمند"
],
"negative": [
"مخالف", "رد", "عدم", "ممنوع", "غیرقانونی", "نامشروع",
"نادرست", "نامناسب", "مضر", "خطرناک", "ممنوع"
],
"neutral": [
"ماده", "بند", "تبصره", "قانون", "مقررات", "شرایط",
"مفاد", "طرفین", "تاریخ", "مبلغ", "شماره"
]
}
def _load_classification_categories(self) -> Dict[str, Dict]:
"""Load document classification categories with weights"""
return {
"قرارداد": {
"keywords": ["قرارداد", "عقد", "طرفین", "مفاد"],
"weight": 0.4,
"patterns": ["طرفین", "متعاهدین", "شرایط"]
},
"احکام_قضایی": {
"keywords": ["حکم", "رای", "دادگاه", "قاضی"],
"weight": 0.35,
"patterns": ["شعبه", "خواهان", "خوانده"]
},
"قوانین": {
"keywords": ["قانون", "ماده", "تبصره", "مجلس"],
"weight": 0.3,
"patterns": ["مصوبه", "تصویب", "اساسی"]
},
"مقررات_اداری": {
"keywords": ["مقررات", "دستورالعمل", "آیین‌نامه"],
"weight": 0.25,
"patterns": ["اداره", "سازمان", "وزارت"]
},
"اسناد_مالی": {
"keywords": ["مالی", "حساب", "ترازنامه", "صورت"],
"weight": 0.2,
"patterns": ["درآمد", "سود", "زیان"]
}
}
def analyze_document(self, text: str, metadata: Dict = None) -> Dict[str, Any]:
"""
Comprehensive document analysis including scoring, classification, and insights
Args:
text: Document text content
metadata: Additional document metadata
Returns:
Dictionary containing analysis results
"""
try:
# Basic text preprocessing
cleaned_text = self._preprocess_text(text)
# Perform various analyses
analysis = {
"basic_metrics": self._calculate_basic_metrics(cleaned_text),
"classification": self._classify_document(cleaned_text),
"entities": self._extract_entities(cleaned_text),
"sentiment": self._analyze_sentiment(cleaned_text),
"keywords": self._extract_keywords(cleaned_text),
"quality_score": self._calculate_quality_score(cleaned_text, metadata),
"recommendations": self._generate_recommendations(cleaned_text, metadata),
"timestamp": datetime.now().isoformat()
}
# Add similarity analysis if we have existing documents
if self.document_vectors:
analysis["similarity"] = self._find_similar_documents(
cleaned_text)
return analysis
except Exception as e:
logger.error(f"Error in document analysis: {e}")
return {
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def _preprocess_text(self, text: str) -> str:
"""Clean and normalize Persian text"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Normalize Persian characters
text = text.replace('ي', 'ی').replace('ك', 'ک')
# Remove common noise characters
text = re.sub(
r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s\d\-\.\/]', '', text)
return text
def _calculate_basic_metrics(self, text: str) -> Dict[str, Any]:
"""Calculate basic document metrics"""
words = text.split()
sentences = re.split(r'[.!?؟]', text)
sentences = [s.strip() for s in sentences if s.strip()]
return {
"word_count": len(words),
"sentence_count": len(sentences),
"avg_sentence_length": len(words) / len(sentences) if sentences else 0,
"unique_words": len(set(words)),
"vocabulary_diversity": len(set(words)) / len(words) if words else 0,
"legal_terms_count": self._count_legal_terms(text)
}
def _count_legal_terms(self, text: str) -> int:
"""Count legal terms in the document"""
count = 0
for category_terms in self.legal_keywords.values():
for term in category_terms:
count += text.count(term)
return count
def _classify_document(self, text: str) -> Dict[str, float]:
"""Classify document into legal categories"""
scores = {}
for category, config in self.classification_categories.items():
score = 0
weight = config["weight"]
# Keyword matching
for keyword in config["keywords"]:
if keyword in text:
score += weight
# Pattern matching
for pattern in config["patterns"]:
if pattern in text:
score += weight * 0.5
scores[category] = min(score, 1.0)
# Normalize scores
total_score = sum(scores.values())
if total_score > 0:
scores = {k: v/total_score for k, v in scores.items()}
return scores
def _extract_entities(self, text: str) -> Dict[str, List[str]]:
"""Extract legal entities from text"""
entities = {}
for entity_type, pattern in self.entity_patterns.items():
matches = re.findall(pattern, text)
if matches:
entities[entity_type] = list(set(matches))
return entities
def _analyze_sentiment(self, text: str) -> Dict[str, float]:
"""Analyze sentiment of legal document"""
sentiment_scores = {"positive": 0, "negative": 0, "neutral": 0}
total_words = len(text.split())
if total_words == 0:
return sentiment_scores
for sentiment, indicators in self.sentiment_indicators.items():
count = 0
for indicator in indicators:
count += text.count(indicator)
sentiment_scores[sentiment] = count / total_words
# Normalize scores
total = sum(sentiment_scores.values())
if total > 0:
sentiment_scores = {k: v/total for k,
v in sentiment_scores.items()}
return sentiment_scores
def _extract_keywords(self, text: str) -> List[Tuple[str, float]]:
"""Extract important keywords with TF-IDF scores"""
try:
# Create document-term matrix
tfidf_matrix = self.vectorizer.fit_transform([text])
feature_names = self.vectorizer.get_feature_names_out()
# Get TF-IDF scores
scores = tfidf_matrix.toarray()[0]
# Create keyword-score pairs
keywords = [(feature_names[i], scores[i])
for i in range(len(feature_names))]
# Sort by score and return top keywords
keywords.sort(key=lambda x: x[1], reverse=True)
return keywords[:20] # Return top 20 keywords
except Exception as e:
logger.error(f"Error extracting keywords: {e}")
return []
def _calculate_quality_score(self, text: str, metadata: Dict = None) -> float:
"""Calculate overall document quality score"""
score = 0.0
# Text length factor (optimal length for legal documents)
word_count = len(text.split())
if 100 <= word_count <= 2000:
score += 0.3
elif word_count > 2000:
score += 0.2
else:
score += 0.1
# Legal terms density
legal_terms = self._count_legal_terms(text)
if legal_terms > 0:
density = legal_terms / word_count
if 0.01 <= density <= 0.1:
score += 0.3
elif density > 0.1:
score += 0.2
else:
score += 0.1
# Structure factor (presence of legal document structure)
structure_indicators = ["ماده", "بند", "تبصره", "فصل", "باب"]
structure_count = sum(text.count(indicator)
for indicator in structure_indicators)
if structure_count > 0:
score += 0.2
# Completeness factor
completeness_indicators = ["تاریخ", "شماره", "امضا", "مهر"]
completeness_count = sum(text.count(indicator)
for indicator in completeness_indicators)
if completeness_count >= 2:
score += 0.2
return min(score, 1.0)
def _generate_recommendations(self, text: str, metadata: Dict = None) -> List[str]:
"""Generate intelligent recommendations for the document"""
recommendations = []
# Check document completeness
if len(text.split()) < 100:
recommendations.append(
"مستندات کافی نیست. پیشنهاد می‌شود جزئیات بیشتری اضافه شود.")
# Check for legal structure
if "ماده" not in text and "بند" not in text:
recommendations.append(
"ساختار حقوقی مشخص نیست. پیشنهاد می‌شود از ساختار ماده و بند استفاده شود.")
# Check for dates and numbers
if not re.search(r'\d{1,2}/\d{1,2}/\d{2,4}', text):
recommendations.append(
"تاریخ مشخص نشده است. پیشنهاد می‌شود تاریخ مستندات اضافه شود.")
# Check for signatures
if "امضا" not in text and "مهر" not in text:
recommendations.append(
"امضا یا مهر مشخص نشده است. پیشنهاد می‌شود امضا اضافه شود.")
# Check for amounts
if not re.search(r'\d{1,3}(?:,\d{3})*', text):
recommendations.append(
"مبالغ مشخص نشده است. پیشنهاد می‌شود مبالغ دقیق ذکر شود.")
return recommendations
def _find_similar_documents(self, text: str) -> List[Dict[str, Any]]:
"""Find similar documents using TF-IDF and cosine similarity"""
try:
# Vectorize current document
current_vector = self.vectorizer.transform([text])
similarities = []
for doc_id, doc_vector in self.document_vectors.items():
similarity = cosine_similarity(
current_vector, doc_vector)[0][0]
similarities.append({
"document_id": doc_id,
"similarity_score": float(similarity),
"category": "similar_document"
})
# Sort by similarity and return top matches
similarities.sort(
key=lambda x: x["similarity_score"], reverse=True)
return similarities[:5] # Return top 5 similar documents
except Exception as e:
logger.error(f"Error finding similar documents: {e}")
return []
def update_document_vector(self, doc_id: str, text: str):
"""Update document vector for similarity analysis"""
try:
vector = self.vectorizer.transform([text])
self.document_vectors[doc_id] = vector
except Exception as e:
logger.error(f"Error updating document vector: {e}")
def get_ai_insights(self, documents: List[Dict]) -> Dict[str, Any]:
"""Generate AI insights from multiple documents"""
try:
insights = {
"document_trends": self._analyze_trends(documents),
"common_entities": self._find_common_entities(documents),
"category_distribution": self._analyze_category_distribution(documents),
"quality_metrics": self._calculate_overall_quality(documents),
"recommendations": self._generate_system_recommendations(documents)
}
return insights
except Exception as e:
logger.error(f"Error generating AI insights: {e}")
return {"error": str(e)}
def _analyze_trends(self, documents: List[Dict]) -> Dict[str, Any]:
"""Analyze trends across documents"""
# Implementation for trend analysis
return {"trend_analysis": "Not implemented yet"}
def _find_common_entities(self, documents: List[Dict]) -> Dict[str, List[str]]:
"""Find common entities across documents"""
# Implementation for common entity analysis
return {"common_entities": "Not implemented yet"}
def _analyze_category_distribution(self, documents: List[Dict]) -> Dict[str, int]:
"""Analyze distribution of document categories"""
# Implementation for category distribution
return {"category_distribution": "Not implemented yet"}
def _calculate_overall_quality(self, documents: List[Dict]) -> Dict[str, float]:
"""Calculate overall quality metrics"""
# Implementation for overall quality calculation
return {"overall_quality": "Not implemented yet"}
def _generate_system_recommendations(self, documents: List[Dict]) -> List[str]:
"""Generate system-wide recommendations"""
# Implementation for system recommendations
return ["سیستم در حال بهبود است"]