Source code for pypesto.objective.roadrunner.roadrunner_calculator

"""RoadRunner calculator for PEtab problems.

Handles all RoadRunner.simulate calls, calculates likelihoods and residuals.
"""
import numbers
from collections.abc import Sequence
from typing import Optional

import numpy as np
import petab
import roadrunner
from petab.parameter_mapping import ParMappingDictQuadruple

from ...C import (
    FVAL,
    MODE_FUN,
    MODE_RES,
    RES,
    ROADRUNNER_LLH,
    ROADRUNNER_SIMULATION,
    TIME,
    ModeType,
)
from .utils import (
    ExpData,
    SolverOptions,
    simulation_to_measurement_df,
    unscale_parameters,
)

LLH_TYPES = {
    "lin_normal": lambda measurement, simulation, sigma: -0.5
    * (
        np.log(2 * np.pi * (sigma**2))
        + ((measurement - simulation) / sigma) ** 2
    ),
    "log_normal": lambda measurement, simulation, sigma: -0.5
    * (
        np.log(2 * np.pi * (sigma**2) * (measurement**2))
        + ((np.log(measurement) - np.log(simulation)) / sigma) ** 2
    ),
    "log10_normal": lambda measurement, simulation, sigma: -0.5
    * (
        np.log(2 * np.pi * (sigma**2) * (measurement**2) * np.log(10) ** 2)
        + ((np.log10(measurement) - np.log10(simulation)) / sigma) ** 2
    ),
    "lin_laplace": lambda measurement, simulation, sigma: -np.log(2 * sigma)
    - (np.abs(measurement - simulation) / sigma),
    "log_laplace": lambda measurement, simulation, sigma: -np.log(
        2 * sigma * simulation
    )
    - (np.abs(np.log(measurement) - np.log(simulation)) / sigma),
    "log10_laplace": lambda measurement, simulation, sigma: -np.log(
        2 * sigma * simulation * np.log(10)
    )
    - (np.abs(np.log10(measurement) - np.log10(simulation)) / sigma),
}


