Spaces:
Paused
Paused
```python | |
import requests | |
import time | |
import json | |
import csv | |
import sqlite3 | |
import logging | |
import os | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Tuple | |
from urllib.parse import urljoin, urlparse | |
from urllib.robotparser import RobotFileParser | |
from dataclasses import dataclass, asdict | |
from pathlib import Path | |
import re | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
try: | |
from hazm import Normalizer, WordTokenizer, SentenceTokenizer | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
NLP_AVAILABLE = True | |
except ImportError as e: | |
NLP_AVAILABLE = False | |
logging.warning(f"⚠️ NLP libraries not available: {e}") | |
# Create required directories | |
log_dir = '/app/logs' | |
data_dir = '/app/data' | |
cache_dir = '/app/cache' | |
os.makedirs(log_dir, exist_ok=True) | |
os.makedirs(data_dir, exist_ok=True) | |
os.makedirs(cache_dir, exist_ok=True) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(os.path.join(log_dir, 'legal_scraper.log')), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Iranian legal sources | |
IRANIAN_LEGAL_SOURCES = [ | |
"https://rc.majlis.ir", | |
"https://dolat.ir", | |
"https://iribnews.ir", | |
"https://www.irna.ir", | |
"https://www.tasnimnews.com", | |
"https://www.mehrnews.com", | |
"https://www.farsnews.ir" | |
] | |
class LegalDocument: | |
title: str | |
content: str | |
source_url: str | |
document_type: str | |
date_published: Optional[str] = None | |
date_scraped: str = None | |
category: Optional[str] = None | |
tags: List[str] = None | |
summary: Optional[str] = None | |
importance_score: float = 0.0 | |
sentiment_score: float = 0.0 | |
keywords: List[str] = None | |
legal_entities: List[str] = None | |
embedding: Optional[List[float]] = None | |
language: str = "fa" | |
def __post_init__(self): | |
if self.date_scraped is None: | |
self.date_scraped = datetime.now().isoformat() | |
if self.tags is None: | |
self.tags = [] | |
if self.keywords is None: | |
self.keywords = [] | |
if self.legal_entities is None: | |
self.legal_entities = [] | |
if self.embedding is None: | |
self.embedding = [] | |
class PersianNLPProcessor: | |
def __init__(self): | |
self.normalizer = None | |
self.tokenizer = None | |
self.sentence_tokenizer = None | |
self.model = None | |
self.model_tokenizer = None | |
if NLP_AVAILABLE: | |
try: | |
logger.info("Initializing Persian NLP components...") | |
self.normalizer = Normalizer() | |
self.tokenizer = WordTokenizer() | |
self.sentence_tokenizer = SentenceTokenizer() | |
if os.getenv("ENVIRONMENT") != "huggingface_free": | |
self.model = AutoModel.from_pretrained("HooshvareLab/bert-fa-base-uncased", cache_dir="/app/cache") | |
self.model_tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-fa-base-uncased", cache_dir="/app/cache") | |
logger.info("Persian NLP components initialized") | |
except Exception as e: | |
logger.warning(f"Failed to initialize NLP components: {e}. Falling back to basic text processing.") | |
self.model = None | |
self.model_tokenizer = None | |
def normalize_text(self, text: str) -> str: | |
if self.normalizer: | |
return self.normalizer.normalize(text) | |
return text | |
def extract_keywords(self, text: str, top_n: int = 10) -> List[str]: | |
if not NLP_AVAILABLE or not self.tokenizer: | |
return [] | |
try: | |
normalized_text = self.normalize_text(text) | |
tokens = self.tokenizer.tokenize(normalized_text) | |
word_freq = {} | |
for token in tokens: | |
if len(token) > 2 and token not in self.tokenizer.separators: | |
word_freq[token] = word_freq.get(token, 0) + 1 | |
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) | |
return [word for word, freq in sorted_words[:top_n] if not re.match(r'[\d\s.,!?]', word)] | |
except Exception as e: | |
logger.error(f"Keyword extraction failed: {e}") | |
return [] | |
def generate_summary(self, text: str, max_length: int = 100) -> str: | |
if not NLP_AVAILABLE or not self.sentence_tokenizer: | |
return text[:max_length] + "..." if len(text) > max_length else text | |
try: | |
sentences = self.sentence_tokenizer.tokenize(text) | |
if not sentences: | |
return text[:max_length] + "..." if len(text) > max_length else text | |
summary = sentences[0] | |
current_length = len(summary) | |
for sentence in sentences[1:]: | |
if current_length + len(sentence) <= max_length: | |
summary += " " + sentence | |
current_length += len(sentence) | |
else: | |
break | |
return summary | |
except Exception as e: | |
logger.error(f"Summary generation failed: {e}") | |
return text[:max_length] + "..." if len(text) > max_length else text | |
def get_embedding(self, text: str) -> List[float]: | |
if not NLP_AVAILABLE or not self.model or not self.model_tokenizer: | |
return [] | |
try: | |
inputs = self.model_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512) | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy().tolist() | |
return embedding | |
except Exception as e: | |
logger.error(f"Embedding generation failed: {e}") | |
return [] | |
def calculate_sentiment(self, text: str) -> float: | |
if not NLP_AVAILABLE: | |
return 0.0 | |
try: | |
positive_words = {'مثبت', 'خوب', 'عالی', 'موفق', 'قانونی', 'مفید'} | |
negative_words = {'منفی', 'بد', 'ناکام', 'غیرقانونی', 'مضر'} | |
tokens = set(self.tokenizer.tokenize(self.normalize_text(text))) | |
pos_score = len(tokens & positive_words) | |
neg_score = len(tokens & negative_words) | |
total = pos_score + neg_score | |
return (pos_score - neg_score) / total if total > 0 else 0.0 | |
except Exception as e: | |
logger.error(f"Sentiment analysis failed: {e}") | |
return 0.0 | |
def extract_legal_entities(self, text: str) -> List[str]: | |
if not NLP_AVAILABLE: | |
return [] | |
try: | |
patterns = [ | |
r'قانون\s+[\w\s]+', # Laws | |
r'ماده\s+\d+', # Articles | |
r'دادگاه\s+[\w\s]+', # Courts | |
r'[\w\s]+شورا' # Councils | |
] | |
entities = [] | |
normalized_text = self.normalize_text(text) | |
for pattern in patterns: | |
matches = re.findall(pattern, normalized_text) | |
entities.extend(matches) | |
return list(set(entities)) | |
except Exception as e: | |
logger.error(f"Legal entity extraction failed: {e}") | |
return [] | |
class EnhancedLegalScraper: | |
def __init__(self, delay: float = 2.0, db_path: str = "/app/data/legal_scraper.db"): | |
self.nlp = PersianNLPProcessor() if NLP_AVAILABLE else None | |
self.session = requests.Session() | |
self.delay = delay | |
self.last_request_time = 0 | |
self.db_path = db_path | |
self.robots_cache = {} | |
self.user_agent = "LegalDataCollector/2.0 (Educational Research; Contact: [email protected])" | |
self.session.headers.update({ | |
'User-Agent': self.user_agent, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'fa,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
self._init_database() | |
def _init_database(self): | |
try: | |
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS legal_documents ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
title TEXT NOT NULL, | |
content TEXT NOT NULL, | |
source_url TEXT UNIQUE NOT NULL, | |
document_type TEXT NOT NULL, | |
date_published TEXT, | |
date_scraped TEXT NOT NULL, | |
category TEXT, | |
tags TEXT, | |
summary TEXT, | |
importance_score REAL DEFAULT 0.0, | |
sentiment_score REAL DEFAULT 0.0, | |
keywords TEXT, | |
legal_entities TEXT, | |
embedding TEXT, | |
language TEXT DEFAULT 'fa' | |
) | |
''') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_url ON legal_documents(source_url)') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_type ON legal_documents(document_type)') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date_published ON legal_documents(date_published)') | |
conn.commit() | |
conn.close() | |
logger.info(f"Database initialized: {self.db_path}") | |
except Exception as e: | |
logger.error(f"Database initialization failed: {e}") | |
raise | |
def _can_fetch(self, url: str) -> bool: | |
try: | |
domain = urlparse(url).netloc | |
if domain not in self.robots_cache: | |
robots_url = f"https://{domain}/robots.txt" | |
rp = RobotFileParser() | |
rp.set_url(robots_url) | |
try: | |
rp.read() | |
self.robots_cache[domain] = rp | |
except Exception as e: | |
logger.warning(f"Could not read robots.txt for {domain}: {e}") | |
self.robots_cache[domain] = None | |
rp = self.robots_cache[domain] | |
if rp is None: | |
return True | |
return rp.can_fetch(self.user_agent, url) | |
except Exception as e: | |
logger.error(f"Error checking robots.txt for {url}: {e}") | |
return True | |
def _respect_delay(self): | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.delay: | |
time.sleep(self.delay - time_since_last) | |
self.last_request_time = time.time() | |
def _fetch_page(self, url: str, timeout: int = 30) -> Optional[BeautifulSoup]: | |
try: | |
if not self._can_fetch(url): | |
logger.warning(f"Robots.txt disallows fetching: {url}") | |
return None | |
self._respect_delay() | |
logger.info(f"Fetching: {url}") | |
response = self.session.get(url, timeout=timeout) | |
response.raise_for_status() | |
response.encoding = response.apparent_encoding | |
return BeautifulSoup(response.text, 'html.parser') | |
except requests.RequestException as e: | |
logger.error(f"Request failed for {url}: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Error parsing {url}: {e}") | |
return None | |
def _extract_article_title(self, soup: BeautifulSoup) -> str: | |
selectors = [ | |
'h1.title', 'h1', '.article-title', '.post-title', | |
'.news-title', 'title', '.headline' | |
] | |
for selector in selectors: | |
elem = soup.select_one(selector) | |
if elem: | |
title = elem.get_text(strip=True) | |
if title and len(title) > 10: | |
return title | |
return "Unknown Title" | |
def _extract_article_content(self, soup: BeautifulSoup) -> str: | |
for unwanted in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): | |
unwanted.decompose() | |
selectors = [ | |
'.article-content', '.post-content', '.news-content', | |
'.content', 'article', '.main-content', 'main' | |
] | |
for selector in selectors: | |
elem = soup.select_one(selector) | |
if elem: | |
content = elem.get_text(strip=True) | |
if len(content) > 200: | |
return content | |
body = soup.find('body') | |
if body: | |
return body.get_text(strip=True) | |
return soup.get_text(strip=True) | |
def _extract_article_date(self, soup: BeautifulSoup) -> Optional[str]: | |
date_meta = soup.find('meta', {'name': 'date'}) or soup.find('meta', {'property': 'article:published_time'}) | |
if date_meta: | |
return date_meta.get('content') | |
date_selectors = ['.date', '.published', '.timestamp', '.article-date'] | |
for selector in date_selectors: | |
elem = soup.select_one(selector) | |
if elem: | |
date_text = elem.get_text(strip=True) | |
patterns = [ | |
r'(\d{4}/\d{1,2}/\d{1,2})', | |
r'(\d{1,2}/\d{1,2}/\d{4})', | |
r'(\d{4}-\d{1,2}-\d{1,2})' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, date_text) | |
if match: | |
return match.group(1) | |
return None | |
def _calculate_importance(self, doc_type: str, content: str) -> float: | |
if not self.nlp: | |
return 0.5 | |
keywords = self.nlp.extract_keywords(content) | |
important_terms = {'قانون', 'ماده', 'دادگاه', 'حکم', 'آییننامه', 'مصوبه'} | |
score = 0.5 | |
if doc_type == 'law' or doc_type == 'ruling': | |
score += 0.3 | |
if any(term in keywords for term in important_terms): | |
score += 0.2 | |
return min(score, 1.0) | |
def scrape_real_sources(self, source_urls: List[str] = None, max_docs: int = 10) -> List[LegalDocument]: | |
if not source_urls: | |
source_urls = IRANIAN_LEGAL_SOURCES | |
documents = [] | |
max_docs_per_source = max_docs // len(source_urls) + 1 | |
for base_url in source_urls: | |
try: | |
is_majlis = 'rc.majlis.ir' in base_url | |
if is_majlis: | |
# Scrape laws from Majlis | |
law_urls = [f"{base_url}/fa/law/show/{i}" for i in range(100000, 100000 + max_docs_per_source)] | |
for url in law_urls[:max_docs_per_source]: | |
try: | |
soup = self._fetch_page(url) | |
if not soup: | |
continue | |
title = self._extract_article_title(soup) | |
content = self._extract_article_content(soup) | |
if len(content) < 100: | |
continue | |
date_published = self._extract_article_date(soup) | |
doc = LegalDocument( | |
title=title, | |
content=content, | |
source_url=url, | |
document_type="law", | |
date_published=date_published, | |
category="legislation", | |
tags=["قانون", "مجلس"] | |
) | |
if self.nlp: | |
doc.summary = self.nlp.generate_summary(content) | |
doc.keywords = self.nlp.extract_keywords(content) | |
doc.sentiment_score = self.nlp.calculate_sentiment(content) | |
doc.legal_entities = self.nlp.extract_legal_entities(content) | |
doc.embedding = self.nlp.get_embedding(content) | |
doc.importance_score = self._calculate_importance("law", content) | |
documents.append(doc) | |
self.save_document(doc) | |
logger.info(f"Scraped law: {title[:50]}...") | |
except Exception as e: | |
logger.error(f"Error scraping law {url}: {e}") | |
continue | |
else: | |
# Scrape news articles | |
soup = self._fetch_page(base_url) | |
if not soup: | |
continue | |
article_links = [] | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
full_url = urljoin(base_url, href) | |
if any(keyword in href.lower() for keyword in ['news', 'article', 'post', 'اخبار']): | |
article_links.append(full_url) | |
article_links = article_links[:max_docs_per_source] | |
for article_url in article_links: | |
try: | |
article_soup = self._fetch_page(article_url) | |
if not article_soup: | |
continue | |
title = self._extract_article_title(article_soup) | |
content = self._extract_article_content(article_soup) | |
if len(content) < 100: | |
continue | |
date_published = self._extract_article_date(article_soup) | |
doc = LegalDocument( | |
title=title, | |
content=content, | |
source_url=article_url, | |
document_type="news", | |
date_published=date_published, | |
category="legal_news", | |
tags=["اخبار", "حقوقی"] | |
) | |
if self.nlp: | |
doc.summary = self.nlp.generate_summary(content) | |
doc.keywords = self.nlp.extract_keywords(content) | |
doc.sentiment_score = self.nlp.calculate_sentiment(content) | |
doc.legal_entities = self.nlp.extract_legal_entities(content) | |
doc.embedding = self.nlp.get_embedding(content) | |
doc.importance_score = self._calculate_importance("news", content) | |
documents.append(doc) | |
self.save_document(doc) | |
logger.info(f"Scraped news: {title[:50]}...") | |
except Exception as e: | |
logger.error(f"Error scraping news {article_url}: {e}") | |
continue | |
except Exception as e: | |
logger.error(f"Error scraping source {base_url}: {e}") | |
continue | |
return documents[:max_docs] | |
def save_document(self, doc: LegalDocument) -> bool: | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT OR REPLACE INTO legal_documents | |
(title, content, source_url, document_type, date_published, | |
date_scraped, category, tags, summary, importance_score, | |
sentiment_score, keywords, legal_entities, embedding, language) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
doc.title, | |
doc.content, | |
doc.source_url, | |
doc.document_type, | |
doc.date_published, | |
doc.date_scraped, | |
doc.category, | |
json.dumps(doc.tags, ensure_ascii=False), | |
doc.summary, | |
doc.importance_score, | |
doc.sentiment_score, | |
json.dumps(doc.keywords, ensure_ascii=False), | |
json.dumps(doc.legal_entities, ensure_ascii=False), | |
json.dumps(doc.embedding, ensure_ascii=False), | |
doc.language | |
)) | |
conn.commit() | |
conn.close() | |
return True | |
except Exception as e: | |
logger.error(f"Failed to save document {doc.source_url}: {e}") | |
return False | |
def _text_search(self, query: str, limit: int = 20) -> List[Dict]: | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
normalized_query = self.nlp.normalize_text(query) if self.nlp else query | |
query_words = normalized_query.split() | |
like_clauses = [f"content LIKE '%{word}%'" for word in query_words] | |
query_sql = f''' | |
SELECT title, content, source_url, document_type, date_published, | |
date_scraped, category, tags, summary, importance_score, | |
sentiment_score, keywords, legal_entities, embedding, language | |
FROM legal_documents | |
WHERE {' AND '.join(like_clauses)} | |
ORDER BY importance_score DESC, date_scraped DESC | |
LIMIT ? | |
''' | |
cursor.execute(query_sql, (limit,)) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [] | |
for row in rows: | |
doc_dict = dict(zip(columns, row)) | |
doc_dict['tags'] = json.loads(doc_dict['tags']) if doc_dict['tags'] else [] | |
doc_dict['keywords'] = json.loads(doc_dict['keywords']) if doc_dict['keywords'] else [] | |
doc_dict['legal_entities'] = json.loads(doc_dict['legal_entities']) if doc_dict['legal_entities'] else [] | |
doc_dict['embedding'] = json.loads(doc_dict['embedding']) if doc_dict['embedding'] else [] | |
results.append(doc_dict) | |
conn.close() | |
return results | |
except Exception as e: | |
logger.error(f"Text search failed: {e}") | |
return [] | |
def search_with_similarity(self, query: str, limit: int = 20) -> List[Dict]: | |
if not self.nlp or not NLP_AVAILABLE: | |
return self._text_search(query, limit) | |
try: | |
query_embedding = self.nlp.get_embedding(query) | |
if not query_embedding: | |
return self._text_search(query, limit) | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT title, content, source_url, document_type, date_published, | |
date_scraped, category, tags, summary, importance_score, | |
sentiment_score, keywords, legal_entities, embedding, language | |
FROM legal_documents | |
ORDER BY importance_score DESC, date_scraped DESC | |
''') | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
documents = [] | |
for row in rows: | |
doc_dict = dict(zip(columns, row)) | |
doc_dict['tags'] = json.loads(doc_dict['tags']) if doc_dict['tags'] else [] | |
doc_dict['keywords'] = json.loads(doc_dict['keywords']) if doc_dict['keywords'] else [] | |
doc_dict['legal_entities'] = json.loads(doc_dict['legal_entities']) if doc_dict['legal_entities'] else [] | |
doc_dict['embedding'] = json.loads(doc_dict['embedding']) if doc_dict['embedding'] else [] | |
documents.append(doc_dict) | |
conn.close() | |
if not documents: | |
return [] | |
results = [] | |
query_embedding = np.array(query_embedding).reshape(1, -1) | |
for doc in documents: | |
if not doc['embedding']: | |
continue | |
doc_embedding = np.array(doc['embedding']).reshape(1, -1) | |
similarity = cosine_similarity(query_embedding, doc_embedding)[0][0] | |
doc['similarity_score'] = float(similarity) | |
results.append(doc) | |
results.sort(key=lambda x: (x['similarity_score'], x['importance_score']), reverse=True) | |
return results[:limit] | |
except Exception as e: | |
logger.error(f"Similarity search failed: {e}") | |
return self._text_search(query, limit) | |
def export_to_csv(self, filename: str = None) -> bool: | |
if filename is None: | |
filename = f"/app/data/legal_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('SELECT * FROM legal_documents ORDER BY date_scraped DESC') | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
df = pd.DataFrame(rows, columns=columns) | |
for col in ['tags', 'keywords', 'legal_entities', 'embedding']: | |
if col in df.columns: | |
df[col] = df[col].apply(lambda x: json.loads(x) if x else []) | |
df.to_csv(filename, index=False, encoding='utf-8') | |
conn.close() | |
logger.info(f"Data exported to {filename}") | |
return True | |
except Exception as e: | |
logger.error(f"Export failed: {e}") | |
return False | |
def get_enhanced_statistics(self) -> Dict: | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
stats = {} | |
cursor.execute('SELECT COUNT(*) FROM legal_documents') | |
stats['total_documents'] = cursor.fetchone()[0] | |
cursor.execute('SELECT document_type, COUNT(*) FROM legal_documents GROUP BY document_type') | |
stats['by_type'] = dict(cursor.fetchall()) | |
cursor.execute('SELECT category, COUNT(*) FROM legal_documents GROUP BY category') | |
stats['by_category'] = dict(cursor.fetchall()) | |
cursor.execute(''' | |
SELECT DATE(date_scraped) as day, COUNT(*) | |
FROM legal_documents | |
GROUP BY DATE(date_scraped) | |
ORDER BY day DESC | |
LIMIT 7 | |
''') | |
stats['recent_activity'] = dict(cursor.fetchall()) | |
cursor.execute('SELECT keywords FROM legal_documents WHERE keywords IS NOT NULL') | |
all_keywords = [] | |
for row in cursor.fetchall(): | |
keywords = json.loads(row[0]) if row[0] else [] | |
all_keywords.extend(keywords) | |
keyword_freq = {} | |
for kw in all_keywords: | |
keyword_freq[kw] = keyword_freq.get(kw, 0) + 1 | |
stats['top_keywords'] = dict(sorted(keyword_freq.items(), key=lambda x: x[1], reverse=True)[:10]) | |
cursor.execute(''' | |
SELECT | |
SUM(CASE WHEN importance_score > 0.7 THEN 1 ELSE 0 END) as high, | |
SUM(CASE WHEN importance_score BETWEEN 0.3 AND 0.7 THEN 1 ELSE 0 END) as medium, | |
SUM(CASE WHEN importance_score < 0.3 THEN 1 ELSE 0 END) as low | |
FROM legal_documents | |
''') | |
imp_dist = cursor.fetchone() | |
stats['importance_distribution'] = { | |
'high': imp_dist[0] or 0, | |
'medium': imp_dist[1] or 0, | |
'low': imp_dist[2] or 0 | |
} | |
conn.close() | |
return stats | |
except Exception as e: | |
logger.error(f"Statistics failed: {e}") | |
return {} | |
``` |