# MIT License
#
# Copyright (c) 2024 DALabNOVA
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
SLIM_GSGP Class for Evolutionary Computation using PyTorch.
"""
import random
import time
import numpy as np
import torch
from slim_gsgp.algorithms.GP.representations.tree import Tree as GP_Tree
from slim_gsgp.algorithms.GSGP.representations.tree import Tree
from slim_gsgp.algorithms.SLIM_GSGP.representations.individual import Individual
from slim_gsgp.algorithms.SLIM_GSGP.representations.population import Population
from slim_gsgp.utils.diversity import gsgp_pop_div_from_vectors
from slim_gsgp.utils.logger import logger
from slim_gsgp.utils.utils import verbose_reporter
[docs]
class SLIM_GSGP:
def __init__(
self,
pi_init,
initializer,
selector,
inflate_mutator,
deflate_mutator,
ms,
crossover,
find_elit_func,
p_m=1,
p_xo=0,
p_inflate=0.3,
p_deflate=0.7,
pop_size=100,
seed=0,
operator="sum",
copy_parent=True,
two_trees=True,
settings_dict=None,
):
"""
Initialize the SLIM_GSGP algorithm with given parameters.
Parameters
----------
pi_init : dict
Dictionary with all the parameters needed for candidate solutions initialization.
initializer : Callable
Function to initialize the population.
selector : Callable
Function to select individuals.
inflate_mutator : Callable
Function for inflate mutation.
deflate_mutator : Callable
Function for deflate mutation.
ms : Callable
Mutation step function.
crossover : Callable
Crossover function.
find_elit_func : Callable
Function to find elite individuals.
p_m : float
Probability of mutation. Default is 1.
p_xo : float
Probability of crossover. Default is 0.
p_inflate : float
Probability of inflate mutation. Default is 0.3.
p_deflate : float
Probability of deflate mutation. Default is 0.7.
pop_size : int
Size of the population. Default is 100.
seed : int
Random seed for reproducibility. Default is 0.
operator : {'sum', 'prod'}
Operator to apply to the semantics, either "sum" or "prod". Default is "sum".
copy_parent : bool
Whether to copy the parent when mutation is not possible. Default is True.
two_trees : bool
Indicates if two trees are used. Default is True.
settings_dict : dict
Additional settings passed as a dictionary.
"""
self.pi_init = pi_init
self.selector = selector
self.p_m = p_m
self.p_inflate = p_inflate
self.p_deflate = p_deflate
self.crossover = crossover
self.inflate_mutator = inflate_mutator
self.deflate_mutator = deflate_mutator
self.ms = ms
self.p_xo = p_xo
self.initializer = initializer
self.pop_size = pop_size
self.seed = seed
self.operator = operator
self.copy_parent = copy_parent
self.two_trees = two_trees
self.settings_dict = settings_dict
self.find_elit_func = find_elit_func
Tree.FUNCTIONS = pi_init["FUNCTIONS"]
Tree.TERMINALS = pi_init["TERMINALS"]
Tree.CONSTANTS = pi_init["CONSTANTS"]
GP_Tree.FUNCTIONS = pi_init["FUNCTIONS"]
GP_Tree.TERMINALS = pi_init["TERMINALS"]
GP_Tree.CONSTANTS = pi_init["CONSTANTS"]
[docs]
def solve(
self,
X_train,
X_test,
y_train,
y_test,
curr_dataset,
run_info,
n_iter=20,
elitism=True,
log=0,
verbose=0,
test_elite=False,
log_path=None,
ffunction=None,
max_depth=17,
n_elites=1,
reconstruct=True,
n_jobs=1):
"""
Solve the optimization problem using SLIM_GSGP.
Parameters
----------
X_train : array-like
Training input data.
X_test : array-like
Testing input data.
y_train : array-like
Training output data.
y_test : array-like
Testing output data.
curr_dataset : str or int
Identifier for the current dataset.
run_info : dict
Information about the current run.
n_iter : int
Number of iterations. Default is 20.
elitism : bool
Whether elitism is used during evolution. Default is True.
log : int or str
Logging level (e.g., 0 for no logging, 1 for basic, etc.). Default is 0.
verbose : int
Verbosity level for logging outputs. Default is 0.
test_elite : bool
Whether elite individuals should be tested. Default is False.
log_path : str
File path for saving log outputs. Default is None.
ffunction : function
Fitness function used to evaluate individuals. Default is None.
max_depth : int
Maximum depth for the trees. Default is 17.
n_elites : int
Number of elite individuals to retain during selection. Default is True.
reconstruct : bool
Indicates if reconstruction of the solution is needed. Default is True.
n_jobs : int
Maximum number of concurrently running jobs for joblib parallelization. Default is 1.
"""
if test_elite and (X_test is None or y_test is None):
raise Exception('If test_elite is True you need to provide a test dataset')
# setting the seeds
torch.manual_seed(self.seed)
np.random.seed(self.seed)
random.seed(self.seed)
# starting time count
start = time.time()
# creating the initial population
population = Population(
[
Individual(
collection=[
Tree(
tree,
train_semantics=None,
test_semantics=None,
reconstruct=True,
)
],
train_semantics=None,
test_semantics=None,
reconstruct=True,
)
for tree in self.initializer(**self.pi_init)
]
)
# calculating initial population semantics
population.calculate_semantics(X_train)
# evaluating the initial population
population.evaluate(ffunction, y=y_train, operator=self.operator, n_jobs=n_jobs)
end = time.time()
# setting up the elite(s)
self.elites, self.elite = self.find_elit_func(population, n_elites)
# calculating the testing semantics and the elite's testing fitness if test_elite is true
if test_elite:
population.calculate_semantics(X_test, testing=True)
self.elite.evaluate(
ffunction, y=y_test, testing=True, operator=self.operator
)
# logging the results based on the log level
if log != 0:
if log == 2:
gen_diversity = (
gsgp_pop_div_from_vectors(
torch.stack(
[
torch.sum(ind.train_semantics, dim=0)
for ind in population.population
]
),
)
if self.operator == "sum"
else gsgp_pop_div_from_vectors(
torch.stack(
[
torch.prod(ind.train_semantics, dim=0)
for ind in population.population
]
)
)
)
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
float(gen_diversity),
np.std(population.fit),
log,
]
elif log == 3:
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
" ".join([str(ind.nodes_count) for ind in population.population]),
" ".join([str(f) for f in population.fit]),
log,
]
elif log == 4:
gen_diversity = (
gsgp_pop_div_from_vectors(
torch.stack(
[
torch.sum(ind.train_semantics, dim=0)
for ind in population.population
]
),
)
if self.operator == "sum"
else gsgp_pop_div_from_vectors(
torch.stack(
[
torch.prod(ind.train_semantics, dim=0)
for ind in population.population
]
)
)
)
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
float(gen_diversity),
np.std(population.fit),
" ".join([str(ind.nodes_count) for ind in population.population]),
" ".join([str(f) for f in population.fit]),
log,
]
else:
add_info = [self.elite.test_fitness, self.elite.nodes_count, log]
logger(
log_path,
0,
self.elite.fitness,
end - start,
float(population.nodes_count),
additional_infos=add_info,
run_info=run_info,
seed=self.seed,
)
# displaying the results on console if verbose level is more than 0
if verbose != 0:
verbose_reporter(
curr_dataset,
0,
self.elite.fitness,
self.elite.test_fitness,
end - start,
self.elite.nodes_count,
)
# begining the evolution process
for it in range(1, n_iter + 1, 1):
# starting an empty offspring population
offs_pop, start = [], time.time()
# adding the elite to the offspring population, if applicable
if elitism:
offs_pop.extend(self.elites)
# filling the offspring population
while len(offs_pop) < self.pop_size:
# choosing between crossover and mutation
if random.random() < self.p_xo:
p1, p2 = self.selector(population), self.selector(population)
while p1 == p2:
# choosing parents
p1, p2 = self.selector(population), self.selector(population)
pass # future work on slim_gsgp implementations should invent crossover
else:
# so, mutation was selected. Now deflation or inflation is selected.
if random.random() < self.p_deflate:
# selecting the parent to deflate
p1 = self.selector(population)
# if the parent has only one block, it cannot be deflated
if p1.size == 1:
# if copy parent is set to true, the parent who cannot be deflated will be copied as the offspring
if self.copy_parent:
off1 = Individual(
collection=p1.collection if reconstruct else None,
train_semantics=p1.train_semantics,
test_semantics=p1.test_semantics,
reconstruct=reconstruct,
)
(
off1.nodes_collection,
off1.nodes_count,
off1.depth_collection,
off1.depth,
off1.size,
) = (
p1.nodes_collection,
p1.nodes_count,
p1.depth_collection,
p1.depth,
p1.size,
)
else:
# if we choose to not copy the parent, we inflate it instead
ms_ = self.ms()
off1 = self.inflate_mutator(
p1,
ms_,
X_train,
max_depth=self.pi_init["init_depth"],
p_c=self.pi_init["p_c"],
X_test=X_test,
reconstruct=reconstruct,
)
else:
# if the size of the parent is more than 1, normal deflation can occur
off1 = self.deflate_mutator(p1, reconstruct=reconstruct)
# inflation mutation was selected
else:
# selecting a parent to inflate
p1 = self.selector(population)
# determining the random mutation step
ms_ = self.ms()
# if the chosen parent is already at maximum depth and therefore cannot be inflated
if max_depth is not None and p1.depth == max_depth:
# if copy parent is set to true, the parent who cannot be inflated will be copied as the offspring
if self.copy_parent:
off1 = Individual(
collection=p1.collection if reconstruct else None,
train_semantics=p1.train_semantics,
test_semantics=p1.test_semantics,
reconstruct=reconstruct,
)
(
off1.nodes_collection,
off1.nodes_count,
off1.depth_collection,
off1.depth,
off1.size,
) = (
p1.nodes_collection,
p1.nodes_count,
p1.depth_collection,
p1.depth,
p1.size,
)
# if copy parent is false, the parent is deflated instead of inflated
else:
off1 = self.deflate_mutator(p1, reconstruct=reconstruct)
# so the chosen individual can be normally inflated
else:
off1 = self.inflate_mutator(
p1,
ms_,
X_train,
max_depth=self.pi_init["init_depth"],
p_c=self.pi_init["p_c"],
X_test=X_test,
reconstruct=reconstruct,
)
# if offspring resulting from inflation exceedes the max depth
if max_depth is not None and off1.depth > max_depth:
# if copy parent is set to true, the offspring is discarded and the parent is chosen instead
if self.copy_parent:
off1 = Individual(
collection=p1.collection if reconstruct else None,
train_semantics=p1.train_semantics,
test_semantics=p1.test_semantics,
reconstruct=reconstruct,
)
(
off1.nodes_collection,
off1.nodes_count,
off1.depth_collection,
off1.depth,
off1.size,
) = (
p1.nodes_collection,
p1.nodes_count,
p1.depth_collection,
p1.depth,
p1.size,
)
else:
# otherwise, deflate the parent
off1 = self.deflate_mutator(p1, reconstruct=reconstruct)
# adding the new offspring to the offspring population
offs_pop.append(off1)
# removing any excess individuals from the offspring population
if len(offs_pop) > population.size:
offs_pop = offs_pop[: population.size]
# turning the offspring population into a Population
offs_pop = Population(offs_pop)
# calculating the offspring population semantics
offs_pop.calculate_semantics(X_train)
# evaluating the offspring population
offs_pop.evaluate(ffunction, y=y_train, operator=self.operator, n_jobs=n_jobs)
# replacing the current population with the offspring population P = P'
population = offs_pop
self.population = population
end = time.time()
# setting the new elite(s)
self.elites, self.elite = self.find_elit_func(population, n_elites)
# calculating the testing semantics and the elite's testing fitness if test_elite is true
if test_elite:
self.elite.calculate_semantics(X_test, testing=True)
self.elite.evaluate(
ffunction, y=y_test, testing=True, operator=self.operator
)
# logging the results based on the log level
if log != 0:
if log == 2:
gen_diversity = (
gsgp_pop_div_from_vectors(
torch.stack(
[
torch.sum(ind.train_semantics, dim=0)
for ind in population.population
]
),
)
if self.operator == "sum"
else gsgp_pop_div_from_vectors(
torch.stack(
[
torch.prod(ind.train_semantics, dim=0)
for ind in population.population
]
)
)
)
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
float(gen_diversity),
np.std(population.fit),
log,
]
elif log == 3:
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
" ".join(
[str(ind.nodes_count) for ind in population.population]
),
" ".join([str(f) for f in population.fit]),
log,
]
elif log == 4:
gen_diversity = (
gsgp_pop_div_from_vectors(
torch.stack(
[
torch.sum(ind.train_semantics, dim=0)
for ind in population.population
]
),
)
if self.operator == "sum"
else gsgp_pop_div_from_vectors(
torch.stack(
[
torch.prod(ind.train_semantics, dim=0)
for ind in population.population
]
)
)
)
add_info = [
self.elite.test_fitness,
self.elite.nodes_count,
float(gen_diversity),
np.std(population.fit),
" ".join(
[str(ind.nodes_count) for ind in population.population]
),
" ".join([str(f) for f in population.fit]),
log,
]
else:
add_info = [self.elite.test_fitness, self.elite.nodes_count, log]
logger(
log_path,
it,
self.elite.fitness,
end - start,
float(population.nodes_count),
additional_infos=add_info,
run_info=run_info,
seed=self.seed,
)
# displaying the results on console if verbose level is more than 0
if verbose != 0:
verbose_reporter(
run_info[-1],
it,
self.elite.fitness,
self.elite.test_fitness,
end - start,
self.elite.nodes_count,
)