Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from sklearn.datasets import make_classification | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score | |
| import random | |
| # Define a function to generate a dataset | |
| def generate_dataset(task_id): | |
| X, y = make_classification(n_samples=100, n_features=10, n_informative=5, n_redundant=3, n_repeated=2, random_state=task_id) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=task_id) | |
| return X_train, X_test, y_train, y_test | |
| # Define a neural network class | |
| class Net(keras.Model): | |
| def __init__(self): | |
| super(Net, self).__init__() | |
| self.fc1 = keras.layers.Dense(20, activation='relu', input_shape=(10,)) | |
| self.fc2 = keras.layers.Dense(10, activation='relu') | |
| self.fc3 = keras.layers.Dense(2) | |
| def call(self, x): | |
| x = self.fc1(x) | |
| x = self.fc2(x) | |
| x = self.fc3(x) | |
| return x | |
| # Define a genetic algorithm class | |
| class GeneticAlgorithm: | |
| def __init__(self, population_size, task_id): | |
| self.population_size = population_size | |
| self.task_id = task_id | |
| self.population = [Net() for _ in range(population_size)] | |
| def selection(self): | |
| X_train, X_test, y_train, y_test = generate_dataset(self.task_id) | |
| fitness = [] | |
| for i, net in enumerate(self.population): | |
| net.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) | |
| net.fit(X_train, y_train, epochs=10, verbose=0) | |
| loss, accuracy = net.evaluate(X_test, y_test, verbose=0) | |
| fitness.append(accuracy) | |
| if len(fitness) > 0: | |
| self.population = [self.population[i] for i in np.argsort(fitness)[-len(self.population)//2:]] | |
| def crossover(self): | |
| offspring = [] | |
| for _ in range(len(self.population)//2): | |
| parent1, parent2 = random.sample(self.population, 2) | |
| child = Net() | |
| child.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) | |
| # Get the weights of the parent networks | |
| parent1_weights = parent1.get_weights() | |
| parent2_weights = parent2.get_weights() | |
| # Average the weights of the two parents | |
| child_weights = [] | |
| for w1, w2 in zip(parent1_weights, parent2_weights): | |
| child_weights.append((w1 + w2) / 2) | |
| # Set the weights of the child network | |
| child.set_weights(child_weights) | |
| offspring.append(child) | |
| self.population += offspring | |
| def mutation(self): | |
| for net in self.population: | |
| if random.random() < 0.1: | |
| weights = net.get_weights() | |
| new_weights = [np.array(w) + np.random.randn(*w.shape) * 0.1 for w in weights] | |
| net.set_weights(new_weights) | |
| # Streamlit app | |
| st.title("Evolution of Sub-Models") | |
| # Parameters | |
| st.sidebar.header("Parameters") | |
| population_size = st.sidebar.slider("Population size", 10, 100, 50) | |
| num_tasks = st.sidebar.slider("Number of tasks", 1, 10, 5) | |
| num_generations = st.sidebar.slider("Number of generations", 1, 100, 10) | |
| gas = None | |
| # Run the evolution | |
| gas = [] | |
| if st.button("Run evolution"): | |
| gas = [GeneticAlgorithm(population_size, task_id) for task_id in range(num_tasks)] | |
| gas = [GeneticAlgorithm(population_size, task_id) for task_id in range(num_tasks)] | |
| for generation in range(num_generations): | |
| for ga in gas: | |
| ga.selection() | |
| ga.crossover() | |
| ga.mutation() | |
| st.write(f"Generation {generation+1} complete") | |
| # Evaluate the final population | |
| final_accuracy = [] | |
| for task_id, ga in enumerate(gas): | |
| X_train, X_test, y_train, y_test = generate_dataset(task_id) | |
| accuracy = [] | |
| for net in ga.population: | |
| net.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) | |
| net.fit(X_train, y_train, epochs=10, verbose=0) | |
| loss, acc = net.evaluate(X_test, y_test, verbose=0) | |
| accuracy.append(acc) | |
| if len(accuracy) > 0: | |
| final_accuracy.append(np.mean(accuracy)) | |
| # Trade populations between tasks | |
| for i in range(len(gas)): | |
| for j in range(i+1, len(gas)): | |
| ga1 = gas[i] | |
| ga2 = gas[j] | |
| population1 = ga1.population | |
| population2 = ga2.population | |
| num_trade = int(0.1 * population_size) | |
| trade1 = random.sample(population1, num_trade) | |
| trade2 = random.sample(population2, num_trade) | |
| ga1.population = population1 + trade2 | |
| ga2.population = population2 + trade1 | |
| # Evaluate the final population after trading | |
| final_accuracy_after_trade = [] | |
| for task_id, ga in enumerate(gas): | |
| X_train, X_test, y_train, y_test = generate_dataset(task_id) | |
| accuracy = [] | |
| for net in ga.population: | |
| net.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) | |
| net.build(input_shape=(None, 10)) # Compile the model before training | |
| net.fit(X_train, y_train, epochs=10, verbose=0) | |
| loss, acc = net.evaluate(X_test, y_test, verbose=0) | |
| accuracy.append(acc) | |
| final_accuracy_after_trade.append(np.mean(accuracy)) | |
| if len(final_accuracy) > 0: | |
| st.write(f"Final accuracy: {np.mean(final_accuracy)}") | |
| st.write(f"Final accuracy after trading: {np.mean(final_accuracy_after_trade)}") | |