[docs] class RoadRunnerCalculator: """Class to handle RoadRunner simulation and obtain objective value."""
[docs] def __call__( self, x_dct: dict, # TODO: sensi_order support mode: ModeType, roadrunner_instance: roadrunner.RoadRunner, edatas: list[ExpData], x_ids: Sequence[str], parameter_mapping: list[ParMappingDictQuadruple], petab_problem: petab.Problem, solver_options: Optional[SolverOptions], ): """Perform the RoadRunner call and obtain objective function values. Parameters ---------- x_dct: Parameter dictionary. mode: Mode of the call. roadrunner_instance: RoadRunner instance. edatas: List of ExpData. x_ids: Sequence of parameter IDs. parameter_mapping: Parameter parameter_mapping. petab_problem: PEtab problem. solver_options: Solver options of the roadrunner instance Integrator. These will modify the roadrunner instance inplace. Returns ------- Tuple of objective function values. """ # sanity check that edatas and conditions are consistent if len(edatas) != len(parameter_mapping): raise ValueError( "Number of edatas and conditions are not consistent." ) if solver_options is None: solver_options = SolverOptions() # apply solver options solver_options.apply_to_roadrunner(roadrunner_instance) simulation_results = {} llh_tot = 0 for edata, mapping_per_condition in zip(edatas, parameter_mapping): sim_res, llh = self.simulate_per_condition( x_dct, roadrunner_instance, edata, mapping_per_condition ) simulation_results[edata.condition_id] = sim_res llh_tot += llh if mode == MODE_FUN: return { FVAL: -llh_tot, ROADRUNNER_SIMULATION: simulation_results, ROADRUNNER_LLH: llh_tot, } if mode == MODE_RES: # TODO: speed up by not using pandas simulation_df = simulation_to_measurement_df( simulation_results, petab_problem.measurement_df ) res_df = petab.calculate_residuals( petab_problem.measurement_df, simulation_df, petab_problem.observable_df, petab_problem.parameter_df, ) return { RES: res_df, ROADRUNNER_SIMULATION: simulation_results, FVAL: -llh_tot, }
[docs] def simulate_per_condition( self, x_dct: dict, roadrunner_instance: roadrunner.RoadRunner, edata: ExpData, parameter_mapping_per_condition: ParMappingDictQuadruple, ) -> tuple[np.ndarray, float]: """Simulate the model for a single condition. Parameters ---------- x_dct: Parameter dictionary. roadrunner_instance: RoadRunner instance. edata: ExpData of a single condition. parameter_mapping_per_condition: Parameter parameter_mapping for a single condition. Returns ------- Tuple of simulation results in form of a numpy array and the negative log-likelihood. """ # get timepoints and outputs to simulate timepoints = list(edata.timepoints) if timepoints[0] != 0.0: timepoints = [0.0] + timepoints if len(timepoints) == 1: timepoints = [0.0] + timepoints observables_ids = edata.get_observable_ids() # steady state stuff steady_state_calculations = False state_variables = roadrunner_instance.model.getFloatingSpeciesIds() # some states might be hidden as parameters with rate rules rate_rule_ids = roadrunner_instance.getRateRuleIds() state_variables += [ rate_rule_id for rate_rule_id in rate_rule_ids if rate_rule_id not in state_variables ] # obs_ss = [] # TODO: add them to return values with info state_ss = [] # if the first and third parameter mappings are not empty, we need # to pre-equlibrate the model if ( parameter_mapping_per_condition[0] and parameter_mapping_per_condition[2] ): steady_state_calculations = True roadrunner_instance.conservedMoietyAnalysis = True self.fill_in_parameters( x_dct, roadrunner_instance, parameter_mapping_per_condition, preeq=True, ) # allow simulation to reach steady state roadrunner_instance.getSteadyStateSolver().setValue( "allow_presimulation", True ) # steady state output = observables + state variables steady_state_selections = observables_ids + state_variables roadrunner_instance.steadyStateSelections = steady_state_selections steady_state = roadrunner_instance.getSteadyStateValuesNamedArray() # we split the steady state into observables and state variables # obs_ss = steady_state[:, : len(observables_ids)].flatten() state_ss = steady_state[:, len(observables_ids) :].flatten() # turn off conserved moiety analysis roadrunner_instance.conservedMoietyAnalysis = False # reset the model roadrunner_instance.reset() # set parameters par_map = self.fill_in_parameters( x_dct, roadrunner_instance, parameter_mapping_per_condition ) # if steady state calculations are required, set state variables if steady_state_calculations: roadrunner_instance.setValues(state_variables, state_ss) # fill in overriden species self.fill_in_parameters( x_dct, roadrunner_instance, parameter_mapping_per_condition, filling_mode="only_species", ) sim_res = roadrunner_instance.simulate( times=timepoints, selections=[TIME] + observables_ids ) llhs = calculate_llh(sim_res, edata, par_map, roadrunner_instance) # reset the model roadrunner_instance.reset() return sim_res, llhs
[docs] def fill_in_parameters( self, problem_parameters: dict, roadrunner_instance: Optional[roadrunner.RoadRunner] = None, parameter_mapping: Optional[ParMappingDictQuadruple] = None, preeq: bool = False, filling_mode: Optional[str] = None, ) -> dict: """Fill in parameters into the roadrunner instance. Parameters ---------- roadrunner_instance: RoadRunner instance to fill in parameters problem_parameters: Problem parameters as parameterId=>value dict. Only parameters included here will be set. Remaining parameters will be used as already set in `amici_model` and `edata`. parameter_mapping: Parameter mapping for current condition. Quadruple of dicts, where the first dict contains the parameter mapping for pre- equilibration, the second dict contains the parameter mapping for the simulation, the third and fourth dict contain the scaling factors for the pre-equilibration and simulation, respectively. preeq: Whether to fill in parameters for pre-equilibration. filling_mode: Which parameters to fill in. If None or "all", all parameters are filled in. Other options are "only_parameters" and "only_species". Returns ------- dict: Mapping of parameter IDs to values. """ if filling_mode is None: filling_mode = "all" # check for valid filling modes if filling_mode not in ["all", "only_parameters", "only_species"]: raise ValueError( "Invalid filling mode. Choose from 'all', " "'only_parameters', 'only_species'." ) mapping = parameter_mapping[1] # default: simulation condition mapping scaling = parameter_mapping[3] # default: simulation condition scaling if preeq: mapping = parameter_mapping[0] # pre-equilibration mapping scaling = parameter_mapping[2] # pre-equilibration scaling # Parameter parameter_mapping may contain parameter_ids as values, # these *must* be replaced def _get_par(model_par, val): """Get parameter value from problem_parameters and mapping. Replace parameter IDs in parameter_mapping dicts by values from problem_parameters where necessary """ if isinstance(val, str): try: # estimated parameter return problem_parameters[val] except KeyError: # condition table overrides must have been handled already, # e.g. by the PEtab parameter parameter_mapping, but # parameters from InitialAssignments may still be present. if mapping[val] == model_par: # prevent infinite recursion raise return _get_par(val, mapping[val]) if model_par in problem_parameters: # user-provided return problem_parameters[model_par] # prevent nan-propagation in derivative if np.isnan(val): return 0.0 # constant value return val mapping_values = { key: _get_par(key, val) for key, val in mapping.items() } # we assume the parameters to be given in the scale defined in the # petab problem. Thus, they need to be unscaled. mapping_values = unscale_parameters(mapping_values, scaling) # seperate the parameters into ones that overwrite species and others mapping_params = {} mapping_species = {} for key, value in mapping_values.items(): if key in roadrunner_instance.model.getFloatingSpeciesIds(): # values that originally were NaN are not set if isinstance(mapping[key], str) or not np.isnan(mapping[key]): mapping_species[key] = float(value) else: mapping_params[key] = value if filling_mode == "only_parameters" or filling_mode == "all": # set parameters. roadrunner_instance.setValues(mapping_params) # reset is necessary to apply the changes to initial assignments roadrunner_instance.reset() if filling_mode == "only_species" or filling_mode == "all": # set species roadrunner_instance.setValues(mapping_species) return mapping_values
def calculate_llh( simulations: np.ndarray, edata: ExpData, parameter_mapping: dict, roadrunner_instance: roadrunner.RoadRunner, ) -> float: """Calculate the negative log-likelihood for a single condition. Parameters ---------- simulations: Simulations of condition. edata: ExpData of a single condition. parameter_mapping: Parameter mapping for the condition. roadrunner_instance: RoadRunner instance. Needed to retrieve complex formulae. Returns ------- float: Negative log-likelihood. """ # if 0 is not in timepoints, remove the first row of the simulation if 0.0 not in edata.timepoints: simulations = simulations[1:, :] if not np.array_equal(simulations[:, 0], edata.timepoints): raise ValueError( "Simulation and Measurement have different timepoints." ) # check that simulation and condition have same dimensions and timepoints if simulations.shape != edata.measurements.shape: raise ValueError( "Simulation and Measurement have different dimensions." ) # we can now drop the timepoints simulations = simulations[:, 1:] measurements = edata.measurements[:, 1:] def _fill_in_noise_formula(noise_formula): """Fill in the noise formula.""" if isinstance(noise_formula, numbers.Number): return float(noise_formula) # if it is not a number, it is assumed to be a string if noise_formula in parameter_mapping.keys(): return parameter_mapping[noise_formula] # if the string starts with "noiseFormula_" it is saved in the model if noise_formula.startswith("noiseFormula_"): return roadrunner_instance.getValue(noise_formula) # replace noise formula with actual value from mapping noise_formulae = np.array( [_fill_in_noise_formula(formula) for formula in edata.noise_formulae] ) # check that the rows of noise are the columns of the simulation if noise_formulae.shape[0] != simulations.shape[1]: raise ValueError("Noise and Simulation have different dimensions.") # per observable, decide on the llh function based on the noise dist llhs = np.array( [ LLH_TYPES[noise_dist]( measurements[:, i], simulations[:, i], noise_formulae[i] ) for i, noise_dist in enumerate(edata.noise_distributions) ] ).transpose() # check whether all nan values in llhs coincide with nan measurements if not np.all(np.isnan(llhs) == np.isnan(measurements)): return np.nan # sum over all observables, ignoring nans llhs = np.nansum(llhs) return llhs