Source code for pypesto.hierarchical.spline_approximation.problem

from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

from ...C import (
    CURRENT_SIMULATION,
    DATAPOINTS,
    EXPDATA_MASK,
    INNER_PARAMETER_BOUNDS,
    LIN,
    MAX_DATAPOINT,
    MEASUREMENT_TYPE,
    MIN_DATAPOINT,
    N_SPLINE_PARS,
    NOISE_PARAMETERS,
    NONLINEAR_MONOTONE,
    NUM_DATAPOINTS,
    SPLINE_PAR_TYPE,
    TIME,
    InnerParameterType,
)
from ..problem import (
    InnerProblem,
    _get_timepoints_with_replicates,
    ix_matrices_from_arrays,
)
from .parameter import SplineInnerParameter

try:
    import amici
    import petab
    from petab.C import OBSERVABLE_ID
except ImportError:
    pass


[docs]class SplineInnerProblem(InnerProblem): """Inner optimization problem for spline approximation. Attributes ---------- xs: Mapping of (inner) parameter ID to ``InnerParameters``. data: Measurement data. One matrix (`num_timepoints` x `num_observables`) per simulation condition. Missing observations as NaN. groups: A dictionary of the groups of the subproblem. spline_ratio: The ratio of the number of spline inner parameters and number of measurements for each group. """
[docs] def __init__( self, xs: List[SplineInnerParameter], data: List[np.ndarray], spline_ratio: float = 0.5, ): """Construct.""" super().__init__(xs, data) self.spline_ratio = spline_ratio if spline_ratio <= 0: raise ValueError("Spline ratio must be a positive float.") self._initialize_groups()
def _initialize_groups(self) -> None: """Initialize the groups of the subproblem.""" self.groups = {} for group in self.get_groups_for_xs(InnerParameterType.SPLINE): xs = self.get_xs_for_group(group) self.groups[group] = {} self.groups[group][N_SPLINE_PARS] = len({x.index for x in xs}) self.groups[group][DATAPOINTS] = self.get_measurements_for_group( group ) self.groups[group][NUM_DATAPOINTS] = len( self.groups[group][DATAPOINTS] ) self.groups[group][MIN_DATAPOINT] = np.min( self.groups[group][DATAPOINTS] ) self.groups[group][MAX_DATAPOINT] = np.max( self.groups[group][DATAPOINTS] ) self.groups[group][EXPDATA_MASK] = xs[0].ixs self.groups[group][CURRENT_SIMULATION] = np.zeros( self.groups[group][NUM_DATAPOINTS] ) self.groups[group][NOISE_PARAMETERS] = np.zeros( self.groups[group][NUM_DATAPOINTS] )
[docs] @staticmethod def from_petab_amici( petab_problem: petab.Problem, amici_model: 'amici.Model', edatas: List['amici.ExpData'], spline_ratio: float = None, ) -> 'SplineInnerProblem': """Construct the inner problem from the `petab_problem`.""" if spline_ratio is None: spline_ratio = get_default_options() return spline_inner_problem_from_petab_problem( petab_problem, amici_model, edatas, spline_ratio )
[docs] def get_groups_for_xs(self, inner_parameter_type: str) -> List[int]: """Get unique list of ``SplineParameter.group`` values.""" groups = [x.group for x in self.get_xs_for_type(inner_parameter_type)] return list(set(groups))
[docs] def get_xs_for_group(self, group: int) -> List[SplineInnerParameter]: r"""Get ``SplineParameter``\s that belong to the given group.""" return [x for x in self.xs.values() if x.group == group]
[docs] def get_free_xs_for_group(self, group: int) -> List[SplineInnerParameter]: r"""Get ``SplineParameter``\s that are free and belong to the given group.""" return [ x for x in self.xs.values() if x.group == group and x.estimate is True ]
[docs] def get_fixed_xs_for_group(self, group: int) -> List[SplineInnerParameter]: r"""Get ``SplineParameter``\s that are fixed and belong to the given group.""" return [ x for x in self.xs.values() if x.group == group and x.estimate is False ]
[docs] def get_inner_parameter_dictionary(self) -> Dict: """Get a dictionary with all inner parameter ids and their values.""" inner_par_dict = {} for x_id, x in self.xs.items(): inner_par_dict[x_id] = x.value return inner_par_dict
[docs] def get_measurements_for_group(self, gr) -> np.ndarray: """Get measurements for a group.""" # Taking the ixs of first inner parameter since # all of them are the same for the same group. ixs = self.get_xs_for_group(gr)[0].ixs return np.concatenate( [ self.data[condition_index][ixs[condition_index]] for condition_index in range(len(ixs)) ] )
def get_default_options() -> Dict: """Return the default spline problem options dictionary.""" spline_ratio = 1 / 2 return spline_ratio def spline_inner_problem_from_petab_problem( petab_problem: petab.Problem, amici_model: 'amici.Model', edatas: List['amici.ExpData'], spline_ratio: float = None, ): """Construct the inner problem from the `petab_problem`.""" if spline_ratio is None: spline_ratio = get_default_options() elif spline_ratio <= 0: raise ValueError("Spline ratio must be a positive float.") # inner parameters inner_parameters = spline_inner_parameters_from_measurement_df( petab_problem.measurement_df, spline_ratio, amici_model ) # used indices for all measurement specific parameters ixs = spline_ixs_for_measurement_specific_parameters( petab_problem, amici_model, inner_parameters ) # transform experimental data edatas = [ amici.numpy.ExpDataView(edata)['observedData'] for edata in edatas ] # matrixify ix_matrices = ix_matrices_from_arrays(ixs, edatas) # assign matrices for par in inner_parameters: par.ixs = ix_matrices[par.inner_parameter_id] return SplineInnerProblem( inner_parameters, edatas, spline_ratio, ) def spline_inner_parameters_from_measurement_df( df: pd.DataFrame, spline_ratio: float, amici_model: 'amici.Model', ) -> List[SplineInnerParameter]: """Create list of inner free parameters from PEtab measurement table.""" df = df.reset_index() observable_ids = amici_model.getObservableIds() par_type = SPLINE_PAR_TYPE estimate = True lb, ub = INNER_PARAMETER_BOUNDS[InnerParameterType.SPLINE].values() inner_parameters = [] # Select the nonlinear monotone measurements. df = df[df[MEASUREMENT_TYPE] == NONLINEAR_MONOTONE] # Iterate over groups. for observable_id in observable_ids: group = observable_ids.index(observable_id) + 1 df_for_group = df[df[OBSERVABLE_ID] == observable_id] n_spline_parameters = int(np.ceil(len(df_for_group) * spline_ratio)) # Create n_spline_parameters number of spline inner parameters. for par_index in range(n_spline_parameters): par_id = f'{par_type}_{observable_id}_{group}_{par_index+1}' inner_parameters.append( SplineInnerParameter( inner_parameter_id=par_id, inner_parameter_type=InnerParameterType.SPLINE, scale=LIN, lb=lb, ub=ub, observable_id=observable_id, group=group, index=par_index + 1, estimate=estimate, ) ) inner_parameters.sort(key=lambda x: (x.group, x.index)) return inner_parameters def spline_ixs_for_measurement_specific_parameters( petab_problem: 'petab.Problem', amici_model: 'amici.Model', inner_parameters: List[SplineInnerParameter], ) -> Dict[str, List[Tuple[int, int, int]]]: """Create mapping of parameters to measurements. Returns ------- A dictionary mapping parameter ID to a list of `(condition index, time index, observable index)` tuples in which this output parameter is used. For each condition, the time index refers to a sorted list of non-unique time points for which there are measurements. """ ixs_for_par = {} observable_ids = amici_model.getObservableIds() simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) for condition_ix, condition in simulation_conditions.iterrows(): # measurement table for current condition df_for_condition = petab.get_rows_for_condition( measurement_df=petab_problem.measurement_df, condition=condition ) # unique sorted list of timepoints timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # non-unique sorted list of timepoints timepoints_w_reps = _get_timepoints_with_replicates( measurement_df=df_for_condition ) for time in timepoints: # subselect measurements for time `time` df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index( measurement[OBSERVABLE_ID] ) # as the time indices have to account for replicates, we need # to track which time indices have already been assigned for # the current observable if observable_ix in time_ix_for_obs_ix: # a replicate time_ix_for_obs_ix[observable_ix] += 1 else: # the first measurement for this `(observable, timepoint)` time_ix_for_obs_ix[observable_ix] = time_ix_0 time_w_reps_ix = time_ix_for_obs_ix[observable_ix] inner_par_ids_for_meas = ( get_spline_inner_par_ids_for_measurement( measurement, inner_parameters ) ) # try to insert if hierarchical parameter for override in inner_par_ids_for_meas: ixs_for_par.setdefault(override, []).append( (condition_ix, time_w_reps_ix, observable_ix) ) return ixs_for_par def get_spline_inner_par_ids_for_measurement( measurement: Dict, inner_parameters: List[SplineInnerParameter], ): """Return inner parameter ids of parameters which are related to the measurement.""" return [ inner_par.inner_parameter_id for inner_par in inner_parameters if inner_par.observable_id == measurement[OBSERVABLE_ID] ]