"""Definition of an optimal scaling parameter class."""
import numpy as np
import pandas as pd
from ...C import (
C_MATRIX,
CAT_LB,
CAT_UB,
CENSORED,
CENSORING_BOUNDS,
CENSORING_TYPES,
INNER_PARAMETER_BOUNDS,
INTERVAL_CENSORED,
LB_INDICES,
LEFT_CENSORED,
LIN,
MEASUREMENT_CATEGORY,
MEASUREMENT_TYPE,
NUM_CATEGORIES,
NUM_CONSTR_FULL,
NUM_DATAPOINTS,
NUM_INNER_PARAMS,
ORDINAL,
QUANTITATIVE_DATA,
QUANTITATIVE_IXS,
REDUCED,
RIGHT_CENSORED,
STANDARD,
SURROGATE_DATA,
TIME,
UB_INDICES,
W_DOT_MATRIX,
W_MATRIX,
InnerParameterType,
)
from ..base_problem import (
AmiciInnerProblem,
_get_timepoints_with_replicates,
ix_matrices_from_arrays,
)
from .parameter import OrdinalParameter
try:
import amici
import petab
from petab.C import OBSERVABLE_ID, PARAMETER_SEPARATOR
except ImportError:
pass
[docs]
class OrdinalProblem(AmiciInnerProblem):
r"""Inner optimization problem for ordinal or censored data.
The ordinal inner problem contains the following parameters: surrogate data,
lower bounds, and upper bounds. All parameters are optimized to minimize the
distance between the surrogate data and the simulated data while satisfying
the ordering constraints of the problem. Depending on the method, the problem
is re-formulated to reduce the number of parameters to estimate.
Attributes
----------
xs:
Mapping of (inner) parameter ID to ``InnerParameters``.
data:
Measurement data. One matrix (``num_timepoints`` x ``num_observables``)
per simulation condition. Missing observations as NaN.
edatas:
AMICI ``ExpData``\s for each simulation condition.
groups:
A dictionary of the groups of the subproblem.
method:
A string representing the method of the Optimal Scaling approach, either ``reduced`` or ``standard``.
"""
[docs]
def __init__(
self,
method: str,
**kwargs,
):
"""Construct."""
super().__init__(**kwargs)
self.groups = {}
self.method = method
self._initialize_groups()
def _initialize_groups(self) -> None:
"""Initialize the groups of the subproblem."""
self.groups = {}
for group in self.get_groups_for_xs(InnerParameterType.ORDINAL):
self.groups[group] = {}
xs = self.get_xs_for_group(group)
self.groups[group][NUM_CATEGORIES] = len({x.category for x in xs})
self.groups[group][NUM_DATAPOINTS] = sum(
[
sum([np.sum(ixs) for ixs in x.ixs])
for x in self.get_cat_ub_parameters_for_group(group)
]
)
self.groups[group][SURROGATE_DATA] = np.zeros(
self.groups[group][NUM_DATAPOINTS]
)
self.groups[group][NUM_INNER_PARAMS] = (
self.groups[group][NUM_DATAPOINTS]
+ 2 * self.groups[group][NUM_CATEGORIES]
)
self.groups[group][LB_INDICES] = list(
range(
self.groups[group][NUM_DATAPOINTS],
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
)
)
self.groups[group][UB_INDICES] = list(
range(
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
self.groups[group][NUM_INNER_PARAMS],
)
)
if all(x.censoring_type is not None for x in xs):
self.groups[group][MEASUREMENT_TYPE] = CENSORED
self.groups[group][
QUANTITATIVE_IXS
] = self.get_censored_group_quantitative_ixs(xs)
self.groups[group][QUANTITATIVE_DATA] = np.concatenate(
[
data_i[mask_i]
for data_i, mask_i in zip(
self.data, self.groups[group][QUANTITATIVE_IXS]
)
]
)
elif all(x.censoring_type is None for x in xs):
self.groups[group][MEASUREMENT_TYPE] = ORDINAL
self.groups[group][NUM_CONSTR_FULL] = (
2 * self.groups[group][NUM_DATAPOINTS]
+ 2 * self.groups[group][NUM_CATEGORIES]
)
self.groups[group][C_MATRIX] = self.initialize_c(group)
self.groups[group][W_MATRIX] = self.initialize_w(group)
self.groups[group][W_DOT_MATRIX] = self.initialize_w(group)
else:
raise ValueError(
"Censoring types of optimal scaling parameters of a group "
"have to either be all None, or all not None."
)
[docs]
def initialize(self) -> None:
"""Initialize the subproblem."""
# Initialize all parameter values.
for x in self.xs.values():
x.initialize()
# Initialize the groups.
for group in self.get_groups_for_xs(InnerParameterType.SPLINE):
self.groups[group][SURROGATE_DATA] = np.zeros(
self.groups[group][NUM_DATAPOINTS]
)
[docs]
@staticmethod
def from_petab_amici(
petab_problem: petab.Problem,
amici_model: "amici.Model",
edatas: list["amici.ExpData"],
method: str = None,
) -> "OrdinalProblem":
"""Construct the inner problem from the `petab_problem`."""
if not method:
method = REDUCED
return optimal_scaling_inner_problem_from_petab_problem(
petab_problem, amici_model, edatas, method
)
[docs]
def get_interpretable_x_ids(self) -> list[str]:
"""Get IDs of interpretable inner parameters.
There are no interpretable inner parameters for the ordinal problem.
"""
return []
[docs]
def get_groups_for_xs(self, inner_parameter_type: str) -> list[int]:
"""Get unique list of ``OptimalScalingParameter.group`` values."""
groups = [x.group for x in self.get_xs_for_type(inner_parameter_type)]
return list(set(groups))
[docs]
def get_xs_for_group(self, group: int) -> list[OrdinalParameter]:
r"""Get ``OptimalScalingParameter``\s that belong to the given group."""
return [x for x in self.xs.values() if x.group == group]
[docs]
def get_free_xs_for_group(self, group: int) -> list[OrdinalParameter]:
r"""Get ``OptimalScalingParameter``\s that are free and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.estimate is True
]
[docs]
def get_fixed_xs_for_group(self, group: int) -> list[OrdinalParameter]:
r"""Get ``OptimalScalingParameter``\s that are fixed and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.estimate is False
]
[docs]
def get_cat_ub_parameters_for_group(
self, group: int
) -> list[OrdinalParameter]:
r"""Get ``OptimalScalingParameter``\s that are category upper boundaries and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.inner_parameter_id[:6] == CAT_UB
]
[docs]
def get_cat_lb_parameters_for_group(
self, group: int
) -> list[OrdinalParameter]:
r"""Get ``OptimalScalingParameter``\s that are category lower boundaries and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.inner_parameter_id[:6] == CAT_LB
]
[docs]
def initialize_c(self, group: int) -> np.ndarray:
"""Initialize the constraints matrix for the group.
The structure of the constraints matrix is the following: Each row C_i of the matrix C
represents one optimization constraint as C_i * xi + d(theta, sim) <= 0, where xi is the
vector of inner paramters (surrogate data, lower bounds, upper bounds)^T, and d is the
vector of minimal category interval ranges and gaps.
First `self.groups[group][NUM_DATAPOINTS]` rows constrain the surrogate data to stay
larger than lower category bounds.
Then another `self.groups[group][NUM_DATAPOINTS]` rows constrain the surrogate data to
stay smaller than upper category bounds.
Then `self.groups[group][NUM_CATEGORIES]` rows constrain the ordering of the categories.
And lastly, the remaining `self.groups[group][NUM_CATEGORIES]` constrain the lower
bound to be smaller than the upper bound for each category.
"""
constr = np.zeros(
[
self.groups[group][NUM_CONSTR_FULL],
self.groups[group][NUM_INNER_PARAMS],
]
)
data_idx = 0
# Iterate over categories.
for cat_idx, category in enumerate(
self.get_cat_ub_parameters_for_group(group)
):
num_data_in_cat = int(
np.sum(
[
np.sum(category.ixs[idx])
for idx in range(len(category.ixs))
]
)
)
# Constrain the surrogate data of this category to stay within it.
for _ in range(num_data_in_cat):
# lb - y_surr <= 0
constr[data_idx, data_idx] = -1
constr[
data_idx, cat_idx + self.groups[group][NUM_DATAPOINTS]
] = 1
# y_surr - ub <= 0
constr[
data_idx + self.groups[group][NUM_DATAPOINTS], data_idx
] = 1
constr[
data_idx + self.groups[group][NUM_DATAPOINTS],
cat_idx
+ self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
] = -1
data_idx += 1
# Constrain the ordering wrt. neighbouring categories, i.e. ub_i - lb_{i+1} <= 0.
if cat_idx == 0:
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS] + cat_idx,
] = -1
else:
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS] + cat_idx,
] = -1
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
+ cat_idx
- 1,
] = 1
# Constrain upper bound to be larger than lower bound, i.e. lb_i - ub_i <= 0.
constr[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] # - 1
+ cat_idx,
self.groups[group][LB_INDICES][cat_idx],
] = 1
constr[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] # - 1
+ cat_idx,
self.groups[group][UB_INDICES][cat_idx],
] = -1
return constr
[docs]
def initialize_w(self, group: int) -> np.ndarray:
"""Initialize the weight matrix for the group."""
weights = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS]),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return weights
[docs]
def get_w(self, group: int, y_sim_all: np.ndarray) -> np.ndarray:
"""Return the weight matrix of the group."""
weights = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS])
/ (np.sum(np.abs(y_sim_all)) + 1e-8),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return weights
[docs]
def get_wdot(
self, group: int, y_sim_all: np.ndarray, sy_all: np.ndarray
) -> np.ndarray:
"""Return the derivative of the weight matrix of a group with respect to an outer parameter."""
w_dot = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS])
* (
-1
* np.sum(sy_all)
/ ((np.sum(np.abs(y_sim_all)) + 1e-8) ** 2)
),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return w_dot
[docs]
def get_d(
self,
group,
xs: list[OrdinalParameter],
y_sim_all: np.ndarray,
eps: float,
) -> np.ndarray:
"""Return vector of minimal gaps and ranges."""
max_simulation = np.nanmax(y_sim_all)
interval_range = max_simulation / (2 * len(xs) + 1)
interval_gap = max_simulation / (4 * (len(xs) - 1) + 1)
d = np.zeros(self.groups[group][NUM_CONSTR_FULL])
d[
2 * self.groups[group][NUM_DATAPOINTS] + 1 : 2
* self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
] = interval_gap + eps
d[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] :
] = interval_range
return d
[docs]
def get_dd_dtheta(
self,
group: int,
xs: list[OrdinalParameter],
y_sim_all: np.ndarray,
sy_all: np.ndarray,
) -> np.ndarray:
"""Return the derivative of vector of minimal gaps and ranges with respect to an outer parameter."""
max_sim_idx = np.argmax(y_sim_all)
max_sy = sy_all[max_sim_idx]
dd_dtheta = np.zeros(self.groups[group][NUM_CONSTR_FULL])
dinterval_range_dtheta = max_sy / (2 * len(xs) + 1)
dinterval_gap_dtheta = max_sy / (4 * (len(xs) - 1) + 1)
dd_dtheta[
2 * self.groups[group][NUM_DATAPOINTS] + 1 : 2
* self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
] = dinterval_gap_dtheta
dd_dtheta[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] :
] = dinterval_range_dtheta
return dd_dtheta
[docs]
def get_censored_group_quantitative_ixs(
self, xs: list[OrdinalParameter]
) -> list[np.ndarray]:
r"""Return a list of boolean masks indicating which data points are quantitative.
For a given group with censored data, return a list of boolean masks indicating
which data points are not censored, and therefore quantitative.
Parameters
----------
xs:
List of ``OptimalScalingParameter``\s of a group with censored data.
Returns
-------
quantitative_ixs:
List of boolean masks indicating which data points are quantitative.
"""
# Initialize boolean masks with False and find corresponding observable index.
quantitative_ixs = [np.full(ixs_i.shape, False) for ixs_i in xs[0].ixs]
observable_index = xs[0].group - 1
# Set to True all datapoints of the corresponding observable.
if np.ndim(quantitative_ixs) == 2:
quantitative_ixs = [
np.full(ixs_i.shape, True) for ixs_i in xs[0].ixs
]
else:
for quantitative_ixs_i in quantitative_ixs:
quantitative_ixs_i[:, observable_index] = True
# Set to False for all censored datapoints.
for x in xs:
for ixs_i, quantitative_ixs_i in zip(x.ixs, quantitative_ixs):
quantitative_ixs_i[ixs_i] = False
return quantitative_ixs
[docs]
def get_inner_parameter_dictionary(self) -> dict:
"""Return a dictionary with inner parameter ids and their values."""
inner_par_dict = {}
for x_id, x in self.xs.items():
inner_par_dict[x_id] = x.value
return inner_par_dict
def optimal_scaling_inner_problem_from_petab_problem(
petab_problem: petab.Problem,
amici_model: "amici.Model",
edatas: list["amici.ExpData"],
method: str,
):
"""Construct the inner problem from the `petab_problem`."""
# inner parameters
inner_parameters = optimal_scaling_inner_parameters_from_measurement_df(
petab_problem.measurement_df, method, amici_model
)
# used indices for all measurement specific parameters
ixs = optimal_scaling_ixs_for_measurement_specific_parameters(
petab_problem, amici_model, inner_parameters
)
# transform experimental data
data = [amici.numpy.ExpDataView(edata)["observedData"] for edata in edatas]
# matrixify
ix_matrices = ix_matrices_from_arrays(ixs, data)
# assign matrices
for par in inner_parameters:
par.ixs = ix_matrices[par.inner_parameter_id]
return OrdinalProblem(
xs=inner_parameters,
data=data,
edatas=edatas,
method=method,
)
def optimal_scaling_inner_parameters_from_measurement_df(
df: pd.DataFrame,
method: str,
amici_model: "amici.Model",
) -> list[OrdinalParameter]:
"""Create list of inner free parameters from PEtab measurement table dependent on the method provided."""
df = df.reset_index()
observable_ids = amici_model.getObservableIds()
estimate = get_estimate_for_method(method)
par_types = [CAT_LB, CAT_UB]
inner_parameters = []
lb, ub = INNER_PARAMETER_BOUNDS[InnerParameterType.ORDINAL].values()
for observable_idx, observable_id in enumerate(observable_ids):
group = observable_idx + 1
observable_df = df[df[OBSERVABLE_ID] == observable_id]
if all(observable_df[MEASUREMENT_TYPE] == ORDINAL):
# Add optimal scaling parameters for ordinal measurements.
for par_type, par_estimate in zip(par_types, estimate):
for _, row in observable_df.iterrows():
par_id = f"{par_type}_{observable_id}_{row[MEASUREMENT_TYPE]}_{int(row[MEASUREMENT_CATEGORY])}"
# Create only one set of bound parameters per category of a group.
if par_id not in [
inner_par.inner_parameter_id
for inner_par in inner_parameters
]:
inner_parameters.append(
OrdinalParameter(
inner_parameter_id=par_id,
inner_parameter_type=InnerParameterType.ORDINAL,
scale=LIN,
lb=lb,
ub=ub,
observable_id=observable_id,
category=int(row[MEASUREMENT_CATEGORY]),
group=group,
estimate=par_estimate,
)
)
elif any(observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)):
# Get df with only censored measurements.
censored_df = observable_df.loc[
observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)
]
# Check for unique values in the CENSORING_BOUNDS column and
# order them with resect to the first float value in the string.
unique_censoring_bounds = sorted(
censored_df[CENSORING_BOUNDS].unique(),
key=lambda x: float(str(x).split(PARAMETER_SEPARATOR)[0]),
)
for par_type in par_types:
for _, row in censored_df.iterrows():
category = int(
unique_censoring_bounds.index(row[CENSORING_BOUNDS])
+ 1
)
par_id = f"{par_type}_{observable_id}_{row[MEASUREMENT_TYPE]}_{category}"
# Create only one set of bound parameters per category of a group.
if par_id not in [
inner_par.inner_parameter_id
for inner_par in inner_parameters
]:
inner_parameters.append(
OrdinalParameter(
inner_parameter_id=par_id,
inner_parameter_type=InnerParameterType.ORDINAL,
scale=LIN,
lb=lb,
ub=ub,
observable_id=observable_id,
category=category,
group=group,
estimate=False,
censoring_type=row[MEASUREMENT_TYPE],
)
)
_add_value_to_censored_bound_parameter(
inner_parameters[-1], row, par_type
)
inner_parameters.sort(key=lambda x: (x.group, x.category))
return inner_parameters
def get_estimate_for_method(method: str) -> tuple[bool, bool]:
"""Return which inner parameters to estimate dependent on the method provided."""
estimate_ub = True
estimate_lb = False
if method == STANDARD:
estimate_lb = True
return estimate_lb, estimate_ub
def optimal_scaling_ixs_for_measurement_specific_parameters(
petab_problem: "petab.Problem",
amici_model: "amici.Model",
inner_parameters: list[OrdinalParameter],
) -> dict[str, list[tuple[int, int, int]]]:
"""Create mapping of parameters to measurements.
Returns
-------
A dictionary mapping parameter ID to a list of
`(condition index, time index, observable index)` tuples in which this
output parameter is used. For each condition, the time index refers to
a sorted list of non-unique time points for which there are measurements.
"""
ixs_for_par = {}
observable_ids = amici_model.getObservableIds()
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
measurement_df = petab_problem.measurement_df
# Get unique censoring bounds for each observable.
unique_censoring_bounds_per_observable = {}
if CENSORING_BOUNDS in measurement_df.columns:
for observable_id in observable_ids:
observable_df = measurement_df[
measurement_df[OBSERVABLE_ID] == observable_id
]
censored_observable_df = observable_df.loc[
observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)
]
unique_censoring_bounds_per_observable[observable_id] = sorted(
censored_observable_df[CENSORING_BOUNDS].unique(),
key=lambda x: float(str(x).split(PARAMETER_SEPARATOR)[0]),
)
for condition_ix, condition in simulation_conditions.iterrows():
# measurement table for current condition
df_for_condition = petab.get_rows_for_condition(
measurement_df=measurement_df, condition=condition
)
# unique sorted list of timepoints
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# non-unique sorted list of timepoints
timepoints_w_reps = _get_timepoints_with_replicates(
measurement_df=df_for_condition
)
for time in timepoints:
# subselect measurements for time `time`
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(
measurement[OBSERVABLE_ID]
)
# as the time indices have to account for replicates, we need
# to track which time indices have already been assigned for
# the current observable
if observable_ix in time_ix_for_obs_ix:
# a replicate
time_ix_for_obs_ix[observable_ix] += 1
else:
# the first measurement for this `(observable, timepoint)`
time_ix_for_obs_ix[observable_ix] = time_ix_0
time_w_reps_ix = time_ix_for_obs_ix[observable_ix]
inner_par_ids_for_meas = get_inner_par_ids_for_measurement(
measurement,
inner_parameters,
unique_censoring_bounds_per_observable,
)
# try to insert if hierarchical parameter
if inner_par_ids_for_meas:
for override in inner_par_ids_for_meas:
ixs_for_par.setdefault(override, []).append(
(condition_ix, time_w_reps_ix, observable_ix)
)
return ixs_for_par
def get_inner_par_ids_for_measurement(
measurement: dict,
inner_parameters: list[OrdinalParameter],
unique_censoring_bounds_per_observable: dict[str, list[float]],
):
"""Return inner parameter ids of parameters which are related to the measurement."""
if measurement[MEASUREMENT_TYPE] == ORDINAL:
return [
inner_par.inner_parameter_id
for inner_par in inner_parameters
if inner_par.category == measurement[MEASUREMENT_CATEGORY]
and inner_par.observable_id == measurement[OBSERVABLE_ID]
]
elif measurement[MEASUREMENT_TYPE] in [
LEFT_CENSORED,
INTERVAL_CENSORED,
RIGHT_CENSORED,
]:
unique_censoring_bounds = unique_censoring_bounds_per_observable[
measurement[OBSERVABLE_ID]
]
measurement_category = int(
unique_censoring_bounds.index(measurement[CENSORING_BOUNDS]) + 1
)
return [
inner_par.inner_parameter_id
for inner_par in inner_parameters
if inner_par.observable_id == measurement[OBSERVABLE_ID]
and inner_par.category == measurement_category
]
def _add_value_to_censored_bound_parameter(
inner_parameter: OrdinalParameter,
row: pd.Series,
par_type: str,
) -> None:
if row[MEASUREMENT_TYPE] == LEFT_CENSORED and par_type == CAT_LB:
inner_parameter.value = 0.0
elif row[MEASUREMENT_TYPE] == LEFT_CENSORED and par_type == CAT_UB:
inner_parameter.value = float(row[CENSORING_BOUNDS])
elif row[MEASUREMENT_TYPE] == RIGHT_CENSORED and par_type == CAT_LB:
inner_parameter.value = float(row[CENSORING_BOUNDS])
elif row[MEASUREMENT_TYPE] == RIGHT_CENSORED and par_type == CAT_UB:
inner_parameter.value = np.inf
elif row[MEASUREMENT_TYPE] == INTERVAL_CENSORED and par_type == CAT_LB:
inner_parameter.value = float(
row[CENSORING_BOUNDS].split(PARAMETER_SEPARATOR)[0]
)
elif row[MEASUREMENT_TYPE] == INTERVAL_CENSORED and par_type == CAT_UB:
inner_parameter.value = float(
row[CENSORING_BOUNDS].split(PARAMETER_SEPARATOR)[1]
)