"""Inner optimization problem in hierarchical optimization."""
import copy
import logging
from typing import Union
import numpy as np
import pandas as pd
from .base_parameter import InnerParameter
try:
import amici
import petab
from petab.C import OBSERVABLE_ID, TIME
except ImportError:
pass
logger = logging.getLogger(__name__)
[docs]
class InnerProblem:
"""
Inner optimization problem in hierarchical optimization.
Attributes
----------
xs:
Mapping of (inner) parameter ID to ``InnerParameters``.
data:
Measurement data. One matrix (`num_timepoints` x `num_observables`)
per simulation condition. Missing observations as NaN.
"""
[docs]
def __init__(self, xs: list[InnerParameter], data: list[np.ndarray]):
self.xs: dict[str, InnerParameter] = {
x.inner_parameter_id: x for x in xs
}
self.data = copy.deepcopy(data)
# create the joint mask of all inner problem parameters
self.data_mask = [
np.zeros_like(cond_data, dtype=bool) for cond_data in data
]
for x in xs:
for condition_ix, cond_mask in enumerate(self.data_mask):
cond_mask[x.ixs[condition_ix]] = True
# mask the data: a inner problem is aware of only the data it uses
for i in range(len(self.data)):
self.data[i][~self.data_mask[i]] = np.nan
logger.debug(f"Created InnerProblem with ids {self.get_x_ids()}")
if self.is_empty():
raise ValueError(
"There are no parameters in the inner problem of hierarchical "
"optimization."
)
[docs]
@staticmethod
def from_petab_amici(
petab_problem: "petab.Problem",
amici_model: "amici.Model",
edatas: list["amici.ExpData"],
) -> "InnerProblem":
"""Create an InnerProblem from a PEtab problem and AMICI objects."""
[docs]
def get_x_ids(self) -> list[str]:
"""Get IDs of inner parameters."""
return list(self.xs.keys())
[docs]
def get_interpretable_x_ids(self) -> list[str]:
"""Get IDs of interpretable inner parameters.
Interpretable parameters need to be easily interpretable by the user.
Examples are scaling factors, offsets, or noise parameters. An example
of non-interpretable inner parameters is the spline heights of spline
approximation for semiquantitative data. It is challenging to interpret
the meaning of these parameters based solely on their value.
"""
return list(self.xs.keys())
[docs]
def get_xs_for_type(
self, inner_parameter_type: str
) -> list[InnerParameter]:
"""Get inner parameters of the given type."""
return [
x
for x in self.xs.values()
if x.inner_parameter_type == inner_parameter_type
]
[docs]
def get_dummy_values(self, scaled: bool) -> dict[str, float]:
"""
Get dummy parameter values.
Get parameter values to be used for simulation before their optimal
values are computed.
Parameters
----------
scaled:
Whether the parameters should be returned on parameter scale (``True``)
or on linear scale (``False``).
"""
return {
x.inner_parameter_id: (
scale_value(x.dummy_value, x.scale)
if scaled
else x.dummy_value
)
for x in self.xs.values()
}
[docs]
def get_for_id(self, inner_parameter_id: str) -> InnerParameter:
"""Get InnerParameter for the given parameter ID."""
try:
return self.xs[inner_parameter_id]
except KeyError:
raise KeyError(f"Cannot find parameter with id {id}.") from None
[docs]
def is_empty(self) -> bool:
"""Check for emptiness.
Returns
-------
``True`` if there aren't any parameters associated with this problem,
``False`` otherwise.
"""
return len(self.xs) == 0
[docs]
def get_bounds(self) -> tuple[np.ndarray, np.ndarray]:
"""Get bounds of inner parameters."""
lb = np.asarray([x.lb for x in self.xs.values()])
ub = np.asarray([x.ub for x in self.xs.values()])
return lb, ub
[docs]
def get_interpretable_x_bounds(self) -> tuple[np.ndarray, np.ndarray]:
"""Get bounds of interpretable inner parameters."""
interpretable_x_ids = self.get_interpretable_x_ids()
lb = np.asarray(
[
x.lb
for x in self.xs.values()
if x.inner_parameter_id in interpretable_x_ids
]
)
ub = np.asarray(
[
x.ub
for x in self.xs.values()
if x.inner_parameter_id in interpretable_x_ids
]
)
return lb, ub
class AmiciInnerProblem(InnerProblem):
"""
Inner optimization problem in hierarchical optimization.
For use with AMICI objects.
Attributes
----------
edatas:
AMICI ``ExpDataView``s for each simulation condition.
"""
def __init__(self, edatas: list[amici.ExpData], **kwargs):
super().__init__(**kwargs)
def check_edatas(self, edatas: list[amici.ExpData]) -> bool:
"""Check for consistency in data.
Currently only checks for the actual data values. e.g., timepoints are
not compared.
Parameters
----------
edatas:
A data set. Will be checked against the data set provided to the
constructor.
Returns
-------
Whether the data sets are consistent.
"""
# TODO replace but edata1==edata2 once this makes it into amici
# https://github.com/AMICI-dev/AMICI/issues/1880
data = [
amici.numpy.ExpDataView(edata)["observedData"] for edata in edatas
]
if len(self.data) != len(data):
return False
for data0, data1 in zip(self.data, data):
if not np.array_equal(data0, data1, equal_nan=True):
return False
return True
def scale_value_dict(
dct: dict[str, float], problem: InnerProblem
) -> dict[str, float]:
"""Scale a value dictionary."""
scaled_dct = {}
for key, val in dct.items():
x = problem.get_for_id(key)
scaled_dct[key] = scale_value(val, x.scale)
return scaled_dct
def scale_value(
val: Union[float, np.array], scale: str
) -> Union[float, np.array]:
"""Scale a single value."""
if scale == "lin":
return val
if scale == "log":
return np.log(val)
if scale == "log10":
return np.log10(val)
raise ValueError(f"Scale {scale} not recognized.")
def ix_matrices_from_arrays(
ixs: dict[str, list[tuple[int, int, int]]], edatas: list[np.array]
) -> dict[str, list[np.array]]:
"""
Convert mapping of parameters to measurements to matrix form.
Returns
-------
A dictionary mapping parameter ID to a list of Boolean matrices, one per
simulation condition. Therein, ``True`` indicates that the respective
parameter is used for the model output at the respective timepoint,
observable and condition index.
"""
ix_matrices = {
id: [np.zeros_like(edata, dtype=bool) for edata in edatas]
for id in ixs
}
for id, arr in ixs.items():
matrices = ix_matrices[id]
for condition_ix, time_ix, observable_ix in arr:
matrices[condition_ix][time_ix, observable_ix] = True
return ix_matrices
def _get_timepoints_with_replicates(
measurement_df: pd.DataFrame,
) -> list[float]:
"""
Get list of timepoints including replicate measurements.
:param measurement_df:
PEtab measurement table subset for a single condition.
:return:
Sorted list of timepoints, including multiple timepoints accounting
for replicate measurements.
"""
# create sorted list of all timepoints for which measurements exist
timepoints = sorted(measurement_df[TIME].unique().astype(float))
# find replicate numbers of time points
timepoints_w_reps = []
for time in timepoints:
# subselect for time
df_for_time = measurement_df[measurement_df.time == time]
# rep number is maximum over rep numbers for observables
n_reps = max(df_for_time.groupby([OBSERVABLE_ID, TIME]).size())
# append time point n_rep times
timepoints_w_reps.extend([time] * n_reps)
return timepoints_w_reps