#!/usr/bin/env python3 """ Batch ECG Analysis Script Processes all ECGs in ecg_uploads_greenwich/ directory using ECG-FM Production API Updates Greenwichschooldata.csv with comprehensive clinical analysis results """ import pandas as pd import requests import json import time import os from typing import Dict, Any, List from datetime import datetime import traceback # Configuration API_BASE_URL = "https://mystic-cbk-ecg-fm-api.hf.space" ECG_DIR = "../ecg_uploads_greenwich/" INDEX_FILE = "../Greenwichschooldata.csv" OUTPUT_FILE = "../Greenwichschooldata_ECG_FM_Enhanced.csv" # ECG-FM Analysis Results Structure class ECGFMAnalysis: def __init__(self): self.rhythm = None self.heart_rate = None self.qrs_duration = None self.qt_interval = None self.pr_interval = None self.axis_deviation = None self.abnormalities = [] self.confidence = None self.signal_quality = None self.features_count = None self.processing_time = None self.analysis_timestamp = None self.api_status = None self.error_message = None def load_ecg_data(file_path: str) -> Dict[str, Any]: """Load ECG data from CSV file""" try: df = pd.read_csv(file_path) # Convert to the format expected by the API signal = [df[col].tolist() for col in df.columns] # Create enhanced payload with clinical metadata payload = { "signal": signal, "fs": 500, # Standard ECG sampling rate "lead_names": ["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"], "recording_duration": len(signal[0]) / 500.0 } return payload except Exception as e: print(f"โŒ Error loading ECG data from {file_path}: {e}") return None def analyze_ecg_with_api(ecg_file: str, patient_info: Dict[str, Any]) -> ECGFMAnalysis: """Analyze single ECG using ECG-FM Production API""" analysis = ECGFMAnalysis() analysis.analysis_timestamp = datetime.now().isoformat() try: # Load ECG data ecg_path = os.path.join(ECG_DIR, ecg_file) payload = load_ecg_data(ecg_path) if payload is None: analysis.api_status = "Failed to load ECG data" return analysis print(f" ๐Ÿ“ Processing: {ecg_file}") print(f" ๐Ÿ‘ค Patient: {patient_info['Patient Name']} ({patient_info['Age']} {patient_info['Gender']})") # Test API health first try: health_response = requests.get(f"{API_BASE_URL}/health", timeout=30) if health_response.status_code != 200: analysis.api_status = f"API unhealthy: {health_response.status_code}" return analysis except Exception as e: analysis.api_status = f"API connection failed: {str(e)}" return analysis # Perform full ECG analysis start_time = time.time() response = requests.post( f"{API_BASE_URL}/analyze", json=payload, timeout=180 # 3 minutes for full analysis ) total_time = time.time() - start_time if response.status_code == 200: analysis_data = response.json() # Extract clinical analysis clinical = analysis_data['clinical_analysis'] analysis.rhythm = clinical['rhythm'] analysis.heart_rate = clinical['heart_rate'] analysis.qrs_duration = clinical['qrs_duration'] analysis.qt_interval = clinical['qt_interval'] analysis.pr_interval = clinical['pr_interval'] analysis.axis_deviation = clinical['axis_deviation'] analysis.abnormalities = clinical['abnormalities'] analysis.confidence = clinical['confidence'] # Extract technical metrics analysis.signal_quality = analysis_data['signal_quality'] analysis.features_count = len(analysis_data['features']) analysis.processing_time = analysis_data['processing_time'] analysis.api_status = "Success" print(f" โœ… Analysis completed in {analysis.processing_time}s") print(f" ๐Ÿฅ Rhythm: {analysis.rhythm}, HR: {analysis.heart_rate} BPM") print(f" ๐Ÿ” Quality: {analysis.signal_quality}, Confidence: {analysis.confidence:.2f}") else: analysis.api_status = f"API error: {response.status_code}" analysis.error_message = response.text print(f" โŒ API error: {response.status_code} - {response.text}") except Exception as e: analysis.api_status = f"Processing error: {str(e)}" analysis.error_message = traceback.format_exc() print(f" โŒ Processing error: {str(e)}") return analysis def update_index_with_ecg_fm_results(index_df: pd.DataFrame) -> pd.DataFrame: """Update index DataFrame with ECG-FM analysis results""" # Add new columns for ECG-FM results new_columns = [ 'ECG_FM_Rhythm', 'ECG_FM_HeartRate', 'ECG_FM_QRS_Duration', 'ECG_FM_QT_Interval', 'ECG_FM_PR_Interval', 'ECG_FM_AxisDeviation', 'ECG_FM_Abnormalities', 'ECG_FM_Confidence', 'ECG_FM_SignalQuality', 'ECG_FM_FeaturesCount', 'ECG_FM_ProcessingTime', 'ECG_FM_AnalysisTimestamp', 'ECG_FM_APIStatus', 'ECG_FM_ErrorMessage' ] for col in new_columns: index_df[col] = None # Process each ECG file total_files = len(index_df) successful_analyses = 0 failed_analyses = 0 print(f"\n๐Ÿš€ Starting batch ECG analysis for {total_files} patients...") print("=" * 80) for index, row in index_df.iterrows(): try: # Extract ECG filename from path ecg_path = row['ECG File Path'] if pd.isna(ecg_path) or ecg_path == "": print(f"โš ๏ธ Skipping row {index + 1}: No ECG file path") continue ecg_file = os.path.basename(ecg_path) # Check if ECG file exists if not os.path.exists(os.path.join(ECG_DIR, ecg_file)): print(f"โš ๏ธ Skipping row {index + 1}: ECG file not found: {ecg_file}") continue print(f"\n๐Ÿ“Š Processing {index + 1}/{total_files}: {ecg_file}") # Perform ECG analysis analysis = analyze_ecg_with_api(ecg_file, row) # Update DataFrame with results index_df.at[index, 'ECG_FM_Rhythm'] = analysis.rhythm index_df.at[index, 'ECG_FM_HeartRate'] = analysis.heart_rate index_df.at[index, 'ECG_FM_QRS_Duration'] = analysis.qrs_duration index_df.at[index, 'ECG_FM_QT_Interval'] = analysis.qt_interval index_df.at[index, 'ECG_FM_PR_Interval'] = analysis.pr_interval index_df.at[index, 'ECG_FM_AxisDeviation'] = analysis.axis_deviation index_df.at[index, 'ECG_FM_Abnormalities'] = '; '.join(analysis.abnormalities) if analysis.abnormalities else None index_df.at[index, 'ECG_FM_Confidence'] = analysis.confidence index_df.at[index, 'ECG_FM_SignalQuality'] = analysis.signal_quality index_df.at[index, 'ECG_FM_FeaturesCount'] = analysis.features_count index_df.at[index, 'ECG_FM_ProcessingTime'] = analysis.processing_time index_df.at[index, 'ECG_FM_AnalysisTimestamp'] = analysis.analysis_timestamp index_df.at[index, 'ECG_FM_APIStatus'] = analysis.api_status index_df.at[index, 'ECG_FM_ErrorMessage'] = analysis.error_message if analysis.api_status == "Success": successful_analyses += 1 else: failed_analyses += 1 # Add delay to avoid overwhelming the API time.sleep(2) except Exception as e: print(f"โŒ Error processing row {index + 1}: {str(e)}") index_df.at[index, 'ECG_FM_APIStatus'] = f"Row processing error: {str(e)}" failed_analyses += 1 print("\n" + "=" * 80) print("๐Ÿ BATCH ANALYSIS COMPLETE!") print(f"๐Ÿ“Š Total files: {total_files}") print(f"โœ… Successful analyses: {successful_analyses}") print(f"โŒ Failed analyses: {failed_analyses}") print(f"๐Ÿ“ˆ Success rate: {(successful_analyses/total_files)*100:.1f}%") return index_df def generate_analysis_summary(index_df: pd.DataFrame) -> None: """Generate summary statistics from the enhanced dataset""" print("\n๐Ÿ“Š ECG-FM ANALYSIS SUMMARY") print("=" * 50) # Filter successful analyses successful_df = index_df[index_df['ECG_FM_APIStatus'] == 'Success'] if len(successful_df) == 0: print("โŒ No successful analyses to summarize") return print(f"๐Ÿ“ Total successful analyses: {len(successful_df)}") # Heart Rate Analysis hr_data = successful_df['ECG_FM_HeartRate'].dropna() if len(hr_data) > 0: print(f"๐Ÿ’“ Heart Rate - Mean: {hr_data.mean():.1f} BPM, Range: {hr_data.min():.1f}-{hr_data.max():.1f} BPM") # QRS Duration Analysis qrs_data = successful_df['ECG_FM_QRS_Duration'].dropna() if len(qrs_data) > 0: print(f"๐Ÿ“ QRS Duration - Mean: {qrs_data.mean():.1f} ms, Range: {qrs_data.min():.1f}-{qrs_data.max():.1f} ms") # QT Interval Analysis qt_data = successful_df['ECG_FM_QT_Interval'].dropna() if len(qt_data) > 0: print(f"โฑ๏ธ QT Interval - Mean: {qt_data.mean():.1f} ms, Range: {qt_data.min():.1f}-{qt_data.max():.1f} ms") # Signal Quality Distribution quality_counts = successful_df['ECG_FM_SignalQuality'].value_counts() print(f"๐Ÿ” Signal Quality Distribution:") for quality, count in quality_counts.items(): print(f" {quality}: {count} ({count/len(successful_df)*100:.1f}%)") # Confidence Analysis conf_data = successful_df['ECG_FM_Confidence'].dropna() if len(conf_data) > 0: print(f"๐ŸŽฏ Analysis Confidence - Mean: {conf_data.mean():.2f}, Range: {conf_data.min():.2f}-{conf_data.max():.2f}") # Processing Time Analysis time_data = successful_df['ECG_FM_ProcessingTime'].dropna() if len(time_data) > 0: print(f"โšก Processing Time - Mean: {time_data.mean():.3f}s, Range: {time_data.min():.3f}-{time_data.max():.3f}s") def main(): """Main function to run batch ECG analysis""" print("๐Ÿงช ECG-FM BATCH ANALYSIS SYSTEM") print("=" * 60) print(f"๐ŸŒ API URL: {API_BASE_URL}") print(f"๐Ÿ“ ECG Directory: {ECG_DIR}") print(f"๐Ÿ“‹ Index File: {INDEX_FILE}") print(f"๐Ÿ’พ Output File: {OUTPUT_FILE}") print() # Check if files exist if not os.path.exists(INDEX_FILE): print(f"โŒ Index file not found: {INDEX_FILE}") return if not os.path.exists(ECG_DIR): print(f"โŒ ECG directory not found: {ECG_DIR}") return # Load index file try: print("๐Ÿ“ Loading patient index file...") index_df = pd.read_csv(INDEX_FILE) print(f"โœ… Loaded {len(index_df)} patient records") except Exception as e: print(f"โŒ Error loading index file: {e}") return # Check API health try: print("๐Ÿฅ Checking API health...") health_response = requests.get(f"{API_BASE_URL}/health", timeout=30) if health_response.status_code == 200: health_data = health_response.json() print(f"โœ… API healthy - Models loaded: {health_data['models_loaded']}") else: print(f"โš ๏ธ API health check failed: {health_response.status_code}") proceed = input("Continue anyway? (y/n): ") if proceed.lower() != 'y': return except Exception as e: print(f"โš ๏ธ API health check failed: {e}") proceed = input("Continue anyway? (y/n): ") if proceed.lower() != 'y': return # Process all ECGs enhanced_df = update_index_with_ecg_fm_results(index_df) # Generate summary generate_analysis_summary(enhanced_df) # Save enhanced dataset try: print(f"\n๐Ÿ’พ Saving enhanced dataset to: {OUTPUT_FILE}") enhanced_df.to_csv(OUTPUT_FILE, index=False) print("โœ… Enhanced dataset saved successfully!") # Also save a backup with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"../Greenwichschooldata_ECG_FM_Backup_{timestamp}.csv" enhanced_df.to_csv(backup_file, index=False) print(f"๐Ÿ’พ Backup saved to: {backup_file}") except Exception as e: print(f"โŒ Error saving enhanced dataset: {e}") print(f"\n๐ŸŽ‰ BATCH ANALYSIS COMPLETE!") print(f"๐Ÿ“Š Enhanced dataset: {OUTPUT_FILE}") print(f"๐Ÿ”— Monitor your API at: {API_BASE_URL}") if __name__ == "__main__": main()