Source code for slim_gsgp.main_gsgp

# MIT License
#
# Copyright (c) 2024 DALabNOVA
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
This script runs the StandardGSGP algorithm on various datasets and configurations,
logging the results for further analysis.
"""

import uuid
import os
import warnings

from slim_gsgp.algorithms.GSGP.gsgp import GSGP
from slim_gsgp.config.gsgp_config import *
from slim_gsgp.utils.logger import log_settings
from slim_gsgp.utils.utils import get_terminals, validate_inputs, generate_random_uniform
from typing import Callable

[docs] def gsgp(X_train: torch.Tensor, y_train: torch.Tensor, X_test: torch.Tensor = None, y_test: torch.Tensor = None, dataset_name: str = None, pop_size: int = gsgp_parameters["pop_size"], n_iter: int = gsgp_solve_parameters["n_iter"], p_xo: float = gsgp_parameters["p_xo"], elitism: bool = gsgp_solve_parameters["elitism"], n_elites: int = gsgp_solve_parameters["n_elites"], init_depth: int = gsgp_pi_init["init_depth"], ms_lower: float = 0, ms_upper: float = 1, log_path: str = None, seed: int = gsgp_parameters["seed"], log_level: int = gsgp_solve_parameters["log"], verbose: int = gsgp_solve_parameters["verbose"], reconstruct: bool = gsgp_solve_parameters["reconstruct"], fitness_function: str = gsgp_solve_parameters["ffunction"], initializer: str = gsgp_parameters["initializer"], minimization: bool = True, prob_const: float = gsgp_pi_init["p_c"], tree_functions: list = list(FUNCTIONS.keys()), tree_constants: list = [float(key.replace("constant_", "").replace("_", "-")) for key in CONSTANTS], n_jobs: int = gsgp_solve_parameters["n_jobs"], tournament_size: int = 2, test_elite: bool = gsgp_solve_parameters["test_elite"]): """ Main function to execute the Standard GSGP algorithm on specified datasets Parameters ---------- X_train: (torch.Tensor) Training input data. y_train: (torch.Tensor) Training output data. X_test: (torch.Tensor), optional Testing input data. y_test: (torch.Tensor), optional Testing output data. dataset_name : str, optional Dataset name, for logging purposes pop_size : int, optional The population size for the genetic programming algorithm (default is 100). n_iter : int, optional The number of iterations for the genetic programming algorithm (default is 100). p_xo : float, optional The probability of crossover in the genetic programming algorithm. Must be a number between 0 and 1 (default is 0.8). elitism : bool, optional Indicate the presence or absence of elitism. n_elites : int, optional The number of elites. init_depth : int, optional The depth value for the initial GP trees population. ms_lower : float, optional Lower bound for mutation rates (default is 0). ms_upper : float, optional Upper bound for mutation rates (default is 1). log_path : str, optional The path where is created the log directory where results are saved. seed : int, optional Seed for the randomness log_level : int, optional Level of detail to utilize in logging. verbose : int, optional Level of detail to include in console output. reconstruct: bool, optional Whether to store the structure of individuals. More computationally expensive, but allows usage outside the algorithm. minimization : bool, optional If True, the objective is to minimize the fitness function. If False, maximize it (default is True). fitness_function : str, optional The fitness function used for evaluating individuals (default is from gp_solve_parameters). initializer : str, optional The strategy for initializing the population (e.g., "grow", "full", "rhh"). n_jobs : int, optional Number of parallel jobs to run (default is 1). prob_const : float, optional The probability of a constant being chosen rather than a terminal in trees creation (default: 0.2). tree_functions : list, optional List of allowed functions that can appear in the trees. Check documentation for the available functions. tree_constants : list, optional List of constants allowed to appear in the trees. tournament_size : int, optional Tournament size to utilize during selection. Only applicable if using tournament selection. (Default is 2) test_elite : bool, optional Whether to test the elite individual on the test set after each generation. Returns ------- Tree Returns the best individual at the last generation. """ # ================================ # Input Validation # ================================ # Setting the log_path if log_path is None: log_path = os.path.join(os.getcwd(), "log", "gsgp.csv") validate_inputs(X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, pop_size=pop_size, n_iter=n_iter, elitism=elitism, n_elites=n_elites, init_depth=init_depth, log_path=log_path, prob_const=prob_const, tree_functions=tree_functions, tree_constants=tree_constants, log=log_level, verbose=verbose, minimization=minimization, n_jobs=n_jobs, test_elite=test_elite, fitness_function=fitness_function, initializer=initializer, tournament_size=tournament_size) if test_elite and (X_test is None or y_test is None): warnings.warn("If test_elite is True, a test dataset must be provided. test_elite has been set to False") test_elite = False if dataset_name is None: warnings.warn("No dataset name set. Using default value of dataset_1.") dataset_name = "dataset_1" # Checking that both ms bounds are numerical assert isinstance(ms_lower, (int, float)) and isinstance(ms_upper, (int, float)), \ "Both ms_lower and ms_upper must be either int or float" # If so, create the ms callable ms = generate_random_uniform(ms_lower, ms_upper) # assuring the p_xo is valid assert 0 <= p_xo <= 1, "p_xo must be a number between 0 and 1" # creating a list with the valid available fitness functions valid_fitnesses = list(fitness_function_options) # assuring the chosen fitness_function is valid assert fitness_function.lower() in fitness_function_options.keys(), \ "fitness function must be: "+f"{', '.join(valid_fitnesses[:-1])} or {valid_fitnesses[-1]}"\ if len(valid_fitnesses) > 1 else valid_fitnesses[0] # creating a list with the valid available initializers valid_initializers = list(initializer_options) # assuring the chosen initializer is valid assert initializer.lower() in initializer_options.keys(), \ "initializer must be " + f"{', '.join(valid_initializers[:-1])} or {valid_initializers[-1]}" \ if len(valid_initializers) > 1 else valid_initializers[0] # ================================ # Parameter Definition # ================================ # setting the number of elites to 0 if no elitism is used if not elitism: n_elites = 0 # getting a unique run id for the settings logging unique_run_id = uuid.uuid1() # setting the algorithm name to standard gsgp for logging algo_name = "StandardGSGP" # *************** GSGP_PI_INIT *************** # getting the terminals based on the training data TERMINALS = get_terminals(X_train) gsgp_pi_init["TERMINALS"] = TERMINALS try: gsgp_pi_init["FUNCTIONS"] = {key: FUNCTIONS[key] for key in tree_functions} except KeyError as e: valid_functions = list(FUNCTIONS) raise KeyError( "The available tree functions are: " + f"{', '.join(valid_functions[:-1])} or "f"{valid_functions[-1]}" if len(valid_functions) > 1 else valid_functions[0]) try: gsgp_pi_init['CONSTANTS'] = {f"constant_{str(n).replace('-', '_')}": lambda _, num=n: torch.tensor(num) for n in tree_constants} except KeyError as e: valid_constants = list(CONSTANTS) raise KeyError( "The available tree constants are: " + f"{', '.join(valid_constants[:-1])} or "f"{valid_constants[-1]}" if len(valid_constants) > 1 else valid_constants[0]) # setting up the configuration dictionaries based on the user given input gsgp_pi_init["init_pop_size"] = pop_size gsgp_pi_init["init_depth"] = init_depth gsgp_pi_init["p_c"] = prob_const # *************** GSGP_PARAMETERS *************** gsgp_parameters["p_xo"] = p_xo gsgp_parameters["p_m"] = 1 - gsgp_parameters["p_xo"] gsgp_parameters["pop_size"] = pop_size gsgp_parameters["ms"] = ms gsgp_parameters["seed"] = seed gsgp_parameters["initializer"] = initializer_options[initializer] if minimization: gsgp_parameters["selector"] = tournament_selection_min(tournament_size) gsgp_parameters["find_elit_func"] = get_best_min else: gsgp_parameters["selector"] = tournament_selection_max(tournament_size) gsgp_parameters["find_elit_func"] = get_best_max # *************** GSGP_SOLVE_PARAMETERS *************** # setting up the information of the run, for logging purposes gsgp_solve_parameters["run_info"] = [algo_name, unique_run_id, dataset_name] gsgp_solve_parameters["n_iter"] = n_iter gsgp_solve_parameters["log_path"] = log_path gsgp_solve_parameters["elitism"] = elitism gsgp_solve_parameters["n_elites"] = n_elites gsgp_solve_parameters["n_jobs"] = n_jobs gsgp_solve_parameters["test_elite"] = test_elite gsgp_solve_parameters["log"] = log_level gsgp_solve_parameters["verbose"] = verbose gsgp_solve_parameters["reconstruct"] = reconstruct gsgp_solve_parameters["ffunction"] = fitness_function_options[fitness_function] # ================================ # Running the Algorithm # ================================ optimizer = GSGP(pi_init=gsgp_pi_init, **gsgp_parameters) optimizer.solve( X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test, curr_dataset=dataset_name, **gsgp_solve_parameters, ) log_settings( path=log_path[:-4] + "_settings.csv", settings_dict=[gsgp_solve_parameters, gsgp_parameters, gsgp_pi_init, settings_dict], unique_run_id=unique_run_id, ) return optimizer.elite
if __name__ == "__main__": from slim_gsgp.datasets.data_loader import load_resid_build_sale_price from slim_gsgp.utils.utils import train_test_split X, y = load_resid_build_sale_price(X_y=True) X_train, X_test, y_train, y_test = train_test_split(X, y, p_test=0.4) X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, p_test=0.5) final_tree = gsgp(X_train=X_train, y_train=y_train, X_test=X_val, y_test=y_val, dataset_name='resid_build_sale_price', pop_size=100, n_iter=1000, log_path=os.path.join(os.getcwd(), "log", f"TESTING_GSGP.csv"), fitness_function="rmse", n_jobs=2) predictions = final_tree.predict(X_test) print(float(rmse(y_true=y_test, y_pred=predictions)))