Source code for pypesto.history.optimizer

"""Track optimal values during an optimization."""

import logging
from typing import Union

import numpy as np

from ..C import FVAL, GRAD, HESS, RES, SRES, ModeType, X
from ..util import allclose, is_none_or_nan, is_none_or_nan_array, isclose
from .base import HistoryBase, add_fun_from_res
from .util import ResultDict

logger = logging.getLogger(__name__)


[docs] class OptimizerHistory: """ Optimizer objective call history. Container around a History object, additionally keeping track of optimal values. Attributes ---------- fval0, fval_min: Initial and best function value found. x0, x_min: Initial and best parameters found. grad_min: gradient for best parameters hess_min: hessian (approximation) for best parameters res_min: residuals for best parameters sres_min: residual sensitivities for best parameters Parameters ---------- history: History object to attach to this container. This history object implements the storage of the actual history. x0: Initial values for optimization. lb, ub: Lower and upper bound. Used for checking validity of optimal points. generate_from_history: If set to true, this function will try to fill attributes of this function based on the provided history. Defaults to ``False``. """ # optimal point values MIN_KEYS = (X, *HistoryBase.RESULT_KEYS)
[docs] def __init__( self, history: HistoryBase, x0: np.ndarray, lb: np.ndarray, ub: np.ndarray, generate_from_history: bool = False, ) -> None: self.history: HistoryBase = history # initial point self.fval0: Union[float, None] = None self.x0: np.ndarray = x0 # bounds self.lb: np.ndarray = lb self.ub: np.ndarray = ub # minimum point self.fval_min: float = np.inf self.x_min: Union[np.ndarray, None] = None self.grad_min: Union[np.ndarray, None] = None self.hess_min: Union[np.ndarray, None] = None self.res_min: Union[np.ndarray, None] = None self.sres_min: Union[np.ndarray, None] = None if generate_from_history: self._maybe_compute_init_and_min_vals_from_trace()
[docs] def update( self, x: np.ndarray, sensi_orders: tuple[int], mode: ModeType, result: ResultDict, ) -> None: """Update history and best found value. Parameters ---------- x: Current parameter vector. sensi_orders: Sensitivity orders to be evaluated. mode: Mode of the evaluation. result: Current result. """ result = add_fun_from_res(result) self._update_vals(x, result) self.history.update(x, sensi_orders, mode, result)
[docs] def finalize( self, message: Union[str, None] = None, exitflag: Union[int, None] = None, ): """ Finalize history. Parameters ---------- message: Optimizer message to be saved. Defaults to ``None``. exitflag: Optimizer exitflag to be saved. Defaults to ``None``. """ self.history.finalize(message=message, exitflag=exitflag) # There can be entries in the history e.g. for grad that are not # recorded in ..._min, e.g. when evaluated before fval. # On the other hand, not all variables may be recorded in the history. # Thus, here at the end we go over the history once and try to fill # in what is available. # check if a useful history exists # TODO Y This can be solved prettier try: self.history.get_x_trace() except NotImplementedError: return # find optimal point result = self._get_optimal_point_from_history() fval = result[FVAL] if fval is None: # nothing to be improved return # check if history has a better point (should not really happen) if ( fval < self.fval_min and not isclose(fval, self.fval_min) and not allclose(result[X], self.x_min) ): # issue a warning, as if this happens, then something may be wrong logger.warning( f"History has a better point {fval} than the current best " f"point {self.fval_min}." ) # update everything for key in self.MIN_KEYS: setattr(self, key + "_min", result[key]) # check if history has same point if ( isclose(fval, self.fval_min) and self.x_min is not None and allclose(result[X], self.x_min) ): # update only missing entries # (e.g. grad and hess may be recorded but not in history) for key in self.MIN_KEYS: if result[key] is not None: # if getattr(self, f'{key}_min') is None: setattr(self, f"{key}_min", result[key])
def _update_vals(self, x: np.ndarray, result: ResultDict) -> None: """Update initial and best function values.""" # update initial point if is_none_or_nan(self.fval0) and np.array_equal(x, self.x0): self.fval0 = result.get(FVAL) # don't update optimal point if point is not admissible if not self._admissible(x): return # update if fval is better if ( not is_none_or_nan(fval := result.get(FVAL)) and fval < self.fval_min ): # need to update all values, as better fval found for key in HistoryBase.RESULT_KEYS: setattr(self, f"{key}_min", result.get(key)) self.x_min = x return # Sometimes sensitivities are evaluated on subsequent calls. We can # identify this situation by checking that x hasn't changed. if self.x_min is not None and np.array_equal(self.x_min, x): for key in (GRAD, HESS, SRES): val_min = getattr(self, f"{key}_min", None) if is_none_or_nan_array(val_min) and not is_none_or_nan_array( val := result.get(key) ): setattr(self, f"{key}_min", val) def _maybe_compute_init_and_min_vals_from_trace(self) -> None: """Try to set initial and best function value from trace. .. note:: Only possible if history has a trace. """ if not len(self.history): # nothing to be computed from empty history return # some optimizers may evaluate hess+grad first to compute trust region # etc for ix in range(len(self.history)): fval = self.history.get_fval_trace(ix) x = self.history.get_x_trace(ix) if not is_none_or_nan(fval) and allclose(x, self.x0): self.fval0 = float(fval) break # find best fval result = self._get_optimal_point_from_history() # assign values for key in OptimizerHistory.MIN_KEYS: setattr(self, f"{key}_min", result[key]) def _admissible(self, x: np.ndarray) -> bool: """Check whether point `x` is admissible (i.e. within bounds). Parameters ---------- x: A single parameter vector. Returns ------- Whether the point fulfills the problem requirements. """ return np.all(x <= self.ub) and np.all(x >= self.lb) def _get_optimal_point_from_history(self) -> ResultDict: """Extract optimal point from `self.history`.""" result = {} # get indices of admissible trace entries # shape (n_sample, n_x) xs = np.asarray(self.history.get_x_trace()) ixs_admit = [ix for ix, x in enumerate(xs) if self._admissible(x)] if len(ixs_admit) == 0: # no admittable indices return {key: None for key in OptimizerHistory.MIN_KEYS} # index of minimum of fval values ix_min = np.nanargmin(self.history.get_fval_trace(ixs_admit)) # np.argmin returns ndarray when multiple minimal values are found, # we want the first occurrence if isinstance(ix_min, np.ndarray): ix_min = ix_min[0] # select index in original array ix_min = ixs_admit[ix_min] # fill in parameter and function value from that index for var in (X, FVAL, RES): val = getattr(self.history, f"get_{var}_trace")(ix_min) if val is not None and not np.all(np.isnan(val)): result[var] = val # convert to float if var is FVAL to be sure if var == FVAL: result[var] = float(result[var]) # derivatives may be evaluated at different indices, therefore # iterate over all and check whether any has the same parameter # and the desired field filled for var in (GRAD, HESS, SRES): for ix in range(len(self.history)): if not allclose(result[X], self.history.get_x_trace(ix)): # different parameter continue val = getattr(self.history, f"get_{var}_trace")(ix) if not is_none_or_nan_array(val): result[var] = val # successfuly found break # fill remaining keys with None for key in OptimizerHistory.MIN_KEYS: if key not in result: result[key] = None return result