"""Inner optimization problem in hierarchical optimization."""
import logging
import numpy as np
import pandas as pd
from ...C import (
MEASUREMENT_TYPE,
PARAMETER_TYPE,
SEMIQUANTITATIVE,
InnerParameterType,
)
from ..base_parameter import InnerParameter
from ..base_problem import (
AmiciInnerProblem,
_get_timepoints_with_replicates,
ix_matrices_from_arrays,
)
try:
import amici
import petab
from petab.C import (
ESTIMATE,
LOWER_BOUND,
NOISE_PARAMETERS,
OBSERVABLE_ID,
OBSERVABLE_PARAMETERS,
PARAMETER_ID,
PARAMETER_SCALE,
TIME,
UPPER_BOUND,
)
except ImportError:
pass
logger = logging.getLogger(__name__)
[docs]
class RelativeInnerProblem(AmiciInnerProblem):
r"""Inner optimization problem for relative data with scaling/offset.
Attributes
----------
xs:
Mapping of (inner) parameter ID to ``InnerParameters``.
data:
Measurement data. One matrix (`num_timepoints` x `num_observables`)
per simulation condition. Missing observations as NaN.
edatas:
AMICI ``ExpData``\s for each simulation condition.
"""
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
[docs]
@staticmethod
def from_petab_amici(
petab_problem: "petab.Problem",
amici_model: "amici.Model",
edatas: list["amici.ExpData"],
) -> "RelativeInnerProblem":
"""Create an InnerProblem from a PEtab problem and AMICI objects."""
return inner_problem_from_petab_problem(
petab_problem, amici_model, edatas
)
[docs]
def check_edatas(self, edatas: list[amici.ExpData]) -> bool:
"""Check for consistency in data.
Currently only checks for the actual data values. e.g., timepoints are
not compared.
Parameters
----------
edatas:
A data set. Will be checked against the data set provided to the
constructor.
Returns
-------
Whether the data sets are consistent.
"""
# TODO replace but edata1==edata2 once this makes it into amici
# https://github.com/AMICI-dev/AMICI/issues/1880
data = [
amici.numpy.ExpDataView(edata)["observedData"] for edata in edatas
]
if len(self.data) != len(data):
return False
for data0, data1 in zip(self.data, data):
if not np.array_equal(data0, data1, equal_nan=True):
return False
return True
def inner_problem_from_petab_problem(
petab_problem: "petab.Problem",
amici_model: "amici.Model",
edatas: list["amici.ExpData"],
) -> AmiciInnerProblem:
"""
Create inner problem from PEtab problem.
Hierarchical optimization is a pypesto-specific PEtab extension.
"""
import amici
# inner parameters
inner_parameters = inner_parameters_from_parameter_df(
petab_problem.parameter_df, petab_problem.measurement_df
)
x_ids = [x.inner_parameter_id for x in inner_parameters]
# used indices for all measurement specific parameters
ixs = ixs_for_measurement_specific_parameters(
petab_problem, amici_model, x_ids
)
# transform experimental data
data = [amici.numpy.ExpDataView(edata)["observedData"] for edata in edatas]
# matrixify
ix_matrices = ix_matrices_from_arrays(ixs, data)
# assign matrices
for par in inner_parameters:
par.ixs = ix_matrices[par.inner_parameter_id]
par_group_types = {
tuple(obs_pars.split(";")): (
petab_problem.parameter_df.loc[obs_par, PARAMETER_TYPE]
for obs_par in obs_pars.split(";")
)
for (obs_id, obs_pars), _ in petab_problem.measurement_df.groupby(
[petab.OBSERVABLE_ID, petab.OBSERVABLE_PARAMETERS], dropna=True
)
if ";" in obs_pars # prefilter for at least 2 observable parameters
}
coupled_pars = {
group
for group, types in par_group_types.items()
if (
(InnerParameterType.SCALING in types)
and (InnerParameterType.OFFSET in types)
)
}
# Check each group is of length 2
for group in coupled_pars:
if len(group) != 2:
raise ValueError(
f"Expected exactly 2 parameters in group {group}: a scaling "
f"and an offset parameter."
)
id_to_par = {par.inner_parameter_id: par for par in inner_parameters}
# assign coupling
for par in inner_parameters:
if par.inner_parameter_type not in [
InnerParameterType.SCALING,
InnerParameterType.OFFSET,
]:
continue
for group in coupled_pars:
if par.inner_parameter_id in group:
coupled_parameter_id = group[
group.index(par.inner_parameter_id) - 1
]
par.coupled = id_to_par[coupled_parameter_id]
break
return AmiciInnerProblem(xs=inner_parameters, data=data, edatas=edatas)
def inner_parameters_from_parameter_df(
par_df: pd.DataFrame,
meas_df: pd.DataFrame,
) -> list[InnerParameter]:
"""
Create list of inner free parameters from PEtab parameter table.
Inner parameters are those that have a non-empty `parameterType` in the
PEtab problem.
"""
# create list of hierarchical parameters
par_df = par_df.reset_index()
for col in (PARAMETER_TYPE,):
if col not in par_df:
par_df[col] = None
parameters = []
for _, row in par_df.iterrows():
if not row[ESTIMATE]:
continue
if petab.is_empty(row[PARAMETER_TYPE]):
continue
# If a sigma parameter belongs to a semiquantitative
# observable, it is not a relative inner parameter.
if row[PARAMETER_TYPE] == InnerParameterType.SIGMA:
if MEASUREMENT_TYPE in meas_df.columns:
par_id = row[PARAMETER_ID]
corresponding_measurements = meas_df[
meas_df[NOISE_PARAMETERS] == par_id
]
if any(
corresponding_measurements[MEASUREMENT_TYPE]
== SEMIQUANTITATIVE
):
continue
parameters.append(
InnerParameter(
inner_parameter_id=row[PARAMETER_ID],
inner_parameter_type=row[PARAMETER_TYPE],
scale=row[PARAMETER_SCALE],
lb=row[LOWER_BOUND],
ub=row[UPPER_BOUND],
)
)
return parameters
def ixs_for_measurement_specific_parameters(
petab_problem: "petab.Problem",
amici_model: "amici.Model",
x_ids: list[str],
) -> dict[str, list[tuple[int, int, int]]]:
"""
Create mapping of parameters to measurements.
Returns
-------
A dictionary mapping parameter ID to a list of
`(condition index, time index, observable index)` tuples in which this
output parameter is used. For each condition, the time index refers to
a sorted list of non-unique time points for which there are measurements.
"""
ixs_for_par = {}
observable_ids = amici_model.getObservableIds()
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
for condition_ix, condition in simulation_conditions.iterrows():
# measurement table for current condition
df_for_condition = petab.get_rows_for_condition(
measurement_df=petab_problem.measurement_df, condition=condition
)
# unique sorted list of timepoints
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# non-unique sorted list of timepoints
timepoints_w_reps = _get_timepoints_with_replicates(
measurement_df=df_for_condition
)
for time in timepoints:
# subselect measurements for time `time`
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(
measurement[OBSERVABLE_ID]
)
# as the time indices have to account for replicates, we need
# to track which time indices have already been assigned for
# the current observable
if observable_ix in time_ix_for_obs_ix:
# a replicate
time_ix_for_obs_ix[observable_ix] += 1
else:
# the first measurement for this `(observable, timepoint)`
time_ix_for_obs_ix[observable_ix] = time_ix_0
time_w_reps_ix = time_ix_for_obs_ix[observable_ix]
observable_overrides = petab.split_parameter_replacement_list(
measurement.get(OBSERVABLE_PARAMETERS, None)
)
noise_overrides = petab.split_parameter_replacement_list(
measurement.get(NOISE_PARAMETERS, None)
)
# try to insert if hierarchical parameter
for override in observable_overrides + noise_overrides:
if override in x_ids:
ixs_for_par.setdefault(override, []).append(
(condition_ix, time_w_reps_ix, observable_ix)
)
return ixs_for_par