import warnings
from typing import Dict, List
import numpy as np
from scipy.optimize import minimize
from ...C import (
CURRENT_SIMULATION,
DATAPOINTS,
EXPDATA_MASK,
MAX_DATAPOINT,
MIN_DATAPOINT,
MIN_DIFF_FACTOR,
MIN_SIM_RANGE,
N_SPLINE_PARS,
NOISE_PARAMETERS,
NUM_DATAPOINTS,
SCIPY_FUN,
SCIPY_SUCCESS,
SCIPY_X,
InnerParameterType,
)
from ..solver import InnerSolver
from .parameter import SplineInnerParameter
from .problem import SplineInnerProblem
try:
from amici.parameter_mapping import ParameterMapping
except ImportError:
pass
[docs]class SplineInnerSolver(InnerSolver):
"""Solver of the inner subproblem of spline approximation for nonlinear-monotone data.
Options
-------
min_diff_factor:
Determines the minimum difference between two consecutive spline
as ``min_diff_factor * (measurement_range) / n_spline_pars``.
Default is 1/2.
"""
[docs] def __init__(self, options: Dict = None):
self.options = {
**self.get_default_options(),
**(options or {}),
}
self.validate_options()
[docs] def validate_options(self):
"""Validate the current options dictionary."""
if type(self.options[MIN_DIFF_FACTOR]) is not float:
raise TypeError(f"{MIN_DIFF_FACTOR} must be of type float.")
elif self.options[MIN_DIFF_FACTOR] < 0:
raise ValueError(f"{MIN_DIFF_FACTOR} must be greater than zero.")
for key in self.options:
if key not in self.get_default_options():
raise ValueError(f"Unknown SplineInnerSolver option {key}.")
[docs] def solve(
self,
problem: SplineInnerProblem,
sim: List[np.ndarray],
sigma: List[np.ndarray],
) -> list:
"""Get results for every group (inner optimization problem).
Parameters
----------
problem:
InnerProblem from pyPESTO hierarchical.
sim:
Simulations from AMICI.
sigma:
List of sigmas from AMICI.
Returns
-------
List of optimization results of the inner subproblem.
"""
inner_optimization_results = []
for group in problem.get_groups_for_xs(InnerParameterType.SPLINE):
group_dict = problem.groups[group]
group_dict[NOISE_PARAMETERS] = extract_expdata_using_mask(
expdata=sigma, mask=group_dict[EXPDATA_MASK]
)
group_dict[CURRENT_SIMULATION] = extract_expdata_using_mask(
expdata=sim, mask=group_dict[EXPDATA_MASK]
)
inner_optimization_results_per_group = self._optimize_spline(
inner_parameters=problem.get_free_xs_for_group(group),
group_dict=group_dict,
)
inner_optimization_results.append(
inner_optimization_results_per_group
)
save_inner_parameters_to_inner_problem(
inner_parameters=problem.get_xs_for_group(group),
s=inner_optimization_results_per_group[SCIPY_X],
)
return inner_optimization_results
[docs] @staticmethod
def calculate_obj_function(x_inner_opt: list):
"""Calculate the inner objective function value.
Calculates the inner objective function value from a list of inner
optimization results returned from `_optimize_spline`.
Parameters
----------
x_inner_opt:
List of optimization results
Returns
-------
Inner objective function value.
"""
if False in (
x_inner_opt[idx][SCIPY_SUCCESS] for idx in range(len(x_inner_opt))
):
obj = np.inf
warnings.warn("Inner optimization failed.")
else:
obj = np.sum(
[
x_inner_opt[idx][SCIPY_FUN]
for idx in range(len(x_inner_opt))
]
)
return obj
[docs] def calculate_gradients(
self,
problem: SplineInnerProblem,
x_inner_opt: List[Dict],
sim: List[np.ndarray],
sigma: List[np.ndarray],
sy: List[np.ndarray],
parameter_mapping: ParameterMapping,
par_opt_ids: List,
par_sim_ids: List,
snllh: Dict,
):
"""Calculate gradients of the inner objective function.
Calculates gradients of the objective function with respect to outer
(dynamical) parameters.
Parameters
----------
problem:
Optimal scaling inner problem.
x_inner_opt:
List of optimization results of the inner subproblem.
sim:
Model simulations.
sigma:
Model noise parameters.
sy:
Model sensitivities.
parameter_mapping:
Mapping of optimization to simulation parameters.
par_opt_ids:
Ids of outer otimization parameters.
par_sim_ids:
Ids of outer simulation parameters, includes fixed parameters.
snllh:
Empty dictionary with optimization parameters as keys.
Returns
-------
Filled in snllh dictionary with objective function gradients.
"""
already_calculated = set()
for condition_map_sim_var in [
cond_par_map.map_sim_var for cond_par_map in parameter_mapping
]:
for par_sim, par_opt in condition_map_sim_var.items():
if (
not isinstance(par_opt, str)
or par_opt in already_calculated
):
continue
# Current fix for scaling/offset parameters in models.
elif par_sim.startswith('observableParameter'):
continue
else:
already_calculated.add(par_opt)
par_sim_idx = par_sim_ids.index(par_sim)
par_opt_idx = par_opt_ids.index(par_opt)
grad = 0.0
sy_for_outer_parameter = [
sy_cond[:, par_sim_idx, :] for sy_cond in sy
]
for group_idx, group in enumerate(
problem.get_groups_for_xs(InnerParameterType.SPLINE)
):
# Get the reformulated spline parameters
s = np.asarray(x_inner_opt[group_idx][SCIPY_X])
group_dict = problem.groups[group]
measurements = group_dict[DATAPOINTS]
sigma = group_dict[NOISE_PARAMETERS]
sim_all = group_dict[CURRENT_SIMULATION]
N = group_dict[N_SPLINE_PARS]
K = group_dict[NUM_DATAPOINTS]
sy_all = extract_expdata_using_mask(
expdata=sy_for_outer_parameter,
mask=group_dict[EXPDATA_MASK],
)
delta_c, c, n = self._rescale_spline_bases(
sim_all=sim_all, N=N, K=K
)
delta_c_dot, c_dot = calculate_spline_bases_gradient(
sim_all=sim_all, sy_all=sy_all, N=N
)
C = np.diag(-np.ones(N))
# For the reformulated problem, mu can be calculated
# as the inner gradient at the optimal point s.
mu = calculate_inner_gradient(
s=s,
sim_all=sim_all,
measurements=measurements,
sigma=sigma,
N=N,
delta_c=delta_c,
c=c,
n=n,
)
min_meas = group_dict[MIN_DATAPOINT]
max_meas = group_dict[MAX_DATAPOINT]
min_diff = self._get_minimal_difference(
measurement_range=max_meas - min_meas,
N=N,
min_diff_factor=self.options[MIN_DIFF_FACTOR],
)
# Calculate df_ds term only if mu is not all 0
if np.any(mu):
s_dot = calculate_ds_dtheta(
sim_all=sim_all,
sy_all=sy_all,
measurements=measurements,
sigma=sigma,
s=s,
C=C,
mu=mu,
N=N,
delta_c=delta_c,
delta_c_dot=delta_c_dot,
c=c,
c_dot=c_dot,
n=n,
min_diff=min_diff,
)
df_ds = mu
grad += df_ds.dot(s_dot)
# Let's calculate the df_dyk term now:
df_dyk = calculate_df_dyk(
sim_all=sim_all,
sy_all=sy_all,
measurements=measurements,
sigma=sigma,
s=s,
N=N,
delta_c=delta_c,
delta_c_dot=delta_c_dot,
c=c,
c_dot=c_dot,
n=n,
)
grad += df_dyk
snllh[par_opt_idx] = grad
return snllh
[docs] @staticmethod
def get_default_options() -> Dict:
"""Return default options for solving the inner problem."""
options = {
MIN_DIFF_FACTOR: 1 / 2,
}
return options
def _optimize_spline(
self,
inner_parameters: List[SplineInnerParameter],
group_dict: Dict,
):
"""Run optimization for the inner problem.
Parameters
----------
inner_parameters:
The spline inner parameters.
group_dict:
The group dictionary.
"""
group_measurements = group_dict[DATAPOINTS]
group_noise_parameters = group_dict[NOISE_PARAMETERS]
current_group_simulation = group_dict[CURRENT_SIMULATION]
n_datapoints = group_dict[NUM_DATAPOINTS]
n_spline_pars = group_dict[N_SPLINE_PARS]
(
distance_between_bases,
spline_bases,
intervals_per_sim,
) = self._rescale_spline_bases(
sim_all=current_group_simulation, N=n_spline_pars, K=n_datapoints
)
min_meas = group_dict[MIN_DATAPOINT]
max_meas = group_dict[MAX_DATAPOINT]
min_diff = self._get_minimal_difference(
measurement_range=max_meas - min_meas,
N=n_spline_pars,
min_diff_factor=self.options[MIN_DIFF_FACTOR],
)
inner_options = self._get_inner_optimization_options(
inner_parameters=inner_parameters,
N=n_spline_pars,
min_meas=min_meas,
max_meas=max_meas,
min_diff=min_diff,
)
def objective_function_wrapper(x):
return calculate_objective_function(
s=x,
sim_all=current_group_simulation,
measurements=group_measurements,
sigma=group_noise_parameters,
N=n_spline_pars,
delta_c=distance_between_bases,
c=spline_bases,
n=intervals_per_sim,
)
def inner_gradient_wrapper(x):
return calculate_inner_gradient(
s=x,
sim_all=current_group_simulation,
measurements=group_measurements,
sigma=group_noise_parameters,
N=n_spline_pars,
delta_c=distance_between_bases,
c=spline_bases,
n=intervals_per_sim,
)
results = minimize(
objective_function_wrapper,
jac=inner_gradient_wrapper,
**inner_options,
)
return results
def _rescale_spline_bases(self, sim_all: np.ndarray, N: int, K: int):
"""Rescale the spline bases.
Before the optimization of the spline parameters, we have to fix the
spline bases to some values. We choose to scale them to the current
simulation. In case of simulations that are very close to each other,
we choose to scale closely around the average value of the simulations,
to avoid numerical problems (as we often divide by delta_c).
Parameters
----------
sim_all:
The current simulation.
N:
The number of spline parameters.
K:
The number of simulations.
Returns
-------
distance_between_bases:
The distance between the spline bases.
spline_bases:
The rescaled spline bases.
intervals_per_sim:
List of indices of intervals each simulation belongs to.
"""
min_idx = np.argmin(sim_all)
max_idx = np.argmax(sim_all)
min_all = sim_all[min_idx]
max_all = sim_all[max_idx]
n = np.ones(K)
# In case the simulation are very close to each other
# or even collapse into a single point.
if max_all - min_all < MIN_SIM_RANGE:
average_value = (max_all + min_all) / 2
delta_c = MIN_SIM_RANGE / (N - 1)
if average_value < (MIN_SIM_RANGE / 2):
c = np.linspace(0, MIN_SIM_RANGE, N)
else:
c = np.linspace(
average_value - (MIN_SIM_RANGE / 2),
average_value + (MIN_SIM_RANGE / 2),
N,
)
# Set the n(k) values for the simulations
for i in range(len(sim_all)):
n[i] = np.ceil((sim_all[i] - c[0]) / delta_c) + 1
if n[i] > N:
n[i] = N
warnings.warn(
"Interval for a simulation has been set to a larger value than the number of spline parameters."
)
# In case the simulations are sufficiently apart:
else:
delta_c = (max_all - min_all) / (N - 1)
c = np.linspace(min_all, max_all, N)
for i in range(len(sim_all)):
if i == max_idx:
n[i] = N
elif i == min_idx:
n[i] = 1
else:
n[i] = np.ceil((sim_all[i] - c[0]) / delta_c) + 1
if n[i] > N:
n[i] = N
n = n.astype(int)
return delta_c, c, n
def _get_minimal_difference(
self,
measurement_range: float,
N: int,
min_diff_factor: float,
):
"""Return minimal parameter difference for spline parameters."""
return min_diff_factor * measurement_range / N
def _get_inner_optimization_options(
self,
inner_parameters: List[SplineInnerParameter],
N: int,
min_meas: float,
max_meas: float,
min_diff: float,
) -> Dict:
"""Return default options for scipy optimizer.
Returns inner subproblem optimization options including startpoint
and optimization bounds or constraints, dependent on solver method.
Parameters
----------
inner_parameters:
Inner parameters of the spline group.
N:
Number of spline parameters.
min_meas:
Minimal measurement value.
max_meas:
Maximal measurement value.
min_diff:
Minimal difference between spline parameters.
"""
range_all = max_meas - min_meas
constraint_min_diff = np.full(N, min_diff)
constraint_min_diff[0] = 0
last_opt_values = np.asarray([x.value for x in inner_parameters])
if (last_opt_values > 0).any():
x0 = last_opt_values
# In case this is the first inner optimization, initialize the
# spline parameters to a linear function with a symmetric 60%
# larger range than the measurement range.
else:
x0 = np.full(
N,
(
max_meas
+ 0.3 * range_all
- np.max([min_meas - 0.3 * range_all, 0])
)
/ (N - 1),
)
x0[0] = np.max([min_meas - 0.3 * range_all, 0])
from scipy.optimize import Bounds
inner_options = {
"x0": x0,
"method": "L-BFGS-B",
"options": {"ftol": 1e-10, "disp": None},
"bounds": Bounds(lb=constraint_min_diff),
}
return inner_options
def calculate_objective_function(
s: np.ndarray,
sim_all: np.ndarray,
measurements: np.ndarray,
sigma: np.ndarray,
N: int,
delta_c: float,
c: np.ndarray,
n: np.ndarray,
):
"""Objective function for reformulated inner spline problem."""
obj = 0
for y_k, z_k, sigma_k, n_k in zip(sim_all, measurements, sigma, n):
i = n_k - 1
sum_s = 0
sum_s = np.sum(s[:i])
if i == 0:
obj += (1 / sigma_k**2) * (z_k - s[i]) ** 2
elif i == N:
obj += (1 / sigma_k**2) * (z_k - sum_s) ** 2
else:
obj += (1 / sigma_k**2) * (
z_k - (y_k - c[i - 1]) * s[i] / delta_c - sum_s
) ** 2
obj = obj / 2
return obj
def get_spline_mapped_simulations(
s: np.ndarray,
sim_all: np.ndarray,
N: int,
delta_c: float,
c: np.ndarray,
n: np.ndarray,
):
"""Return model simulations mapped using the approximation spline."""
mapped_simulations = np.zeros(len(sim_all))
xi = np.zeros(len(s))
for i in range(len(s)):
xi[i] = np.sum(s[: i + 1])
for y_k, n_k, index in zip(sim_all, n, range(len(sim_all))):
interval_index = n_k - 1
if interval_index == 0 or interval_index == N:
mapped_simulations[index] = xi[interval_index]
else:
mapped_simulations[index] = (y_k - c[interval_index - 1]) * (
xi[interval_index] - xi[interval_index - 1]
) / delta_c + xi[interval_index - 1]
return mapped_simulations
def calculate_inner_gradient(
s: np.ndarray,
sim_all: np.ndarray,
measurements: np.ndarray,
sigma: np.ndarray,
N: int,
delta_c: float,
c: np.ndarray,
n: np.ndarray,
):
"""Gradient of the objective function for the reformulated inner spline problem."""
gradient = np.zeros(N)
for y_k, z_k, sigma_k, n_k in zip(sim_all, measurements, sigma, n):
weight_k = 1 / sigma_k**2
sum_s = 0
i = n_k - 1 # just the iterator to go over the Jacobian array
sum_s = np.sum(s[:i])
if i == 0:
gradient[i] += weight_k * (s[i] - z_k)
elif i == N:
for j in range(i):
gradient[j] += weight_k * (sum_s - z_k)
else:
gradient[i] += (
weight_k
* ((y_k - c[i - 1]) * s[i] / delta_c + sum_s - z_k)
* (y_k - c[i - 1])
/ delta_c
)
gradient[:i] += np.full(
i,
weight_k * ((y_k - c[i - 1]) * s[i] / delta_c + sum_s - z_k),
)
return gradient
def calculate_inner_hessian(
s: np.ndarray,
sim_all: np.ndarray,
sigma: np.ndarray,
N: int,
delta_c: float,
c: np.ndarray,
n: np.ndarray,
):
"""Calculate the hessian of the objective function for the reformulated inner problem."""
hessian = np.zeros((N, N))
for y_k, sigma_k, n_k in zip(sim_all, sigma, n):
sum_s = 0
i = n_k - 1 # just the iterator to go over the Hessian matrix
for j in range(i):
sum_s += s[j]
hessian[i][i] += (1 / sigma_k**2) * ((y_k - c[i - 1]) / delta_c) ** 2
for j in range(i):
hessian[i][j] += (1 / sigma_k**2) * ((y_k - c[i - 1]) / delta_c)
hessian[j][i] += (1 / sigma_k**2) * ((y_k - c[i - 1]) / delta_c)
for h in range(i):
hessian[j][h] += 1 / sigma_k**2
return hessian
def calculate_ds_dtheta(
sim_all: np.ndarray,
sy_all: np.ndarray,
measurements: np.ndarray,
sigma: np.ndarray,
s: np.ndarray,
C: np.ndarray,
mu: np.ndarray,
N: int,
delta_c: float,
delta_c_dot: float,
c: np.ndarray,
c_dot: np.ndarray,
n: np.ndarray,
min_diff: float,
):
"""Calculate derivatives of reformulated spline parameters with respect to outer parameter.
Calculates the derivative of reformulated spline parameters s with respect to the
dynamical parameter theta. Firstly, we calculate the derivative of the
first two equations of the necessary optimality conditions of the
optimization problem with inequality constraints. Then we solve the linear
system to obtain the derivatives.
"""
Jacobian_derivative = np.zeros((N, N))
rhs = np.zeros(2 * N)
for y_k, z_k, y_dot_k, sigma_k, n_k in zip(
sim_all, measurements, sy_all, sigma, n
):
i = n_k - 1 # just the iterator to go over the Jacobian matrix
weight_k = 1 / sigma_k**2
sum_s = 0
sum_s = np.sum(s[:i])
# calculate the Jacobian derivative:
if i == 0:
Jacobian_derivative[i][i] += weight_k
elif i == N:
Jacobian_derivative = Jacobian_derivative + np.full(
(N, N), weight_k
)
else:
Jacobian_derivative[i][i] += (
weight_k * (y_k - c[i - 1]) ** 2 / delta_c**2
)
rhs[i] += (
weight_k
* (2 * (y_k - c[i - 1]) / delta_c * s[i] + sum_s - z_k)
* (
(y_dot_k - c_dot[i - 1]) * delta_c
- (y_k - c[i - 1]) * delta_c_dot
)
/ delta_c**2
)
if i > 0:
Jacobian_derivative[i, :i] += np.full(
i, weight_k * (y_k - c[i - 1]) / delta_c
)
Jacobian_derivative[:i, i] += np.full(
i, weight_k * (y_k - c[i - 1]) / delta_c
)
rhs[:i] += np.full(
i,
weight_k
* (
(y_dot_k - c_dot[i - 1]) * delta_c
- (y_k - c[i - 1]) * delta_c_dot
)
* s[i]
/ delta_c**2,
)
Jacobian_derivative[:i, :i] += np.full((i, i), weight_k)
from scipy import linalg
constraint_min_diff = np.diag(np.full(N, min_diff))
constraint_min_diff[0][0] = 0
lhs = np.block(
[
[Jacobian_derivative, C],
[-np.diag(mu), constraint_min_diff - np.diag(s)],
]
)
ds_dtheta = linalg.lstsq(lhs, rhs, lapack_driver="gelsy")
return ds_dtheta[0][:N]
def calculate_df_dyk(
sim_all: np.ndarray,
sy_all: np.ndarray,
measurements: np.ndarray,
sigma: np.ndarray,
s: np.ndarray,
N: int,
delta_c: float,
delta_c_dot: float,
c: np.ndarray,
c_dot: np.ndarray,
n: np.ndarray,
):
"""Calculate the derivative of the objective function for one group with respect to the simulations."""
df_dyk = 0
for y_k, z_k, y_dot_k, sigma_k, n_k in zip(
sim_all, measurements, sy_all, sigma, n
):
i = n_k - 1
sum_s = np.sum(s[:i])
if i > 0 and i < N:
df_dyk += (
(1 / sigma_k**2)
* ((y_k - c[i - 1]) * s[i] / delta_c + sum_s - z_k)
* s[i]
* (
(y_dot_k - c_dot[i - 1]) * delta_c
- (y_k - c[i - 1]) * delta_c_dot
)
/ delta_c**2
)
return df_dyk
def calculate_spline_bases_gradient(
sim_all: np.ndarray, sy_all: np.ndarray, N: int
):
"""Calculate gradient of the rescaled spline bases."""
min_idx = np.argmin(sim_all)
max_idx = np.argmax(sim_all)
if sim_all[max_idx] - sim_all[min_idx] < MIN_SIM_RANGE:
delta_c_dot = 0
c_dot = np.full(N, (sy_all[max_idx] - sy_all[min_idx]) / 2)
else:
delta_c_dot = (sy_all[max_idx] - sy_all[min_idx]) / (N - 1)
c_dot = np.linspace(sy_all[min_idx], sy_all[max_idx], N)
return delta_c_dot, c_dot
def extract_expdata_using_mask(
expdata: List[np.ndarray], mask: List[np.ndarray]
):
"""Extract data from expdata list of arrays for the given mask."""
return np.concatenate(
[
expdata[condition_index][mask[condition_index]]
for condition_index in range(len(mask))
]
)
def save_inner_parameters_to_inner_problem(
inner_parameters,
s,
) -> None:
"""Save inner parameter values to the inner subproblem.
Calculates the non-reformulated inner spline parameters from
the reformulated inner spline parameters and saves them to
the inner subproblem.
Parameters
----------
inner_parameters : list
List of inner parameters.
s : np.ndarray
Reformulated inner spline parameters.
"""
xi = np.zeros(len(s))
for i in range(len(s)):
xi[i] = np.sum(s[: i + 1])
for idx in range(len(inner_parameters)):
inner_parameters[idx].value = xi[idx]
def get_monotonicity_measure(measurement, simulation):
"""Get monotonicity measure by calculating inversions.
Calculate the number of inversions in the simulation data
with respect to the measurement data.
Parameters
----------
measurement : np.ndarray
Measurement data.
simulation : np.ndarray
Simulation data.
Returns
-------
inversions : int
Number of inversions.
"""
if len(measurement) != len(simulation):
raise ValueError(
"Measurement and simulation data must have the same length."
)
ordered_simulation = [
x
for _, x in sorted(
zip(measurement, simulation), key=lambda pair: pair[0]
)
]
ordered_measurement = sorted(simulation)
inversions = 0
for i in range(len(ordered_simulation)):
for j in range(i + 1, len(ordered_simulation)):
if ordered_simulation[i] > ordered_simulation[j]:
inversions += 1
elif (
ordered_simulation[i] == ordered_simulation[j]
and ordered_measurement[i] != ordered_measurement[j]
):
inversions += 1
return inversions