"""Definition of an optimal scaling parameter class."""
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from ...C import (
C_MATRIX,
CAT_LB,
CAT_UB,
CENSORED,
CENSORING_BOUNDS,
CENSORING_TYPES,
INNER_PARAMETER_BOUNDS,
INTERVAL_CENSORED,
LB_INDICES,
LEFT_CENSORED,
LIN,
MEASUREMENT_CATEGORY,
MEASUREMENT_TYPE,
NUM_CATEGORIES,
NUM_CONSTR_FULL,
NUM_DATAPOINTS,
NUM_INNER_PARAMS,
ORDINAL,
QUANTITATIVE_DATA,
QUANTITATIVE_IXS,
REDUCED,
RIGHT_CENSORED,
STANDARD,
SURROGATE_DATA,
TIME,
UB_INDICES,
W_DOT_MATRIX,
W_MATRIX,
InnerParameterType,
)
from ..problem import (
InnerProblem,
_get_timepoints_with_replicates,
ix_matrices_from_arrays,
)
from .parameter import OptimalScalingParameter
try:
import amici
import petab
from petab.C import OBSERVABLE_ID, PARAMETER_SEPARATOR
except ImportError:
pass
[docs]class OptimalScalingProblem(InnerProblem):
"""Inner optimization problem for optimal scaling.
Attributes
----------
xs:
Mapping of (inner) parameter ID to ``InnerParameters``.
data:
Measurement data. One matrix (`num_timepoints` x `num_observables`)
per simulation condition. Missing observations as NaN.
groups:
A dictionary of the groups of the subproblem.
method:
A string representing the method of the Optimal Scaling approach, either 'reduced' or 'standard'.
"""
[docs] def __init__(
self,
xs: List[OptimalScalingParameter],
data: List[np.ndarray],
method: str,
):
"""Construct."""
super().__init__(xs, data)
self.groups = {}
self.method = method
self._initialize_groups()
def _initialize_groups(self) -> None:
"""Initialize the groups of the subproblem."""
self.groups = {}
for group in self.get_groups_for_xs(
InnerParameterType.OPTIMAL_SCALING
):
self.groups[group] = {}
xs = self.get_xs_for_group(group)
self.groups[group][NUM_CATEGORIES] = len({x.category for x in xs})
self.groups[group][NUM_DATAPOINTS] = sum(
[
sum([np.sum(ixs) for ixs in x.ixs])
for x in self.get_cat_ub_parameters_for_group(group)
]
)
self.groups[group][SURROGATE_DATA] = np.zeros(
self.groups[group][NUM_DATAPOINTS]
)
self.groups[group][NUM_INNER_PARAMS] = (
self.groups[group][NUM_DATAPOINTS]
+ 2 * self.groups[group][NUM_CATEGORIES]
)
self.groups[group][LB_INDICES] = list(
range(
self.groups[group][NUM_DATAPOINTS],
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
)
)
self.groups[group][UB_INDICES] = list(
range(
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
self.groups[group][NUM_INNER_PARAMS],
)
)
if all(x.censoring_type is not None for x in xs):
self.groups[group][MEASUREMENT_TYPE] = CENSORED
self.groups[group][
QUANTITATIVE_IXS
] = self.get_censored_group_quantitative_ixs(xs)
self.groups[group][QUANTITATIVE_DATA] = np.concatenate(
[
data_i[mask_i]
for data_i, mask_i in zip(
self.data, self.groups[group][QUANTITATIVE_IXS]
)
]
)
elif all(x.censoring_type is None for x in xs):
self.groups[group][MEASUREMENT_TYPE] = ORDINAL
self.groups[group][NUM_CONSTR_FULL] = (
2 * self.groups[group][NUM_DATAPOINTS]
+ 2 * self.groups[group][NUM_CATEGORIES]
)
self.groups[group][C_MATRIX] = self.initialize_c(group)
self.groups[group][W_MATRIX] = self.initialize_w(group)
self.groups[group][W_DOT_MATRIX] = self.initialize_w(group)
else:
raise ValueError(
'Censoring types of optimal scaling parameters of a group '
'have to either be all None, or all not None.'
)
[docs] @staticmethod
def from_petab_amici(
petab_problem: petab.Problem,
amici_model: 'amici.Model',
edatas: List['amici.ExpData'],
method: str = None,
) -> 'OptimalScalingProblem':
"""Construct the inner problem from the `petab_problem`."""
if not method:
method = REDUCED
return optimal_scaling_inner_problem_from_petab_problem(
petab_problem, amici_model, edatas, method
)
[docs] def get_groups_for_xs(self, inner_parameter_type: str) -> List[int]:
"""Get unique list of ``OptimalScalingParameter.group`` values."""
groups = [x.group for x in self.get_xs_for_type(inner_parameter_type)]
return list(set(groups))
[docs] def get_xs_for_group(self, group: int) -> List[OptimalScalingParameter]:
r"""Get ``OptimalScalingParameter``\s that belong to the given group."""
return [x for x in self.xs.values() if x.group == group]
[docs] def get_free_xs_for_group(
self, group: int
) -> List[OptimalScalingParameter]:
r"""Get ``OptimalScalingParameter``\s that are free and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.estimate is True
]
[docs] def get_fixed_xs_for_group(
self, group: int
) -> List[OptimalScalingParameter]:
r"""Get ``OptimalScalingParameter``\s that are fixed and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.estimate is False
]
[docs] def get_cat_ub_parameters_for_group(
self, group: int
) -> List[OptimalScalingParameter]:
r"""Get ``OptimalScalingParameter``\s that are category upper boundaries and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.inner_parameter_id[:6] == CAT_UB
]
[docs] def get_cat_lb_parameters_for_group(
self, group: int
) -> List[OptimalScalingParameter]:
r"""Get ``OptimalScalingParameter``\s that are category lower boundaries and belong to the given group."""
return [
x
for x in self.xs.values()
if x.group == group and x.inner_parameter_id[:6] == CAT_LB
]
[docs] def initialize_c(self, group: int) -> np.ndarray:
"""Initialize the constraints matrix for the group.
The structure of the constraints matrix is the following: Each row C_i of the matrix C
represents one optimization constraint as C_i * xi + d(theta, sim) <= 0, where xi is the
vector of inner paramters (surrogate data, lower bounds, upper bounds)^T, and d is the
vector of minimal category interval ranges and gaps.
First `self.groups[group][NUM_DATAPOINTS]` rows constrain the surrogate data to stay
larger than lower category bounds.
Then another `self.groups[group][NUM_DATAPOINTS]` rows constrain the surrogate data to
stay smaller than upper category bounds.
Then `self.groups[group][NUM_CATEGORIES]` rows constrain the ordering of the categories.
And lastly, the remaining `self.groups[group][NUM_CATEGORIES]` constrain the lower
bound to be smaller than the upper bound for each category.
"""
constr = np.zeros(
[
self.groups[group][NUM_CONSTR_FULL],
self.groups[group][NUM_INNER_PARAMS],
]
)
data_idx = 0
# Iterate over categories.
for cat_idx, category in enumerate(
self.get_cat_ub_parameters_for_group(group)
):
num_data_in_cat = int(
np.sum(
[
np.sum(category.ixs[idx])
for idx in range(len(category.ixs))
]
)
)
# Constrain the surrogate data of this category to stay within it.
for _ in range(num_data_in_cat):
# lb - y_surr <= 0
constr[data_idx, data_idx] = -1
constr[
data_idx, cat_idx + self.groups[group][NUM_DATAPOINTS]
] = 1
# y_surr - ub <= 0
constr[
data_idx + self.groups[group][NUM_DATAPOINTS], data_idx
] = 1
constr[
data_idx + self.groups[group][NUM_DATAPOINTS],
cat_idx
+ self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES],
] = -1
data_idx += 1
# Constrain the ordering wrt. neighbouring categories, i.e. ub_i - lb_{i+1} <= 0.
if cat_idx == 0:
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS] + cat_idx,
] = -1
else:
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS] + cat_idx,
] = -1
constr[
2 * self.groups[group][NUM_DATAPOINTS] + cat_idx,
self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
+ cat_idx
- 1,
] = 1
# Constrain upper bound to be larger than lower bound, i.e. lb_i - ub_i <= 0.
constr[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] # - 1
+ cat_idx,
self.groups[group][LB_INDICES][cat_idx],
] = 1
constr[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] # - 1
+ cat_idx,
self.groups[group][UB_INDICES][cat_idx],
] = -1
return constr
[docs] def initialize_w(self, group: int) -> np.ndarray:
"""Initialize the weight matrix for the group."""
weights = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS]),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return weights
[docs] def get_w(self, group: int, y_sim_all: np.ndarray) -> np.ndarray:
"""Return the weight matrix of the group."""
weights = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS])
/ (np.sum(np.abs(y_sim_all)) + 1e-8),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return weights
[docs] def get_wdot(
self, group: int, y_sim_all: np.ndarray, sy_all: np.ndarray
) -> np.ndarray:
"""Return the derivative of the weight matrix of a group with respect to an outer parameter."""
w_dot = np.diag(
np.block(
[
np.ones(self.groups[group][NUM_DATAPOINTS])
* (
-1
* np.sum(sy_all)
/ ((np.sum(np.abs(y_sim_all)) + 1e-8) ** 2)
),
np.zeros(2 * self.groups[group][NUM_CATEGORIES]),
]
)
)
return w_dot
[docs] def get_d(
self,
group,
xs: List[OptimalScalingParameter],
y_sim_all: np.ndarray,
eps: float,
) -> np.ndarray:
"""Return vector of minimal gaps and ranges."""
max_simulation = np.nanmax(y_sim_all)
interval_range = max_simulation / (2 * len(xs) + 1)
interval_gap = max_simulation / (4 * (len(xs) - 1) + 1)
d = np.zeros(self.groups[group][NUM_CONSTR_FULL])
d[
2 * self.groups[group][NUM_DATAPOINTS]
+ 1 : 2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
] = (interval_gap + eps)
d[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] :
] = interval_range
return d
[docs] def get_dd_dtheta(
self,
group: int,
xs: List[OptimalScalingParameter],
y_sim_all: np.ndarray,
sy_all: np.ndarray,
) -> np.ndarray:
"""Return the derivative of vector of minimal gaps and ranges with respect to an outer parameter."""
max_sim_idx = np.argmax(y_sim_all)
max_sy = sy_all[max_sim_idx]
dd_dtheta = np.zeros(self.groups[group][NUM_CONSTR_FULL])
dinterval_range_dtheta = max_sy / (2 * len(xs) + 1)
dinterval_gap_dtheta = max_sy / (4 * (len(xs) - 1) + 1)
dd_dtheta[
2 * self.groups[group][NUM_DATAPOINTS]
+ 1 : 2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES]
] = dinterval_gap_dtheta
dd_dtheta[
2 * self.groups[group][NUM_DATAPOINTS]
+ self.groups[group][NUM_CATEGORIES] :
] = dinterval_range_dtheta
return dd_dtheta
[docs] def get_censored_group_quantitative_ixs(
self, xs: List[OptimalScalingParameter]
) -> List[np.ndarray]:
r"""Return a list of boolean masks indicating which data points are quantitative.
For a given group with censored data, return a list of boolean masks indicating
which data points are not censored, and therefore quantitative.
Parameters
----------
xs:
List of ``OptimalScalingParameter``\s of a group with censored data.
Returns
-------
quantitative_ixs:
List of boolean masks indicating which data points are quantitative.
"""
# Initialize boolean masks with False and find corresponding observable index.
quantitative_ixs = [np.full(ixs_i.shape, False) for ixs_i in xs[0].ixs]
observable_index = xs[0].group - 1
# Set to True all datapoints of the corresponding observable.
if np.ndim(quantitative_ixs) == 2:
quantitative_ixs = [
np.full(ixs_i.shape, True) for ixs_i in xs[0].ixs
]
else:
for quantitative_ixs_i in quantitative_ixs:
quantitative_ixs_i[:, observable_index] = True
# Set to False for all censored datapoints.
for x in xs:
for ixs_i, quantitative_ixs_i in zip(x.ixs, quantitative_ixs):
quantitative_ixs_i[ixs_i] = False
return quantitative_ixs
[docs] def get_inner_parameter_dictionary(self) -> Dict:
"""Return a dictionary with inner parameter ids and their values."""
inner_par_dict = {}
for x_id, x in self.xs.items():
inner_par_dict[x_id] = x.value
return inner_par_dict
def optimal_scaling_inner_problem_from_petab_problem(
petab_problem: petab.Problem,
amici_model: 'amici.Model',
edatas: List['amici.ExpData'],
method: str,
):
"""Construct the inner problem from the `petab_problem`."""
# inner parameters
inner_parameters = optimal_scaling_inner_parameters_from_measurement_df(
petab_problem.measurement_df, method, amici_model
)
# used indices for all measurement specific parameters
ixs = optimal_scaling_ixs_for_measurement_specific_parameters(
petab_problem, amici_model, inner_parameters
)
# transform experimental data
edatas = [
amici.numpy.ExpDataView(edata)['observedData'] for edata in edatas
]
# matrixify
ix_matrices = ix_matrices_from_arrays(ixs, edatas)
# assign matrices
for par in inner_parameters:
par.ixs = ix_matrices[par.inner_parameter_id]
return OptimalScalingProblem(
inner_parameters,
edatas,
method,
)
def optimal_scaling_inner_parameters_from_measurement_df(
df: pd.DataFrame,
method: str,
amici_model: 'amici.Model',
) -> List[OptimalScalingParameter]:
"""Create list of inner free parameters from PEtab measurement table dependent on the method provided."""
df = df.reset_index()
observable_ids = amici_model.getObservableIds()
estimate = get_estimate_for_method(method)
par_types = [CAT_LB, CAT_UB]
inner_parameters = []
lb, ub = INNER_PARAMETER_BOUNDS[
InnerParameterType.OPTIMAL_SCALING
].values()
for observable_idx, observable_id in enumerate(observable_ids):
group = observable_idx + 1
observable_df = df[df[OBSERVABLE_ID] == observable_id]
if all(observable_df[MEASUREMENT_TYPE] == ORDINAL):
# Add optimal scaling parameters for ordinal measurements.
for par_type, par_estimate in zip(par_types, estimate):
for _, row in observable_df.iterrows():
par_id = f'{par_type}_{observable_id}_{row[MEASUREMENT_TYPE]}_{int(row[MEASUREMENT_CATEGORY])}'
# Create only one set of bound parameters per category of a group.
if par_id not in [
inner_par.inner_parameter_id
for inner_par in inner_parameters
]:
inner_parameters.append(
OptimalScalingParameter(
inner_parameter_id=par_id,
inner_parameter_type=InnerParameterType.OPTIMAL_SCALING,
scale=LIN,
lb=lb,
ub=ub,
observable_id=observable_id,
category=int(row[MEASUREMENT_CATEGORY]),
group=group,
estimate=par_estimate,
)
)
elif any(observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)):
# Get df with only censored measurements.
censored_df = observable_df.loc[
observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)
]
# Check for unique values in the CENSORING_BOUNDS column and
# order them with resect to the first float value in the string.
unique_censoring_bounds = sorted(
censored_df[CENSORING_BOUNDS].unique(),
key=lambda x: float(str(x).split(PARAMETER_SEPARATOR)[0]),
)
for par_type in par_types:
for _, row in censored_df.iterrows():
category = int(
unique_censoring_bounds.index(row[CENSORING_BOUNDS])
+ 1
)
par_id = f'{par_type}_{observable_id}_{row[MEASUREMENT_TYPE]}_{category}'
# Create only one set of bound parameters per category of a group.
if par_id not in [
inner_par.inner_parameter_id
for inner_par in inner_parameters
]:
inner_parameters.append(
OptimalScalingParameter(
inner_parameter_id=par_id,
inner_parameter_type=InnerParameterType.OPTIMAL_SCALING,
scale=LIN,
lb=lb,
ub=ub,
observable_id=observable_id,
category=category,
group=group,
estimate=False,
censoring_type=row[MEASUREMENT_TYPE],
)
)
_add_value_to_censored_bound_parameter(
inner_parameters[-1], row, par_type
)
inner_parameters.sort(key=lambda x: (x.group, x.category))
return inner_parameters
def get_estimate_for_method(method: str) -> Tuple[bool, bool]:
"""Return which inner parameters to estimate dependent on the method provided."""
estimate_ub = True
estimate_lb = False
if method == STANDARD:
estimate_lb = True
return estimate_lb, estimate_ub
def optimal_scaling_ixs_for_measurement_specific_parameters(
petab_problem: 'petab.Problem',
amici_model: 'amici.Model',
inner_parameters: List[OptimalScalingParameter],
) -> Dict[str, List[Tuple[int, int, int]]]:
"""Create mapping of parameters to measurements.
Returns
-------
A dictionary mapping parameter ID to a list of
`(condition index, time index, observable index)` tuples in which this
output parameter is used. For each condition, the time index refers to
a sorted list of non-unique time points for which there are measurements.
"""
ixs_for_par = {}
observable_ids = amici_model.getObservableIds()
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
measurement_df = petab_problem.measurement_df
# Get unique censoring bounds for each observable.
unique_censoring_bounds_per_observable = {}
if CENSORING_BOUNDS in measurement_df.columns:
for observable_id in observable_ids:
observable_df = measurement_df[
measurement_df[OBSERVABLE_ID] == observable_id
]
censored_observable_df = observable_df.loc[
observable_df[MEASUREMENT_TYPE].isin(CENSORING_TYPES)
]
unique_censoring_bounds_per_observable[observable_id] = sorted(
censored_observable_df[CENSORING_BOUNDS].unique(),
key=lambda x: float(str(x).split(PARAMETER_SEPARATOR)[0]),
)
for condition_ix, condition in simulation_conditions.iterrows():
# measurement table for current condition
df_for_condition = petab.get_rows_for_condition(
measurement_df=measurement_df, condition=condition
)
# unique sorted list of timepoints
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# non-unique sorted list of timepoints
timepoints_w_reps = _get_timepoints_with_replicates(
measurement_df=df_for_condition
)
for time in timepoints:
# subselect measurements for time `time`
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(
measurement[OBSERVABLE_ID]
)
# as the time indices have to account for replicates, we need
# to track which time indices have already been assigned for
# the current observable
if observable_ix in time_ix_for_obs_ix:
# a replicate
time_ix_for_obs_ix[observable_ix] += 1
else:
# the first measurement for this `(observable, timepoint)`
time_ix_for_obs_ix[observable_ix] = time_ix_0
time_w_reps_ix = time_ix_for_obs_ix[observable_ix]
inner_par_ids_for_meas = get_inner_par_ids_for_measurement(
measurement,
inner_parameters,
unique_censoring_bounds_per_observable,
)
# try to insert if hierarchical parameter
if inner_par_ids_for_meas:
for override in inner_par_ids_for_meas:
ixs_for_par.setdefault(override, []).append(
(condition_ix, time_w_reps_ix, observable_ix)
)
return ixs_for_par
def get_inner_par_ids_for_measurement(
measurement: Dict,
inner_parameters: List[OptimalScalingParameter],
unique_censoring_bounds_per_observable: Dict[str, List[float]],
):
"""Return inner parameter ids of parameters which are related to the measurement."""
if measurement[MEASUREMENT_TYPE] == ORDINAL:
return [
inner_par.inner_parameter_id
for inner_par in inner_parameters
if inner_par.category == measurement[MEASUREMENT_CATEGORY]
and inner_par.observable_id == measurement[OBSERVABLE_ID]
]
elif measurement[MEASUREMENT_TYPE] in [
LEFT_CENSORED,
INTERVAL_CENSORED,
RIGHT_CENSORED,
]:
unique_censoring_bounds = unique_censoring_bounds_per_observable[
measurement[OBSERVABLE_ID]
]
measurement_category = int(
unique_censoring_bounds.index(measurement[CENSORING_BOUNDS]) + 1
)
return [
inner_par.inner_parameter_id
for inner_par in inner_parameters
if inner_par.observable_id == measurement[OBSERVABLE_ID]
and inner_par.category == measurement_category
]
def _add_value_to_censored_bound_parameter(
inner_parameter: OptimalScalingParameter,
row: pd.Series,
par_type: str,
) -> None:
if row[MEASUREMENT_TYPE] == LEFT_CENSORED and par_type == CAT_LB:
inner_parameter.value = 0.0
elif row[MEASUREMENT_TYPE] == LEFT_CENSORED and par_type == CAT_UB:
inner_parameter.value = float(row[CENSORING_BOUNDS])
elif row[MEASUREMENT_TYPE] == RIGHT_CENSORED and par_type == CAT_LB:
inner_parameter.value = float(row[CENSORING_BOUNDS])
elif row[MEASUREMENT_TYPE] == RIGHT_CENSORED and par_type == CAT_UB:
inner_parameter.value = np.inf
elif row[MEASUREMENT_TYPE] == INTERVAL_CENSORED and par_type == CAT_LB:
inner_parameter.value = float(
row[CENSORING_BOUNDS].split(PARAMETER_SEPARATOR)[0]
)
elif row[MEASUREMENT_TYPE] == INTERVAL_CENSORED and par_type == CAT_UB:
inner_parameter.value = float(
row[CENSORING_BOUNDS].split(PARAMETER_SEPARATOR)[1]
)