Spaces:
Runtime error
Runtime error
| import os | |
| import tempfile | |
| import pandas as pd | |
| import warnings | |
| import torch | |
| import numpy as np | |
| from transformers import Trainer, TrainingArguments, set_seed | |
| from tsfm_public import TimeSeriesPreprocessor | |
| from tsfm_public.models.tinytimemixer import TinyTimeMixerForPrediction | |
| from tsfm_public.toolkit.get_model import get_model | |
| from tsfm_public.toolkit.dataset import ForecastDFDataset | |
| # Constants | |
| SEED = 42 | |
| TTM_MODEL_PATH = "iaravagni/ttm-finetuned-model" | |
| CONTEXT_LENGTH = 52 # 4.33 hrs | |
| PREDICTION_LENGTH = 6 # 30 mins | |
| OUT_DIR = "ttm_finetuned_models/" | |
| def setup_environment(): | |
| """ | |
| Set up the environment for model training and evaluation. | |
| Creates necessary directories and sets random seed for reproducibility. | |
| """ | |
| set_seed(SEED) | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| os.makedirs(os.path.join(OUT_DIR, 'results'), exist_ok=True) | |
| def load_dataset(file_path): | |
| """ | |
| Load the dataset from the specified file path. | |
| Args: | |
| file_path (str): Path to the dataset CSV file | |
| Returns: | |
| pd.DataFrame: The loaded dataset | |
| """ | |
| return pd.read_csv(file_path) | |
| def prepare_data(data, timestamp_column): | |
| """ | |
| Prepare the dataset by converting timestamp column to datetime format. | |
| Args: | |
| data (pd.DataFrame): The dataset | |
| timestamp_column (str): Name of the timestamp column | |
| Returns: | |
| pd.DataFrame: The processed dataset | |
| """ | |
| data[timestamp_column] = pd.to_datetime(data[timestamp_column]) | |
| return data | |
| def get_column_specs(): | |
| """ | |
| Define and return column specifications for the dataset. | |
| Returns: | |
| dict: Column specifications including timestamp, ID, target, and control columns | |
| """ | |
| timestamp_column = "Timestamp" | |
| id_columns = ["patient_id"] | |
| target_columns = ["Glucose"] | |
| control_columns = ["Accelerometer", "Calories", "Carbs", "Sugar", "Gender", "HbA1c", "Age"] | |
| return { | |
| "timestamp_column": timestamp_column, | |
| "id_columns": id_columns, | |
| "target_columns": target_columns, | |
| "control_columns": control_columns, | |
| } | |
| def create_test_only_dataset(ts_preprocessor, test_dataset, train_dataset=None, stride=1, enable_padding=True, **dataset_kwargs): | |
| """ | |
| Creates a preprocessed pytorch dataset for testing only. | |
| Args: | |
| ts_preprocessor: TimeSeriesPreprocessor instance | |
| test_dataset: Pandas dataframe for testing | |
| train_dataset: Optional pandas dataframe for training the scaler | |
| stride: Stride used for creating the dataset | |
| enable_padding: If True, datasets are created with padding | |
| dataset_kwargs: Additional keyword arguments to pass to ForecastDFDataset | |
| Returns: | |
| ForecastDFDataset for testing | |
| """ | |
| # Standardize the test dataframe | |
| test_data = ts_preprocessor._standardize_dataframe(test_dataset) | |
| # Train the preprocessor on the training data if provided, otherwise use test data | |
| if train_dataset is not None: | |
| train_data = ts_preprocessor._standardize_dataframe(train_dataset) | |
| ts_preprocessor.train(train_data) | |
| else: | |
| ts_preprocessor.train(test_data) | |
| # Preprocess the test data | |
| test_data_prep = test_data.copy() # Skip preprocessing to avoid scaling errors | |
| # Specify columns | |
| column_specifiers = { | |
| "id_columns": ts_preprocessor.id_columns, | |
| "timestamp_column": ts_preprocessor.timestamp_column, | |
| "target_columns": ts_preprocessor.target_columns, | |
| "observable_columns": ts_preprocessor.observable_columns, | |
| "control_columns": ts_preprocessor.control_columns, | |
| "conditional_columns": ts_preprocessor.conditional_columns, | |
| "categorical_columns": ts_preprocessor.categorical_columns, | |
| "static_categorical_columns": ts_preprocessor.static_categorical_columns, | |
| } | |
| params = column_specifiers | |
| params["context_length"] = ts_preprocessor.context_length | |
| params["prediction_length"] = ts_preprocessor.prediction_length | |
| params["stride"] = stride | |
| params["enable_padding"] = enable_padding | |
| # Add frequency token - this is critical for TinyTimeMixer | |
| params["frequency_token"] = ts_preprocessor.get_frequency_token(ts_preprocessor.freq) | |
| # Update with any additional kwargs | |
| params.update(**dataset_kwargs) | |
| # Create the ForecastDFDataset | |
| test_dataset = ForecastDFDataset(test_data_prep, **params) | |
| if len(test_dataset) == 0: | |
| raise RuntimeError("The generated test dataset is of zero length.") | |
| return test_dataset | |
| def zeroshot_eval(train_df, test_df, batch_size, context_length=CONTEXT_LENGTH, forecast_length=PREDICTION_LENGTH, model_path=TTM_MODEL_PATH): | |
| """ | |
| Performs zero-shot evaluation of time series forecasting on test data. | |
| Args: | |
| train_df: Training dataframe | |
| test_df: Testing dataframe | |
| batch_size: Batch size for evaluation | |
| context_length: Number of time steps to use as context | |
| forecast_length: Number of time steps to predict | |
| Returns: | |
| dict: Dictionary containing predictions dataframe and metrics | |
| """ | |
| column_specifiers = get_column_specs() | |
| # Create preprocessor with scaling disabled | |
| tsp = TimeSeriesPreprocessor( | |
| timestamp_column=column_specifiers["timestamp_column"], | |
| id_columns=column_specifiers["id_columns"], | |
| target_columns=column_specifiers["target_columns"], | |
| control_columns=column_specifiers["control_columns"], | |
| context_length=context_length, | |
| prediction_length=forecast_length, | |
| scaling=False, | |
| encode_categorical=False, | |
| force_return="zeropad", | |
| ) | |
| # Load model | |
| zeroshot_model = get_model( | |
| model_path, | |
| context_length=context_length, | |
| prediction_length=forecast_length, | |
| freq_prefix_tuning=False, | |
| freq=None, | |
| prefer_l1_loss=False, | |
| prefer_longer_context=True, | |
| ) | |
| # Create test dataset | |
| dset_test = create_test_only_dataset(ts_preprocessor=tsp, test_dataset=test_df, train_dataset=train_df) | |
| # Setup trainer | |
| temp_dir = tempfile.mkdtemp() | |
| zeroshot_trainer = Trainer( | |
| model=zeroshot_model, | |
| args=TrainingArguments( | |
| output_dir=temp_dir, | |
| per_device_eval_batch_size=batch_size, | |
| seed=SEED, | |
| report_to="none", | |
| ), | |
| ) | |
| # Get predictions | |
| predictions_dict = zeroshot_trainer.predict(dset_test) | |
| # Process predictions | |
| processed_predictions = process_predictions(predictions_dict, tsp, column_specifiers["target_columns"]) | |
| # Get evaluation metrics | |
| metrics = zeroshot_trainer.evaluate(dset_test) | |
| return { | |
| "predictions_df": processed_predictions, | |
| "metrics": metrics | |
| } | |
| def process_predictions(predictions_dict, tsp, target_columns): | |
| """ | |
| Process the predictions from the Trainer into a usable DataFrame. | |
| Args: | |
| predictions_dict: Predictions from the Trainer | |
| tsp: TimeSeriesPreprocessor instance | |
| target_columns: List of target column names | |
| Returns: | |
| pd.DataFrame: DataFrame containing processed predictions | |
| """ | |
| # Extract predictions | |
| if hasattr(predictions_dict, 'predictions'): | |
| raw_predictions = predictions_dict.predictions | |
| else: | |
| raw_predictions = predictions_dict.get('predictions', predictions_dict) | |
| # Handle tuple predictions (mean and uncertainty) | |
| if isinstance(raw_predictions, tuple): | |
| predictions = raw_predictions[0] | |
| else: | |
| predictions = raw_predictions | |
| # Get shape information | |
| n_samples, n_timesteps, n_features = predictions.shape | |
| # Create DataFrame for processed predictions | |
| processed_df = pd.DataFrame() | |
| # Extract predictions for each target and timestep | |
| for i, col in enumerate(target_columns): | |
| if i < n_features: | |
| for t in range(n_timesteps): | |
| processed_df[f"{col}_step_{t+1}"] = predictions[:, t, i] | |
| return processed_df | |
| def simple_diagonal_averaging(predictions_df, test_data, context_length, step_columns): | |
| """ | |
| Improved approach to diagonally averaging predictions by patient. | |
| Properly handles the last rows of predictions to ensure all available | |
| predicted values are used. | |
| Args: | |
| predictions_df (pd.DataFrame): DataFrame with step-wise predictions | |
| test_data (pd.DataFrame): Original test data with patient IDs | |
| context_length (int): Number of context steps used in the model | |
| step_columns (list): List of step column names | |
| Returns: | |
| pd.DataFrame: DataFrame with averaged predictions | |
| """ | |
| # Create a new dataframe for the final results | |
| final_df = test_data.copy() | |
| # Initialize prediction column with zeros/NaN | |
| final_df['averaged_prediction'] = 0 | |
| # Process each patient separately | |
| for patient_id in test_data['patient_id'].unique(): | |
| # Get indices for this patient | |
| patient_mask = final_df['patient_id'] == patient_id | |
| patient_indices = final_df[patient_mask].index | |
| # Skip the first context_length rows for this patient | |
| start_idx = min(context_length, len(patient_indices)) | |
| # For each row after the context window | |
| for i in range(start_idx, len(patient_indices)): | |
| row_idx = patient_indices[i] | |
| pred_row_idx = i - context_length | |
| # Skip if the prediction row index is negative | |
| if pred_row_idx < 0: | |
| continue | |
| # Get the corresponding prediction row | |
| if pred_row_idx < len(predictions_df): | |
| # Average the predictions for all steps | |
| avg_prediction = predictions_df.iloc[pred_row_idx][step_columns].mean() | |
| final_df.loc[row_idx, 'averaged_prediction'] = avg_prediction | |
| else: | |
| # Handle the case where we've run out of prediction rows | |
| # Calculate how many steps beyond the last prediction row we are | |
| excess_steps = pred_row_idx - len(predictions_df) + 1 | |
| # Make sure we don't go beyond the available steps | |
| if excess_steps < len(step_columns): | |
| # Use the last row of predictions but only the appropriate steps | |
| relevant_steps = step_columns[excess_steps:] | |
| if relevant_steps: | |
| avg_prediction = predictions_df.iloc[-1][relevant_steps].mean() | |
| final_df.loc[row_idx, 'averaged_prediction'] = avg_prediction | |
| return final_df | |
| def main(): | |
| """ | |
| Main function to execute the time series forecasting workflow. | |
| """ | |
| # Setup | |
| # setup_environment() | |
| # Get dataset path | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| test_file = os.path.join(script_dir, '..', 'data', 'processed', 'test_dataset.csv') | |
| train_file = os.path.join(script_dir, '..', 'data', 'processed', 'train_dataset.csv') | |
| # Load and prepare data | |
| test_data = load_dataset(test_file) | |
| train_data = load_dataset(train_file) | |
| column_specs = get_column_specs() | |
| test_data = prepare_data(test_data, column_specs["timestamp_column"]) | |
| train_data = prepare_data(train_data, column_specs["timestamp_column"]) | |
| # Run zero-shot evaluation | |
| results = zeroshot_eval( | |
| train_df=train_data, | |
| test_df=test_data, | |
| batch_size=8, | |
| context_length=CONTEXT_LENGTH, | |
| forecast_length=PREDICTION_LENGTH | |
| ) | |
| # Get all step columns | |
| step_columns = [col for col in results["predictions_df"].columns if col.startswith("Glucose_step_")] | |
| # Apply simple diagonal averaging by patient | |
| final_results = simple_diagonal_averaging( | |
| results["predictions_df"], | |
| test_data, | |
| CONTEXT_LENGTH, | |
| step_columns | |
| ) | |
| # Save raw predictions to CSV | |
| raw_predictions_path = os.path.join(script_dir, '..', 'data', 'outputs', 'dl_predictions_raw.csv') | |
| results["predictions_df"].to_csv(raw_predictions_path, index=False) | |
| # Save final results to CSV | |
| final_results_path = os.path.join(script_dir, '..', 'data', 'outputs', 'dl_predictions.csv') | |
| final_results.to_csv(final_results_path, index=False) | |
| return | |
| if __name__ == "__main__": | |
| main() |