|
import os
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
import random
|
|
import logging
|
|
from datetime import datetime
|
|
import json
|
|
from typing import List, Tuple, Dict, Any
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
from tensorflow.keras.models import Sequential, load_model, clone_model
|
|
from tensorflow.keras.layers import Dense, Input
|
|
from tensorflow.keras.optimizers import Adam
|
|
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
|
|
import matplotlib.pyplot as plt
|
|
from scipy.stats import kendalltau
|
|
|
|
|
|
DEFAULT_SEQ_LENGTH = 10
|
|
DEFAULT_POP_SIZE = 50
|
|
DEFAULT_GENERATIONS = 50
|
|
DEFAULT_MUTATION_RATE = 0.4
|
|
DEFAULT_WEIGHT_MUT_RATE = 0.8
|
|
DEFAULT_ACTIVATION_MUT_RATE = 0.2
|
|
DEFAULT_MUTATION_STRENGTH = 0.1
|
|
DEFAULT_TOURNAMENT_SIZE = 5
|
|
DEFAULT_ELITISM_COUNT = 2
|
|
DEFAULT_EPOCHS_FINAL_TRAIN = 100
|
|
DEFAULT_BATCH_SIZE = 64
|
|
|
|
|
|
def setup_logging(log_dir: str, log_level=logging.INFO) -> None:
|
|
"""Configures logging to file and console."""
|
|
log_filename = os.path.join(log_dir, 'evolution.log')
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_filename),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
|
|
def check_gpu() -> bool:
|
|
"""Checks for GPU availability and sets memory growth."""
|
|
gpus = tf.config.list_physical_devices('GPU')
|
|
if gpus:
|
|
try:
|
|
|
|
for gpu in gpus:
|
|
tf.config.experimental.set_memory_growth(gpu, True)
|
|
logical_gpus = tf.config.list_logical_devices('GPU')
|
|
logging.info(f"{len(gpus)} Physical GPUs, {len(logical_gpus)} Logical GPUs found.")
|
|
logging.info(f"Using GPU: {gpus[0].name}")
|
|
return True
|
|
except RuntimeError as e:
|
|
|
|
logging.error(f"Error setting memory growth: {e}")
|
|
return False
|
|
else:
|
|
logging.warning("GPU not found. Using CPU.")
|
|
return False
|
|
|
|
|
|
def generate_data(num_samples: int, seq_length: int) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Generates random sequences and their sorted versions."""
|
|
logging.info(f"Generating {num_samples} samples with sequence length {seq_length}...")
|
|
X = np.random.rand(num_samples, seq_length) * 100
|
|
y = np.sort(X, axis=1)
|
|
logging.info("Data generation complete.")
|
|
return X, y
|
|
|
|
|
|
def create_individual(seq_length: int) -> Sequential:
|
|
"""Creates a Keras Sequential model with random architecture."""
|
|
model = Sequential(name=f"model_random_{random.randint(1000, 9999)}")
|
|
num_hidden_layers = random.randint(1, 4)
|
|
neurons_per_layer = [random.randint(8, 64) for _ in range(num_hidden_layers)]
|
|
activations = [random.choice(['relu', 'tanh', 'sigmoid']) for _ in range(num_hidden_layers)]
|
|
|
|
|
|
model.add(Input(shape=(seq_length,)))
|
|
|
|
|
|
for i in range(num_hidden_layers):
|
|
model.add(Dense(neurons_per_layer[i], activation=activations[i]))
|
|
|
|
|
|
model.add(Dense(seq_length, activation='linear'))
|
|
|
|
|
|
|
|
model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
|
return model
|
|
|
|
@tf.function
|
|
def get_predictions(model: Sequential, X: np.ndarray, batch_size: int) -> tf.Tensor:
|
|
"""Gets model predictions using tf.function."""
|
|
return model(X, training=False)
|
|
|
|
def calculate_fitness(individual: Sequential, X: np.ndarray, y: np.ndarray, batch_size: int) -> float:
|
|
"""Calculates fitness based on inverse MSE. Handles potential errors."""
|
|
try:
|
|
|
|
X_tf = tf.cast(X, tf.float32)
|
|
y_tf = tf.cast(y, tf.float32)
|
|
|
|
|
|
y_pred_tf = get_predictions(individual, X_tf, batch_size)
|
|
|
|
|
|
mse = tf.reduce_mean(tf.square(y_tf - y_pred_tf))
|
|
mse_val = mse.numpy()
|
|
|
|
|
|
fitness_score = 1.0 / (mse_val + 1e-8)
|
|
|
|
|
|
if not np.isfinite(fitness_score):
|
|
logging.warning(f"Non-finite fitness detected ({fitness_score}) for model {individual.name}. Assigning low fitness.")
|
|
return 1e-8
|
|
|
|
return float(fitness_score)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error during fitness calculation for model {individual.name}: {e}", exc_info=True)
|
|
return 1e-8
|
|
|
|
|
|
def mutate_individual(individual: Sequential, weight_mut_rate: float, act_mut_rate: float, mut_strength: float) -> Sequential:
|
|
"""Applies mutations (weight perturbation, activation change) to an individual."""
|
|
mutated_model = clone_model(individual)
|
|
mutated_model.set_weights(individual.get_weights())
|
|
|
|
mutated = False
|
|
|
|
if random.random() < weight_mut_rate:
|
|
mutated = True
|
|
for layer in mutated_model.layers:
|
|
if isinstance(layer, Dense):
|
|
weights_biases = layer.get_weights()
|
|
new_weights_biases = []
|
|
for wb in weights_biases:
|
|
noise = np.random.normal(0, mut_strength, wb.shape)
|
|
new_weights_biases.append(wb + noise)
|
|
if new_weights_biases:
|
|
layer.set_weights(new_weights_biases)
|
|
|
|
|
|
|
|
if random.random() < act_mut_rate:
|
|
|
|
dense_layers = [layer for layer in mutated_model.layers if isinstance(layer, Dense)]
|
|
if len(dense_layers) > 1:
|
|
mutated = True
|
|
layer_to_mutate = random.choice(dense_layers[:-1])
|
|
current_activation = layer_to_mutate.get_config().get('activation', 'linear')
|
|
possible_activations = ['relu', 'tanh', 'sigmoid']
|
|
if current_activation in possible_activations:
|
|
possible_activations.remove(current_activation)
|
|
new_activation = random.choice(possible_activations)
|
|
|
|
|
|
|
|
config = mutated_model.get_config()
|
|
for layer_config in config['layers']:
|
|
if layer_config['config']['name'] == layer_to_mutate.name:
|
|
layer_config['config']['activation'] = new_activation
|
|
|
|
break
|
|
|
|
|
|
|
|
try:
|
|
mutated_model_new_act = Sequential.from_config(config)
|
|
mutated_model_new_act.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
|
mutated_model = mutated_model_new_act
|
|
except Exception as e:
|
|
logging.error(f"Error rebuilding model after activation mutation for {mutated_model.name}: {e}")
|
|
|
|
|
|
|
|
|
|
if mutated:
|
|
mutated_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
|
|
mutated_model._name = f"mutated_{individual.name}"
|
|
|
|
return mutated_model
|
|
|
|
|
|
def tournament_selection(population: List[Sequential], fitness_scores: List[float], k: int) -> Sequential:
|
|
"""Selects the best individual from a randomly chosen tournament group."""
|
|
tournament_indices = random.sample(range(len(population)), k)
|
|
tournament_fitness = [fitness_scores[i] for i in tournament_indices]
|
|
winner_index_in_tournament = np.argmax(tournament_fitness)
|
|
winner_original_index = tournament_indices[winner_index_in_tournament]
|
|
return population[winner_original_index]
|
|
|
|
def evolve_population(population: List[Sequential], X: np.ndarray, y: np.ndarray, generations: int,
|
|
mutation_rate: float, weight_mut_rate: float, act_mut_rate: float, mut_strength: float,
|
|
tournament_size: int, elitism_count: int, batch_size: int) -> Tuple[Sequential, List[float], List[float]]:
|
|
"""Runs the evolutionary process."""
|
|
best_fitness_history = []
|
|
avg_fitness_history = []
|
|
best_model_overall = None
|
|
best_fitness_overall = -1.0
|
|
|
|
for gen in range(generations):
|
|
|
|
fitness_scores = [calculate_fitness(ind, X, y, batch_size) for ind in population]
|
|
|
|
|
|
current_best_idx = np.argmax(fitness_scores)
|
|
current_best_fitness = fitness_scores[current_best_idx]
|
|
if current_best_fitness > best_fitness_overall:
|
|
best_fitness_overall = current_best_fitness
|
|
|
|
best_model_overall = clone_model(population[current_best_idx])
|
|
best_model_overall.set_weights(population[current_best_idx].get_weights())
|
|
best_model_overall.compile(optimizer=Adam(), loss='mse')
|
|
logging.info(f"Generation {gen+1}: New overall best fitness: {best_fitness_overall:.4f}")
|
|
|
|
|
|
avg_fitness = np.mean(fitness_scores)
|
|
best_fitness_history.append(current_best_fitness)
|
|
avg_fitness_history.append(avg_fitness)
|
|
|
|
logging.info(f"Generation {gen+1}/{generations} - Best Fitness: {current_best_fitness:.4f}, Avg Fitness: {avg_fitness:.4f}")
|
|
|
|
new_population = []
|
|
|
|
|
|
if elitism_count > 0:
|
|
elite_indices = np.argsort(fitness_scores)[-elitism_count:]
|
|
for idx in elite_indices:
|
|
|
|
elite_clone = clone_model(population[idx])
|
|
elite_clone.set_weights(population[idx].get_weights())
|
|
elite_clone.compile(optimizer=Adam(), loss='mse')
|
|
new_population.append(elite_clone)
|
|
|
|
|
|
|
|
while len(new_population) < len(population):
|
|
|
|
parent = tournament_selection(population, fitness_scores, tournament_size)
|
|
|
|
|
|
child = parent
|
|
if random.random() < mutation_rate:
|
|
|
|
parent_clone = clone_model(parent)
|
|
parent_clone.set_weights(parent.get_weights())
|
|
parent_clone.compile(optimizer=Adam(), loss='mse')
|
|
child = mutate_individual(parent_clone, weight_mut_rate, act_mut_rate, mut_strength)
|
|
else:
|
|
|
|
child = clone_model(parent)
|
|
child.set_weights(parent.get_weights())
|
|
child.compile(optimizer=Adam(), loss='mse')
|
|
|
|
|
|
new_population.append(child)
|
|
|
|
population = new_population[:len(population)]
|
|
|
|
if best_model_overall is None:
|
|
best_idx = np.argmax([calculate_fitness(ind, X, y, batch_size) for ind in population])
|
|
best_model_overall = population[best_idx]
|
|
|
|
return best_model_overall, best_fitness_history, avg_fitness_history
|
|
|
|
|
|
|
|
def plot_fitness_history(history_best: List[float], history_avg: List[float], output_dir: str) -> None:
|
|
"""Plots and saves the fitness history."""
|
|
plt.figure(figsize=(12, 6))
|
|
plt.plot(history_best, label="Best Fitness per Generation", marker='o', linestyle='-')
|
|
plt.plot(history_avg, label="Average Fitness per Generation", marker='x', linestyle='--')
|
|
plt.xlabel("Generation")
|
|
plt.ylabel("Fitness Score (1 / MSE)")
|
|
plt.title("Evolutionary Process Fitness History")
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.tight_layout()
|
|
plot_path = os.path.join(output_dir, "fitness_history.png")
|
|
plt.savefig(plot_path)
|
|
plt.close()
|
|
logging.info(f"Fitness history plot saved to {plot_path}")
|
|
|
|
|
|
def evaluate_model(model: Sequential, X_test: np.ndarray, y_test: np.ndarray, batch_size: int) -> Dict[str, float]:
|
|
"""Evaluates the final model on the test set."""
|
|
logging.info("Evaluating final model on test data...")
|
|
y_pred = model.predict(X_test, batch_size=batch_size, verbose=0)
|
|
test_mse = np.mean(np.square(y_test - y_pred))
|
|
logging.info(f"Final Test MSE: {test_mse:.6f}")
|
|
|
|
|
|
sample_size = min(100, X_test.shape[0])
|
|
taus = []
|
|
indices = np.random.choice(X_test.shape[0], sample_size, replace=False)
|
|
for i in indices:
|
|
tau, _ = kendalltau(y_test[i], y_pred[i])
|
|
if not np.isnan(tau):
|
|
taus.append(tau)
|
|
avg_kendall_tau = np.mean(taus) if taus else 0.0
|
|
logging.info(f"Average Kendall's Tau (on {sample_size} samples): {avg_kendall_tau:.4f}")
|
|
|
|
return {
|
|
"test_mse": float(test_mse),
|
|
"avg_kendall_tau": float(avg_kendall_tau)
|
|
}
|
|
|
|
|
|
def run_pipeline(args: argparse.Namespace):
|
|
"""Executes the complete neuroevolution pipeline."""
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_dir = os.path.join(args.output_base_dir, f"evorun_{timestamp}")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
setup_logging(output_dir)
|
|
logging.info(f"Starting EvoNet Pipeline Run: {timestamp}")
|
|
logging.info(f"Output directory: {output_dir}")
|
|
|
|
|
|
logging.info("Configuration:")
|
|
args_dict = vars(args)
|
|
for k, v in args_dict.items():
|
|
logging.info(f" {k}: {v}")
|
|
|
|
config_path = os.path.join(output_dir, "config.json")
|
|
with open(config_path, 'w') as f:
|
|
json.dump(args_dict, f, indent=4)
|
|
logging.info(f"Configuration saved to {config_path}")
|
|
|
|
|
|
|
|
random.seed(args.seed)
|
|
np.random.seed(args.seed)
|
|
tf.random.set_seed(args.seed)
|
|
logging.info(f"Using random seed: {args.seed}")
|
|
|
|
|
|
check_gpu()
|
|
|
|
|
|
X_train, y_train = generate_data(args.train_samples, args.seq_length)
|
|
X_test, y_test = generate_data(args.test_samples, args.seq_length)
|
|
|
|
|
|
logging.info(f"Initializing population of {args.pop_size} individuals...")
|
|
population = [create_individual(args.seq_length) for _ in range(args.pop_size)]
|
|
logging.info("Population initialized.")
|
|
|
|
|
|
logging.info(f"Starting evolution for {args.generations} generations...")
|
|
best_model_unevolved, best_fitness_hist, avg_fitness_hist = evolve_population(
|
|
population, X_train, y_train, args.generations,
|
|
args.mutation_rate, args.weight_mut_rate, args.activation_mut_rate, args.mutation_strength,
|
|
args.tournament_size, args.elitism_count, args.batch_size
|
|
)
|
|
logging.info("Evolution complete.")
|
|
|
|
|
|
history_path = os.path.join(output_dir, "fitness_history.csv")
|
|
history_data = np.array([best_fitness_hist, avg_fitness_hist]).T
|
|
np.savetxt(history_path, history_data, delimiter=',', header='BestFitness,AvgFitness', comments='')
|
|
logging.info(f"Fitness history data saved to {history_path}")
|
|
|
|
|
|
plot_fitness_history(best_fitness_hist, avg_fitness_hist, output_dir)
|
|
|
|
|
|
logging.info("Starting final training of the best evolved model...")
|
|
|
|
final_model = clone_model(best_model_unevolved)
|
|
final_model.set_weights(best_model_unevolved.get_weights())
|
|
|
|
final_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])
|
|
|
|
|
|
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
|
|
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6, verbose=1)
|
|
|
|
|
|
history = final_model.fit(
|
|
X_train, y_train,
|
|
epochs=args.epochs_final_train,
|
|
batch_size=args.batch_size,
|
|
validation_split=0.2,
|
|
callbacks=[early_stopping, reduce_lr],
|
|
verbose=2
|
|
)
|
|
logging.info("Final training complete.")
|
|
|
|
|
|
final_metrics = evaluate_model(final_model, X_test, y_test, args.batch_size)
|
|
|
|
|
|
model_path = os.path.join(output_dir, "best_evolved_model_trained.keras")
|
|
final_model.save(model_path)
|
|
logging.info(f"Final trained model saved to {model_path}")
|
|
|
|
|
|
results = {
|
|
"config": args_dict,
|
|
"final_evaluation": final_metrics,
|
|
"evolution_summary": {
|
|
"best_fitness_overall": best_fitness_hist[-1] if best_fitness_hist else None,
|
|
"avg_fitness_final_gen": avg_fitness_hist[-1] if avg_fitness_hist else None,
|
|
},
|
|
"training_history": history.history
|
|
}
|
|
results_path = os.path.join(output_dir, "final_results.json")
|
|
|
|
for key in results['training_history']:
|
|
results['training_history'][key] = [float(v) for v in results['training_history'][key]]
|
|
|
|
with open(results_path, 'w') as f:
|
|
json.dump(results, f, indent=4)
|
|
logging.info(f"Final results saved to {results_path}")
|
|
logging.info("Pipeline finished successfully!")
|
|
|
|
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="EvoNet: Neuroevolution for Sorting Task")
|
|
|
|
|
|
parser.add_argument('--output_base_dir', type=str, default=os.path.join(os.getcwd(), "evonet_runs"),
|
|
help='Base directory to store run results.')
|
|
|
|
|
|
parser.add_argument('--seq_length', type=int, default=DEFAULT_SEQ_LENGTH,
|
|
help='Length of the sequences to sort.')
|
|
parser.add_argument('--train_samples', type=int, default=5000, help='Number of training samples.')
|
|
parser.add_argument('--test_samples', type=int, default=1000, help='Number of test samples.')
|
|
|
|
|
|
parser.add_argument('--pop_size', type=int, default=DEFAULT_POP_SIZE, help='Population size.')
|
|
parser.add_argument('--generations', type=int, default=DEFAULT_GENERATIONS, help='Number of generations.')
|
|
parser.add_argument('--mutation_rate', type=float, default=DEFAULT_MUTATION_RATE,
|
|
help='Overall probability of mutating an individual.')
|
|
parser.add_argument('--weight_mut_rate', type=float, default=DEFAULT_WEIGHT_MUT_RATE,
|
|
help='Probability of weight perturbation if mutation occurs.')
|
|
parser.add_argument('--activation_mut_rate', type=float, default=DEFAULT_ACTIVATION_MUT_RATE,
|
|
help='Probability of activation change if mutation occurs.')
|
|
parser.add_argument('--mutation_strength', type=float, default=DEFAULT_MUTATION_STRENGTH,
|
|
help='Standard deviation of Gaussian noise for weight mutation.')
|
|
parser.add_argument('--tournament_size', type=int, default=DEFAULT_TOURNAMENT_SIZE,
|
|
help='Number of individuals participating in tournament selection.')
|
|
parser.add_argument('--elitism_count', type=int, default=DEFAULT_ELITISM_COUNT,
|
|
help='Number of best individuals to carry over directly.')
|
|
|
|
|
|
parser.add_argument('--batch_size', type=int, default=DEFAULT_BATCH_SIZE, help='Batch size for predictions and training.')
|
|
parser.add_argument('--epochs_final_train', type=int, default=DEFAULT_EPOCHS_FINAL_TRAIN,
|
|
help='Max epochs for final training of the best model.')
|
|
|
|
|
|
parser.add_argument('--seed', type=int, default=None, help='Random seed for reproducibility (default: random).')
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
if args.seed is None:
|
|
args.seed = random.randint(0, 2**32 - 1)
|
|
|
|
return args
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
cli_args = parse_arguments()
|
|
|
|
|
|
os.makedirs(cli_args.output_base_dir, exist_ok=True)
|
|
|
|
|
|
try:
|
|
run_pipeline(cli_args)
|
|
except Exception as e:
|
|
|
|
|
|
print(f"FATAL ERROR in pipeline execution: {e}", file=sys.stderr)
|
|
|
|
if logging.getLogger().hasHandlers():
|
|
logging.critical("FATAL ERROR in pipeline execution:", exc_info=True)
|
|
else:
|
|
import traceback
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
sys.exit(1) |