import requests import time import json import csv import sqlite3 import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Union from dataclasses import dataclass, asdict from pathlib import Path import re import pandas as pd import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from bs4 import BeautifulSoup try: import torch from transformers import AutoTokenizer, AutoModel TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False print("⚠️ PyTorch not available, running without advanced NLP features") try: import hazm from hazm import Normalizer, word_tokenize, sent_tokenize HAZM_AVAILABLE = True except ImportError: HAZM_AVAILABLE = False print("⚠️ Hazm not available, using basic text processing") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('legal_scraper.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Predefined Iranian legal and news sources IRANIAN_LEGAL_SOURCES = [ "https://www.irna.ir", # خبرگزاری جمهوری اسلامی "https://www.tasnimnews.com", # خبرگزاری تسنیم "https://www.mehrnews.com", # خبرگزاری مهر "https://www.farsnews.ir", # خبرگزاری فارس "https://iribnews.ir", # خبرگزاری صدا و سیما "https://www.dolat.ir", # پورتال دولت "https://rc.majlis.ir", # مرکز پژوهش‌های مجلس ] @dataclass class LegalDocument: """Enhanced legal document with NLP features""" title: str content: str source_url: str document_type: str date_published: Optional[str] = None date_scraped: str = None category: Optional[str] = None tags: List[str] = None summary: Optional[str] = None importance_score: float = 0.0 sentiment_score: float = 0.0 legal_entities: List[str] = None keywords: List[str] = None embedding: List[float] = None language: str = "fa" def __post_init__(self): if self.date_scraped is None: self.date_scraped = datetime.now().isoformat() if self.tags is None: self.tags = [] if self.legal_entities is None: self.legal_entities = [] if self.keywords is None: self.keywords = [] class PersianNLPProcessor: """Persian NLP processor using available models""" def __init__(self): if HAZM_AVAILABLE: self.normalizer = Normalizer() else: self.normalizer = None self.device = torch.device('cpu') self.tokenizer = None self.model = None if TORCH_AVAILABLE: try: model_names = [ "HooshvareLab/bert-fa-base-uncased", "HooshvareLab/bert-base-parsbert-uncased", "distilbert-base-multilingual-cased" ] for model_name in model_names: try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModel.from_pretrained(model_name) self.model.to(self.device) logger.info(f"✅ Loaded model: {model_name}") break except Exception as e: logger.warning(f"⚠️ Failed to load {model_name}: {e}") continue except Exception as e: logger.error(f"❌ Failed to load any Persian BERT model: {e}") self.legal_categories = { 'قانون': ['قانون', 'ماده', 'بند', 'فصل', 'تبصره', 'اصلاحیه'], 'رای': ['رای', 'حکم', 'دادگاه', 'قاضی', 'محکوم', 'دادرسی'], 'آیین‌نامه': ['آیین‌نامه', 'دستورالعمل', 'بخشنامه', 'مقررات'], 'اخبار': ['خبر', 'گزارش', 'اعلام', 'اطلاعیه', 'بیانیه'], 'نظریه': ['نظریه', 'تفسیر', 'استعلام', 'پاسخ', 'رأی'] } self.tfidf = None self._init_tfidf() def _init_tfidf(self): """Initialize TF-IDF vectorizer""" try: self.tfidf = TfidfVectorizer( max_features=1000, stop_words=self._get_persian_stopwords(), ngram_range=(1, 2), min_df=1, max_df=0.8 ) except Exception as e: logger.error(f"TF-IDF initialization failed: {e}") def _get_persian_stopwords(self) -> List[str]: """Get Persian stopwords""" return [ 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'را', 'و', 'است', 'برای', 'تا', 'کرد', 'شد', 'می', 'خود', 'هم', 'نیز', 'یا', 'اما', 'اگر', 'چون', 'پس', 'بعد', 'قبل', 'روی', 'زیر', 'کنار', 'داخل', 'نیست', 'بود', 'باشد', 'کند', 'کنند', 'شود', 'گردد', 'دارد', 'دارند' ] def normalize_text(self, text: str) -> str: """Normalize Persian text""" if not text: return "" try: text = re.sub(r'[^\w\s\u0600-\u06FF]', ' ', text) text = re.sub(r'\s+', ' ', text) if self.normalizer: text = self.normalizer.normalize(text) return text.strip() except Exception as e: logger.error(f"Text normalization failed: {e}") return text.strip() def extract_keywords(self, text: str, top_k: int = 10) -> List[str]: """Extract keywords using TF-IDF""" try: if not self.tfidf or not text: return [] normalized_text = self.normalize_text(text) if HAZM_AVAILABLE: tokens = word_tokenize(normalized_text) processed_text = ' '.join(tokens) else: processed_text = normalized_text tfidf_matrix = self.tfidf.fit_transform([processed_text]) feature_names = self.tfidf.get_feature_names_out() scores = tfidf_matrix.toarray()[0] keyword_scores = list(zip(feature_names, scores)) keyword_scores.sort(key=lambda x: x[1], reverse=True) return [kw[0] for kw in keyword_scores[:top_k] if kw[1] > 0] except Exception as e: logger.error(f"Keyword extraction failed: {e}") return [] def classify_document(self, text: str) -> Tuple[str, float]: """Classify document type with confidence score""" try: normalized_text = self.normalize_text(text.lower()) scores = {} for category, keywords in self.legal_categories.items(): score = 0 for keyword in keywords: count = normalized_text.count(keyword) score += count * (len(keyword) / 5) if len(normalized_text) > 0: scores[category] = score / (len(normalized_text) / 1000) else: scores[category] = 0 if not scores or max(scores.values()) == 0: return "عمومی", 0.0 best_category = max(scores.items(), key=lambda x: x[1]) total_score = sum(scores.values()) confidence = min(best_category[1] / total_score, 1.0) if total_score > 0 else 0.0 return best_category[0], confidence except Exception as e: logger.error(f"Document classification failed: {e}") return "عمومی", 0.0 def calculate_importance_score(self, doc: LegalDocument) -> float: """Calculate document importance score""" try: score = 0.0 title_lower = doc.title.lower() high_importance_words = ['قانون', 'اساسی', 'حکم', 'رای', 'مصوبه'] medium_importance_words = ['آیین‌نامه', 'بخشنامه', 'دستورالعمل'] for word in high_importance_words: if word in title_lower: score += 0.3 break for word in medium_importance_words: if word in title_lower: score += 0.2 break content_length = len(doc.content) if content_length > 5000: score += 0.25 elif content_length > 2000: score += 0.15 elif content_length > 500: score += 0.1 if doc.date_published: try: date_formats = ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y'] pub_date = None for fmt in date_formats: try: pub_date = datetime.strptime(doc.date_published, fmt) break except: continue if pub_date: days_old = (datetime.now() - pub_date).days if days_old < 30: score += 0.25 elif days_old < 365: score += 0.15 elif days_old < 1825: score += 0.05 except: pass legal_keywords = ['قانون', 'ماده', 'بند', 'حکم', 'رای', 'دادگاه', 'محکمه'] content_lower = doc.content.lower() keyword_count = sum(content_lower.count(kw) for kw in legal_keywords) word_count = len(doc.content.split()) if word_count > 0: keyword_density = keyword_count / word_count score += min(keyword_density * 5, 0.2) type_bonuses = { 'law': 0.2, 'ruling': 0.15, 'regulation': 0.1, 'news': 0.05 } score += type_bonuses.get(doc.document_type, 0) return min(score, 1.0) except Exception as e: logger.error(f"Importance score calculation failed: {e}") return 0.0 def extract_legal_entities(self, text: str) -> List[str]: """Extract legal entities from text""" try: entities = [] patterns = { 'قوانین': r'قانون\s+[\u0600-\u06FF\s]{3,30}', 'مواد': r'ماده\s+\d+[\u0600-\u06FF\s]*', 'دادگاه‌ها': r'دادگاه\s+[\u0600-\u06FF\s]{3,30}', 'مراجع': r'(وزارت|سازمان|اداره|شورای|کمیته)\s+[\u0600-\u06FF\s]{3,30}', 'احکام': r'(حکم|رای)\s+(شماره\s+)?\d+', } for entity_type, pattern in patterns.items(): matches = re.findall(pattern, text) for match in matches: clean_match = re.sub(r'\s+', ' ', match.strip()) if len(clean_match) > 5 and len(clean_match) < 100: entities.append(clean_match) unique_entities = list(dict.fromkeys(entities)) return unique_entities[:15] except Exception as e: logger.error(f"Entity extraction failed: {e}") return [] def get_text_embedding(self, text: str) -> Optional[List[float]]: """Get text embedding using available model""" if not self.model or not self.tokenizer or not TORCH_AVAILABLE: return None try: normalized_text = self.normalize_text(text) if len(normalized_text) > 512: normalized_text = normalized_text[:512] if not normalized_text: return None inputs = self.tokenizer( normalized_text, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(self.device) with torch.no_grad(): outputs = self.model(**inputs) embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] return embedding.tolist() except Exception as e: logger.error(f"Embedding generation failed: {e}") return None def generate_summary(self, text: str, max_length: int = 200) -> str: """Generate text summary""" try: if len(text) <= max_length: return text if HAZM_AVAILABLE: sentences = sent_tokenize(text) else: sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) <= 2: return text[:max_length] + "..." if len(text) > max_length else text keywords = self.extract_keywords(text, top_k=15) sentence_scores = [] for sentence in sentences: if len(sentence) < 20: continue score = 0 sentence_lower = sentence.lower() for kw in keywords: if kw in sentence_lower: score += 1 legal_terms = ['قانون', 'ماده', 'حکم', 'رای', 'دادگاه'] for term in legal_terms: if term in sentence_lower: score += 0.5 if len(sentence) > 200: score *= 0.8 sentence_scores.append((sentence, score)) sentence_scores.sort(key=lambda x: x[1], reverse=True) selected_sentences = [] current_length = 0 for sentence, score in sentence_scores: if current_length + len(sentence) <= max_length: selected_sentences.append(sentence) current_length += len(sentence) else: break if not selected_sentences: return text[:max_length] + "..." summary = ' '.join(selected_sentences) return summary if len(summary) <= max_length else summary[:max_length] + "..." except Exception as e: logger.error(f"Summary generation failed: {e}") return text[:max_length] + "..." if len(text) > max_length else text def process_document(self, doc: LegalDocument) -> LegalDocument: """Process document with all available NLP features""" try: logger.info(f"Processing document: {doc.title[:50]}...") doc.keywords = self.extract_keywords(doc.content) doc_type, confidence = self.classify_document(doc.content) if confidence > 0.3: doc.category = doc_type doc.importance_score = self.calculate_importance_score(doc) doc.legal_entities = self.extract_legal_entities(doc.content) doc.summary = self.generate_summary(doc.content) doc.embedding = self.get_text_embedding(doc.content) logger.info(f"✅ Processed: {doc.title[:30]}... (Score: {doc.importance_score:.2f})") return doc except Exception as e: logger.error(f"Document processing failed: {e}") return doc class EnhancedLegalScraper: """Enhanced legal scraper with real web scraping and NLP""" def __init__(self, delay: float = 1.0): self.delay = delay self.session = requests.Session() try: self.nlp_processor = PersianNLPProcessor() logger.info("✅ NLP processor initialized") except Exception as e: logger.error(f"❌ NLP processor initialization failed: {e}") self.nlp_processor = None self.db_path = self._get_db_path() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'fa,en-US;q=0.7,en;q=0.3', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) self._init_database() def _get_db_path(self) -> str: """Get appropriate database path for the environment""" possible_paths = [ "/tmp/legal_scraper.db", "./data/legal_scraper.db", "legal_scraper.db" ] for path in possible_paths: try: Path(path).parent.mkdir(parents=True, exist_ok=True) return path except: continue return ":memory:" def _init_database(self): """Initialize enhanced database with NLP fields""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS legal_documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT NOT NULL, source_url TEXT UNIQUE NOT NULL, document_type TEXT NOT NULL, date_published TEXT, date_scraped TEXT NOT NULL, category TEXT, tags TEXT, summary TEXT, importance_score REAL DEFAULT 0.0, sentiment_score REAL DEFAULT 0.0, legal_entities TEXT, keywords TEXT, embedding TEXT, language TEXT DEFAULT 'fa', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') indexes = [ 'CREATE INDEX IF NOT EXISTS idx_source_url ON legal_documents(source_url)', 'CREATE INDEX IF NOT EXISTS idx_document_type ON legal_documents(document_type)', 'CREATE INDEX IF NOT EXISTS idx_importance_score ON legal_documents(importance_score DESC)', 'CREATE INDEX IF NOT EXISTS idx_category ON legal_documents(category)', 'CREATE INDEX IF NOT EXISTS idx_date_published ON legal_documents(date_published)', 'CREATE INDEX IF NOT EXISTS idx_date_scraped ON legal_documents(date_scraped DESC)' ] for index in indexes: cursor.execute(index) conn.commit() conn.close() logger.info(f"✅ Database initialized: {self.db_path}") except Exception as e: logger.error(f"❌ Database initialization failed: {e}") raise def save_document(self, doc: LegalDocument) -> bool: """Save enhanced document to database""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO legal_documents (title, content, source_url, document_type, date_published, date_scraped, category, tags, summary, importance_score, sentiment_score, legal_entities, keywords, embedding, language) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( doc.title, doc.content, doc.source_url, doc.document_type, doc.date_published, doc.date_scraped, doc.category, json.dumps(doc.tags, ensure_ascii=False) if doc.tags else None, doc.summary, doc.importance_score, doc.sentiment_score, json.dumps(doc.legal_entities, ensure_ascii=False) if doc.legal_entities else None, json.dumps(doc.keywords, ensure_ascii=False) if doc.keywords else None, json.dumps(doc.embedding) if doc.embedding else None, doc.language )) conn.commit() conn.close() return True except Exception as e: logger.error(f"Failed to save document {doc.source_url}: {e}") return False def get_enhanced_statistics(self) -> Dict: """Get comprehensive statistics with NLP insights""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() stats = {} cursor.execute('SELECT COUNT(*) FROM legal_documents') stats['total_documents'] = cursor.fetchone()[0] cursor.execute('SELECT document_type, COUNT(*) FROM legal_documents GROUP BY document_type') stats['by_type'] = dict(cursor.fetchall()) cursor.execute('SELECT category, COUNT(*) FROM legal_documents WHERE category IS NOT NULL GROUP BY category') stats['by_category'] = dict(cursor.fetchall()) cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.7') high_importance = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.3 AND importance_score < 0.7') medium_importance = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score < 0.3') low_importance = cursor.fetchone()[0] stats['importance_distribution'] = { 'high': high_importance, 'medium': medium_importance, 'low': low_importance } cursor.execute('SELECT keywords FROM legal_documents WHERE keywords IS NOT NULL') all_keywords = [] for row in cursor.fetchall(): try: keywords = json.loads(row[0]) all_keywords.extend(keywords) except: continue if all_keywords: keyword_counts = {} for kw in all_keywords: keyword_counts[kw] = keyword_counts.get(kw, 0) + 1 topទ top_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:25] stats['top_keywords'] = dict(top_keywords) cursor.execute(''' SELECT DATE(date_scraped) as day, COUNT(*) FROM legal_documents WHERE date_scraped >= date('now', '-7 days') GROUP BY DATE(date_scraped) ORDER BY day DESC ''') stats['recent_activity'] = dict(cursor.fetchall()) cursor.execute(''' SELECT document_type, AVG(importance_score) FROM legal_documents GROUP BY document_type ''') stats['avg_importance_by_type'] = dict(cursor.fetchall()) cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE embedding IS NOT NULL') stats['documents_with_embeddings'] = cursor.fetchone()[0] cursor.execute('SELECT language, COUNT(*) FROM legal_documents GROUP BY language') stats['by_language'] = dict(cursor.fetchall()) conn.close() return stats except Exception as e: logger.error(f"Statistics generation failed: {e}") return { 'total_documents': 0, 'by_type': {}, 'by_category': {}, 'importance_distribution': {'high': 0, 'medium': 0, 'low': 0}, 'top_keywords': {}, 'recent_activity': {}, 'avg_importance_by_type': {}, 'documents_with_embeddings': 0, 'by_language': {} } def search_with_similarity(self, query: str, limit: int = 20) -> List[Dict]: """Advanced search using embeddings and similarity""" if not self.nlp_processor or not self.nlp_processor.model: return self._text_search(query, limit) try: query_embedding = self.nlp_processor.get_text_embedding(query) if not query_embedding: return self._text_search(query, limit) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT id, title, content, source_url, document_type, importance_score, summary, embedding FROM legal_documents WHERE embedding IS NOT NULL ''') results = [] query_vector = np.array(query_embedding) for row in cursor.fetchall(): try: doc_embedding = json.loads(row[7]) doc_vector = np.array(doc_embedding) similarity = cosine_similarity([query_vector], [doc_vector])[0][0] combined_score = (similarity * 0.7) + (row[5] * 0.3) results.append({ 'id': row[0], 'title': row[1], 'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2], 'source_url': row[3], 'document_type': row[4], 'importance_score': row[5], 'summary': row[6], 'similarity_score': similarity, 'combined_score': combined_score }) except Exception as e: logger.error(f"Error processing document embedding: {e}") continue results.sort(key=lambda x: x['combined_score'], reverse=True) conn.close() return results[:limit] except Exception as e: logger.error(f"Similarity search failed: {e}") return self._text_search(query, limit) def _text_search(self, query: str, limit: int = 20) -> List[Dict]: """Fallback text search""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if self.nlp_processor: normalized_query = self.nlp_processor.normalize_text(query) else: normalized_query = query query_words = normalized_query.split() search_conditions = [] params = [] for word in query_words: search_conditions.append("(title LIKE ? OR content LIKE ?)") params.extend([f'%{word}%', f'%{word}%']) where_clause = " OR ".join(search_conditions) cursor.execute(f''' SELECT id, title, content, source_url, document_type, importance_score, summary FROM legal_documents WHERE {where_clause} ORDER BY importance_score DESC LIMIT ? ''', params + [limit]) results = [] for row in cursor.fetchall(): results.append({ 'id': row[0], 'title': row[1], 'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2], 'source_url': row[3], 'document_type': row[4], 'importance_score': row[5], 'summary': row[6], 'similarity_score': 0.0 }) conn.close() return results except Exception as e: logger.error(f"Text search failed: {e}") return [] def export_to_csv(self, filename: str = None) -> str: """Export data to CSV with full details""" try: if not filename: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"legal_documents_{timestamp}.csv" conn = sqlite3.connect(self.db_path) query = ''' SELECT title, content, source_url, document_type, date_published, date_scraped, category, summary, importance_score, keywords, legal_entities FROM legal_documents ORDER BY importance_score DESC, date_scraped DESC ''' df = pd.read_sql_query(query, conn) conn.close() for col in ['keywords', 'legal_entities']: if col in df.columns: df[col] = df[col].apply(lambda x: ', '.join(json.loads(x)) if x else '') df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"✅ Data exported to CSV: {filename}") return filename except Exception as e: logger.error(f"CSV export failed: {e}") return "" def scrape_real_sources(self, urls: List[str] = IRANIAN_LEGAL_SOURCES, max_docs: int = 20) -> List[LegalDocument]: """Real web scraping implementation with source-specific extraction""" documents = [] for i, url in enumerate(urls): if len(documents) >= max_docs: break try: logger.info(f"🔄 Scraping {i+1}/{len(urls)}: {url}") time.sleep(self.delay) response = self.session.get(url, timeout=15) response.raise_for_status() if response.encoding == 'ISO-8859-1': response.encoding = response.apparent_encoding soup = BeautifulSoup(response.content, 'html.parser') # Extract documents using source-specific logic extracted_items = self._extract_source_specific_content(soup, url, max_docs - len(documents)) for item in extracted_items: if len(documents) >= max_docs: break doc = LegalDocument( title=item['title'], content=item['content'], source_url=item['url'], document_type=self._determine_document_type(item['title'], item['content']), date_published=item['date'] ) if self.nlp_processor: doc = self.nlp_processor.process_document(doc) documents.append(doc) logger.info(f"✅ Extracted: {doc.title[:50]}...") except Exception as e: logger.error(f"❌ Error scraping {url}: {e}") continue documents.sort(key=lambda x: x.importance_score, reverse=True) return documents def _extract_source_specific_content(self, soup: BeautifulSoup, url: str, max_items: int) -> List[Dict]: """Extract content based on source-specific selectors""" if 'irna.ir' in url: return self._extract_irna_content(soup, url, max_items) elif 'tasnimnews.com' in url: return self._extract_tasnim_content(soup, url, max_items) elif 'mehrnews.com' in url: return self._extract_mehr_content(soup, url, max_items) elif 'farsnews.ir' in url: return self._extract_fars_content(soup, url, max_items) else: return self._extract_generic_content(soup, url, max_items) def _extract_irna_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: """Extract content from IRNA""" items = [] try: articles = soup.select('.news-item, .article, .story')[:max_items] for article in articles: title_elem = soup.select_one('h1, h2, h3, .title, .headline, a') if title_elem: title = title_elem.get_text(strip=True) content = article.get_text(strip=True) if len(title) > 10 and len(content) > 100: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) if not items: main_content = soup.select_one('main, .main-content, .content, article') if main_content: title = soup.select_one('h1, title') title_text = title.get_text(strip=True) if title else "خبر ایرنا" content_text = main_content.get_text(strip=True) if len(content_text) > 200: items.append({ 'title': title_text, 'content': content_text, 'url': base_url, 'date': self._extract_date(soup) }) except Exception as e: logger.error(f"IRNA extraction error: {e}") return items def _extract_tasnim_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: """Extract content from Tasnim""" items = [] try: articles = soup.select('.news-box, .item, .story-item')[:max_items] for article in articles: title_elem = article.select_one('h2, h3, .title, a') if title_elem: title = title_elem.get_text(strip=True) content = article.get_text(strip=True) if len(title) > 10 and len(content) > 100: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) if not items: main_content = soup.select_one('.news-content, .story-body, main') if main_content: title = soup.select_one('h1, .news-title') title_text = title.get_text(strip=True) if title else "خبر تسنیم" content_text = main_content.get_text(strip=True) if len(content_text) > 200: items.append({ 'title': title_text, 'content': content_text, 'url': base_url, 'date': self._extract_date(soup) }) except Exception as e: logger.error(f"Tasnim extraction error: {e}") return items def _extract_mehr_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: """Extract content from Mehr News""" items = [] try: articles = soup.select('.news-item, .article-item, .story')[:max_items] for article in articles: title_elem = article.select_one('h2, h3, .title, .headline') if title_elem: title = title_elem.get_text(strip=True) content = article.get_text(strip=True) if len(title) > 10 and len(content) > 100: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) if not items: main_content = soup.select_one('.content, .news-body, article') if main_content: title = soup.select_one('h1, .page-title') title_text = title.get_text(strip=True) if title else "خبر مهر" content_text = main_content.get_text(strip=True) if len(content_text) > 200: items.append({ 'title': title_text, 'content': content_text, 'url': base_url, 'date': self._extract_date(soup) }) except Exception as e: logger.error(f"Mehr extraction error: {e}") return items def _extract_fars_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: """Extract content from Fars News""" items = [] try: articles = soup.select('.news, .item, .story-item')[:max_items] for article in articles: title_elem = article.select_one('h2, h3, .title, a') if title_elem: title = title_elem.get_text(strip=True) content = article.get_text(strip=True) if len(title) > 10 and len(content) > 100: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) if not items: main_content = soup.select_one('.news-content, .story, main') if main_content: title = soup.select_one('h1, .news-title') title_text = title.get_text(strip=True) if title else "خبر فارس" content_text = main_content.get_text(strip=True) if len(content_text) > 200: items.append({ 'title': title_text, 'content': content_text, 'url': base_url, 'date': self._extract_date(soup) }) except Exception as e: logger.error(f"Fars extraction error: {e}") return items def _extract_generic_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: """Generic content extraction for unknown sources""" items = [] try: articles = soup.select('article, .article, .post, .news-item, .story')[:max_items] for article in articles: title_elem = article.select_one('h1, h2, h3, .title, .headline') if title_elem: title = title_elem.get_text(strip=True) content = article.get_text(strip=True) if len(title) > 10 and len(content) > 150: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) if not items: title_elem = soup.select_one('h1, title') content_elem = soup.select_one('main, .main-content, .content, .entry-content, body') if title_elem and content_elem: for unwanted in content_elem(['script', 'style', 'nav', 'header', 'footer']): unwanted.decompose() title = title_elem.get_text(strip=True) content = content_elem.get_text(strip=True) if len(title) > 5 and len(content) > 200: items.append({ 'title': title, 'content': content, 'url': base_url, 'date': self._extract_date(soup) }) except Exception as e: logger.error(f"Generic extraction error: {e}") return items def _extract_document_from_soup(self, soup: BeautifulSoup, url: str) -> Optional[LegalDocument]: """Extract main document from BeautifulSoup object using source-specific logic""" try: items = self._extract_source_specific_content(soup, url, 1) if not items: return None item = items[0] return LegalDocument( title=item['title'], content=item['content'], source_url=item['url'], document_type=self._determine_document_type(item['title'], item['content']), date_published=item['date'] ) except Exception as e: logger.error(f"Document extraction failed: {e}") return None def _extract_additional_articles(self, soup: BeautifulSoup, base_url: str) -> List[LegalDocument]: """Extract additional articles from the same page using source-specific logic""" documents = [] try: items = self._extract_source_specific_content(soup, base_url, 3) for item in items: doc = LegalDocument( title=item['title'], content=item['content'], source_url=item['url'], document_type=self._determine_document_type(item['title'], item['content']), date_published=item['date'] ) documents.append(doc) except Exception as e: logger.error(f"Additional articles extraction failed: {e}") return documents[:3] def _determine_document_type(self, title: str, content: str) -> str: """Determine document type based on content""" text = (title + " " + content).lower() if any(word in text for word in ['قانون', 'ماده', 'فصل', 'بند', 'تبصره']): return 'law' elif any(word in text for word in ['رای', 'حکم', 'دادگاه', 'قاضی']): return 'ruling' elif any(word in text for word in ['آیین‌نامه', 'دستورالعمل', 'بخشنامه']): return 'regulation' elif any(word in text for word in ['خبر', 'اعلام', 'گزارش', 'اطلاعیه']): return 'news' else: return 'general' def _extract_date(self, soup: BeautifulSoup) -> Optional[str]: """Extract publication date""" try: date_selectors = [ 'meta[name="article:published_time"]', 'meta[property="article:published_time"]', 'meta[name="date"]', 'meta[name="DC.date"]', '.date', '.publish-date', '.article-date', 'time[datetime]' ] for selector in date_selectors: element = soup.select_one(selector) if element: date_str = element.get('content') or element.get('datetime') or element.get_text() if date_str: return self._normalize_date(date_str) text = soup.get_text() persian_date_patterns = [ r'(\d{4}/\d{1,2}/\d{1,2})', r'(\d{1,2}/\d{1,2}/\d{4})', r'(\d{4}-\d{1,2}-\d{1,2})' ] for pattern in persian_date_patterns: match = re.search(pattern, text) if match: return match.group(1) return None except Exception: return None def _normalize_date(self, date_str: str) -> Optional[str]: """Normalize date string to standard format""" try: date_str = re.sub(r'[^\d/\-:]', ' ', date_str).strip() formats = [ '%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y', '%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S' ] for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) return parsed_date.strftime('%Y-%m-%d') except ValueError: continue return date_str except Exception: return None