Source code for slim_gsgp.algorithms.GP.gp

# MIT License
#
# Copyright (c) 2024 DALabNOVA
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Genetic Programming (GP) module.
"""

import random
import time

import numpy as np
import torch
from slim_gsgp.algorithms.GP.representations.population import Population
from slim_gsgp.algorithms.GP.representations.tree import Tree
from slim_gsgp.utils.diversity import niche_entropy
from slim_gsgp.utils.logger import logger
from slim_gsgp.utils.utils import verbose_reporter


[docs] class GP: def __init__( self, pi_init, initializer, selector, mutator, crossover, find_elit_func, p_m=0.2, p_xo=0.8, pop_size=100, seed=0, settings_dict=None, ): """ Initialize the Genetic Programming algorithm. Parameters ---------- pi_init : dict Dictionary with all the parameters needed for candidate solutions initialization. initializer : Callable Function to initialize the population. selector : Callable Function to select individuals. mutator : Callable Function to mutate individuals. crossover : Callable Function to perform crossover between individuals. find_elit_func : Callable Function to find elite individuals. p_m : float, optional Probability of mutation. Default is 0.2. p_xo : float, optional Probability of crossover. Default is 0.8. pop_size : int, optional Size of the population. Default is 100. seed : int, optional Seed for random number generation. Default is 0. settings_dict : dict, optional Additional settings dictionary. """ self.pi_init = pi_init self.selector = selector self.p_m = p_m self.crossover = crossover self.mutator = mutator self.p_xo = p_xo self.initializer = initializer self.pop_size = pop_size self.seed = seed self.find_elit_func = find_elit_func self.settings_dict = settings_dict Tree.FUNCTIONS = pi_init["FUNCTIONS"] Tree.TERMINALS = pi_init["TERMINALS"] Tree.CONSTANTS = pi_init["CONSTANTS"]
[docs] def solve( self, X_train, X_test, y_train, y_test, curr_dataset, n_iter=20, elitism=True, log=0, verbose=0, test_elite=False, log_path=None, run_info=None, max_depth=None, ffunction=None, n_elites=1, depth_calculator=None, n_jobs = 1 ): """ Execute the Genetic Programming algorithm. Parameters ---------- X_train : torch.Tensor Training data features. X_test : torch.Tensor Test data features. y_train : torch.Tensor Training data labels. y_test : torch.Tensor Test data labels. curr_dataset : str Current dataset name. n_iter : int, optional Number of iterations. Default is 20. elitism : bool, optional Whether to use elitism. Default is True. log : int, optional Logging level. Default is 0. verbose : int, optional Verbosity level. Default is 0. test_elite : bool, optional Whether to evaluate elite individuals on test data. Default is False. log_path : str, optional Path to save logs. Default is None. run_info : list, optional Information about the current run. Default is None. max_depth : int, optional Maximum depth of the tree. Default is None. ffunction : function, optional Fitness function. Default is None. n_elites : int, optional Number of elites. Default is 1. depth_calculator : function, optional Function to calculate tree depth. Default is None. n_jobs : int, optional The number of jobs for parallel processing. Default is 1. """ # setting the seeds torch.manual_seed(self.seed) np.random.seed(self.seed) random.seed(self.seed) start = time.time() # Initialize the population population = Population( [Tree(tree) for tree in self.initializer(**self.pi_init)] ) # evaluating the intial population population.evaluate(ffunction, X=X_train, y=y_train, n_jobs=n_jobs) end = time.time() # getting the elite(s) from the initial population self.elites, self.elite = self.find_elit_func(population, n_elites) # testing the elite on testing data, if applicable if test_elite: self.elite.evaluate(ffunction, X=X_test, y=y_test, testing=True) # logging the results if the log level is not 0 if log != 0: self.log_generation( 0, population, end - start, log, log_path, run_info ) # displaying the results on console if verbose level is not 0 if verbose != 0: verbose_reporter( curr_dataset.split("load_")[-1], 0, self.elite.fitness, self.elite.test_fitness, end - start, self.elite.node_count, ) # EVOLUTIONARY PROCESS for it in range(1, n_iter + 1): # getting the offspring population offs_pop, start = self.evolve_population( population, ffunction, max_depth, depth_calculator, elitism, X_train, y_train, n_jobs=n_jobs ) # replacing the population with the offspring population (P = P') population = offs_pop end = time.time() # getting the new elite(s) self.elites, self.elite = self.find_elit_func(population, n_elites) # testing the elite on testing data, if applicable if test_elite: self.elite.evaluate(ffunction, X=X_test, y=y_test, testing=True) # logging the results if log != 0 if log != 0: self.log_generation( it, population, end - start, log, log_path, run_info ) # displaying the results on console if verbose != 0 if verbose != 0: verbose_reporter( run_info[-1], it, self.elite.fitness, self.elite.test_fitness, end - start, self.elite.node_count, )
[docs] def evolve_population( self, population, ffunction, max_depth, depth_calculator, elitism, X_train, y_train, n_jobs=1 ): """ Evolve the population for one iteration (generation). Parameters ---------- population : Population The current population of individuals to evolve. ffunction : function Fitness function used to evaluate individuals. max_depth : int Maximum allowable depth for trees in the population. depth_calculator : Callable Function used to calculate the depth of trees. elitism : bool Whether to use elitism, i.e., preserving the best individuals across generations. X_train : torch.Tensor Input training data features. y_train : torch.Tensor Target values for the training data. n_jobs : int, optional Number of parallel jobs to use with the joblib library (default is 1). Returns ------- Population The evolved population after one generation. float The start time of the evolution process. """ # creating an empty offspring population list offs_pop = [] start = time.time() # adding the elite(s) to the offspring population if elitism: offs_pop.extend(self.elites) # filling the offspring population while len(offs_pop) < self.pop_size: # choosing between crossover and mutation if random.random() < self.p_xo: # if crossover is selected # choose two parents p1, p2 = self.selector(population), self.selector(population) # make sure that the parents are different while p1 == p2: p1, p2 = self.selector(population), self.selector(population) # generate offspring from the chosen parents offs1, offs2 = self.crossover( p1.repr_, p2.repr_, tree1_n_nodes=p1.node_count, tree2_n_nodes=p2.node_count, ) offspring = [] # assuring the offspring do not exceed max_depth # assuring the offspring do not exceed max_depth if max_depth is not None: offspring = [] while len(offspring) == 0: d1 = depth_calculator(offs1) d2 = depth_calculator(offs2) # if at least one offspring fits, keep the valid ones (avoid repeated calculations) if d1 <= max_depth: offspring.append(offs1) if d2 <= max_depth: offspring.append(offs2) if len(offspring) > 0: break # both too deep, retry crossover offs1, offs2 = self.crossover( p1.repr_, p2.repr_, tree1_n_nodes=p1.node_count, tree2_n_nodes=p2.node_count, ) # if no valid offspring found after attempts, offspring remains empty and the loop continues else: offspring = [offs1, offs2] else: # if mutation was chosen # choosing a parent p1 = self.selector(population) # generating a mutated offspring from the parent offs1 = self.mutator(p1.repr_, num_of_nodes=p1.node_count) # making sure the offspring does not exceed max_depth if max_depth is not None: while depth_calculator(offs1) > max_depth: offs1 = self.mutator(p1.repr_, num_of_nodes=p1.node_count) # adding the offspring to a list, to be added to the offspring population offspring = [offs1] # adding the offspring as instances of Tree to the offspring population offs_pop.extend([Tree(child) for child in offspring]) # making sure the offspring population is of the same size as the population if len(offs_pop) > population.size: offs_pop = offs_pop[: population.size] # turning the offspring population into an instance of Population offs_pop = Population(offs_pop) # evaluating the offspring population offs_pop.evaluate(ffunction, X=X_train, y=y_train, n_jobs=n_jobs) # retuning the offspring population and the time control variable return offs_pop, start
[docs] def log_generation( self, generation, population, elapsed_time, log, log_path, run_info ): """ Log the results for the current generation. Args: generation (int): Current generation (iteration) number. population (Population): Current population. elapsed_time (float): Time taken for the process. log (int): Logging level. log_path (str): Path to save logs. run_info (list): Information about the current run. Returns: None """ if log == 2: add_info = [ self.elite.test_fitness, self.elite.node_count, float(niche_entropy([ind.repr_ for ind in population.population])), np.std(population.fit), log, ] elif log == 3: add_info = [ self.elite.test_fitness, self.elite.node_count, " ".join([str(ind.node_count) for ind in population.population]), " ".join([str(f) for f in population.fit]), log, ] elif log == 4: add_info = [ self.elite.test_fitness, self.elite.node_count, float(niche_entropy([ind.repr_ for ind in population.population])), np.std(population.fit), " ".join([str(ind.node_count) for ind in population.population]), " ".join([str(f) for f in population.fit]), log, ] else: add_info = [self.elite.test_fitness, self.elite.node_count, log] logger( log_path, generation, self.elite.fitness, elapsed_time, float(population.nodes_count), additional_infos=add_info, run_info=run_info, seed=self.seed, )