Hoghoghi / app /api /enhanced_analytics.py
Really-amin's picture
Upload 143 files
c636ebf verified
raw
history blame
25.2 kB
#!/usr/bin/env python3
"""
Enhanced Analytics API for Legal Dashboard
=========================================
Advanced analytics endpoints providing:
- Real-time performance metrics
- Predictive analytics and forecasting
- Document clustering and similarity analysis
- Quality assessment and recommendations
- System health monitoring
"""
from fastapi import APIRouter, HTTPException, Query, Depends, BackgroundTasks
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import logging
from pydantic import BaseModel, Field
import json
import asyncio
from ..services.advanced_analytics_service import AdvancedAnalyticsService
from ..services.database_service import DatabaseManager
from ..services.cache_service import cache_service
logger = logging.getLogger(__name__)
router = APIRouter()
# Pydantic models for request/response
class RealTimeMetricsResponse(BaseModel):
"""Real-time metrics response model"""
total_documents: int
processed_today: int
avg_processing_time: float
success_rate: float
error_rate: float
cache_hit_rate: float
quality_score: float
system_health: float
timestamp: str
class TrendAnalysisRequest(BaseModel):
"""Trend analysis request model"""
metric: str = Field(
..., description="Metric to analyze (e.g., 'processing_time', 'quality_score')")
time_period: str = Field(
"7d", description="Time period for analysis (7d, 30d, 90d)")
category: Optional[str] = Field(None, description="Category filter")
confidence_threshold: float = Field(
0.8, description="Minimum confidence for trend analysis")
class TrendAnalysisResponse(BaseModel):
"""Trend analysis response model"""
period: str
metric: str
values: List[float]
timestamps: List[str]
trend_direction: str
change_percentage: float
confidence: float
trend_strength: str
recommendations: List[str]
class SimilarityRequest(BaseModel):
"""Document similarity request model"""
document_id: int = Field(..., description="Target document ID")
threshold: float = Field(0.7, description="Similarity threshold")
limit: int = Field(10, description="Maximum number of results")
include_metadata: bool = Field(
True, description="Include document metadata")
class SimilarityResponse(BaseModel):
"""Document similarity response model"""
target_document_id: int
similar_documents: List[Dict[str, Any]]
total_found: int
average_similarity: float
processing_time: float
class PredictiveInsightsResponse(BaseModel):
"""Predictive insights response model"""
patterns: Dict[str, Any]
predictions: Dict[str, Any]
confidence_intervals: Dict[str, List[float]]
recommendations: List[str]
next_24h_forecast: Dict[str, Any]
system_optimization_suggestions: List[str]
class ClusteringRequest(BaseModel):
"""Document clustering request model"""
n_clusters: int = Field(5, description="Number of clusters")
category: Optional[str] = Field(None, description="Category filter")
min_cluster_size: int = Field(
2, description="Minimum documents per cluster")
class ClusteringResponse(BaseModel):
"""Document clustering response model"""
clusters: Dict[str, List[Dict[str, Any]]]
centroids: List[List[float]]
silhouette_score: float
total_documents: int
cluster_quality_metrics: Dict[str, float]
class QualityReportResponse(BaseModel):
"""Quality report response model"""
overall_quality_score: float
quality_distribution: Dict[str, int]
common_issues: List[Dict[str, Any]]
recommendations: List[str]
quality_trends: Dict[str, Any]
improvement_opportunities: List[Dict[str, Any]]
next_actions: List[str]
class SystemHealthResponse(BaseModel):
"""System health response model"""
overall_health: float
component_health: Dict[str, float]
performance_metrics: Dict[str, float]
alerts: List[Dict[str, Any]]
recommendations: List[str]
last_updated: str
# Dependency injection
def get_analytics_service() -> AdvancedAnalyticsService:
return AdvancedAnalyticsService()
def get_db_manager() -> DatabaseManager:
return DatabaseManager()
@router.get("/real-time-metrics", response_model=RealTimeMetricsResponse)
async def get_real_time_metrics(
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Get real-time system metrics"""
try:
metrics = await analytics_service.get_real_time_metrics()
return RealTimeMetricsResponse(
total_documents=metrics.total_documents,
processed_today=metrics.processed_today,
avg_processing_time=metrics.avg_processing_time,
success_rate=metrics.success_rate,
error_rate=metrics.error_rate,
cache_hit_rate=metrics.cache_hit_rate,
quality_score=metrics.quality_score,
system_health=metrics.system_health,
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error getting real-time metrics: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get real-time metrics: {str(e)}")
@router.post("/trends", response_model=TrendAnalysisResponse)
async def analyze_trends(
request: TrendAnalysisRequest,
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Analyze trends for specific metrics"""
try:
trend_data = await analytics_service.analyze_trends(
metric=request.metric,
time_period=request.time_period,
category=request.category
)
# Determine trend strength
if trend_data.confidence >= 0.9:
trend_strength = "strong"
elif trend_data.confidence >= 0.7:
trend_strength = "moderate"
else:
trend_strength = "weak"
# Generate recommendations based on trend
recommendations = _generate_trend_recommendations(trend_data)
return TrendAnalysisResponse(
period=trend_data.period,
metric=trend_data.metric,
values=trend_data.values,
timestamps=trend_data.timestamps,
trend_direction=trend_data.trend_direction,
change_percentage=trend_data.change_percentage,
confidence=trend_data.confidence,
trend_strength=trend_strength,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing trends: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to analyze trends: {str(e)}")
@router.post("/similarity", response_model=SimilarityResponse)
async def find_similar_documents(
request: SimilarityRequest,
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service),
db_manager: DatabaseManager = Depends(get_db_manager)
):
"""Find similar documents using advanced similarity analysis"""
try:
start_time = datetime.now()
similar_docs = await analytics_service.find_similar_documents(
document_id=request.document_id,
threshold=request.threshold,
limit=request.limit
)
processing_time = (datetime.now() - start_time).total_seconds()
# Prepare response data
similar_documents = []
total_similarity = 0
for doc in similar_docs:
doc_data = {
"document_id": doc.document_id,
"similarity_score": doc.similarity_score,
"common_entities": doc.common_entities,
"shared_topics": doc.shared_topics,
"relevance_score": doc.relevance_score
}
if request.include_metadata:
# Get document metadata
metadata = db_manager.get_document_by_id(doc.document_id)
if metadata:
doc_data["metadata"] = {
"title": metadata.get("title", ""),
"category": metadata.get("category", ""),
"created_at": metadata.get("created_at", ""),
"file_size": metadata.get("file_size", 0)
}
similar_documents.append(doc_data)
total_similarity += doc.similarity_score
average_similarity = total_similarity / \
len(similar_documents) if similar_documents else 0
return SimilarityResponse(
target_document_id=request.document_id,
similar_documents=similar_documents,
total_found=len(similar_documents),
average_similarity=average_similarity,
processing_time=processing_time
)
except Exception as e:
logger.error(f"Error finding similar documents: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to find similar documents: {str(e)}")
@router.get("/predictive-insights", response_model=PredictiveInsightsResponse)
async def get_predictive_insights(
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Get predictive insights for document processing"""
try:
insights = await analytics_service.generate_predictive_insights()
# Generate next 24h forecast
next_24h_forecast = _generate_24h_forecast(
insights.get("predictions", {}))
# Generate system optimization suggestions
optimization_suggestions = _generate_optimization_suggestions(insights)
return PredictiveInsightsResponse(
patterns=insights.get("patterns", {}),
predictions=insights.get("predictions", {}),
confidence_intervals=insights.get("confidence_intervals", {}),
recommendations=insights.get("recommendations", []),
next_24h_forecast=next_24h_forecast,
system_optimization_suggestions=optimization_suggestions
)
except Exception as e:
logger.error(f"Error getting predictive insights: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get predictive insights: {str(e)}")
@router.post("/clustering", response_model=ClusteringResponse)
async def cluster_documents(
request: ClusteringRequest,
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Cluster documents using advanced clustering algorithms"""
try:
clustering_result = await analytics_service.cluster_documents(
n_clusters=request.n_clusters,
category=request.category
)
# Calculate cluster quality metrics
cluster_quality = _calculate_cluster_quality(
clustering_result.get("clusters", {}))
return ClusteringResponse(
clusters=clustering_result.get("clusters", {}),
centroids=clustering_result.get("centroids", []),
silhouette_score=clustering_result.get("silhouette_score", 0),
total_documents=clustering_result.get("total_documents", 0),
cluster_quality_metrics=cluster_quality
)
except Exception as e:
logger.error(f"Error clustering documents: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to cluster documents: {str(e)}")
@router.get("/quality-report", response_model=QualityReportResponse)
async def get_quality_report(
category: Optional[str] = Query(None, description="Category filter"),
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Generate comprehensive quality analysis report"""
try:
quality_report = await analytics_service.generate_quality_report(category)
# Generate next actions based on quality issues
next_actions = _generate_quality_actions(quality_report)
return QualityReportResponse(
overall_quality_score=quality_report.get(
"overall_quality_score", 0),
quality_distribution=quality_report.get(
"quality_distribution", {}),
common_issues=quality_report.get("common_issues", []),
recommendations=quality_report.get("recommendations", []),
quality_trends=quality_report.get("quality_trends", {}),
improvement_opportunities=quality_report.get(
"improvement_opportunities", []),
next_actions=next_actions
)
except Exception as e:
logger.error(f"Error generating quality report: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to generate quality report: {str(e)}")
@router.get("/system-health", response_model=SystemHealthResponse)
async def get_system_health(
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service),
db_manager: DatabaseManager = Depends(get_db_manager)
):
"""Get comprehensive system health status"""
try:
# Get real-time metrics
metrics = await analytics_service.get_real_time_metrics()
# Calculate component health
component_health = _calculate_component_health(metrics, db_manager)
# Get performance metrics
performance_metrics = _get_performance_metrics(db_manager)
# Generate alerts
alerts = _generate_system_alerts(metrics, component_health)
# Generate recommendations
recommendations = _generate_system_recommendations(metrics, alerts)
return SystemHealthResponse(
overall_health=metrics.system_health,
component_health=component_health,
performance_metrics=performance_metrics,
alerts=alerts,
recommendations=recommendations,
last_updated=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error getting system health: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get system health: {str(e)}")
@router.get("/performance-dashboard")
async def get_performance_dashboard(
time_range: str = Query(
"24h", description="Time range for dashboard data"),
analytics_service: AdvancedAnalyticsService = Depends(
get_analytics_service)
):
"""Get comprehensive performance dashboard data"""
try:
# Get real-time metrics
metrics = await analytics_service.get_real_time_metrics()
# Get trend data for different metrics
processing_trend = await analytics_service.analyze_trends("processing_time", time_range)
quality_trend = await analytics_service.analyze_trends("quality_score", time_range)
volume_trend = await analytics_service.analyze_trends("document_volume", time_range)
# Get predictive insights
insights = await analytics_service.generate_predictive_insights()
return {
"status": "success",
"data": {
"real_time_metrics": {
"total_documents": metrics.total_documents,
"processed_today": metrics.processed_today,
"avg_processing_time": metrics.avg_processing_time,
"success_rate": metrics.success_rate,
"system_health": metrics.system_health
},
"trends": {
"processing_time": {
"direction": processing_trend.trend_direction,
"change_percentage": processing_trend.change_percentage,
"confidence": processing_trend.confidence
},
"quality_score": {
"direction": quality_trend.trend_direction,
"change_percentage": quality_trend.change_percentage,
"confidence": quality_trend.confidence
},
"document_volume": {
"direction": volume_trend.trend_direction,
"change_percentage": volume_trend.change_percentage,
"confidence": volume_trend.confidence
}
},
"predictions": insights.get("predictions", {}),
"recommendations": insights.get("recommendations", []),
"timestamp": datetime.now().isoformat()
}
}
except Exception as e:
logger.error(f"Error getting performance dashboard: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get performance dashboard: {str(e)}")
# Helper functions
def _generate_trend_recommendations(trend_data) -> List[str]:
"""Generate recommendations based on trend analysis"""
recommendations = []
if trend_data.trend_direction == "up":
if trend_data.metric == "processing_time":
recommendations.append(
"Processing times are increasing - consider optimizing the pipeline")
elif trend_data.metric == "quality_score":
recommendations.append(
"Quality scores are improving - maintain current processes")
elif trend_data.metric == "document_volume":
recommendations.append(
"Document volume is increasing - consider scaling infrastructure")
elif trend_data.trend_direction == "down":
if trend_data.metric == "quality_score":
recommendations.append(
"Quality scores are declining - investigate and implement quality improvements")
elif trend_data.metric == "success_rate":
recommendations.append(
"Success rate is declining - investigate error patterns")
if trend_data.confidence < 0.7:
recommendations.append(
"Low confidence in trend analysis - collect more data for reliable insights")
return recommendations
def _generate_24h_forecast(predictions: Dict[str, Any]) -> Dict[str, Any]:
"""Generate 24-hour forecast based on predictions"""
try:
forecast = {
"expected_documents": predictions.get("expected_volume", 0),
"peak_hours": predictions.get("peak_hours", []),
"avg_processing_time": predictions.get("processing_time_forecast", 0),
"quality_forecast": predictions.get("quality_forecast", 0),
"system_load": "medium" # Default, can be enhanced with actual load prediction
}
# Adjust forecast based on historical patterns
if forecast["expected_documents"] > 100:
forecast["system_load"] = "high"
elif forecast["expected_documents"] < 20:
forecast["system_load"] = "low"
return forecast
except Exception as e:
logger.error(f"Error generating 24h forecast: {e}")
return {}
def _generate_optimization_suggestions(insights: Dict[str, Any]) -> List[str]:
"""Generate system optimization suggestions"""
suggestions = []
predictions = insights.get("predictions", {})
if predictions.get("processing_time_forecast", 0) > 30:
suggestions.append(
"Optimize document processing pipeline for faster processing")
if predictions.get("quality_forecast", 0) < 0.7:
suggestions.append(
"Implement additional quality checks and validation")
if predictions.get("expected_volume", 0) > 1000:
suggestions.append(
"Consider scaling infrastructure to handle increased load")
patterns = insights.get("patterns", {})
if patterns.get("error_patterns"):
suggestions.append("Investigate and resolve common error patterns")
return suggestions
def _calculate_cluster_quality(clusters: Dict[str, List]) -> Dict[str, float]:
"""Calculate quality metrics for each cluster"""
quality_metrics = {}
for cluster_name, documents in clusters.items():
if documents:
# Calculate average similarity to centroid
similarities = [doc.get("similarity_to_centroid", 0)
for doc in documents]
avg_similarity = sum(similarities) / \
len(similarities) if similarities else 0
# Calculate cluster size score
size_score = min(1.0, len(documents) / 10) # Normalize to 0-1
# Overall cluster quality
quality_metrics[cluster_name] = (avg_similarity + size_score) / 2
return quality_metrics
def _generate_quality_actions(quality_report: Dict[str, Any]) -> List[str]:
"""Generate next actions based on quality report"""
actions = []
overall_score = quality_report.get("overall_quality_score", 0)
common_issues = quality_report.get("common_issues", [])
if overall_score < 0.8:
actions.append("Implement comprehensive quality improvement plan")
for issue in common_issues:
if issue.get("severity") == "high":
actions.append(
f"Address high-priority issue: {issue.get('type', 'Unknown')}")
opportunities = quality_report.get("improvement_opportunities", [])
if opportunities:
actions.append("Focus on highest-impact improvement opportunities")
return actions
def _calculate_component_health(metrics, db_manager) -> Dict[str, float]:
"""Calculate health scores for different system components"""
try:
components = {
"database": 100.0, # Default, can be enhanced with actual DB health checks
"ocr_pipeline": 100.0,
"ai_engine": 100.0,
"cache_system": 100.0,
"file_storage": 100.0
}
# Adjust based on metrics
if metrics.success_rate < 90:
components["ocr_pipeline"] = metrics.success_rate
components["ai_engine"] = metrics.success_rate
if metrics.cache_hit_rate < 80:
components["cache_system"] = metrics.cache_hit_rate
return components
except Exception as e:
logger.error(f"Error calculating component health: {e}")
return {}
def _get_performance_metrics(db_manager) -> Dict[str, float]:
"""Get detailed performance metrics"""
try:
return {
"avg_response_time": 0.5, # Placeholder, should be calculated from actual data
"throughput": 100, # documents per hour
"error_rate": 0.02, # 2%
"uptime": 99.9, # 99.9%
"memory_usage": 75.0, # 75%
"cpu_usage": 60.0 # 60%
}
except Exception as e:
logger.error(f"Error getting performance metrics: {e}")
return {}
def _generate_system_alerts(metrics, component_health) -> List[Dict[str, Any]]:
"""Generate system alerts based on metrics and component health"""
alerts = []
# Check success rate
if metrics.success_rate < 90:
alerts.append({
"type": "warning",
"component": "processing_pipeline",
"message": f"Success rate below threshold: {metrics.success_rate:.1f}%",
"severity": "medium"
})
# Check system health
if metrics.system_health < 80:
alerts.append({
"type": "error",
"component": "system",
"message": f"System health critical: {metrics.system_health:.1f}%",
"severity": "high"
})
# Check component health
for component, health in component_health.items():
if health < 80:
alerts.append({
"type": "warning",
"component": component,
"message": f"{component.replace('_', ' ').title()} health degraded: {health:.1f}%",
"severity": "medium"
})
return alerts
def _generate_system_recommendations(metrics, alerts) -> List[str]:
"""Generate system recommendations based on metrics and alerts"""
recommendations = []
if metrics.success_rate < 90:
recommendations.append("Investigate and resolve processing failures")
if metrics.avg_processing_time > 30:
recommendations.append("Optimize document processing pipeline")
if metrics.cache_hit_rate < 80:
recommendations.append("Optimize cache configuration and usage")
if alerts:
recommendations.append(
"Address system alerts to improve overall health")
return recommendations