# MIT License
#
# Copyright (c) 2024 DALabNOVA
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Geometric Semantic Genetic Programming (GSGP) module.
"""
import random
import time
import numpy as np
import torch
from slim_gsgp.algorithms.GP.representations.tree import Tree as GP_Tree
from slim_gsgp.algorithms.GSGP.representations.population import Population
from slim_gsgp.algorithms.GSGP.representations.tree import Tree
from slim_gsgp.algorithms.GSGP.representations.tree_utils import (
nested_depth_calculator, nested_nodes_calculator)
from slim_gsgp.utils.diversity import gsgp_pop_div_from_vectors
from slim_gsgp.utils.logger import logger
from slim_gsgp.utils.utils import get_random_tree, verbose_reporter
[docs]
class GSGP:
"""
Geometric Semantic Genetic Programming class.
"""
def __init__(
self,
pi_init,
initializer,
selector,
mutator,
ms,
crossover,
find_elit_func,
p_m=0.8,
p_xo=0.2,
pop_size=100,
seed=0,
settings_dict=None,
):
"""
Initialize the GSGP algorithm.
Parameters
----------
pi_init : dict
Dictionary with all the parameters needed for candidate solutions initialization.
initializer : callable
Function to initialize the population.
selector : callable
Function to select individuals.
mutator : callable
Function to mutate individuals.
ms : callable
Function to determine mutation step.
crossover : callable
Function to perform crossover between individuals.
find_elit_func : callable
Function to find elite individuals.
p_m : float, optional
Probability of mutation. Defaults to 0.8.
p_xo : float, optional
Probability of crossover. Defaults to 0.2.
pop_size : int, optional
Size of the population. Defaults to 100.
seed : int, optional
Seed for random number generation. Defaults to 0.
settings_dict : dict, optional
Additional settings dictionary. Defaults to None.
"""
self.pi_init = pi_init
self.selector = selector
self.p_m = p_m
self.crossover = crossover
self.mutator = mutator
self.ms = ms
self.p_xo = p_xo
self.initializer = initializer
self.pop_size = pop_size
self.seed = seed
self.settings_dict = settings_dict
self.find_elit_func = find_elit_func
Tree.FUNCTIONS = pi_init["FUNCTIONS"]
Tree.TERMINALS = pi_init["TERMINALS"]
Tree.CONSTANTS = pi_init["CONSTANTS"]
GP_Tree.FUNCTIONS = pi_init["FUNCTIONS"]
GP_Tree.TERMINALS = pi_init["TERMINALS"]
GP_Tree.CONSTANTS = pi_init["CONSTANTS"]
[docs]
def solve(
self,
X_train,
X_test,
y_train,
y_test,
curr_dataset,
n_iter=20,
elitism=True,
log=0,
verbose=0,
test_elite=False,
log_path=None,
run_info=None,
ffunction=None,
reconstruct=False,
n_elites=1,
n_jobs=1
):
"""
Execute the GSGP algorithm.
Parameters
----------
X_train : torch.Tensor
Training data features.
X_test : torch.Tensor
Test data features.
y_train : torch.Tensor
Training data labels.
y_test : torch.Tensor
Test data labels.
curr_dataset : str
Current dataset name.
n_iter : int
Number of iterations.
elitism : bool
Whether to use elitism.
log : int
Logging level. Default is 0.
verbose : int
Verbosity level. Default is 0.
test_elite : bool
Whether to evaluate elite individuals on test data. Default is False.
log_path : str
Path to save logs. Default is None.
run_info : list
Information about the current run. Default is None.
ffunction : callable
Fitness function. Default is None.
reconstruct : bool
Whether to reconstruct trees. Default is False.
n_elites : int
Number of elites. Default is 1.
n_jobs : int
The maximum number of jobs for parallel processing. Default is 1.
"""
# setting the seeds
torch.manual_seed(self.seed)
np.random.seed(self.seed)
random.seed(self.seed)
# starting the time computation
start = time.time()
# creating the initial population
population = Population(
[
Tree(
structure=tree,
train_semantics=None,
test_semantics=None,
reconstruct=True,
)
for tree in self.initializer(**self.pi_init)
]
)
# calculating the semantics for the initial population
population.calculate_semantics(X_train)
# calculating the testing semantics, if applicable
if test_elite:
population.calculate_semantics(X_test, testing=True)
# evaluating the initial population
population.evaluate(ffunction, y=y_train, n_jobs=n_jobs)
end = time.time()
# obtaining the elite(s)
self.elites, self.elite = self.find_elit_func(population, n_elites)
# testing the elite, if applicable
if test_elite:
self.elite.evaluate(ffunction, y=y_test, testing=True)
# logging the intial results, if the result level is != 0
if log != 0:
if log == 2:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
gsgp_pop_div_from_vectors(
torch.stack(
[
(
ind.train_semantics
if ind.train_semantics.shape != torch.Size([])
else ind.train_semantics.repeat(len(X_train))
)
for ind in population.population
]
)
),
np.std(population.fit),
log,
]
elif log == 3:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
" ".join([str(ind.nodes_count) for ind in population.population]),
" ".join([str(f) for f in population.fit]),
log,
]
elif log == 4:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
gsgp_pop_div_from_vectors(
torch.stack(
[
(
ind.train_semantics
if ind.train_semantics.shape != torch.Size([])
else ind.train_semantics.repeat(len(X_train))
)
for ind in population.population
]
)
),
np.std(population.fit),
" ".join([str(ind.nodes) for ind in population.population]),
" ".join([str(f) for f in population.fit]),
log,
]
else:
add_info = [self.elite.test_fitness, self.elite.nodes, log]
# logging the results
logger(
log_path,
0,
self.elite.fitness,
end - start,
float(population.nodes_count),
additional_infos=add_info,
run_info=run_info,
seed=self.seed,
)
# displaying the results on console, if applicable
if verbose != 0:
verbose_reporter(
curr_dataset,
0,
self.elite.fitness,
self.elite.test_fitness,
end - start,
self.elite.nodes,
)
# EVOLUTIONARY PROCESS
for it in range(1, n_iter + 1, 1):
# creating the empty offpsring population
offs_pop, start = [], time.time()
# adding the elite(s) to the offspring population
if elitism:
offs_pop.extend(self.elites)
# filling the offspring population
while len(offs_pop) < self.pop_size:
# if choosing xover
if random.random() < self.p_xo:
p1, p2 = self.selector(population), self.selector(population)
# assuring the parents are different
while p1 == p2:
p1, p2 = self.selector(population), self.selector(population)
# getting the random tree
r_tree = get_random_tree(
max_depth=self.pi_init["init_depth"],
FUNCTIONS=self.pi_init["FUNCTIONS"],
TERMINALS=self.pi_init["TERMINALS"],
CONSTANTS=self.pi_init["CONSTANTS"],
inputs=X_train,
logistic=True,
p_c=self.pi_init["p_c"],
)
# getting the testing semantics of the random trees, if applicable
if test_elite:
r_tree.calculate_semantics(X_test, testing=True, logistic=True)
# getting the offspring
offs1 = Tree(
structure=(
[self.crossover, p1, p2, r_tree] if reconstruct else None
),
train_semantics=self.crossover(p1, p2, r_tree, testing=False),
test_semantics=(
self.crossover(p1, p2, r_tree, testing=True)
if test_elite
else None
),
reconstruct=reconstruct,
)
# if reconstruct = False, calculate the nodes and the depth of the offspring
if not reconstruct:
offs1.nodes = nested_nodes_calculator(
self.crossover, [p1.nodes, p2.nodes, r_tree.nodes]
)
offs1.depth = nested_depth_calculator(
self.crossover, [p1.depth, p2.depth, r_tree.depth]
)
# adding the offspring to the offspring population
offs_pop.append(offs1)
# if mutation
else:
# selecting the parent
p1 = self.selector(population)
# determining the mutation step
ms_ = self.ms()
# seeing if two random trees are needed or only one
if self.mutator.__name__ in [
"standard_geometric_mutation",
"product_two_trees_geometric_mutation",
]:
r_tree1 = get_random_tree(
max_depth=self.pi_init["init_depth"],
FUNCTIONS=self.pi_init["FUNCTIONS"],
TERMINALS=self.pi_init["TERMINALS"],
CONSTANTS=self.pi_init["CONSTANTS"],
inputs=X_train,
p_c=self.pi_init["p_c"],
)
r_tree2 = get_random_tree(
max_depth=self.pi_init["init_depth"],
FUNCTIONS=self.pi_init["FUNCTIONS"],
TERMINALS=self.pi_init["TERMINALS"],
CONSTANTS=self.pi_init["CONSTANTS"],
inputs=X_train,
p_c=self.pi_init["p_c"],
)
# storing the mutation trees
mutation_trees = [r_tree1, r_tree2]
# calculating the testing semantics of the random tree(s), if applicable
if test_elite:
[
rt.calculate_semantics(
X_test, testing=True, logistic=True
)
for rt in mutation_trees
]
# getting only one random tree
else:
r_tree1 = get_random_tree(
max_depth=self.pi_init["init_depth"],
FUNCTIONS=self.pi_init["FUNCTIONS"],
TERMINALS=self.pi_init["TERMINALS"],
CONSTANTS=self.pi_init["CONSTANTS"],
inputs=X_train,
logistic=False,
p_c=self.pi_init["p_c"],
)
# storing the mutation tree
mutation_trees = [r_tree1]
# calculating the testing semantics of the random tree, if applicable
if test_elite:
r_tree1.calculate_semantics(
X_test, testing=True, logistic=False
)
offs1 = Tree(
structure=(
[self.mutator, p1, *mutation_trees, ms_]
if reconstruct
else None
),
train_semantics=self.mutator(
p1, *mutation_trees, ms_, testing=False
),
test_semantics=(
self.mutator(p1, *mutation_trees, ms_, testing=True)
if test_elite
else None
),
reconstruct=reconstruct,
)
# adding the offspring to the population
offs_pop.append(offs1)
# if reconstruct = False, calculate the nodes and the depth of the offspring
if not reconstruct:
offs1.nodes = nested_nodes_calculator(
self.mutator,
[p1.nodes, *[rt.nodes for rt in mutation_trees]],
)
offs1.depth = nested_depth_calculator(
self.mutator,
[p1.depth, *[rt.depth for rt in mutation_trees]],
)
# making sure the offspring population contains population.size amount of individuals
if len(offs_pop) > population.size:
offs_pop = offs_pop[: population.size]
# turning the offspring population into a Population
offs_pop = Population(offs_pop)
# evaluating the offspring population
offs_pop.evaluate(ffunction, y=y_train, n_jobs=n_jobs)
# replacing the population with the offspring population (P = P')
population = offs_pop
end = time.time()
# replacing the elites
self.elites, self.elite = self.find_elit_func(population, n_elites)
# testing elite, if applicable
if test_elite:
self.elite.evaluate(ffunction, y=y_test, testing=True)
# logging the results if log !=0
if log != 0:
if log == 2:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
gsgp_pop_div_from_vectors(
torch.stack(
[
(
ind.train_semantics
if ind.train_semantics.shape != torch.Size([])
else ind.train_semantics.repeat(len(X_train))
)
for ind in population.population
]
)
),
np.std(population.fit),
log,
]
elif log == 3:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
" ".join(
[str(ind.nodes_count) for ind in population.population]
),
" ".join([str(f) for f in population.fit]),
log,
]
elif log == 4:
add_info = [
self.elite.test_fitness,
self.elite.nodes,
gsgp_pop_div_from_vectors(
torch.stack(
[
(
ind.train_semantics
if ind.train_semantics.shape != torch.Size([])
else ind.train_semantics.repeat(len(X_train))
)
for ind in population.population
]
)
),
np.std(population.fit),
" ".join([str(ind.nodes) for ind in population.population]),
" ".join([str(f) for f in population.fit]),
log,
]
else:
add_info = [self.elite.test_fitness, self.elite.nodes, log]
# logging the results
logger(
log_path,
it,
self.elite.fitness,
end - start,
float(population.nodes_count),
additional_infos=add_info,
run_info=run_info,
seed=self.seed,
)
# displaying the results on console, if applicable
if verbose != 0:
verbose_reporter(
run_info[-1],
it,
self.elite.fitness,
self.elite.test_fitness,
end - start,
self.elite.nodes,
)