NEBULA-HRM-X-RAY-DEMO / nebula_training_v04.py
Agnuxo's picture
Upload 15 files
d94fa6e verified
#!/usr/bin/env python3
"""
NEBULA v0.4 TRAINING SYSTEM
Equipo NEBULA: Francisco Angulo de Lafuente y Ángel
SISTEMA DE ENTRENAMIENTO COMPLETO PARA NEBULA v0.4
- Training loop optimizado para RTX GPUs con mixed precision
- Dataset generator de sudokus realistas validado
- Early stopping con validation metrics
- Checkpoint saving y model persistence
- Comprehensive logging y monitoring
- Constraint-aware training schedule
PASO A PASO: Entrenamiento riguroso según nuestros criterios
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
import numpy as np
import math
import time
import json
import os
from typing import Dict, Tuple, Optional, List
from dataclasses import dataclass
import random
# Import our unified model y dataset functions
from NEBULA_UNIFIED_v04 import NEBULA_HRM_Sudoku_v04
@dataclass
class TrainingConfig:
"""Configuration para training setup"""
epochs: int = 50
batch_size: int = 32
learning_rate: float = 1e-3
weight_decay: float = 1e-5
constraint_weight_start: float = 2.0
constraint_weight_end: float = 5.0
distillation_weight: float = 0.3
validation_split: float = 0.2
early_stopping_patience: int = 10
checkpoint_every: int = 5
mixed_precision: bool = True
gradient_clip_norm: float = 1.0
class NEBULASudokuDataset:
"""
Dataset generator para sudokus usando backtracking validado
Basado en nuestro generador probado que produce sudokus válidos
"""
def __init__(self, num_samples: int, mask_rate: float = 0.65, device: str = 'cuda'):
self.num_samples = num_samples
self.mask_rate = mask_rate
self.device = device
def generate_batch(self, batch_size: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""Generate batch of sudoku input-target pairs"""
inputs = []
targets = []
for _ in range(batch_size):
# Generate complete sudoku using our validated backtracking
full_sudoku = self.generate_full_sudoku()
# Create masked version for input
input_sudoku = self.mask_sudoku(full_sudoku, self.mask_rate)
inputs.append(torch.tensor(input_sudoku, dtype=torch.long))
targets.append(torch.tensor(full_sudoku, dtype=torch.long))
return torch.stack(inputs).to(self.device), torch.stack(targets).to(self.device)
def generate_full_sudoku(self, seed: Optional[int] = None) -> List[List[int]]:
"""Generate complete valid sudoku using backtracking"""
if seed is not None:
random.seed(seed)
digits = list(range(1, 10))
grid = [[0]*9 for _ in range(9)]
# Randomized cell order para variability
cells = [(i, j) for i in range(9) for j in range(9)]
random.shuffle(cells)
def is_valid(grid, r, c, val):
# Check row
for j in range(9):
if grid[r][j] == val:
return False
# Check column
for i in range(9):
if grid[i][c] == val:
return False
# Check 3x3 box
br, bc = (r // 3) * 3, (c // 3) * 3
for i in range(br, br+3):
for j in range(bc, bc+3):
if grid[i][j] == val:
return False
return True
def backtrack(idx=0):
if idx >= 81:
return True
i, j = cells[idx]
choices = digits[:]
random.shuffle(choices)
for val in choices:
if is_valid(grid, i, j, val):
grid[i][j] = val
if backtrack(idx + 1):
return True
grid[i][j] = 0
return False
success = backtrack(0)
if not success:
# Fallback: try with ordered cells
grid = [[0]*9 for _ in range(9)]
cells = [(i, j) for i in range(9) for j in range(9)]
success = backtrack(0)
if not success:
raise RuntimeError("Failed to generate valid sudoku")
return grid
def mask_sudoku(self, full_grid: List[List[int]], mask_rate: float) -> List[List[int]]:
"""Create masked sudoku for training input"""
masked = [row[:] for row in full_grid] # Deep copy
# Calculate cells to keep
total_cells = 81
cells_to_keep = int(total_cells * (1.0 - mask_rate))
# Get all positions
positions = [(i, j) for i in range(9) for j in range(9)]
random.shuffle(positions)
# Mask cells (set to 0) except for cells_to_keep
for i, (r, c) in enumerate(positions):
if i >= cells_to_keep:
masked[r][c] = 0
return masked
class NEBULATrainer:
"""
NEBULA v0.4 Training System
Comprehensive training system con:
- Mixed precision training optimizado para RTX
- Constraint-aware loss scheduling
- Advanced optimization strategies
- Comprehensive validation y monitoring
"""
def __init__(self, config: TrainingConfig, device: str = 'cuda'):
self.config = config
self.device = device
print(f"[NEBULA TRAINER] Inicializando sistema de entrenamiento:")
print(f" - Device: {device}")
print(f" - Epochs: {config.epochs}")
print(f" - Batch size: {config.batch_size}")
print(f" - Learning rate: {config.learning_rate}")
print(f" - Mixed precision: {config.mixed_precision}")
# Initialize model
self.model = NEBULA_HRM_Sudoku_v04(
grid_size=9,
device=device,
use_rtx_optimization=True,
use_mixed_precision=config.mixed_precision
)
# Setup optimizer
self.optimizer = optim.AdamW(
self.model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay,
betas=(0.9, 0.999)
)
# Learning rate scheduler
self.scheduler = ReduceLROnPlateau(
self.optimizer,
mode='min',
factor=0.5,
patience=5
)
# Mixed precision scaler if available
if config.mixed_precision and hasattr(torch.cuda.amp, 'GradScaler'):
try:
# Try new API first
from torch.amp import GradScaler
self.scaler = GradScaler('cuda')
print(f" - Mixed precision: Enabled (new API)")
except ImportError:
# Fallback to old API
from torch.cuda.amp import GradScaler
self.scaler = GradScaler()
print(f" - Mixed precision: Enabled (legacy API)")
else:
self.scaler = None
print(f" - Mixed precision: Disabled")
# Training state
self.current_epoch = 0
self.best_validation_loss = float('inf')
self.best_model_state = None
self.training_history = {
'train_loss': [],
'val_loss': [],
'train_accuracy': [],
'val_accuracy': [],
'constraint_violations': [],
'learning_rate': []
}
self.patience_counter = 0
# Create checkpoint directory
self.checkpoint_dir = "nebula_checkpoints"
os.makedirs(self.checkpoint_dir, exist_ok=True)
def compute_constraint_schedule(self, epoch: int) -> float:
"""Compute constraint weight scheduling"""
progress = epoch / self.config.epochs
weight = self.config.constraint_weight_start + (
self.config.constraint_weight_end - self.config.constraint_weight_start
) * progress
return weight
def compute_accuracy(self, logits: torch.Tensor, targets: torch.Tensor,
input_mask: torch.Tensor) -> float:
"""Compute accuracy solo en celdas que necesitan predicción"""
predictions = torch.argmax(logits, dim=-1)
# Mask: solo evaluar celdas donde input era 0 (vacías)
eval_mask = (input_mask == 0) & (targets > 0)
if eval_mask.sum() == 0:
return 0.0
correct = (predictions == targets) & eval_mask
accuracy = correct.sum().item() / eval_mask.sum().item()
return accuracy
def train_epoch(self, dataset: NEBULASudokuDataset) -> Dict[str, float]:
"""Train single epoch"""
self.model.train()
epoch_loss = 0.0
epoch_accuracy = 0.0
epoch_ce_loss = 0.0
epoch_constraint_loss = 0.0
epoch_distillation_loss = 0.0
num_batches = 0
# Dynamic constraint weight
constraint_weight = self.compute_constraint_schedule(self.current_epoch)
# Training loop
steps_per_epoch = max(1, dataset.num_samples // self.config.batch_size)
for step in range(steps_per_epoch):
# Generate fresh batch
inputs, targets = dataset.generate_batch(self.config.batch_size)
self.optimizer.zero_grad()
if self.scaler is not None:
# Mixed precision training
with torch.cuda.amp.autocast():
outputs = self.model(inputs)
loss_dict = self.model.compute_loss(
outputs, targets,
constraint_weight=constraint_weight,
distillation_weight=self.config.distillation_weight
)
total_loss = loss_dict['total_loss']
# Scaled backward pass
self.scaler.scale(total_loss).backward()
# Gradient clipping
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.gradient_clip_norm)
# Optimizer step
self.scaler.step(self.optimizer)
self.scaler.update()
else:
# Standard precision training
outputs = self.model(inputs)
loss_dict = self.model.compute_loss(
outputs, targets,
constraint_weight=constraint_weight,
distillation_weight=self.config.distillation_weight
)
total_loss = loss_dict['total_loss']
# Backward pass
total_loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.gradient_clip_norm)
# Optimizer step
self.optimizer.step()
# Accumulate metrics
with torch.no_grad():
accuracy = self.compute_accuracy(outputs['logits'], targets, inputs)
epoch_loss += total_loss.item()
epoch_accuracy += accuracy
epoch_ce_loss += loss_dict['ce_loss'].item()
epoch_constraint_loss += loss_dict['constraint_loss'].item()
epoch_distillation_loss += loss_dict['distillation_loss'].item()
num_batches += 1
# Progress logging
if (step + 1) % max(1, steps_per_epoch // 10) == 0:
print(f" Step {step+1}/{steps_per_epoch}: Loss={total_loss.item():.4f}, Acc={accuracy:.4f}")
# Average metrics
return {
'loss': epoch_loss / num_batches,
'accuracy': epoch_accuracy / num_batches,
'ce_loss': epoch_ce_loss / num_batches,
'constraint_loss': epoch_constraint_loss / num_batches,
'distillation_loss': epoch_distillation_loss / num_batches,
'constraint_weight': constraint_weight
}
def validate_epoch(self, dataset: NEBULASudokuDataset) -> Dict[str, float]:
"""Validation epoch"""
self.model.eval()
val_loss = 0.0
val_accuracy = 0.0
val_constraint_violations = 0.0
num_batches = 0
# Validation batches
val_steps = max(1, (dataset.num_samples * self.config.validation_split) // self.config.batch_size)
with torch.no_grad():
for step in range(val_steps):
inputs, targets = dataset.generate_batch(self.config.batch_size)
if self.scaler is not None:
with torch.cuda.amp.autocast():
outputs = self.model(inputs)
loss_dict = self.model.compute_loss(outputs, targets)
else:
outputs = self.model(inputs)
loss_dict = self.model.compute_loss(outputs, targets)
accuracy = self.compute_accuracy(outputs['logits'], targets, inputs)
val_loss += loss_dict['total_loss'].item()
val_accuracy += accuracy
val_constraint_violations += outputs['constraint_violations'].sum().item()
num_batches += 1
return {
'loss': val_loss / num_batches,
'accuracy': val_accuracy / num_batches,
'constraint_violations': val_constraint_violations / num_batches
}
def save_checkpoint(self, epoch: int, is_best: bool = False):
"""Save model checkpoint"""
checkpoint = {
'epoch': epoch,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
'training_history': self.training_history,
'config': self.config,
'best_validation_loss': self.best_validation_loss
}
if self.scaler is not None:
checkpoint['scaler_state_dict'] = self.scaler.state_dict()
# Save regular checkpoint
checkpoint_path = os.path.join(self.checkpoint_dir, f"nebula_v04_epoch_{epoch}.pt")
torch.save(checkpoint, checkpoint_path)
# Save best model
if is_best:
best_path = os.path.join(self.checkpoint_dir, "nebula_v04_best.pt")
torch.save(checkpoint, best_path)
print(f" Best model saved at epoch {epoch}")
def train(self, num_training_samples: int = 10000) -> Dict[str, List]:
"""
TRAINING LOOP PRINCIPAL
Training completo con early stopping y validation
"""
print(f"\n{'='*80}")
print(f"NEBULA v0.4 TRAINING INICIADO")
print(f"{'='*80}")
print(f"Training samples: {num_training_samples}")
print(f"Validation split: {self.config.validation_split}")
print(f"Model parameters: {self.model.count_parameters():,}")
# Create datasets
train_dataset = NEBULASudokuDataset(
num_samples=int(num_training_samples * (1 - self.config.validation_split)),
mask_rate=0.65,
device=self.device
)
val_dataset = NEBULASudokuDataset(
num_samples=int(num_training_samples * self.config.validation_split),
mask_rate=0.65,
device=self.device
)
print(f"Train dataset: {train_dataset.num_samples} samples")
print(f"Val dataset: {val_dataset.num_samples} samples")
# Training loop
for epoch in range(self.config.epochs):
self.current_epoch = epoch
epoch_start_time = time.time()
print(f"\nEpoch {epoch+1}/{self.config.epochs}")
print("-" * 50)
# Training
train_metrics = self.train_epoch(train_dataset)
# Validation
val_metrics = self.validate_epoch(val_dataset)
# Update scheduler
self.scheduler.step(val_metrics['loss'])
# Record metrics
self.training_history['train_loss'].append(train_metrics['loss'])
self.training_history['val_loss'].append(val_metrics['loss'])
self.training_history['train_accuracy'].append(train_metrics['accuracy'])
self.training_history['val_accuracy'].append(val_metrics['accuracy'])
self.training_history['constraint_violations'].append(val_metrics['constraint_violations'])
self.training_history['learning_rate'].append(self.optimizer.param_groups[0]['lr'])
# Timing
epoch_time = time.time() - epoch_start_time
# Logging
print(f"Train Loss: {train_metrics['loss']:.6f}, Train Acc: {train_metrics['accuracy']:.4f}")
print(f"Val Loss: {val_metrics['loss']:.6f}, Val Acc: {val_metrics['accuracy']:.4f}")
print(f"Constraint Violations: {val_metrics['constraint_violations']:.2f}")
print(f"Constraint Weight: {train_metrics['constraint_weight']:.2f}")
print(f"Learning Rate: {self.optimizer.param_groups[0]['lr']:.6f}")
print(f"Epoch Time: {epoch_time:.1f}s")
# Early stopping check
is_best = val_metrics['loss'] < self.best_validation_loss
if is_best:
self.best_validation_loss = val_metrics['loss']
self.best_model_state = self.model.state_dict().copy()
self.patience_counter = 0
else:
self.patience_counter += 1
# Save checkpoint
if (epoch + 1) % self.config.checkpoint_every == 0:
self.save_checkpoint(epoch + 1, is_best)
# Early stopping
if self.patience_counter >= self.config.early_stopping_patience:
print(f"\nEarly stopping at epoch {epoch+1} (patience={self.config.early_stopping_patience})")
break
# Load best model
if self.best_model_state is not None:
self.model.load_state_dict(self.best_model_state)
print(f"\nLoaded best model (val_loss={self.best_validation_loss:.6f})")
# Final save
self.save_checkpoint(self.current_epoch + 1, True)
print(f"\n{'='*80}")
print(f"NEBULA v0.4 TRAINING COMPLETADO")
print(f"{'='*80}")
print(f"Best validation loss: {self.best_validation_loss:.6f}")
print(f"Total training time: {sum(self.training_history.get('epoch_times', [0])):.1f}s")
return self.training_history
def main():
"""Main training execution"""
print("NEBULA v0.4 TRAINING SYSTEM")
print("Equipo NEBULA: Francisco Angulo de Lafuente y Ángel")
print("Paso a paso, sin prisa, con calma")
# Training configuration
config = TrainingConfig(
epochs=30, # Reasonable para initial training
batch_size=16, # Balanced para RTX 3090
learning_rate=1e-3,
constraint_weight_start=1.0,
constraint_weight_end=3.0,
distillation_weight=0.2,
early_stopping_patience=8,
mixed_precision=True
)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
try:
# Initialize trainer
trainer = NEBULATrainer(config, device)
# Start training
training_history = trainer.train(num_training_samples=5000) # Initial training
# Save training history
with open('nebula_v04_training_history.json', 'w') as f:
json.dump(training_history, f, indent=2)
print("\nTRAINING SUCCESSFUL")
print("Model ready para benchmark testing")
except Exception as e:
print(f"\nTRAINING ERROR: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == "__main__":
success = main()
if success:
print("NEBULA v0.4 trained successfully - Ready para benchmarking!")
else:
print("Training failed - Debug required")