Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
#!/usr/bin/env python3 | |
""" | |
Enterprise-grade health check system for FRED ML | |
Comprehensive monitoring of all system components | |
""" | |
import sys | |
import os | |
import time | |
import json | |
import requests | |
import subprocess | |
from pathlib import Path | |
from typing import Dict, List, Any, Optional | |
from datetime import datetime, timedelta | |
import logging | |
# Add project root to path | |
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
class HealthChecker: | |
"""Enterprise-grade health checker for FRED ML""" | |
def __init__(self): | |
self.project_root = Path(__file__).parent.parent | |
self.health_results = {} | |
self.start_time = time.time() | |
self.setup_logging() | |
def setup_logging(self): | |
"""Setup logging for health checks""" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
self.logger = logging.getLogger(__name__) | |
def check_python_environment(self) -> Dict[str, Any]: | |
"""Check Python environment health""" | |
self.logger.info("Checking Python environment...") | |
try: | |
import sys | |
import platform | |
result = { | |
"python_version": sys.version, | |
"platform": platform.platform(), | |
"architecture": platform.architecture(), | |
"processor": platform.processor(), | |
"status": "healthy" | |
} | |
# Check Python version | |
if sys.version_info >= (3, 9): | |
result["python_version_ok"] = True | |
else: | |
result["python_version_ok"] = False | |
result["status"] = "warning" | |
result["message"] = "Python version should be 3.9+" | |
# Check virtual environment | |
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): | |
result["virtual_env"] = True | |
result["virtual_env_path"] = sys.prefix | |
else: | |
result["virtual_env"] = False | |
result["status"] = "warning" | |
result["message"] = "Not running in virtual environment" | |
self.logger.info("Python environment check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Python environment check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_dependencies(self) -> Dict[str, Any]: | |
"""Check installed dependencies""" | |
self.logger.info("Checking dependencies...") | |
try: | |
import pkg_resources | |
import subprocess | |
# Get installed packages | |
installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set} | |
# Check required packages | |
required_packages = [ | |
"pandas", "numpy", "matplotlib", "seaborn", "streamlit", | |
"requests", "scikit-learn", "scipy", "statsmodels" | |
] | |
missing_packages = [] | |
outdated_packages = [] | |
for package in required_packages: | |
if package not in installed_packages: | |
missing_packages.append(package) | |
else: | |
# Could add version checking here | |
pass | |
result = { | |
"installed_packages": len(installed_packages), | |
"required_packages": len(required_packages), | |
"missing_packages": missing_packages, | |
"outdated_packages": outdated_packages, | |
"status": "healthy" if not missing_packages else "warning" | |
} | |
if missing_packages: | |
result["message"] = f"Missing packages: {', '.join(missing_packages)}" | |
self.logger.info("Dependencies check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Dependencies check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_configuration(self) -> Dict[str, Any]: | |
"""Check configuration health""" | |
self.logger.info("Checking configuration...") | |
try: | |
from config.settings import get_config | |
config = get_config() | |
result = { | |
"fred_api_key_configured": bool(config.api.fred_api_key), | |
"aws_configured": bool(config.aws.access_key_id and config.aws.secret_access_key), | |
"environment": os.getenv("ENVIRONMENT", "development"), | |
"log_level": config.logging.level, | |
"status": "healthy" | |
} | |
# Check for required configuration | |
if not result["fred_api_key_configured"]: | |
result["status"] = "warning" | |
result["message"] = "FRED API key not configured" | |
if not result["aws_configured"]: | |
result["status"] = "warning" | |
result["message"] = "AWS credentials not configured (cloud features disabled)" | |
self.logger.info("Configuration check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Configuration check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_file_system(self) -> Dict[str, Any]: | |
"""Check file system health""" | |
self.logger.info("Checking file system...") | |
try: | |
import shutil | |
result = { | |
"project_root_exists": self.project_root.exists(), | |
"src_directory_exists": (self.project_root / "src").exists(), | |
"tests_directory_exists": (self.project_root / "tests").exists(), | |
"config_directory_exists": (self.project_root / "config").exists(), | |
"data_directory_exists": (self.project_root / "data").exists(), | |
"logs_directory_exists": (self.project_root / "logs").exists(), | |
"status": "healthy" | |
} | |
# Check disk space | |
try: | |
disk_usage = shutil.disk_usage(self.project_root) | |
result["disk_free_gb"] = disk_usage.free / (1024**3) | |
result["disk_total_gb"] = disk_usage.total / (1024**3) | |
result["disk_usage_percent"] = (1 - disk_usage.free / disk_usage.total) * 100 | |
if result["disk_free_gb"] < 1.0: | |
result["status"] = "warning" | |
result["message"] = "Low disk space" | |
except Exception: | |
result["disk_info"] = "unavailable" | |
# Check for missing directories | |
missing_dirs = [] | |
for key, exists in result.items(): | |
if key.endswith("_exists") and not exists: | |
missing_dirs.append(key.replace("_exists", "")) | |
if missing_dirs: | |
result["status"] = "warning" | |
result["message"] = f"Missing directories: {', '.join(missing_dirs)}" | |
self.logger.info("File system check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"File system check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_network_connectivity(self) -> Dict[str, Any]: | |
"""Check network connectivity""" | |
self.logger.info("Checking network connectivity...") | |
try: | |
result = { | |
"status": "healthy", | |
"tests": {} | |
} | |
# Test FRED API connectivity | |
try: | |
fred_response = requests.get( | |
"https://api.stlouisfed.org/fred/series?series_id=GDP&api_key=test&file_type=json", | |
timeout=10 | |
) | |
result["tests"]["fred_api"] = { | |
"reachable": True, | |
"response_time": fred_response.elapsed.total_seconds(), | |
"status_code": fred_response.status_code | |
} | |
except Exception as e: | |
result["tests"]["fred_api"] = { | |
"reachable": False, | |
"error": str(e) | |
} | |
# Test general internet connectivity | |
try: | |
google_response = requests.get("https://www.google.com", timeout=5) | |
result["tests"]["internet"] = { | |
"reachable": True, | |
"response_time": google_response.elapsed.total_seconds() | |
} | |
except Exception as e: | |
result["tests"]["internet"] = { | |
"reachable": False, | |
"error": str(e) | |
} | |
result["status"] = "error" | |
# Test AWS connectivity (if configured) | |
try: | |
from config.settings import get_config | |
config = get_config() | |
if config.aws.access_key_id: | |
import boto3 | |
sts = boto3.client('sts') | |
sts.get_caller_identity() | |
result["tests"]["aws"] = { | |
"reachable": True, | |
"authenticated": True | |
} | |
else: | |
result["tests"]["aws"] = { | |
"reachable": "not_configured" | |
} | |
except Exception as e: | |
result["tests"]["aws"] = { | |
"reachable": False, | |
"error": str(e) | |
} | |
self.logger.info("Network connectivity check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Network connectivity check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_application_modules(self) -> Dict[str, Any]: | |
"""Check application module health""" | |
self.logger.info("Checking application modules...") | |
try: | |
result = { | |
"status": "healthy", | |
"modules": {} | |
} | |
# Test core module imports | |
core_modules = [ | |
("src.core.enhanced_fred_client", "EnhancedFREDClient"), | |
("src.analysis.comprehensive_analytics", "ComprehensiveAnalytics"), | |
("src.analysis.economic_forecasting", "EconomicForecaster"), | |
("src.analysis.economic_segmentation", "EconomicSegmentation"), | |
("src.analysis.statistical_modeling", "StatisticalModeling"), | |
("src.analysis.mathematical_fixes", "MathematicalFixes"), | |
] | |
for module_name, class_name in core_modules: | |
try: | |
module_obj = __import__(module_name, fromlist=[class_name]) | |
class_obj = getattr(module_obj, class_name) | |
result["modules"][module_name] = { | |
"importable": True, | |
"class_available": True | |
} | |
except ImportError as e: | |
result["modules"][module_name] = { | |
"importable": False, | |
"error": str(e) | |
} | |
result["status"] = "warning" | |
except Exception as e: | |
result["modules"][module_name] = { | |
"importable": True, | |
"class_available": False, | |
"error": str(e) | |
} | |
result["status"] = "warning" | |
self.logger.info("Application modules check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Application modules check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_test_suite(self) -> Dict[str, Any]: | |
"""Check test suite health""" | |
self.logger.info("Checking test suite...") | |
try: | |
result = { | |
"status": "healthy", | |
"test_files": {} | |
} | |
# Check test directory structure | |
test_dirs = ["tests/unit", "tests/integration", "tests/e2e"] | |
for test_dir in test_dirs: | |
dir_path = self.project_root / test_dir | |
if dir_path.exists(): | |
test_files = list(dir_path.glob("test_*.py")) | |
result["test_files"][test_dir] = { | |
"exists": True, | |
"file_count": len(test_files), | |
"files": [f.name for f in test_files] | |
} | |
else: | |
result["test_files"][test_dir] = { | |
"exists": False, | |
"file_count": 0, | |
"files": [] | |
} | |
result["status"] = "warning" | |
# Check test runner | |
test_runner = self.project_root / "tests" / "run_tests.py" | |
result["test_runner"] = { | |
"exists": test_runner.exists(), | |
"executable": test_runner.exists() and os.access(test_runner, os.X_OK) | |
} | |
if not result["test_runner"]["exists"]: | |
result["status"] = "warning" | |
self.logger.info("Test suite check completed") | |
return result | |
except Exception as e: | |
self.logger.error(f"Test suite check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def check_performance(self) -> Dict[str, Any]: | |
"""Check system performance""" | |
self.logger.info("Checking system performance...") | |
try: | |
import psutil | |
import time | |
result = { | |
"status": "healthy", | |
"performance": {} | |
} | |
# CPU usage | |
cpu_percent = psutil.cpu_percent(interval=1) | |
result["performance"]["cpu_usage"] = cpu_percent | |
# Memory usage | |
memory = psutil.virtual_memory() | |
result["performance"]["memory_usage"] = memory.percent | |
result["performance"]["memory_available_gb"] = memory.available / (1024**3) | |
# Disk I/O | |
disk_io = psutil.disk_io_counters() | |
if disk_io: | |
result["performance"]["disk_read_mb"] = disk_io.read_bytes / (1024**2) | |
result["performance"]["disk_write_mb"] = disk_io.write_bytes / (1024**2) | |
# Performance thresholds | |
if cpu_percent > 80: | |
result["status"] = "warning" | |
result["message"] = "High CPU usage" | |
if memory.percent > 80: | |
result["status"] = "warning" | |
result["message"] = "High memory usage" | |
self.logger.info("Performance check completed") | |
return result | |
except ImportError: | |
self.logger.warning("psutil not installed - performance monitoring disabled") | |
return { | |
"status": "warning", | |
"message": "psutil not installed - install with: pip install psutil" | |
} | |
except Exception as e: | |
self.logger.error(f"Performance check failed: {e}") | |
return { | |
"status": "error", | |
"error": str(e) | |
} | |
def run_all_checks(self) -> Dict[str, Any]: | |
"""Run all health checks""" | |
self.logger.info("Starting comprehensive health check...") | |
checks = [ | |
("python_environment", self.check_python_environment), | |
("dependencies", self.check_dependencies), | |
("configuration", self.check_configuration), | |
("file_system", self.check_file_system), | |
("network_connectivity", self.check_network_connectivity), | |
("application_modules", self.check_application_modules), | |
("test_suite", self.check_test_suite), | |
("performance", self.check_performance), | |
] | |
for check_name, check_func in checks: | |
try: | |
self.health_results[check_name] = check_func() | |
except Exception as e: | |
self.health_results[check_name] = { | |
"status": "error", | |
"error": str(e) | |
} | |
# Calculate overall health | |
overall_status = self._calculate_overall_health() | |
return { | |
"timestamp": datetime.now().isoformat(), | |
"duration": time.time() - self.start_time, | |
"overall_status": overall_status, | |
"checks": self.health_results | |
} | |
def _calculate_overall_health(self) -> str: | |
"""Calculate overall system health""" | |
statuses = [check.get("status", "unknown") for check in self.health_results.values()] | |
if "error" in statuses: | |
return "error" | |
elif "warning" in statuses: | |
return "warning" | |
else: | |
return "healthy" | |
def print_health_report(self, health_report: Dict[str, Any]): | |
"""Print comprehensive health report""" | |
print("\n" + "=" * 60) | |
print("π₯ FRED ML - SYSTEM HEALTH REPORT") | |
print("=" * 60) | |
overall_status = health_report["overall_status"] | |
duration = health_report["duration"] | |
# Status indicator | |
status_icons = { | |
"healthy": "β ", | |
"warning": "β οΈ", | |
"error": "β" | |
} | |
print(f"\nOverall Status: {status_icons.get(overall_status, 'β')} {overall_status.upper()}") | |
print(f"Check Duration: {duration:.2f} seconds") | |
print(f"Timestamp: {health_report['timestamp']}") | |
print(f"\nπ Detailed Results:") | |
for check_name, check_result in health_report["checks"].items(): | |
status = check_result.get("status", "unknown") | |
icon = status_icons.get(status, "β") | |
print(f" {icon} {check_name.replace('_', ' ').title()}: {status}") | |
if "message" in check_result: | |
print(f" ββ {check_result['message']}") | |
# Summary | |
print(f"\nπ Summary:") | |
status_counts = {} | |
for check_result in health_report["checks"].values(): | |
status = check_result.get("status", "unknown") | |
status_counts[status] = status_counts.get(status, 0) + 1 | |
for status, count in status_counts.items(): | |
icon = status_icons.get(status, "β") | |
print(f" {icon} {status.title()}: {count} checks") | |
# Recommendations | |
print(f"\nπ‘ Recommendations:") | |
if overall_status == "healthy": | |
print(" β System is healthy and ready for production use") | |
elif overall_status == "warning": | |
print(" β οΈ System has some issues that should be addressed") | |
for check_name, check_result in health_report["checks"].items(): | |
if check_result.get("status") == "warning": | |
print(f" - Review {check_name.replace('_', ' ')} configuration") | |
else: | |
print(" β System has critical issues that must be resolved") | |
for check_name, check_result in health_report["checks"].items(): | |
if check_result.get("status") == "error": | |
print(f" - Fix {check_name.replace('_', ' ')} issues") | |
def save_health_report(self, health_report: Dict[str, Any], filename: str = "health_report.json"): | |
"""Save health report to file""" | |
report_path = self.project_root / filename | |
try: | |
with open(report_path, 'w') as f: | |
json.dump(health_report, f, indent=2, default=str) | |
self.logger.info(f"Health report saved to: {report_path}") | |
except Exception as e: | |
self.logger.error(f"Failed to save health report: {e}") | |
def main(): | |
"""Main entry point""" | |
import argparse | |
parser = argparse.ArgumentParser(description="FRED ML Health Checker") | |
parser.add_argument("--save-report", action="store_true", help="Save health report to file") | |
parser.add_argument("--output-file", default="health_report.json", help="Output file for health report") | |
args = parser.parse_args() | |
checker = HealthChecker() | |
health_report = checker.run_all_checks() | |
checker.print_health_report(health_report) | |
if args.save_report: | |
checker.save_health_report(health_report, args.output_file) | |
# Exit with appropriate code | |
if health_report["overall_status"] == "error": | |
sys.exit(1) | |
elif health_report["overall_status"] == "warning": | |
sys.exit(2) | |
else: | |
sys.exit(0) | |
if __name__ == "__main__": | |
main() |