Source code for desdeo_problem.problem.Problem

"""Desdeo-problem related definitions.

This file has following classes:
    ProblemError
    EvaluationResults
    ProblemBase
    ScalarMOProblem  -- to be deprecated
    ScalarDataProblem -- to be deprecated
    MOProblem
    DataProblem
    ExperimentalProblem
    IOPISProblem
    DiscreteDataProblem
"""
from abc import ABC, abstractmethod

# , TypedDict coming in py3.8
from functools import reduce
from operator import iadd
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union
from warnings import warn

import numpy as np
import pandas as pd

from desdeo_problem.problem.Constraint import ScalarConstraint
from desdeo_problem.problem.Objective import (
    VectorDataObjective,
    VectorObjective,
    _ScalarDataObjective,
    _ScalarObjective,
    ScalarObjective,
    ScalarDataObjective,
)
from desdeo_problem.surrogatemodels.SurrogateModels import BaseRegressor
from desdeo_problem.problem.Variable import Variable


[docs]class ProblemError(Exception): """Raised when an error related to the Problem class is encountered. """
# TODO consider replacing namedtuple with attr.s for validation purposes.
[docs]class EvaluationResults(NamedTuple): """The return object of <problem>.evaluate methods. Attributes: objectives (np.ndarray): The objective function values for each input vector. fitness (np.ndarray): Equal to objective values if objective is to be minimized. Multiplied by (-1) if objective to be maximized. constraints (Union[None, np.ndarray]): The constraint values of the problem corresponding each input vector. uncertainity (Union[None, np.ndarray]): The uncertainity in the objective values. """ objectives: np.ndarray fitness: np.ndarray constraints: Union[None, np.ndarray] = None uncertainity: Union[None, np.ndarray] = None def __str__(self): """Textual output of attributes. Returns: str: The textual output of attributes """ prnt_msg = ( "Evaluation Results Object \n" f"Objective values are: \n{self.objectives}\n" f"Constraint violation values are: \n{self.constraints}\n" f"Fitness values are: \n{self.fitness}\n" f"Uncertainity values are: \n{self.uncertainity}\n" ) return prnt_msg
[docs]class ProblemBase(ABC): """The base class for the problems. All other problem classes should be derived from this. Attributes: nadir (np.ndarray): Nadir values for the problem, initiated = None ideal (np.ndarray): Ideal values for the problem, initiated = None nadir_fitness (np.ndarray): Fitness values for nadir, initiated = None ideal_fitness (np.ndarray): Fitness values for ideal, initiated = None __n_of_objectives (int): Number of objectives, initiated = 0 __n_of_variables (int): Number of variables, initiated = 0 __decision_vectors (np.ndarray): Array of decision variable vectors, initiated = None __objective_vectors (np.ndarray): Array of objective variable vectors, initiated = None """ def __init__(self): self.nadir: np.ndarray = None self.ideal: np.ndarray = None self.nadir_fitness: np.ndarray = None self.ideal_fitness: np.ndarray = None self.__n_of_objectives: int = 0 self.__n_of_variables: int = 0 self.__decision_vectors: np.ndarray = None self.__objective_vectors: np.ndarray = None @property def n_of_objectives(self) -> int: """Property, returns the number of objectives. Returns: int: The number of objectives """ return self.__n_of_objectives @n_of_objectives.setter def n_of_objectives(self, val: int): """Setter, the number of objectives. Arguments: val (int): the number of objectives """ self.__n_of_objectives = val @property def n_of_variables(self) -> int: """Property, returns the number of variables. Returns: int: The number of variables """ return self.__n_of_variables @n_of_variables.setter def n_of_variables(self, val: int): """Setter, the number of variables. Arguments: val (int): the number of variables """ self.__n_of_variables = val @property def decision_vectors(self) -> np.ndarray: """Property, returns the decision variable vectors array. Returns: np.ndarray: decision vector array """ return self.__decision_vectors @decision_vectors.setter def decision_vectors(self, val: np.ndarray): """Setter, the decision variable vector array. Arguments: val (np.ndarray): the decision vector array """ self.__decision_vectors = val
[docs] @abstractmethod def get_variable_bounds(self) -> Union[None, np.ndarray]: """Abstract method to get variable bounds""" pass
[docs] @abstractmethod def evaluate( self, decision_vectors: np.ndarray, use_surrogate: bool = False ) -> EvaluationResults: """Abstract method to evaluate problem. Evaluates the problem using an ensemble of input vectors. Uses surrogate models if available. Otherwise, it uses the true evaluator. Arguments: decision_vectors (np.ndarray): An array of decision variable input vectors. use_surrogate (bool): A bool to control whether to use the true, potentially expensive function or a surrogate model to evaluate the objectives. Returns: (Dict): Dict with the following keys: 'objectives' (np.ndarray): The objective function values for each input vector. 'constraints' (Union[np.ndarray, None]): The constraint values of the problem corresponding each input vector. 'fitness' (np.ndarray): Equal to objective values if objective is to be minimized. Multiplied by (-1) if objective to be maximized. 'uncertainity' (Union[np.ndarray, None]): The uncertainity in the objective values. """
[docs] @abstractmethod def evaluate_constraint_values(self) -> Optional[np.ndarray]: """Abstract method to evaluate constraint values. Evaluate just the constraint function values using the attributes decision_vectors and objective_vectors Note: Currently not supported by ScalarMOProblem """
# TODO: Depreciate. Use MO problem in the future
[docs]class ScalarMOProblem(ProblemBase): """A multiobjective optimization problem. To be depreciated. A multiobjective optimization problem with user defined objective functions, constraints and variables. The objectives each return a single scalar. Arguments: objectives (List[ScalarObjective]): A list containing the objectives of the problem. variables (List[Variable]): A list containing the variables of the problem. constraints (List[ScalarConstraint]): A list containing the constraints of the problem. If no constraints exist, None may be supllied as the value. nadir (Optional[np.ndarray]): The nadir point of the problem. ideal (Optional[np.ndarray]): The ideal point of the problem. Attributes: __n_of_objectives (int): The number of objectives in the problem. __n_of_variables (int): The number of variables in the problem. __n_of_constraints (int): The number of constraints in the problem. __nadir (np.ndarray): The nadir point of the problem. __ideal (np.ndarray): The ideal point of the problem. __objectives (List[ScalarObjective]): A list containing the objectives of the problem. __constraints (List[ScalarConstraint]): A list conatining the constraints of the problem. Raises: ProblemError: Ill formed nadir and/or ideal vectors are supplied. """ def __init__( self, objectives: List[ScalarObjective], variables: List[Variable], constraints: List[ScalarConstraint], nadir: Optional[np.ndarray] = None, ideal: Optional[np.ndarray] = None, ) -> None: super().__init__() self.__objectives: List[ScalarObjective] = objectives self.__variables: List[Variable] = variables self.__constraints: List[ScalarConstraint] = constraints self.__n_of_objectives: int = len(self.objectives) self.__n_of_variables: int = len(self.variables) if self.constraints is not None: self.__n_of_constraints: int = len(self.constraints) else: self.__n_of_constraints = 0 # Nadir vector must be the same size as the number of objectives if nadir is not None: if len(nadir) != self.n_of_objectives: msg = ( "The length of the nadir vector does not match the" "number of objectives: Length nadir {}, number of " "objectives {}." ).format(len(nadir), self.n_of_objectives) raise ProblemError(msg) # Ideal vector must be the same size as the number of objectives if ideal is not None: if len(ideal) != self.n_of_objectives: msg = ( "The length of the ideal vector does not match the" "number of objectives: Length ideal {}, number of " "objectives {}." ).format(len(ideal), self.n_of_objectives) raise ProblemError(msg) # Nadir and ideal vectors must match in size if nadir is not None and ideal is not None: if len(nadir) != len(ideal): msg = ( "The length of the nadir and ideal point don't match:" " length of nadir {}, length of ideal {}." ).format(len(nadir), len(ideal)) raise ProblemError(msg) self.__nadir = nadir self.__ideal = ideal # Multiplier to convert maximization to minimization max_multiplier = np.asarray([1, -1]) to_maximize = [objective.maximize for objective in objectives] to_maximize = sum(to_maximize, []) # To flatten the list to_maximize = np.asarray(to_maximize) * 1 # Convert to zeros and ones self._max_multiplier = max_multiplier[to_maximize] @property def n_of_constraints(self) -> int: """Property: the number of constraints. Returns: int: the number of constraints. """ return self.__n_of_constraints @n_of_constraints.setter def n_of_constraints(self, val: int): """Setter: the number of constraints. Arguments: int: the number of constraints. """ self.__n_of_constraints = val @property def objectives(self) -> List[ScalarObjective]: """Property: the list of objectives. Returns: List[ScalarObjective]: the list of objectives """ return self.__objectives @objectives.setter def objectives(self, val: List[ScalarObjective]): """Setter: the list of objectives. Arguments: val (List[ScalarObjective]): the list of objectives. """ self.__objectives = val @property def variables(self) -> List[Variable]: """Property: the list of problem variables. Returns: List[_ScalarObjective]: the list of problem variables """ return self.__variables @variables.setter def variables(self, val: List[Variable]): """Setter: the list of variables. Arguments: val (List[_ScalarObjective]): the list of variables. """ self.__variables = val @property def constraints(self) -> List[ScalarConstraint]: """Property: the list of constraints. Returns: List[_ScalarObjective]: the list of constraints """ return self.__constraints @constraints.setter def constraints(self, val: List[ScalarConstraint]): """Setter: the list of constraints. Arguments: val (List[_ScalarObjective]): the list of constraints. """ self.__constraints = val @property def n_of_objectives(self) -> int: """Property: the number of objectives. Returns: int: the number of objectives. """ return self.__n_of_objectives @n_of_objectives.setter def n_of_objectives(self, val: int): """Setter: the number of objectives. Arguments: int: the number of objectives. """ self.__n_of_objectives = val @property def n_of_variables(self) -> int: """Property: the number of variables. Returns: int: the number of variables. """ return self.__n_of_variables @n_of_variables.setter def n_of_variables(self, val: int): """Setter: the number of variables. Arguments: int: the number of variables. """ self.__n_of_variables = val @property def nadir(self) -> np.ndarray: """Property: the nadir point of the problem. Returns: np.ndarray: the nadir point of the problem. """ return self.__nadir @nadir.setter def nadir(self, val: np.ndarray): """Setter: the nadir point of the problem. Arguments: val (np.ndarray): The nadir point of the problem. """ self.__nadir = val @property def ideal(self) -> np.ndarray: """Property: the ideal point of the problem. Returns: np.ndarray: the ideal point of the problem. """ return self.__ideal @ideal.setter def ideal(self, val: np.ndarray): """Setter: the ideal point of the problem. Arguments: val (np.ndarray): The ideal point of the problem. """ self.__ideal = val
[docs] def get_variable_bounds(self) -> Union[np.ndarray, None]: """Get the variable bounds. Return the upper and lower bounds of each decision variable present in the problem as a 2D numpy array. The first column corresponds to the lower bounds of each variable, and the second column to the upper bound. Returns: np.ndarray: Lower and upper bounds of each variable as a 2D numpy array. If undefined variables, return None instead. """ if self.variables is not None: bounds = np.ndarray((self.n_of_variables, 2)) for ind, var in enumerate(self.variables): bounds[ind] = np.array(var.get_bounds()) return bounds else: return None
[docs] def get_variable_names(self) -> List[str]: """Get variable names. Return the variable names of the variables present in the problem in the order they were added. Returns: List[str]: Names of the variables in the order they were added. """ return [var.name for var in self.variables]
[docs] def get_objective_names(self) -> List[str]: """Get objective names. Return the names of the objectives present in the problem in the order they were added. Returns: List[str]: Names of the objectives in the order they were added. """ return [obj.name for obj in self.objectives]
[docs] def get_uncertainty_names(self) -> List[str]: """Return the names of the objectives present in the problem in the order they were added. Returns: List[str]: Names of the objectives in the order they were added. """ return [unc.name for unc in self.objectives]
[docs] def get_variable_lower_bounds(self) -> np.ndarray: """Get variable lower bounds. Return the lower bounds of each variable as a list. The order of the bounds follows the order the variables were added to the problem. Returns: np.ndarray: An array with the lower bounds of the variables. """ return np.array([var.get_bounds()[0] for var in self.variables])
[docs] def get_variable_upper_bounds(self) -> np.ndarray: """Get variable upper bounds. Return the upper bounds of each variable as a list. The order of the bounds follows the order the variables were added to the problem. Returns: np.ndarray: An array with the upper bounds of the variables. """ return np.array([var.get_bounds()[1] for var in self.variables])
[docs] def evaluate( self, decision_vectors: np.ndarray, use_surrogate: bool = False ) -> EvaluationResults: """Evaluates the problem using an ensemble of input vectors. Arguments: decision_vectors (np.ndarray): An 2D array of decision variable input vectors. Each column represent the values of each decision variable. Returns: Tuple[np.ndarray, Union[None, np.ndarray]]: If constraint are defined, returns the objective vector values and corresponding constraint values. Or, if no constraints are defined, returns just the objective vector values with None as the constraint values. Raises: ProblemError: The decision_vectors have wrong dimensions. """ # Reshape decision_vectors with single row to work with the code if use_surrogate is True: raise NotImplementedError( "Surrogates not yet supported in this class. " "Use the '''DataProblem''' class instead." ) shape = np.shape(decision_vectors) if len(shape) == 1: decision_vectors = np.reshape(decision_vectors, (1, shape[0])) (n_rows, n_cols) = np.shape(decision_vectors) if n_cols != self.n_of_variables: msg = ( "The length of the input vectors does not match the number " "of variables in the problem: Input vector length {}, " "number of variables {}." ).format(n_cols, self.n_of_variables) raise ProblemError(msg) objective_vectors: np.ndarray = np.ndarray( (n_rows, self.n_of_objectives), dtype=float ) # ??? Use np.zeros instead of this? uncertainity: np.ndarray = np.ndarray( (n_rows, self.n_of_objectives), dtype=float ) # ??? Use np.zeros instead of this? if self.n_of_constraints > 0: constraint_values: np.ndarray = np.ndarray( (n_rows, self.n_of_constraints), dtype=float ) else: constraint_values = None # Calculate the objective values for (col_i, objective) in enumerate(self.objectives): results = objective.evaluate(decision_vectors) objective_vectors[:, col_i] = results.objectives uncertainity[:, col_i] = results.uncertainity # Calculate fitness, which is always to be minimized fitness = objective_vectors * self._max_multiplier # Calculate the constraint values if constraint_values is not None: for (col_i, constraint) in enumerate(self.constraints): constraint_values[:, col_i] = np.array( constraint.evaluate(decision_vectors, objective_vectors) ) return EvaluationResults( objective_vectors, fitness, constraint_values, uncertainity )
[docs] def evaluate_constraint_values(self) -> Optional[np.ndarray]: """Evaluate constraint values. Evaluate just the constraint function values using the attributes decision_vectors and objective_vectors Raises: NotImplementedError Note: Currently not supported by ScalarMOProblem """ raise NotImplementedError("Not implemented for ScalarMOProblem")
# TODO: Depreciate. Use data problem in the future
[docs]class ScalarDataProblem(ProblemBase): """A problem class for case where the data is pre-computed. To be depreciated Defines a problem with pre-computed data representing a multiobjective optimization problem with scalar valued objective functions. Arguments: decision_vectors (np.ndarray): A 2D vector of decision_vectors. Each row represents a solution with the value for each decision_vectors defined on the columns. objective_vectors (np.ndarray): A 2D vector of objective function values. Each row represents one objective vector with the values for the invidual objective functions defined on the columns. Attributes: decision_vectors (np.ndarray): See args objective_vectors (np.ndarray): See args __epsilon (float): A small floating point number to shift the bounds of the variables. See, get_variable_bounds, default value 1e-6 __constraints (List[ScalarConstraint]): A list of defined constraints. nadir (np.ndarray): The nadir point of the problem. ideal (np.ndarray): The ideal point of the problem. __model_exists (bool): is there a model for this problem Note: It is assumed that the decision_vectors and objectives follow a direct one-to-one mapping, i.e., the objective values on the ith row in 'objectives' should represent the solution of the multiobjective problem when evaluated with the decision_vectors on the ith row in 'decision_vectors'. """ def __init__(self, decision_vectors: np.ndarray, objective_vectors: np.ndarray): super().__init__() self.decision_vectors: np.ndarray = decision_vectors self.objective_vectors: np.ndarray = objective_vectors # epsilon is used when computing the bounds. We don't want to exclude # any of the solutions that contain border values. # See get_variable_bounds self.__epsilon: float = 1e-6 # Used to indicate if a model has been built to represent the model. # Used in the evaluation. self.__model_exists: bool = False self.__constraints: List[ScalarConstraint] = [] try: self.n_of_variables = self.decision_vectors.shape[1] except IndexError as e: msg = ( "Check the variable dimensions. Is it a 2D array? " "Encountered '{}'".format(str(e)) ) raise ProblemError(msg) try: self.n_of_objectives = self.objective_vectors.shape[1] except IndexError as e: msg = ( "Check the objective dimensions. Is it a 2D array? " "Encountered '{}'".format(str(e)) ) raise ProblemError(msg) self.nadir = np.max(self.objective_vectors, axis=0) self.ideal = np.min(self.objective_vectors, axis=0) @property def epsilon(self) -> float: """Property: epsilon. Return: float: epsilon value (for shifting the bounds of variables) """ return self.__epsilon @epsilon.setter def epsilon(self, val: float): """Setter: epsilon. Argument: val (float): epsilon value (for shifting the bounds of variables.) """ self.__epsilon = val @property def constraints(self) -> List[ScalarConstraint]: """Property: Constraints. Return: List[ScalarConstraint]: list of the defined constraints """ return self.__constraints @constraints.setter def constraints(self, val: List[ScalarConstraint]): """Setter: Constraints. Argument: val (List[ScalarConstraint]): list of the constraints. """ self.__constraints = val
[docs] def get_variable_bounds(self): """Get the variable bounds. Returns: np.array[float]: The variable bounds in a stack. The epsilon value will be added to the upper bounds and substracted from the lower bounds to return closed bounds. Note: If self.epsilon is zero, the bounds will represent an open range. """ return np.stack( ( np.min(self.decision_vectors, axis=0) - self.epsilon, np.max(self.decision_vectors, axis=0) + self.epsilon, ), axis=1, )
[docs] def evaluate_constraint_values(self) -> Optional[np.ndarray]: """Evaluate the constraint values. Evaluate the constraint values for each defined constraint. A positive value indicates that a constraint is adhered to, a negative value indicates a violated constraint. Returns: Optional[np.ndarray]: A 2D array with each row representing the constraint values for different objective vectors. One column for each constraint. If no constraint function are defined, returns None. """ if len(self.constraints) == 0: return None constraint_values = np.zeros( (len(self.objective_vectors), len(self.constraints)) ) for ind, con in enumerate(self.constraints): constraint_values[:, ind] = con.evaluate( self.decision_vectors, self.objective_vectors ) return constraint_values
[docs] def evaluate(self, decision_vectors: np.ndarray) -> np.ndarray: """Evaluate the values of the objectives at the given decision. Evaluate the values of the objectives corresponding to the decision decision_vectors. Args: decision_vectors (np.ndarray): A 2D array with the decision decision_vectors to be evaluated on each row. Returns: nd.ndarray: A 2D array with the objective values corresponding to each decision vectors on the rows. Note: At the moment, this function just maps the given decision decision_vectors to the closest decision variable present (using an L2 distance) in the problem and returns the corresponsing objective vector. """ if not self.__model_exists: idx = np.unravel_index( np.linalg.norm( self.decision_vectors - decision_vectors, axis=1 ).argmin(), self.objective_vectors.shape, order="F", )[0] else: msg = "Models not implemented yet for data based problems." raise NotImplementedError(msg) return (self.objective_vectors[idx],)
[docs]class MOProblem(ProblemBase): """An user defined multiobjective optimization problem. A multiobjective optimization problem with user defined objective functions, constraints, and variables. Arguments: objectives (List[Union[ScalarObjective, VectorObjective]]): A list containing the objectives of the problem. variables (List[Variable]): A list containing the variables of the problem. constraints (List[ScalarConstraint]): A list of the constraints of the problem. nadir (Optional[np.ndarray], optional): Nadir point of the problem. Defaults to None. ideal (Optional[np.ndarray], optional): Ideal point of the problem. Defaults to None. Attributes: __objectives (List[Union[ScalarObjective, VectorObjective]]): A list containing the objectives of the problem. __variables (List[Variable]): A list containing the variables of the problem. __constraints (List[ScalarConstraint]): A list of the constraints of the problem. __nadir (Optional[np.ndarray], optional): Nadir point of the problem. Defaults to None. __ideal (Optional[np.ndarray], optional): Ideal point of the problem. Defaults to None. __n_of_variables (int): The number of variables __n_of_objectives (int): The number of objectives Raises: ProblemError: If ideal or nadir vectors are not the same size as number of objectives. """ # TODO: use_surrogate : Union[bool, List[bool]] def __init__( self, objectives: List[Union[ScalarObjective, VectorObjective]], variables: List[Variable], constraints: List[ScalarConstraint] = None, nadir: Optional[np.ndarray] = None, ideal: Optional[np.ndarray] = None, ): super().__init__() self.__objectives: List[Union[ScalarObjective, VectorObjective]] = objectives self.__variables: List[Variable] = variables self.__constraints: List[ScalarConstraint] = constraints self.__n_of_variables: int = len(self.variables) self.__n_of_objectives: int = sum( map(self.number_of_objectives, self.__objectives) ) if self.constraints is not None: self.__n_of_constraints: int = len(self.constraints) else: self.__n_of_constraints = 0 # Multiplier to convert maximization to minimization max_multiplier = np.asarray([1, -1]) to_maximize = [objective.maximize for objective in objectives] # Does not work # to_maximize = sum(to_maximize, []) # To flatten the list to_maximize = ( np.hstack(to_maximize) * 1 ) # To flatten list and convert to zeros and ones # to_maximize = np.asarray(to_maximize) * 1 # Convert to zeros and ones self._max_multiplier = max_multiplier[to_maximize] self.nadir_fitness = np.full(self.__n_of_objectives, np.inf, dtype=float) self.nadir = self.nadir_fitness * self._max_multiplier self.ideal_fitness = np.full(self.__n_of_objectives, np.inf, dtype=float) self.ideal = self.ideal_fitness * self._max_multiplier # Nadir vector must be the same size as the number of objectives if nadir is not None: if len(nadir) != self.n_of_objectives: msg = ( "The length of the nadir vector does not match the" "number of objectives: Length nadir {}, number of " "objectives {}." ).format(len(nadir), self.n_of_objectives) raise ProblemError(msg) self.nadir = nadir # Ideal vector must be the same size as the number of objectives if ideal is not None: if len(ideal) != self.n_of_objectives: msg = ( "The length of the ideal vector does not match the" "number of objectives: Length ideal {}, number of " "objectives {}." ).format(len(ideal), self.n_of_objectives) raise ProblemError(msg) self.ideal = ideal self.nadir_fitness = self.nadir * self._max_multiplier self.ideal_fitness = self.ideal * self._max_multiplier # Objective and variable names self.objective_names = self.get_objective_names() self.variable_names = self.get_variable_names() self.fitness_names = self.objective_names @property def n_of_constraints(self) -> int: """Property: number of constraints. Returns: int: Number of constraints """ return self.__n_of_constraints @n_of_constraints.setter def n_of_constraints(self, val: int): """Setter: number of constraints. Arguments: val (int): number of constraints """ self.__n_of_constraints = val @property def objectives(self) -> List[ScalarObjective]: """Property: list of objectives. Returns: List[ScalarObjective]: list of objectives """ return self.__objectives @objectives.setter def objectives(self, val: List[ScalarObjective]): """Setter: set list of objectives. Arguments: val (List[ScalarObjective]): List of objectives """ self.__objectives = val @property def variables(self) -> List[Variable]: """Property: List of variables Returns: List[Variable]: list of variables """ return self.__variables @variables.setter def variables(self, val: List[Variable]): """Setter: set list of variables. Arguments: val (List[Variable]): list of variables """ self.__variables = val @property def constraints(self) -> List[ScalarConstraint]: """Property: list of constraints. Returns: List[ScalarConstraint]: list of constraints """ return self.__constraints @constraints.setter def constraints(self, val: List[ScalarConstraint]): """Setter: list of constraints. Arguments: val (List[ScalarConstraint]): list of constraints """ self.__constraints = val @property def n_of_objectives(self) -> int: """Property: number of objectives. Returns: int: number of objectives """ return self.__n_of_objectives @n_of_objectives.setter def n_of_objectives(self, val: int): """Setter: number of objectives. Arguments: val (int): number of objectives """ self.__n_of_objectives = val @property def n_of_fitnesses(self) -> int: """Property: number of dimensions of the fitness matrix. May be different than the number of objectives in inherited classes. Returns: int: number of fitness dimensions. """ return self.__n_of_objectives @property def n_of_variables(self) -> int: """Property: number of variables. Returns: int: Number of variables. """ return self.__n_of_variables @n_of_variables.setter def n_of_variables(self, val: int): """Setter: number of variables. Arguments: val (int): number of variables """ self.__n_of_variables = val
[docs] @staticmethod def number_of_objectives( obj_instance: Union[ScalarObjective, VectorObjective] ) -> int: """Return the number of objectives in the given obj_instance. Arguments: obj_instance (Union[ScalarObjective, VectorObjective]): An instance of one of the objective classes Raises: ProblemError: Raised when obj_instance is not an instance of the supported classes Returns: int: Number of objectives in obj_instance """ if isinstance(obj_instance, _ScalarObjective): return 1 elif isinstance(obj_instance, ScalarObjective): return 1 elif isinstance(obj_instance, VectorObjective): return obj_instance.n_of_objectives else: msg = "Supported objective types: ScalarObjective and VectorObjective" raise ProblemError(msg)
[docs] def get_variable_bounds(self) -> Union[np.ndarray, None]: """Get variable bounds. Return the upper and lower bounds of each decision variable present in the problem as a 2D numpy array. The first column corresponds to the lower bounds of each variable, and the second column to the upper bound. Returns: np.ndarray: Lower and upper bounds of each variable as a 2D numpy array. If undefined variables, return None instead. """ if self.variables is not None: bounds = np.ndarray((self.n_of_variables, 2)) for ind, var in enumerate(self.variables): bounds[ind] = np.array(var.get_bounds()) return bounds else: return None
[docs] def get_variable_names(self) -> List[str]: """Get variable names. Return the variable names of the variables present in the problem in the order they were added. Returns: List[str]: Names of the variables in the order they were added. """ return [var.name for var in self.variables]
[docs] def get_objective_names(self) -> List[str]: """Get objective names. Return the names of the objectives present in the problem in the order they were added. Returns: List[str]: Names of the objectives in the order they were added. """ obj_list = [[(obj.name)] for obj in self.objectives] return reduce(iadd, obj_list, [])
# TODO: add get_uncertainty_names() for uncertainty values
[docs] def get_variable_lower_bounds(self) -> np.ndarray: """Get variable lower bounds. Return the lower bounds of each variable as a list. The order of the bounds follows the order the variables were added to the problem. Returns: np.ndarray: An array with the lower bounds of the variables. """ return np.array([var.get_bounds()[0] for var in self.variables])
[docs] def get_variable_upper_bounds(self) -> np.ndarray: """Get variable upper bounds. Return the upper bounds of each variable as a list. The order of the bounds follows the order the variables were added to the problem. Returns: np.ndarray: An array with the upper bounds of the variables. """ return np.array([var.get_bounds()[1] for var in self.variables])
[docs] def evaluate( self, decision_vectors: np.ndarray, use_surrogate: bool = False ) -> EvaluationResults: """Evaluates the problem using an ensemble of input vectors. Arguments: decision_vectors (np.ndarray): An 2D array of decision variable input vectors. Each column represent the values of each decision variable. use_surrogate (bool): A bool to control whether to use the true, potentially expensive function or a surrogate model to evaluate the objectives. Returns: Tuple[np.ndarray, Union[None, np.ndarray]]: If constraint are defined, returns the objective vector values and corresponding constraint values. Or, if no constraints are defined, returns just the objective vector values with None as the constraint values. Raises: ProblemError: The decision_vectors have wrong dimensions. ValueError: If decision_vectors violate the lower or upper bounds. """ # Reshape decision_vectors with single row to work with the code shape = np.shape(decision_vectors) if len(shape) == 1: decision_vectors = np.reshape(decision_vectors, (1, shape[0])) # Checking bounds; warn if bounds are breached. if np.any(self.get_variable_lower_bounds() > decision_vectors): warn("Some decision variable values violate lower bounds") if np.any(self.get_variable_upper_bounds() < decision_vectors): warn("Some decision variable values violate upper bounds") (n_rows, n_cols) = np.shape(decision_vectors) if n_cols != self.n_of_variables: msg = ( "The length of the input vectors does not match the number " "of variables in the problem: Input vector length {}, " "number of variables {}." ).format(n_cols, self.n_of_variables) raise ProblemError(msg) objective_vectors, uncertainity = self.evaluate_objectives( decision_vectors, use_surrogate=use_surrogate ) constraint_values = self.evaluate_constraint_values( decision_vectors, objective_vectors ) # Calculate fitness, which is always to be minimized fitness = self.evaluate_fitness(objective_vectors) # Update ideal values self.update_ideal(objective_vectors, fitness) return EvaluationResults( objective_vectors, fitness, constraint_values, uncertainity )
[docs] def evaluate_objectives( self, decision_vectors: np.ndarray, use_surrogate: bool = False ) -> Tuple[np.ndarray]: """Evaluate objective values of the problem Arguments: decision_vectors (np.ndarray): An 2D array of decision variable input vectors. Each column represent the values of each decision variable. use_surrogate (bool): A bool to control whether to use the true, potentially expensive function or a surrogate model to evaluate the objectives. Returns: Tuple[np.ndarray]: Objective vector values with their uncertainty. """ (n_rows, n_cols) = np.shape(decision_vectors) objective_vectors: np.ndarray = np.ndarray( (n_rows, self.n_of_objectives), dtype=float ) uncertainity: np.ndarray = np.ndarray( (n_rows, self.n_of_objectives), dtype=float ) obj_column = 0 for objective in self.objectives: elem_in_curr_obj = self.number_of_objectives(objective) if elem_in_curr_obj == 1: results = objective.evaluate(decision_vectors, use_surrogate) objective_vectors[:, obj_column] = results.objectives uncertainity[:, obj_column] = results.uncertainity elif elem_in_curr_obj > 1: # results = list(map(objective.evaluate, decision_vectors)) results = objective.evaluate(decision_vectors, use_surrogate) objective_vectors[ :, obj_column : obj_column + elem_in_curr_obj ] = results.objectives uncertainity[ :, obj_column : obj_column + elem_in_curr_obj ] = results.uncertainity obj_column = obj_column + elem_in_curr_obj return (objective_vectors, uncertainity)
[docs] def evaluate_constraint_values( self, decision_vectors: np.ndarray, objective_vectors: np.ndarray ) -> Optional[np.ndarray]: """Evaluate constraint values Evaluate just the constraint function values using the attributes decision_vectors and objective_vectors Arguments: decision_vectors (np.ndarray): An 2D array of decision variable input vectors. Each column represent the values of each decision variable. use_surrogate (bool): A bool to control whether to use the true, potentially expensive function or a surrogate model to evaluate the objectives. Returns: Optional[np.ndarray]: if there are constraints, then this returns np.ndarray of constraint values, else returns None Raises: NotImplementedError Note: Currently not supported by ScalarMOProblem """ if self.n_of_constraints == 0: return None (n_rows, n_cols) = np.shape(decision_vectors) constraint_values: np.ndarray = np.ndarray( (n_rows, self.n_of_constraints), dtype=float ) for (col_i, constraint) in enumerate(self.constraints): constraint_values[:, col_i] = np.array( constraint.evaluate(decision_vectors, objective_vectors) ) return constraint_values
[docs] def evaluate_fitness(self, objective_vectors: np.ndarray) -> np.ndarray: """Evaluate fitness of the objectives. Arguments: objective_vectors (np.ndarray): objective vector array Returns: np.ndarray: fitness of each objective vector """ return objective_vectors * self._max_multiplier
[docs] def update_ideal(self, objective_vectors: np.ndarray, fitness: np.ndarray): """Update the ideal vector Arguments: objective_vectors (np.ndarray): Objective vectors fitness (np.ndarray): fittness of objective vectors. """ self.ideal_fitness = np.amin(np.vstack((self.ideal_fitness, fitness)), axis=0) self.ideal = self.ideal_fitness * self._max_multiplier
# TODO: Make this the "main" Problem class?
[docs]class DataProblem(MOProblem): """A class for a data based problem. A problem class for data-based problem. This supports surrogate modelling. Data should be given in the form of a pandas dataframe. Arguments: data (pd.DataFrame): The input data. This will be used for training the model. variable_names (List[str]): Names of the variables in the dataframe provided. objective_names (List[str]): Names of the objectices in the dataframe provided. bounds (pd.DataFrame, optional): A pandas DataFrame containing the upper and lower bounds of the decision variables. Column names have to be same as variable_names. Row names have to be "lower_bound" and "upper_bound". objectives (List[Union[ScalarDataObjective,VectorDataObjective,]], optional): Objective instances, currently not supported. Defaults to None. variables (List[Variable], optional): Variable instances. Defaults to None. Currently not supported. constraints (List[ScalarConstraint], optional): Constraint instances. Defaults to None, which means that there are no constraints. nadir (Optional[np.ndarray], optional): Nadir of the problem. Defaults to None. ideal (Optional[np.ndarray], optional): Ideal of the problem. Defaults to None. Raises: ProblemError: When input data is not a dataframe. ProblemError: When given objective or variable names are not in dataframe column NotImplementedError: When objective instances are passed NotImplementedError: When variable instances are passed """ def __init__( self, data: pd.DataFrame, variable_names: List[str], objective_names: List[str], bounds: pd.DataFrame = None, maximize: pd.DataFrame = None, objectives: List[Union[ScalarDataObjective, VectorDataObjective]] = None, variables: List[Variable] = None, constraints: List[ScalarConstraint] = None, nadir: Optional[np.ndarray] = None, ideal: Optional[np.ndarray] = None, ): if not isinstance(data, pd.DataFrame): msg = "Please provide data in the pandas dataframe format" raise ProblemError(msg) if not all(obj in data.columns for obj in objective_names): msg = "Provided objective names not found in provided dataframe columns" raise ProblemError(msg) if not all(var in data.columns for var in variable_names): msg = "Provided variable names not found in provided dataframe columns" raise ProblemError(msg) if bounds is not None: if not all(var in variable_names for var in bounds.columns): msg = "A mismatch in the variable names in the bounds dataframe" raise ProblemError(msg) bounds_row_names = ["lower_bound", "upper_bound"] if not all(row_name in bounds_row_names for row_name in bounds.index): msg = ( f"'bounds' should contain the following indices: {bounds_row_names}" ) raise ProblemError(msg) if maximize is not None: if not all(obj in objective_names for obj in maximize.columns): msg = "maximize DataFrame should only contain objectives" raise ProblemError(msg) if not all(obj in maximize.columns for obj in objective_names): msg = "All objectives should be in the maximize DataFrame" raise ProblemError(msg) if maximize is None: # Default to minimize maximize = pd.DataFrame(columns=objective_names, index=[0]) maximize[:] = False # TODO: Implement the rest if objectives is not None: msg = "Support for custom objectives objects not implemented yet" raise NotImplementedError(msg) if variables is not None: msg = "Support for custom variables objects not implemented yet" raise NotImplementedError(msg) if objectives is None: objectives = [] for obj in objective_names: objectives.append( ScalarDataObjective( data=data[variable_names + [obj]], name=obj, maximize=maximize[obj].tolist(), ) ) if variables is None: variables = [] for var in variable_names: initial_value = data[var].mean(axis=0) if bounds is None: lower_bound = data[var].min(axis=0) upper_bound = data[var].max(axis=0) else: lower_bound = bounds[var]["lower_bound"] upper_bound = bounds[var]["upper_bound"] variables.append( Variable( name=var, initial_value=initial_value, lower_bound=lower_bound, upper_bound=upper_bound, ) ) super().__init__(objectives, variables, constraints)
[docs] def train( self, models: Union[BaseRegressor, List[BaseRegressor]], model_parameters: Union[Dict, List[Dict]] = None, index: List[int] = None, data: pd.DataFrame = None, ): """Train surrogate models for all the objectives. The models should have a fit method and a predict method. The predict method should return predicted values as well as uncertainity value (even if they are none.) Arguments: models (Union[BaseRegressor, List[BaseRegressor]]): The class for the surrogate modelling algorithm. models_parameters: Dict or List[Dict] The parameters for the regressors. Should be a dict if a single regressor is provided. If a list of regressors is provided, the parameters should be in a list of dicts, same length as the list of regressors(= number of objs). index (List[int], optional): The indices of the samples to be used for training the surrogate model. If no values are proveded, all samples are used. data (pd.DataFrame, optional): Use this argument if some external data is to be used for training. Defaults to None. Raises: ProblemError: If VectorDataObjective is used as one of the objective instances. They are not supported yet. """ if not isinstance(models, list): models = [models] * len(self.get_objective_names()) model_parameters = [model_parameters] * len(self.get_objective_names()) elif len(models) == 1: models = models * len(self.get_objective_names()) for model, model_params, name in zip( models, model_parameters, self.get_objective_names() ): self.train_one_objective(name, model, model_params, index, data)
[docs] def train_one_objective( self, name: str, model: BaseRegressor, model_parameters: Dict, index: List[int] = None, data: pd.DataFrame = None, ): """Train one objective at a time, otherwise same is the train method. Arguments: name (str): Name of the objective to be trained. model (BaseRegressor): The class for the surrogate modelling algorithm. model_parameters (Dict): **model_parameters is passed to the model when initialized. index (List[int], optional): The indices of the samples to be used for training the surrogate model. If no values are proveded, all samples are used. data (pd.DataFrame, optional): Use this argument if some external data is to be used for training. Defaults to None. Raises: ProblemError: If name is not in the list of objective names. ProblemError: If VectorDataObjective is used as one of the objective instances. They are not supported yet. """ if name not in self.get_objective_names(): raise ProblemError( f'"{name}" not found in the list of' f"original objective names: {self.get_objective_names()}" ) obj_index = self.get_objective_names().index(name) if isinstance(self.objectives[obj_index], _ScalarDataObjective): self.objectives[obj_index].train(model, model_parameters, index, data) if isinstance(self.objectives[obj_index], ScalarDataObjective): self.objectives[obj_index].train(model, model_parameters, index, data) else: msg = "Support for VectorDataObjective not supported yet" raise ProblemError(msg)
[docs]class ExperimentalProblem(MOProblem): """A problem class for data-based problem. This supports surrogate modelling. Data should be given in the form of a pandas dataframe. self.archive is created to save the true function evaluations and update the surrogate models if needed. self.archive updates whenever a true function evaluation happens. Arguments: data (pd.DataFrame): The input data. This will be used for training the model. variable_names (List[str]): Names of the variables in the dataframe provided. objective_names (List[str]): Names of the objectices in the dataframe provided. objectives (List[Union[ScalarDataObjective,VectorDataObjective,]], optional): Objective instances, currently not supported. Defaults to None. variables (List[Variable], optional): Variable instances. Defaults to None. Currently not supported. constraints (List[ScalarConstraint], optional): Constraint instances. Defaults to None, which means that there are no constraints. nadir (Optional[np.ndarray], optional): Nadir of the problem. Defaults to None. ideal (Optional[np.ndarray], optional): Ideal of the problem. Defaults to None. Raises: ProblemError: When input data is not a dataframe. ProblemError: When given objective or variable names are not in dataframe column NotImplementedError: When objective instances are passed NotImplementedError: When variable instances are passed Note: not properly implemented! """ def __init__( self, variable_names: List[str], objective_names: List[str], uncertainity_names: List[str], evaluators: Union[None, List[Callable]] = None, dimensions_data: pd.DataFrame = None, data: pd.DataFrame = None, objective_functions: List[Tuple[List[str], Callable]] = None, constraints: List[Tuple[List[str], Callable]] = None, ): # TODO: add the archiving here for true evaluations. self.uncertainity_names = uncertainity_names # will be removed later if not isinstance(data, pd.DataFrame): msg = "Please provide data in the pandas dataframe format" raise ProblemError(msg) if not all(obj in data.columns for obj in objective_names): msg = "Provided objective names not found in provided dataframe columns" raise ProblemError(msg) if not all(var in data.columns for var in variable_names): msg = "Provided variable names not found in provided dataframe columns" raise ProblemError(msg) # TODO: Implement the rest objectives = [] self.archive = ( data ) # this is for model management to archive the solutions and decision variables # check if evaluator is NOne in that case make a list of nones and the lenght of the list is the same as obj_names # check if evaluator is the same lenght as obj_names if not rais a problem error for obj, evaluator in zip(objective_names, evaluators): objectives.append( ScalarDataObjective( data=data[variable_names + [obj]], name=obj, evaluator=evaluator ) ) variables = [] for var in variable_names: initial_value = data[var].mean(axis=0) lower_bound = data[var].min(axis=0) upper_bound = data[var].max(axis=0) variables.append( Variable( name=var, initial_value=initial_value, lower_bound=lower_bound, upper_bound=upper_bound, ) ) super().__init__(objectives, variables, constraints)
[docs] def train( self, models: Union[BaseRegressor, List[BaseRegressor]], model_parameters: Union[Dict, List[Dict]] = None, index: List[int] = None, data: pd.DataFrame = None, ): """Train surrogate models for all the objectives. The models should have a fit method and a predict method. The predict method should return predicted values as well as uncertainity value (even if they are none.) Arguments: models (Union[BaseRegressor, List[BaseRegressor]]): The class for the surrogate modelling algorithm. models_parameters: Dict or List[Dict] The parameters for the regressors. Should be a dict if a single regressor is provided. If a list of regressors is provided, the parameters should be in a list of dicts, same length as the list of regressors(= number of objs). index (List[int], optional): The indices of the samples to be used for training the surrogate model. If no values are proveded, all samples are used. data (pd.DataFrame, optional): Use this argument if some external data is to be used for training. Defaults to None. Raises: ProblemError: If VectorDataObjective is used as one of the objective instances. They are not supported yet. """ data = self.archive # for updating the data after updating the surrogates if not isinstance(models, list): models = [models] * len(self.get_objective_names()) model_parameters = [model_parameters] * len(self.get_objective_names()) elif len(models) == 1: models = models * len(self.get_objective_names()) for model, model_params, name in zip( models, model_parameters, self.get_objective_names() ): self.train_one_objective(name, model, model_params, index, data)
[docs] def train_one_objective( self, name: str, model: BaseRegressor, model_parameters: Dict, index: List[int] = None, data: pd.DataFrame = None, ): """Train one objective at a time, otherwise same is the train method. Arguments: name (str): Name of the objective to be trained. model (BaseRegressor): The class for the surrogate modelling algorithm. model_parameters (Dict): **model_parameters is passed to the model when initialized. index (List[int], optional): The indices of the samples to be used for training the surrogate model. If no values are proveded, all samples are used. data (pd.DataFrame, optional): Use this argument if some external data is to be used for training. Defaults to None. Raises: ProblemError: If name is not in the list of objective names. ProblemError: If VectorDataObjective is used as one of the objective instances. They are not supported yet. """ if name not in self.get_objective_names(): raise ProblemError( f'"{name}" not found in the list of' f"original objective names: {self.get_objective_names()}" ) obj_index = self.get_objective_names().index(name) if isinstance(self.objectives[obj_index], _ScalarDataObjective): self.objectives[obj_index].train(model, model_parameters, index, data) elif isinstance(self.objectives[obj_index], ScalarDataObjective): self.objectives[obj_index].train(model, model_parameters, index, data) else: msg = "Support for VectorDataObjective not supported yet" raise ProblemError(msg)
[docs] def evaluate( self, decision_vectors: np.ndarray, use_surrogate: bool ) -> EvaluationResults: names = np.hstack((self.variable_names, self.objective_names)) new_results = super().evaluate(decision_vectors, use_surrogate=use_surrogate) # The following is for achiving solutions with true function evaluations if use_surrogate == False: new_samples = np.hstack((decision_vectors, new_results.objectives)) new_data = pd.DataFrame(data=new_samples, columns=names) self.archive = self.archive.append(new_data) return new_results
[docs]class DiscreteDataProblem: """A problem class for data-based problems with discrete values. These data values are computed representing a set of non-dominated points. Arguments: data (pd.DataFrame): The input data. variable_names (List[str]): Names of the variables in the dataframe provided. objective_names (List[str]): Names of the objectices in the dataframe provided. nadir (np.ndarray): Nadir of the problem. ideal (np.ndarray): Ideal of the problem. """ def __init__( self, data: pd.DataFrame, variable_names: List[str], objective_names: List[str], ideal: np.ndarray, nadir: np.ndarray, ): self.decision_variables = data[variable_names].values self.variable_names = variable_names self.objectives = data[objective_names].values self.objective_names = objective_names self.ideal = ideal self.nadir = nadir self.n_of_objectives = len(objective_names)
[docs] def find_closest(self, x: np.ndarray) -> int: """Find closest point in data to x. Given a vector of decision variables, finds the closest point in the given data and returns its index. A simple euclidean distance is used. Arguments: x (np.ndarray): A 1D vector containing decision variables. Returns: int: The index of the closest point in the data computed for x. """ return np.argmin(np.linalg.norm(x - self.decision_variables, axis=1))