watermelon / train_2.py
Xalphinions's picture
Upload folder using huggingface_hub
fdc673b verified
import os
import time
import argparse
import torch
import torchaudio
import torchvision
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from efficient_model import MobileNetGRUModel, EfficientNetCNNModel, SqueezeNetTransformerModel
# Print library version information
print(f"\033[92mINFO\033[0m: PyTorch version: {torch.__version__}")
print(f"\033[92mINFO\033[0m: Torchaudio version: {torchaudio.__version__}")
print(f"\033[92mINFO\033[0m: Torchvision version: {torchvision.__version__}")
# Device selection
device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
print(f"\033[92mINFO\033[0m: Using device: {device}")
# Hyperparameters (using the best configuration from search)
batch_size = 4
epochs = 20
fc_hidden_size = 64
learning_rate = 0.0005
dropout_rate = 0.5
# Model save directory
os.makedirs("./models/", exist_ok=True)
class PreprocessedDataset(Dataset):
def __init__(self, data_dir):
self.data_dir = data_dir
self.samples = [
os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".pt")
]
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample_path = self.samples[idx]
mfcc, image, label = torch.load(sample_path)
return mfcc.float(), image.float(), label
def calculate_mae(outputs, labels):
"""Calculate Mean Absolute Error between outputs and labels"""
return torch.abs(outputs - labels).mean().item()
def evaluate_model(model, test_loader, criterion):
model.eval()
test_loss = 0.0
mae_sum = 0.0
all_predictions = []
all_labels = []
# For debugging
debug_samples = []
with torch.no_grad():
for mfcc, image, label in test_loader:
mfcc, image, label = mfcc.to(device), image.to(device), label.to(device)
output = model(mfcc, image)
label = label.view(-1, 1).float()
# Store debug samples (handling batch dimension properly)
if len(debug_samples) < 5:
# Extract individual samples from the batch
for i in range(min(len(output), 5 - len(debug_samples))):
debug_samples.append((output[i][0].item(), label[i][0].item()))
# Calculate MSE loss
loss = criterion(output, label)
test_loss += loss.item()
# Calculate MAE
mae = torch.abs(output - label).mean()
mae_sum += mae.item()
# Store predictions and labels for additional analysis
all_predictions.extend(output.cpu().numpy())
all_labels.extend(label.cpu().numpy())
avg_loss = test_loss / len(test_loader)
avg_mae = mae_sum / len(test_loader)
# Convert to numpy arrays for easier analysis
all_predictions = np.array(all_predictions).flatten()
all_labels = np.array(all_labels).flatten()
# Print debug samples
print("\nDEBUG SAMPLES (Prediction, Label):")
for i, (pred, label) in enumerate(debug_samples):
print(f"Sample {i+1}: Prediction = {pred:.4f}, Label = {label:.4f}, Difference = {abs(pred-label):.4f}")
return avg_loss, avg_mae, all_predictions, all_labels
def train_model(model_type):
try:
# Create model based on type
if model_type == "mobilenet_gru":
model = MobileNetGRUModel(
gru_hidden_size=32,
gru_layers=1,
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
).to(device)
model_name = "MobileNetGRU"
elif model_type == "efficientnet_cnn":
model = EfficientNetCNNModel(
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
).to(device)
model_name = "EfficientNetCNN"
elif model_type == "squeezenet_transformer":
model = SqueezeNetTransformerModel(
nhead=4,
dim_feedforward=128,
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
).to(device)
model_name = "SqueezeNetTransformer"
else:
raise ValueError(f"Unknown model type: {model_type}")
# Data loading
data_dir = "./processed/"
dataset = PreprocessedDataset(data_dir)
n_samples = len(dataset)
# Check label range
all_labels = []
for i in range(min(10, len(dataset))):
_, _, label = dataset[i]
all_labels.append(label)
print("\nLABEL RANGE CHECK:")
print(f"Sample labels: {all_labels}")
print(f"Min label: {min(all_labels)}, Max label: {max(all_labels)}")
train_size = int(0.7 * n_samples)
val_size = int(0.2 * n_samples)
test_size = n_samples - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
dataset, [train_size, val_size, test_size]
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Loss function and optimizer
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# TensorBoard
writer = SummaryWriter(f"runs/{model_name}/")
global_step = 0
print(f"\033[92mINFO\033[0m: Training {model_name} model for {epochs} epochs")
print(f"\033[92mINFO\033[0m: Training samples: {len(train_dataset)}")
print(f"\033[92mINFO\033[0m: Validation samples: {len(val_dataset)}")
print(f"\033[92mINFO\033[0m: Test samples: {len(test_dataset)}")
print(f"\033[92mINFO\033[0m: Batch size: {batch_size}")
print(f"\033[92mINFO\033[0m: Learning rate: {learning_rate}")
print(f"\033[92mINFO\033[0m: Dropout rate: {dropout_rate}")
best_val_loss = float('inf')
best_model_path = None
# Calculate model size
model_size = sum(p.numel() for p in model.parameters()) / 1e6 # in millions
print(f"\033[92mINFO\033[0m: Model parameters: {model_size:.2f}M")
# Training loop
for epoch in range(epochs):
print(f"\033[92mINFO\033[0m: Training epoch ({epoch+1}/{epochs})")
model.train()
running_loss = 0.0
running_mae = 0.0
n_batches = 0
start_time = time.time()
try:
for mfcc, image, label in train_loader:
mfcc, image, label = mfcc.to(device), image.to(device), label.to(device)
optimizer.zero_grad()
output = model(mfcc, image)
label = label.view(-1, 1).float()
loss = criterion(output, label)
loss.backward()
optimizer.step()
running_loss += loss.item()
running_mae += calculate_mae(output, label)
n_batches += 1
writer.add_scalar("Training/Loss", loss.item(), global_step)
writer.add_scalar("Training/MAE", calculate_mae(output, label), global_step)
global_step += 1
except Exception as e:
print(f"\033[91mERR!\033[0m: {e}")
epoch_time = time.time() - start_time
# Validation phase
model.eval()
val_loss = 0.0
val_mae = 0.0
val_batches = 0
with torch.no_grad():
try:
for mfcc, image, label in val_loader:
mfcc, image, label = (
mfcc.to(device),
image.to(device),
label.to(device),
)
output = model(mfcc, image)
label = label.view(-1, 1).float()
# Calculate loss
loss = criterion(output, label)
val_loss += loss.item()
# Calculate MAE
val_mae += calculate_mae(output, label)
val_batches += 1
except Exception as e:
print(f"\033[91mERR!\033[0m: {e}")
avg_train_loss = running_loss / n_batches
avg_train_mae = running_mae / n_batches
avg_val_loss = val_loss / val_batches
avg_val_mae = val_mae / val_batches
# Record validation metrics
writer.add_scalar("Validation/Loss", avg_val_loss, epoch)
writer.add_scalar("Validation/MAE", avg_val_mae, epoch)
print(
f"Epoch [{epoch+1}/{epochs}], Time: {epoch_time:.2f}s, "
f"Train Loss: {avg_train_loss:.4f}, Train MAE: {avg_train_mae:.4f}, "
f"Val Loss: {avg_val_loss:.4f}, Val MAE: {avg_val_mae:.4f}"
)
# Save model checkpoint
timestamp = time.strftime("%Y%m%d-%H%M%S")
model_path = f"models/{model_name}_model_{epoch+1}_{timestamp}.pt"
torch.save(model.state_dict(), model_path)
# Save the best model based on validation loss
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
best_model_path = model_path
print(f"\033[92mINFO\033[0m: New best model saved with validation loss: {best_val_loss:.4f}")
print(
f"\033[92mINFO\033[0m: Model checkpoint epoch [{epoch+1}/{epochs}] saved: {model_path}"
)
print(f"\033[92mINFO\033[0m: Training complete")
# Load the best model for testing
print(f"\033[92mINFO\033[0m: Loading best model from {best_model_path} for testing")
model.load_state_dict(torch.load(best_model_path))
# Evaluate on test set
test_loss, test_mae, predictions, labels = evaluate_model(model, test_loader, criterion)
# Calculate additional metrics
max_error = np.max(np.abs(predictions - labels))
min_error = np.min(np.abs(predictions - labels))
print("\n" + "="*50)
print(f"TEST RESULTS FOR {model_name}:")
print(f"Test Loss (MSE): {test_loss:.4f}")
print(f"Mean Absolute Error: {test_mae:.4f}")
print(f"Maximum Absolute Error: {max_error:.4f}")
print(f"Minimum Absolute Error: {min_error:.4f}")
# Add test results to TensorBoard
writer.add_scalar("Test/MSE", test_loss, 0)
writer.add_scalar("Test/MAE", test_mae, 0)
writer.add_scalar("Test/Max_Error", max_error, 0)
writer.add_scalar("Test/Min_Error", min_error, 0)
# Create a histogram of absolute errors
abs_errors = np.abs(predictions - labels)
writer.add_histogram("Test/Absolute_Errors", abs_errors, 0)
print("="*50)
# Final summary
print("\nTRAINING SUMMARY:")
print(f"Model: {model_name}")
print(f"Model Size: {model_size:.2f}M parameters")
print(f"Best Validation Loss: {best_val_loss:.4f}")
print(f"Final Test Loss: {test_loss:.4f}")
print(f"Final Test MAE: {test_mae:.4f}")
print(f"Best model saved at: {best_model_path}")
writer.close()
# Return metrics for comparison
return {
"model_name": model_name,
"model_size": model_size,
"val_loss": best_val_loss,
"test_loss": test_loss,
"test_mae": test_mae,
"model_path": best_model_path
}
except Exception as e:
print(f"\033[91mERR!\033[0m: Error training {model_type}: {e}")
# Return a placeholder result
return {
"model_name": model_type,
"model_size": 0,
"val_loss": float('inf'),
"test_loss": float('inf'),
"test_mae": float('inf'),
"model_path": None,
"error": str(e)
}
def test_cpu_inference(model_path, model_type):
"""Test CPU inference speed for the given model"""
# Create model based on type
if model_type == "mobilenet_gru":
model = MobileNetGRUModel(
gru_hidden_size=32,
gru_layers=1,
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
)
model_name = "MobileNetGRU"
elif model_type == "efficientnet_cnn":
model = EfficientNetCNNModel(
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
)
model_name = "EfficientNetCNN"
elif model_type == "squeezenet_transformer":
model = SqueezeNetTransformerModel(
nhead=4,
dim_feedforward=128,
fc_hidden_size=fc_hidden_size,
dropout_rate=dropout_rate
)
model_name = "SqueezeNetTransformer"
else:
raise ValueError(f"Unknown model type: {model_type}")
# Load model weights
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()
# Create dummy input
dummy_mfcc = torch.randn(1, 10, 376) # Batch size 1, 10 time steps, 376 features
dummy_image = torch.randn(1, 3, 224, 224) # Batch size 1, 3 channels, 224x224 image
# Warm-up
for _ in range(10):
_ = model(dummy_mfcc, dummy_image)
# Measure inference time
num_runs = 100
start_time = time.time()
for _ in range(num_runs):
_ = model(dummy_mfcc, dummy_image)
end_time = time.time()
avg_time = (end_time - start_time) / num_runs
print(f"\n{model_name} CPU Inference Time:")
print(f"Average over {num_runs} runs: {avg_time*1000:.2f} ms")
return avg_time
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train and evaluate efficient models")
parser.add_argument(
"--model",
type=str,
choices=["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer", "all"],
default="all",
help="Model architecture to train"
)
args = parser.parse_args()
results = []
if args.model == "all":
# Train all models
for model_type in ["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer"]:
print(f"\n\n{'='*50}")
print(f"TRAINING {model_type.upper()}")
print(f"{'='*50}\n")
result = train_model(model_type)
results.append(result)
# Test CPU inference
inference_time = test_cpu_inference(result["model_path"], model_type)
result["inference_time"] = inference_time
else:
# Train specific model
result = train_model(args.model)
results.append(result)
# Test CPU inference
inference_time = test_cpu_inference(result["model_path"], args.model)
result["inference_time"] = inference_time
# Compare results
print("\n\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)
print(f"{'Model':<25} {'Size (M)':<10} {'Val Loss':<10} {'Test Loss':<10} {'Test MAE':<10} {'CPU Time (ms)':<15}")
print("-"*80)
for result in results:
print(f"{result['model_name']:<25} {result['model_size']:<10.2f} {result['val_loss']:<10.4f} "
f"{result['test_loss']:<10.4f} {result['test_mae']:<10.4f} {result['inference_time']*1000:<15.2f}")
print("="*80)
# Find best model
best_model = min(results, key=lambda x: x["test_mae"])
print(f"\nBEST MODEL: {best_model['model_name']}")
print(f"Test MAE: {best_model['test_mae']:.4f}")
print(f"CPU Inference Time: {best_model['inference_time']*1000:.2f} ms")
print(f"Model Path: {best_model['model_path']}")