import copy
from typing import Callable, List, Literal, Tuple, Union
import numpy as np
from ..problem import Problem
from ..result import ProfilerResult
from .options import ProfileOptions
__all__ = ['next_guess', 'fixed_step', 'adaptive_step']
[docs]
def next_guess(
x: np.ndarray,
par_index: int,
par_direction: Literal[1, -1],
profile_options: ProfileOptions,
update_type: Literal[
'fixed_step',
'adaptive_step_order_0',
'adaptive_step_order_1',
'adaptive_step_regression',
],
current_profile: ProfilerResult,
problem: Problem,
global_opt: float,
) -> np.ndarray:
"""
Create the next initial guess for the optimizer.
Used in order to compute the next profile point. Different proposal methods
are available.
Parameters
----------
x:
The current position of the profiler.
par_index:
The index of the parameter of the current profile.
par_direction:
The direction, in which the profiling is done (``1`` or ``-1``).
profile_options:
Various options applied to the profile optimization.
update_type:
Type of update for next profile point:
``fixed_step`` (see :func:`fixed_step`),
``adaptive_step_order_0``, ``adaptive_step_order_1``, or ``adaptive_step_regression``
(see :func:`adaptive_step`).
current_profile:
The profile which should be computed.
problem:
The problem to be solved.
global_opt:
Log-posterior value of the global optimum.
Returns
-------
next_guess:
The next initial guess as base for the next profile point.
"""
if update_type == 'fixed_step':
return fixed_step(
x, par_index, par_direction, profile_options, problem
)
if update_type == 'adaptive_step_order_0':
order = 0
elif update_type == 'adaptive_step_order_1':
order = 1
elif update_type == 'adaptive_step_regression':
order = np.nan
else:
raise ValueError(
f'Unsupported `update_type` {update_type} for `next_guess`.'
)
return adaptive_step(
x,
par_index,
par_direction,
profile_options,
current_profile,
problem,
global_opt,
order,
)
[docs]
def fixed_step(
x: np.ndarray,
par_index: int,
par_direction: Literal[1, -1],
options: ProfileOptions,
problem: Problem,
) -> np.ndarray:
"""Most simple method to create the next guess.
Computes the next point based on the fixed step size given by
``default_step_size`` in :class:`ProfileOptions`.
Parameters
----------
x:
The current position of the profiler, size `dim_full`.
par_index:
The index of the parameter of the current profile
par_direction:
The direction, in which the profiling is done (``1`` or ``-1``)
options:
Various options applied to the profile optimization.
problem:
The problem to be solved.
Returns
-------
x_new:
The updated parameter vector, of size `dim_full`.
"""
delta_x = np.zeros(len(x))
delta_x[par_index] = par_direction * options.default_step_size
# check whether the next point is maybe outside the bounds
# and correct it
next_x_par = x[par_index] + delta_x[par_index]
if par_direction == -1 and next_x_par < problem.lb_full[par_index]:
delta_x[par_index] = problem.lb_full[par_index] - x[par_index]
elif par_direction == 1 and next_x_par > problem.ub_full[par_index]:
delta_x[par_index] = problem.ub_full[par_index] - x[par_index]
return x + delta_x
[docs]
def adaptive_step(
x: np.ndarray,
par_index: int,
par_direction: Literal[1, -1],
options: ProfileOptions,
current_profile: ProfilerResult,
problem: Problem,
global_opt: float,
order: int = 1,
) -> np.ndarray:
"""Group of more complex methods for point proposal.
Step size is automatically computed by a line search algorithm (hence:
adaptive).
Parameters
----------
x:
The current position of the profiler, size `dim_full`.
par_index:
The index of the parameter of the current profile
par_direction:
The direction, in which the profiling is done (1 or -1)
options:
Various options applied to the profile optimization.
current_profile:
The profile which should be computed
problem:
The problem to be solved.
global_opt:
log-posterior value of the global optimum
order:
Specifies the precise algorithm for extrapolation: can be ``0`` (
just one parameter is updated), ``1`` (last two points used to
extrapolate all parameters), and ``np.nan`` (indicates that a more
complex regression should be used)
Returns
-------
x_new:
The updated parameter vector, of size `dim_full`.
"""
# restrict step proposal to minimum and maximum step size
def clip_to_minmax(step_size_proposal):
return clip(
step_size_proposal, options.min_step_size, options.max_step_size
)
# restrict step proposal to bounds
def clip_to_bounds(step_proposal):
return clip(step_proposal, problem.lb_full, problem.ub_full)
# check if this is the first step
n_profile_points = len(current_profile.fval_path)
problem.fix_parameters(par_index, x[par_index])
# Get update directions and first step size guesses
(
step_size_guess,
delta_x_dir,
reg_par,
delta_obj_value,
) = handle_profile_history(
x,
par_index,
par_direction,
n_profile_points,
global_opt,
order,
current_profile,
problem,
options,
)
# check whether we must make a minimum step anyway, since we're close to
# the next bound
min_delta_x = x[par_index] + par_direction * options.min_step_size
if par_direction == -1 and (min_delta_x < problem.lb_full[par_index]):
step_length = problem.lb_full[par_index] - x[par_index]
return x + step_length * delta_x_dir
elif par_direction == 1 and (min_delta_x > problem.ub_full[par_index]):
step_length = problem.ub_full[par_index] - x[par_index]
return x + step_length * delta_x_dir
# parameter extrapolation function
def par_extrapol(step_length):
# Do we have enough points to do a regression?
if np.isnan(order) and n_profile_points > 2:
x_step_tmp = []
# loop over parameters, extrapolate each one
for i_par in range(problem.dim_full):
if i_par == par_index:
# if we meet the profiling parameter, just increase,
# don't extrapolate
x_step_tmp.append(
x[par_index] + step_length * par_direction
)
elif i_par in problem.x_fixed_indices:
# common fixed parameter: will be ignored anyway later
x_step_tmp.append(np.nan)
else:
# extrapolate
cur_par_extrapol = np.poly1d(reg_par[i_par])
x_step_tmp.append(
cur_par_extrapol(
x[par_index] + step_length * par_direction
)
)
x_step = np.array(x_step_tmp)
else:
# if we do simple extrapolation
x_step = x + step_length * delta_x_dir
return clip_to_bounds(x_step)
# compute proposal
next_x = par_extrapol(step_size_guess)
# next start point has to be searched
# compute the next objective value which we aim for
next_obj_target = (
-np.log(1.0 - options.delta_ratio_max)
+ options.magic_factor_obj_value * delta_obj_value
+ current_profile.fval_path[-1]
)
# compute objective at the guessed point
problem.fix_parameters(par_index, next_x[par_index])
next_obj = problem.objective(problem.get_reduced_vector(next_x))
# iterate until good step size is found
return do_line_search(
next_x,
step_size_guess,
"decrease" if next_obj_target < next_obj else "increase",
par_extrapol,
next_obj,
next_obj_target,
clip_to_minmax,
clip_to_bounds,
par_index,
problem,
options,
)
def handle_profile_history(
x: np.ndarray,
par_index: int,
par_direction: int,
n_profile_points: int,
global_opt: float,
order: int,
current_profile: ProfilerResult,
problem: Problem,
options: ProfileOptions,
) -> Tuple:
"""Compute the very first step direction update guesses.
Check whether enough steps have been taken for applying regression,
computes regression or simple extrapolation.
"""
# set the update direction
delta_x_dir = np.zeros(len(x))
delta_x_dir[par_index] = par_direction
reg_par = None
# Is this the first step along this profile? If so, try a simple step
if n_profile_points == 1:
# try to use the default step size
step_size_guess = options.default_step_size
delta_obj_value = 0.0
else:
# try to reuse the previous step size
step_size_guess = np.abs(
current_profile.x_path[par_index, -1]
- current_profile.x_path[par_index, -2]
)
delta_obj_value = current_profile.fval_path[-1] - global_opt
if order == 1 or (np.isnan(order) and n_profile_points < 3):
# set the update direction (extrapolate with order 1)
last_delta_x = (
current_profile.x_path[:, -1] - current_profile.x_path[:, -2]
)
delta_x_dir = last_delta_x / step_size_guess
elif np.isnan(order):
# compute the regression polynomial for parameter extrapolation
reg_par = get_reg_polynomial(
n_profile_points, par_index, current_profile, problem, options
)
return step_size_guess, delta_x_dir, reg_par, delta_obj_value
def get_reg_polynomial(
n_profile_points: int,
par_index: int,
current_profile: ProfilerResult,
problem: Problem,
options: ProfileOptions,
) -> List[float]:
"""Compute the regression polynomial.
Used to step proposal extrapolation from the last profile points
"""
# determine interpolation order
reg_max_order = np.floor(n_profile_points / 2)
reg_order = np.min([reg_max_order, options.reg_order])
reg_points = np.min([n_profile_points, options.reg_points])
# set up matrix of regression parameters
reg_par = []
for i_par in range(problem.dim_full):
if i_par in problem.x_fixed_indices:
# if we meet the current profiling parameter or a fixed parameter,
# there is nothing to do, so pass a np.nan
reg_par.append(np.nan)
else:
# Do polynomial interpolation of profile path
# Determine rank of polynomial interpolation
regression_tmp = np.polyfit(
current_profile.x_path[par_index, -1:-reg_points:-1],
current_profile.x_path[i_par, -1:-reg_points:-1],
reg_order,
full=True,
)
# Decrease rank if interpolation problem is ill-conditioned
if regression_tmp[2] < reg_order:
reg_order = regression_tmp[2]
regression_tmp = np.polyfit(
current_profile.x_path[par_index, -reg_points:-1],
current_profile.x_path[i_par, -reg_points:-1],
int(reg_order),
full=True,
)
# add to regression parameters
reg_par.append(regression_tmp[0])
return reg_par
def do_line_search(
next_x: np.ndarray,
step_size_guess: float,
direction: Literal['increase', 'decrease'],
par_extrapol: Callable,
next_obj: float,
next_obj_target: float,
clip_to_minmax: Callable,
clip_to_bounds: Callable,
par_index: int,
problem: Problem,
options: ProfileOptions,
) -> np.ndarray:
"""Perform the line search.
Based on the objective function we want to reach, based on the current
position in parameter space and on the first guess for the proposal.
"""
# Was the initial step too big or too small?
if direction == 'increase':
adapt_factor = options.step_size_factor
else:
adapt_factor = 1 / options.step_size_factor
# Loop until correct step size was found
stop_search = False
while not stop_search:
# Adapt step size of guess
last_x = copy.copy(next_x)
step_size_guess = clip_to_minmax(step_size_guess * adapt_factor)
next_x = clip_to_bounds(par_extrapol(step_size_guess))
# Check if we hit the bounds
hit_bounds = (
direction == 'decrease'
and step_size_guess == options.min_step_size
) or (
direction == 'increase'
and step_size_guess == options.max_step_size
)
if hit_bounds:
return next_x
# compute new objective value
problem.fix_parameters(par_index, next_x[par_index])
last_obj = copy.copy(next_obj)
next_obj = problem.objective(problem.get_reduced_vector(next_x))
# check for root crossing and compute correct step size in case
if (direction == 'decrease' and next_obj_target >= next_obj) or (
direction == 'increase' and next_obj_target <= next_obj
):
return next_x_interpolate(
next_obj, last_obj, next_x, last_x, next_obj_target
)
def next_x_interpolate(
next_obj: float,
last_obj: float,
next_x: np.ndarray,
last_x: np.ndarray,
next_obj_target: float,
) -> np.ndarray:
"""Interpolate between the last two steps."""
delta_obj = np.abs(next_obj - last_obj)
add_x = np.abs(last_obj - next_obj_target) * (next_x - last_x) / delta_obj
# fix final guess and return
return last_x + add_x
def clip(
vector_guess: Union[float, np.ndarray],
lower: Union[float, np.ndarray],
upper: Union[float, np.ndarray],
) -> Union[float, np.ndarray]:
"""Restrict a scalar or a vector to given bounds.
``vector_guess`` is modified in-place if it is an array.
"""
if isinstance(vector_guess, float):
return np.max([np.min([vector_guess, upper]), lower])
for i_par, i_guess in enumerate(vector_guess):
vector_guess[i_par] = np.max(
[np.min([i_guess, upper[i_par]]), lower[i_par]]
)
return vector_guess