GP / gp1.py
woletee
this is the commit for adding the gp interface
ded89f7
import numpy as np
import sys, os, json
from deap import base, creator, gp, tools, algorithms
from dsl import *
import glob
from dsl import _objects
# Custom type definition (DEAP compatibility)
class GridList(list):
pass
# DEAP GP Setup
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
pset = gp.PrimitiveSetTyped("MAIN", [Grid], Grid)
# Basic Grid primitives (all your previously defined primitives)
pset.addPrimitive(ic_compress2, [Grid], Grid)
pset.addPrimitive(flipy, [Grid], Grid)
pset.addPrimitive(rot90, [Grid], Grid)
pset.addPrimitive(rot180, [Grid], Grid)
pset.addPrimitive(mirrorX, [Grid], Grid)
pset.addPrimitive(mirrorY, [Grid], Grid)
pset.addPrimitive(overlay, [Grid, Grid], Grid)
pset.addPrimitive(set_bg, [int, Grid], Grid)
pset.addPrimitive(ic_connectY, [Grid], Grid)
pset.addPrimitive(ic_connectX, [Grid], Grid)
pset.addPrimitive(ic_compress3, [Grid], Grid)
pset.addPrimitive(ic_erasecol, [int, Grid], Grid)
pset.addPrimitive(left_half, [Grid], Grid)
pset.addPrimitive(right_half, [Grid], Grid)
pset.addPrimitive(top_half, [Grid], Grid)
pset.addPrimitive(repeatX, [Grid], Grid)
pset.addPrimitive(flipx, [Grid], Grid)
pset.addPrimitive(setcol, [Colour, Grid], Grid)
pset.addPrimitive(ic_embed, [Grid, Grid], Grid)
pset.addPrimitive(rot270, [Grid], Grid)
# GridList-based primitives
pset.addPrimitive(ic_splitall, [Grid], GridList)
pset.addPrimitive(ic_composegrowing, [GridList], Grid)
pset.addPrimitive(lambda x: GridList([x]), [Grid], GridList, name="toGridList")
pset.addTerminal(GridList([]), GridList)
pset.addPrimitive(ic_pickunique, [GridList], Grid)
pset.addPrimitive(gravity_right, [Grid], Grid)
pset.addPrimitive(split8, [Grid], GridList)
pset.addPrimitive(ic_makeborder, [Grid], Grid)
pset.addPrimitive(ic_filtercol, [Colour, Grid], Grid)
pset.addPrimitive(ic_invert, [Grid], Grid)
pset.addPrimitive(logical_and, [Grid, Grid], Grid)
pset.addPrimitive(fillobj, [Colour, Grid], Grid)
pset.addPrimitive(topcol, [Grid], Colour)
pset.addPrimitive(rarestcol, [Grid], Colour)
pset.addPrimitive(gravity_down, [Grid], Grid)
pset.addPrimitive(pickcommon, [GridList], Grid)
pset.addPrimitive(swapxy, [Grid], Grid)
pset.addPrimitive(topcol, [Grid], Colour)
pset.addPrimitive(setcol, [Colour, Grid], Grid)
pset.addPrimitive(get_bg, [Grid], Colour)
pset.addPrimitive(rarestcol, [Grid], Colour)
pset.addPrimitive(ic_fill, [Colour, Grid], Grid)
pset.addPrimitive(ic_center, [Grid], Grid)
pset.addPrimitive(countToY, [Count, Colour], Grid)
pset.addPrimitive(countPixels, [Grid], Count)
pset.addPrimitive(ic_splitcols, [Grid], GridList)
pset.addPrimitive(grid_split, [Grid], GridList)
pset.addPrimitive(_objects, [Grid], GridList)
pset.addPrimitive(overlay, [Grid, Grid], Grid)
pset.addPrimitive(stack_no_crop, [GridList], Grid)
pset.addPrimitive(move_down, [Grid], Grid)
pset.addPrimitive(draw_line, [Grid, int], Grid)
pset.addPrimitive(draw_line_slant_up, [Grid, Grid], Grid)
pset.addPrimitive(draw_line_slant_down, [Grid], Grid)
pset.addPrimitive(rarestcol, [Grid], int)
pset.addPrimitive(lambda: 1, [], int, name="int_one")
# Integer terminals
for i in range(1, 10):
pset.addTerminal(i, int)
import operator # needed for operator.attrgetter
toolbox = base.Toolbox()
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
# Use leaf-biased crossover to avoid excessive tree growth
toolbox.register("mate", gp.cxOnePointLeafBiased, termpb=0.1)
# Mutation setup remains the same
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
# Explicitly limit tree height to avoid memory errors
MAX_TREE_HEIGHT = 17 # recommended limit
toolbox.decorate("mate", gp.staticLimit(operator.attrgetter("height"), MAX_TREE_HEIGHT))
toolbox.decorate("mutate", gp.staticLimit(operator.attrgetter("height"), MAX_TREE_HEIGHT))
# Population initialization (unchanged)
toolbox.register("population", tools.initRepeat, list,
lambda: creator.Individual(gp.genHalfAndHalf(pset, min_=1, max_=3)))
def evaluate_task(individual, task):
func = toolbox.compile(expr=individual)
total = 0
for example in task['train']:
inp = Grid(np.array(example["input"]))
tgt = Grid(np.array(example["output"]))
try:
out = func(inp)
if out.grid.shape == tgt.grid.shape:
total += np.sum(out.grid == tgt.grid)
except:
pass
return (total,)
toolbox.register("evaluate", evaluate_task)
# Folder containing your tasks
training_folder = "./training/"
task_files = glob.glob(training_folder + "*.json")
results = []
# Evaluate each task separately
for task_file in task_files:
task_name = os.path.basename(task_file)
print(f"Processing {task_name}")
with open(task_file, 'r') as f:
task = json.load(f)
# GP initialization per task
pop = toolbox.population(n=150)
hof = tools.HallOfFame(1)
for gen in range(250): # Adjust number of generations if needed
offspring = algorithms.varAnd(pop, toolbox, cxpb=0.5, mutpb=0.2)
fits = toolbox.map(lambda ind: toolbox.evaluate(ind, task), offspring)
for fit, ind in zip(fits, offspring):
ind.fitness.values = fit
hof.update(offspring)
pop = toolbox.select(offspring, k=len(pop))
# Evaluate best individual on the test set
best_ind = hof[0]
func = toolbox.compile(expr=best_ind)
correct = False
try:
for test_case in task["test"]:
test_inp = Grid(np.array(test_case["input"]))
expected_out = np.array(test_case["output"])
output_grid = func(test_inp).grid
if np.array_equal(output_grid, expected_out):
correct = True
else:
correct = False
break
except:
correct = False
results.append({
"task_name": task_name,
"best_program": str(best_ind),
"solution_found": correct
})
print(f"{task_name} completed. Solution found: {correct}")
# Save summary results to JSON
with open("tasks_results_summary.json", "w") as f:
json.dump(results, f, indent=2)
print("All tasks processed. Results saved to tasks_results_summary.json.")
# Put everything you already have here...
# And then add this at the bottom:
def run_task(task_path):
with open(task_path, 'r') as f:
task = json.load(f)
pop = toolbox.population(n=150)
hof = tools.HallOfFame(1)
for gen in range(250):
offspring = algorithms.varAnd(pop, toolbox, cxpb=0.5, mutpb=0.2)
fits = toolbox.map(lambda ind: toolbox.evaluate(ind, task), offspring)
for fit, ind in zip(fits, offspring):
ind.fitness.values = fit
hof.update(offspring)
pop = toolbox.select(offspring, k=len(pop))
best_ind = hof[0]
func = toolbox.compile(expr=best_ind)
# We'll just use the first test example for visual output
test_example = task["test"][0]
input_grid = np.array(test_example["input"])
target_grid = np.array(test_example["output"])
try:
output_grid = func(Grid(input_grid)).grid
correct = np.array_equal(output_grid, target_grid)
except:
output_grid = np.zeros_like(input_grid)
correct = False
return str(best_ind), correct, input_grid.tolist(), target_grid.tolist(), output_grid.tolist()