Spaces:
Paused
Paused
| """ | |
| Analytics and Reporting API for Legal Dashboard | |
| ============================================== | |
| Provides comprehensive analytics, performance metrics, and reporting capabilities. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| from contextlib import contextmanager | |
| from fastapi import APIRouter, HTTPException, Depends, Query | |
| from fastapi.responses import StreamingResponse | |
| import csv | |
| import io | |
| from pydantic import BaseModel | |
| # Import services | |
| from ..services.cache_service import cache_service | |
| from ..services.notification_service import notification_service | |
| from ..api.auth import get_current_user, require_role | |
| logger = logging.getLogger(__name__) | |
| # Pydantic models | |
| class AnalyticsSummary(BaseModel): | |
| total_documents: int | |
| total_users: int | |
| total_ocr_processed: int | |
| total_scraping_sessions: int | |
| avg_processing_time: float | |
| success_rate: float | |
| cache_hit_rate: float | |
| system_uptime: float | |
| class PerformanceMetrics(BaseModel): | |
| api_response_times: Dict[str, float] | |
| memory_usage: Dict[str, Any] | |
| cpu_usage: float | |
| disk_usage: Dict[str, Any] | |
| active_connections: int | |
| class UserActivity(BaseModel): | |
| user_id: int | |
| username: str | |
| documents_processed: int | |
| last_activity: str | |
| total_processing_time: float | |
| success_rate: float | |
| class DocumentAnalytics(BaseModel): | |
| document_id: int | |
| filename: str | |
| processing_time: float | |
| ocr_accuracy: Optional[float] | |
| file_size: int | |
| created_at: str | |
| status: str | |
| # Database connection | |
| def get_db_connection(): | |
| db_path = os.getenv("DATABASE_PATH", "legal_documents.db") | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| # Router | |
| router = APIRouter() | |
| async def get_analytics_summary(current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Get comprehensive analytics summary""" | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| # Total documents | |
| cursor.execute("SELECT COUNT(*) FROM documents") | |
| total_documents = cursor.fetchone()[0] | |
| # Total users | |
| cursor.execute("SELECT COUNT(*) FROM users") | |
| total_users = cursor.fetchone()[0] | |
| # OCR processing stats | |
| cursor.execute(""" | |
| SELECT COUNT(*) as total, | |
| AVG(processing_time) as avg_time, | |
| SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful | |
| FROM documents | |
| WHERE ocr_processed = 1 | |
| """) | |
| ocr_stats = cursor.fetchone() | |
| total_ocr_processed = ocr_stats['total'] if ocr_stats['total'] else 0 | |
| avg_processing_time = ocr_stats['avg_time'] if ocr_stats['avg_time'] else 0 | |
| success_rate = ( | |
| ocr_stats['successful'] / total_ocr_processed * 100) if total_ocr_processed > 0 else 0 | |
| # Scraping sessions | |
| cursor.execute("SELECT COUNT(*) FROM scraping_sessions") | |
| total_scraping_sessions = cursor.fetchone()[0] | |
| # Cache statistics | |
| cache_stats = cache_service.get_cache_stats() | |
| cache_hit_rate = cache_stats.get('hit_rate', 0) | |
| # System uptime (simplified - in production, you'd track this properly) | |
| system_uptime = 99.5 # Placeholder | |
| return AnalyticsSummary( | |
| total_documents=total_documents, | |
| total_users=total_users, | |
| total_ocr_processed=total_ocr_processed, | |
| total_scraping_sessions=total_scraping_sessions, | |
| avg_processing_time=avg_processing_time, | |
| success_rate=success_rate, | |
| cache_hit_rate=cache_hit_rate, | |
| system_uptime=system_uptime | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error getting analytics summary: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve analytics summary") | |
| async def get_performance_metrics(current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Get system performance metrics""" | |
| try: | |
| # Get cache statistics | |
| cache_stats = cache_service.get_cache_stats() | |
| # Simulate performance metrics (in production, you'd get these from monitoring) | |
| api_response_times = { | |
| "documents": 150.0, | |
| "ocr": 2500.0, | |
| "search": 200.0, | |
| "analytics": 300.0 | |
| } | |
| memory_usage = { | |
| "total": "2.5GB", | |
| "used": "1.8GB", | |
| "available": "700MB", | |
| "percentage": 72.0 | |
| } | |
| cpu_usage = 45.5 | |
| disk_usage = { | |
| "total": "50GB", | |
| "used": "35GB", | |
| "available": "15GB", | |
| "percentage": 70.0 | |
| } | |
| active_connections = len(cache_service.active_connections) if hasattr( | |
| cache_service, 'active_connections') else 0 | |
| return PerformanceMetrics( | |
| api_response_times=api_response_times, | |
| memory_usage=memory_usage, | |
| cpu_usage=cpu_usage, | |
| disk_usage=disk_usage, | |
| active_connections=active_connections | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error getting performance metrics: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve performance metrics") | |
| async def get_user_activity( | |
| days: int = Query(30, description="Number of days to analyze"), | |
| current_user: Dict[str, Any] = Depends(require_role("admin")) | |
| ): | |
| """Get user activity analytics""" | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| # Get user activity for the specified period | |
| start_date = datetime.utcnow() - timedelta(days=days) | |
| cursor.execute(""" | |
| SELECT | |
| u.id, | |
| u.username, | |
| COUNT(d.id) as documents_processed, | |
| MAX(d.created_at) as last_activity, | |
| AVG(d.processing_time) as avg_processing_time, | |
| SUM(CASE WHEN d.status = 'completed' THEN 1 ELSE 0 END) as successful_docs, | |
| COUNT(d.id) as total_docs | |
| FROM users u | |
| LEFT JOIN documents d ON u.id = d.user_id | |
| AND d.created_at >= ? | |
| GROUP BY u.id, u.username | |
| ORDER BY documents_processed DESC | |
| """, (start_date.isoformat(),)) | |
| activities = [] | |
| for row in cursor.fetchall(): | |
| total_docs = row['total_docs'] or 0 | |
| successful_docs = row['successful_docs'] or 0 | |
| success_rate = (successful_docs / total_docs * | |
| 100) if total_docs > 0 else 0 | |
| activities.append(UserActivity( | |
| user_id=row['id'], | |
| username=row['username'], | |
| documents_processed=row['documents_processed'] or 0, | |
| last_activity=row['last_activity'] or "Never", | |
| total_processing_time=row['avg_processing_time'] or 0, | |
| success_rate=success_rate | |
| )) | |
| return activities | |
| except Exception as e: | |
| logger.error(f"Error getting user activity: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve user activity") | |
| async def get_document_analytics( | |
| limit: int = Query(100, description="Number of documents to retrieve"), | |
| current_user: Dict[str, Any] = Depends(require_role("admin")) | |
| ): | |
| """Get document processing analytics""" | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT | |
| id, | |
| filename, | |
| processing_time, | |
| ocr_accuracy, | |
| file_size, | |
| created_at, | |
| status | |
| FROM documents | |
| ORDER BY created_at DESC | |
| LIMIT ? | |
| """, (limit,)) | |
| analytics = [] | |
| for row in cursor.fetchall(): | |
| analytics.append(DocumentAnalytics( | |
| document_id=row['id'], | |
| filename=row['filename'], | |
| processing_time=row['processing_time'] or 0, | |
| ocr_accuracy=row['ocr_accuracy'], | |
| file_size=row['file_size'] or 0, | |
| created_at=row['created_at'], | |
| status=row['status'] | |
| )) | |
| return analytics | |
| except Exception as e: | |
| logger.error(f"Error getting document analytics: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve document analytics") | |
| async def export_analytics_csv( | |
| report_type: str = Query( | |
| ..., description="Type of report: summary, user_activity, document_analytics"), | |
| current_user: Dict[str, Any] = Depends(require_role("admin")) | |
| ): | |
| """Export analytics data as CSV""" | |
| try: | |
| if report_type == "summary": | |
| data = await get_analytics_summary(current_user) | |
| return _generate_summary_csv(data) | |
| elif report_type == "user_activity": | |
| data = await get_user_activity(30, current_user) | |
| return _generate_user_activity_csv(data) | |
| elif report_type == "document_analytics": | |
| data = await get_document_analytics(1000, current_user) | |
| return _generate_document_analytics_csv(data) | |
| else: | |
| raise HTTPException(status_code=400, detail="Invalid report type") | |
| except Exception as e: | |
| logger.error(f"Error exporting CSV: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to export CSV") | |
| def _generate_summary_csv(data: AnalyticsSummary): | |
| """Generate CSV for analytics summary""" | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(["Metric", "Value"]) | |
| writer.writerow(["Total Documents", data.total_documents]) | |
| writer.writerow(["Total Users", data.total_users]) | |
| writer.writerow(["Total OCR Processed", data.total_ocr_processed]) | |
| writer.writerow(["Total Scraping Sessions", data.total_scraping_sessions]) | |
| writer.writerow(["Average Processing Time", | |
| f"{data.avg_processing_time:.2f}s"]) | |
| writer.writerow(["Success Rate", f"{data.success_rate:.2f}%"]) | |
| writer.writerow(["Cache Hit Rate", f"{data.cache_hit_rate:.2f}%"]) | |
| writer.writerow(["System Uptime", f"{data.system_uptime:.2f}%"]) | |
| output.seek(0) | |
| return StreamingResponse( | |
| io.BytesIO(output.getvalue().encode()), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=analytics_summary_{datetime.now().strftime('%Y%m%d')}.csv"} | |
| ) | |
| def _generate_user_activity_csv(data: List[UserActivity]): | |
| """Generate CSV for user activity""" | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(["User ID", "Username", "Documents Processed", | |
| "Last Activity", "Avg Processing Time", "Success Rate"]) | |
| for activity in data: | |
| writer.writerow([ | |
| activity.user_id, | |
| activity.username, | |
| activity.documents_processed, | |
| activity.last_activity, | |
| f"{activity.total_processing_time:.2f}s", | |
| f"{activity.success_rate:.2f}%" | |
| ]) | |
| output.seek(0) | |
| return StreamingResponse( | |
| io.BytesIO(output.getvalue().encode()), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=user_activity_{datetime.now().strftime('%Y%m%d')}.csv"} | |
| ) | |
| def _generate_document_analytics_csv(data: List[DocumentAnalytics]): | |
| """Generate CSV for document analytics""" | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(["Document ID", "Filename", "Processing Time", | |
| "OCR Accuracy", "File Size", "Created At", "Status"]) | |
| for doc in data: | |
| writer.writerow([ | |
| doc.document_id, | |
| doc.filename, | |
| f"{doc.processing_time:.2f}s", | |
| f"{doc.ocr_accuracy:.2f}%" if doc.ocr_accuracy else "N/A", | |
| f"{doc.file_size} bytes", | |
| doc.created_at, | |
| doc.status | |
| ]) | |
| output.seek(0) | |
| return StreamingResponse( | |
| io.BytesIO(output.getvalue().encode()), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=document_analytics_{datetime.now().strftime('%Y%m%d')}.csv"} | |
| ) | |
| async def get_cache_statistics(current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Get cache performance statistics""" | |
| try: | |
| stats = cache_service.get_cache_stats() | |
| return { | |
| "cache_stats": stats, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting cache stats: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve cache statistics") | |
| async def get_notification_statistics(current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Get notification statistics""" | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| # Total notifications | |
| cursor.execute("SELECT COUNT(*) FROM notifications") | |
| total_notifications = cursor.fetchone()[0] | |
| # Notifications by type | |
| cursor.execute(""" | |
| SELECT type, COUNT(*) as count | |
| FROM notifications | |
| GROUP BY type | |
| """) | |
| by_type = dict(cursor.fetchall()) | |
| # Recent notifications (last 24 hours) | |
| yesterday = datetime.utcnow() - timedelta(days=1) | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM notifications | |
| WHERE created_at >= ? | |
| """, (yesterday.isoformat(),)) | |
| recent_notifications = cursor.fetchone()[0] | |
| return { | |
| "total_notifications": total_notifications, | |
| "recent_notifications": recent_notifications, | |
| "by_type": by_type, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting notification stats: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve notification statistics") | |
| async def get_system_health(current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Get system health status""" | |
| try: | |
| # Check database connectivity | |
| db_healthy = False | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT 1") | |
| db_healthy = True | |
| except: | |
| pass | |
| # Check cache connectivity | |
| cache_healthy = False | |
| try: | |
| cache_service.get("health_check") | |
| cache_healthy = True | |
| except: | |
| pass | |
| # Check disk space (simplified) | |
| disk_usage = { | |
| "total": "50GB", | |
| "used": "35GB", | |
| "available": "15GB", | |
| "healthy": True | |
| } | |
| # Check memory usage (simplified) | |
| memory_usage = { | |
| "total": "8GB", | |
| "used": "6GB", | |
| "available": "2GB", | |
| "healthy": True | |
| } | |
| return { | |
| "database": { | |
| "status": "healthy" if db_healthy else "unhealthy", | |
| "connected": db_healthy | |
| }, | |
| "cache": { | |
| "status": "healthy" if cache_healthy else "unhealthy", | |
| "connected": cache_healthy | |
| }, | |
| "disk": { | |
| "status": "healthy" if disk_usage["healthy"] else "warning", | |
| "usage": disk_usage | |
| }, | |
| "memory": { | |
| "status": "healthy" if memory_usage["healthy"] else "warning", | |
| "usage": memory_usage | |
| }, | |
| "overall_status": "healthy" if all([db_healthy, cache_healthy, disk_usage["healthy"], memory_usage["healthy"]]) else "warning", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting system health: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve system health") | |
| async def get_analytics_trends( | |
| days: int = Query(30, description="Number of days to analyze"), | |
| current_user: Dict[str, Any] = Depends(require_role("admin")) | |
| ): | |
| """Get analytics trends over time""" | |
| try: | |
| with get_db_connection() as conn: | |
| cursor = conn.cursor() | |
| # Daily document processing trends | |
| cursor.execute(""" | |
| SELECT | |
| DATE(created_at) as date, | |
| COUNT(*) as documents_processed, | |
| AVG(processing_time) as avg_processing_time, | |
| SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful | |
| FROM documents | |
| WHERE created_at >= date('now', '-{} days') | |
| GROUP BY DATE(created_at) | |
| ORDER BY date | |
| """.format(days)) | |
| daily_trends = [] | |
| for row in cursor.fetchall(): | |
| total = row['documents_processed'] | |
| successful = row['successful'] | |
| success_rate = (successful / total * 100) if total > 0 else 0 | |
| daily_trends.append({ | |
| "date": row['date'], | |
| "documents_processed": total, | |
| "avg_processing_time": row['avg_processing_time'] or 0, | |
| "success_rate": success_rate | |
| }) | |
| return { | |
| "daily_trends": daily_trends, | |
| "period_days": days, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting analytics trends: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve analytics trends") | |