# MIT License
#
# Copyright (c) 2024 DALabNOVA
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import math
import random
import numpy as np
import torch
from slim_gsgp.algorithms.GP.representations.tree_utils import (create_full_random_tree,
create_grow_random_tree)
from slim_gsgp.algorithms.GSGP.representations.tree import Tree
from sklearn.metrics import root_mean_squared_error
[docs]
def protected_div(x1, x2):
"""Implements the division protected against zero denominator
Performs division between x1 and x2. If x2 is (or has) zero(s), the
function returns the numerator's value(s).
Parameters
----------
x1 : torch.Tensor
The numerator.
x2 : torch.Tensor
The denominator.
Returns
-------
torch.Tensor
Result of protected division between x1 and x2.
"""
return torch.where(
torch.abs(x2) > 0.001,
torch.div(x1, x2),
torch.tensor(1.0, dtype=x2.dtype, device=x2.device),
)
[docs]
def mean_(x1, x2):
"""
Compute the mean of two tensors.
Parameters
----------
x1 : torch.Tensor
The first tensor.
x2 : torch.Tensor
The second tensor.
Returns
-------
torch.Tensor
The mean of the two tensors.
"""
return torch.div(torch.add(x1, x2), 2)
[docs]
def train_test_split(X, y, p_test=0.3, shuffle=True, indices_only=False, seed=0):
"""Splits X and y tensors into train and test subsets
This method replicates the behaviour of Sklearn's 'train_test_split'.
Parameters
----------
X : torch.Tensor
Input data instances,
y : torch.Tensor
Target vector.
p_test : float (default=0.3)
The proportion of the dataset to include in the test split.
shuffle : bool (default=True)
Whether to shuffle the data before splitting.
indices_only : bool (default=False)
Whether to return only the indices representing training and test partition.
seed : int (default=0)
The seed for random numbers generators.
Returns
-------
X_train : torch.Tensor
Training data instances.
y_train : torch.Tensor
Training target vector.
X_test : torch.Tensor
Test data instances.
y_test : torch.Tensor
Test target vector.
train_indices : torch.Tensor
Indices representing the training partition.
test_indices : torch.Tensor
Indices representing the test partition.
"""
torch.manual_seed(seed)
if shuffle:
indices = torch.randperm(X.shape[0])
else:
indices = torch.arange(0, X.shape[0], 1)
split = int(math.floor(p_test * X.shape[0]))
train_indices, test_indices = indices[split:], indices[:split]
if indices_only:
return train_indices, test_indices
else:
X_train, X_test = X[train_indices], X[test_indices]
y_train, y_test = y[train_indices], y[test_indices]
return X_train, X_test, y_train, y_test
[docs]
def tensor_dimensioned_sum(dim):
"""
Generate a sum function over a specified dimension.
Parameters
----------
dim : int
The dimension to sum over.
Returns
-------
function
A function that sums tensors over the specified dimension.
"""
def tensor_sum(input):
return torch.sum(input, dim)
return tensor_sum
[docs]
def verbose_reporter(
dataset, generation, pop_val_fitness, pop_test_fitness, timing, nodes
):
"""
Prints a formatted report of generation, fitness values, timing, and node count.
Parameters
----------
generation : int
Current generation number.
pop_val_fitness : float
Population's validation fitness value.
pop_test_fitness : float
Population's test fitness value.
timing : float
Time taken for the process.
nodes : int
Count of nodes in the population.
Returns
-------
None
Outputs a formatted report to the console.
"""
digits_dataset = len(str(dataset))
digits_generation = len(str(generation))
digits_val_fit = len(str(float(pop_val_fitness)))
if pop_test_fitness is not None:
digits_test_fit = len(str(float(pop_test_fitness)))
test_text_init = (
"|"
+ " " * 3
+ str(float(pop_test_fitness))
+ " " * (23 - digits_test_fit)
+ "|"
)
test_text = (
" " * 3 + str(float(pop_test_fitness)) + " " * (23 - digits_test_fit) + "|"
)
else:
digits_test_fit = 4
test_text_init = "|" + " " * 3 + "None" + " " * (23 - digits_test_fit) + "|"
test_text = " " * 3 + "None" + " " * (23 - digits_test_fit) + "|"
digits_timing = len(str(timing))
digits_nodes = len(str(nodes))
if generation == 0:
print("Verbose Reporter")
print(
"-----------------------------------------------------------------------------------------------------------------------------------------"
)
print(
"| Dataset | Generation | Train Fitness | Test Fitness | "
"Timing | Nodes |"
)
print(
"-----------------------------------------------------------------------------------------------------------------------------------------"
)
print(
"|"
+ " " * 5
+ str(dataset)
+ " " * (20 - digits_dataset)
+ "|"
+ " " * 7
+ str(generation)
+ " " * (7 - digits_generation)
+ "|"
+ " " * 3
+ str(float(pop_val_fitness))
+ " " * (20 - digits_val_fit)
+ test_text_init
+ " " * 3
+ str(timing)
+ " " * (21 - digits_timing)
+ "|"
+ " " * 6
+ str(nodes)
+ " " * (12 - digits_nodes)
+ "|"
)
else:
print(
"|"
+ " " * 5
+ str(dataset)
+ " " * (20 - digits_dataset)
+ "|"
+ " " * 7
+ str(generation)
+ " " * (7 - digits_generation)
+ "|"
+ " " * 3
+ str(float(pop_val_fitness))
+ " " * (20 - digits_val_fit)
+ "|"
+ test_text
+ " " * 3
+ str(timing)
+ " " * (21 - digits_timing)
+ "|"
+ " " * 6
+ str(nodes)
+ " " * (12 - digits_nodes)
+ "|"
)
[docs]
def get_terminals(X):
"""
Get terminal nodes for a dataset.
Parameters
----------
X : (torch.Tensor)
An array to get the set of TERMINALS from, it will correspond to the columns.
Returns
-------
dict
Dictionary of terminal nodes.
"""
return {f"x{i}": i for i in range(len(X[0]))}
[docs]
def get_best_min(population, n_elites):
"""
Get the best individuals from the population with the minimum fitness.
Parameters
----------
population : Population
The population of individuals.
n_elites : int
Number of elites to return.
Returns
-------
list
The list of elite individuals.
Individual
Best individual from the elites.
"""
if n_elites > 1:
idx = np.argpartition(population.fit, n_elites)
elites = [population.population[i] for i in idx[:n_elites]]
return elites, elites[np.argmin([elite.fitness for elite in elites])]
else:
elite = population.population[np.argmin(population.fit)]
return [elite], elite
[docs]
def get_best_max(population, n_elites):
"""
Get the best individuals from the population with the maximum fitness.
Parameters
----------
population : Population
The population of individuals.
n_elites : int
Number of elites to return.
Returns
-------
list
The list of elite individuals.
Individual
Best individual from the elites.
"""
if n_elites > 1:
idx = np.argpartition(population.fit, -n_elites)
elites = [population.population[i] for i in idx[-n_elites:]]
return elites, elites[np.argmax([elite.fitness for elite in elites])]
else:
elite = population.population[np.argmax(population.fit)]
return [elite], elite
[docs]
def get_random_tree(
max_depth,
FUNCTIONS,
TERMINALS,
CONSTANTS,
inputs,
p_c=0.3,
grow_probability=1,
logistic=True,
):
"""
Get a random tree using either grow or full method.
Parameters
----------
max_depth : int
Maximum depth of the tree.
FUNCTIONS : dict
Dictionary of functions.
TERMINALS : dict
Dictionary of terminals.
CONSTANTS : dict
Dictionary of constants.
inputs : torch.Tensor
Input tensor for calculating semantics.
p_c : float, default=0.3
Probability of choosing a constant.
grow_probability : float, default=1
Probability of using the grow method.
logistic : bool, default=True
Whether to use logistic semantics.
Returns
-------
Tree
The generated random tree.
"""
if random.random() < grow_probability:
tree_structure = create_grow_random_tree(
max_depth, FUNCTIONS, TERMINALS, CONSTANTS, p_c
)
else:
tree_structure = create_full_random_tree(
max_depth, FUNCTIONS, TERMINALS, CONSTANTS, p_c
)
tree = Tree(
structure=tree_structure,
train_semantics=None,
test_semantics=None,
reconstruct=True,
)
tree.calculate_semantics(inputs, testing=False, logistic=logistic)
return tree
[docs]
def show_individual(tree, operator):
"""
Display an individual's structure with a specified operator.
Parameters
----------
tree : Tree
The tree representing the individual.
operator : str
The operator to display ('sum' or 'prod').
Returns
-------
str
The string representation of the individual's structure.
"""
op = "+" if operator == "sum" else "*"
return f" {op} ".join(
[
(
str(t.structure)
if isinstance(t.structure, tuple)
else (
f"f({t.structure[1].structure})"
if len(t.structure) == 3
else f"f({t.structure[1].structure} - {t.structure[2].structure})"
)
)
for t in tree.collection
]
)
[docs]
def gs_rmse(y_true, y_pred):
"""
Calculate the root mean squared error.
Parameters
----------
y_true : array-like
True values.
y_pred : array-like
Predicted values.
Returns
-------
float
The root mean squared error.
"""
return root_mean_squared_error(y_true, y_pred[0])
[docs]
def gs_size(y_true, y_pred):
"""
Get the size of the predicted values.
Parameters
----------
y_true : array-like
True values.
y_pred : array-like
Predicted values.
Returns
-------
int
The size of the predicted values.
"""
return y_pred[1]
[docs]
def check_slim_version(slim_version):
"""
Validate the slim_gsgp version given as input bu the users and assign the correct values to the parameters op, sig and trees
Parameters
----------
slim_version : str
Name of the slim_gsgp version.
Returns
-------
op, sig, trees
Parameters reflecting the kind of operation considered, the use of the sigmoid and the use of multiple trees.
"""
if slim_version == "SLIM+SIG2":
return "sum", True, True
elif slim_version == "SLIM*SIG2":
return "mul", True, True
elif slim_version == "SLIM+ABS":
return "sum", False, False
elif slim_version == "SLIM*ABS":
return "mul", False, False
elif slim_version == "SLIM+SIG1":
return "sum", True, False
elif slim_version == "SLIM*SIG1":
return "mul", True, False
else:
raise Exception('Invalid SLIM configuration')
def _evaluate_slim_individual(individual, ffunction, y, testing=False, operator="sum"):
"""
Evaluate the individual using a fitness function.
Args:
ffunction: Fitness function to evaluate the individual.
y: Expected output (target) values as a torch tensor.
testing: Boolean indicating if the evaluation is for testing semantics.
operator: Operator to apply to the semantics ("sum" or "prod").
Returns:
None
"""
if operator == "sum":
operator = torch.sum
else:
operator = torch.prod
if testing:
individual.test_fitness = ffunction(
y,
torch.clamp(
operator(individual.test_semantics, dim=0),
-1000000000000.0,
1000000000000.0,
),
)
else:
individual.fitness = ffunction(
y,
torch.clamp(
operator(individual.train_semantics, dim=0),
-1000000000000.0,
1000000000000.0,
),
)
# if testing is false, return the value so that training parallelization has effect
return ffunction(
y,
torch.clamp(
operator(individual.train_semantics, dim=0),
-1000000000000.0,
1000000000000.0,
),
)