"""Importer for PEtab problems using RoadRunner.
Creates from a PEtab problem a roadrunner model, a roadrunner objective or a
pypesto problem with a roadrunner objective. The actual form of the likelihood
depends on the noise model specified in the provided PEtab problem.
"""
from __future__ import annotations
import numbers
import re
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import libsbml
import petab
import roadrunner
from petab.C import (
OBSERVABLE_FORMULA,
PREEQUILIBRATION_CONDITION_ID,
SIMULATION_CONDITION_ID,
)
from petab.models.sbml_model import SbmlModel
from petab.parameter_mapping import ParMappingDictQuadruple
from ...petab.importer import PetabStartpoints
from ...problem import Problem
from ...startpoint import StartpointMethod
from ..aggregated import AggregatedObjective
from ..priors import NegLogParameterPriors, get_parameter_prior_dict
from .road_runner import RoadRunnerObjective
from .roadrunner_calculator import RoadRunnerCalculator
from .utils import ExpData
[docs]
class PetabImporterRR:
"""
Importer for PEtab problems using RoadRunner.
Create a :class:`roadrunner.RoadRunner` instance,
a :class:`pypesto.objective.RoadRunnerObjective` or a
:class:`pypesto.problem.Problem` from PEtab files. The actual
form of the likelihood depends on the noise model specified in the provided PEtab problem.
For more information, see the
`PEtab documentation <https://petab.readthedocs.io/en/latest/documentation_data_format.html#noise-distributions>`_.
""" # noqa
[docs]
def __init__(
self, petab_problem: petab.Problem, validate_petab: bool = True
):
"""Initialize importer.
Parameters
----------
petab_problem:
Managing access to the model and data.
validate_petab:
Flag indicating if the PEtab problem shall be validated.
"""
self.petab_problem = petab_problem
if validate_petab:
if petab.lint_problem(petab_problem):
raise ValueError("Invalid PEtab problem.")
self.rr = roadrunner.RoadRunner()
[docs]
@staticmethod
def from_yaml(yaml_config: Path | str) -> PetabImporterRR:
"""Simplified constructor using a petab yaml file."""
petab_problem = petab.Problem.from_yaml(yaml_config)
return PetabImporterRR(petab_problem=petab_problem)
def _check_noise_formulae(
self,
edatas: list[ExpData] | None = None,
parameter_mapping: list[ParMappingDictQuadruple] | None = None,
):
"""Check if the noise formulae are valid.
Currently, only static values or singular parameters are supported.
Complex formulae are not supported.
"""
# check that parameter mapping is available
if parameter_mapping is None:
parameter_mapping = self.create_parameter_mapping()
# check that edatas are available
if edatas is None:
edatas = self.create_edatas()
# save formulae that need to be changed
to_change = []
# check that noise formulae are valid
for i_edata, (edata, par_map) in enumerate(
zip(edatas, parameter_mapping)
):
for j_formula, noise_formula in enumerate(edata.noise_formulae):
# constant values are allowed
if isinstance(noise_formula, numbers.Number):
continue
# single parameters are allowed
if noise_formula in par_map[1].keys():
continue
# extract the observable name via regex pattern
pattern = r"noiseParameter1_(.*?)($|\s)"
observable_name = re.search(pattern, noise_formula).group(1)
to_change.append((i_edata, j_formula, observable_name))
# change formulae
formulae_changed = []
for i_edata, j_formula, obs_name in to_change:
# assign new parameter, formula in RR and parameter into mapping
original_formula = edatas[i_edata].noise_formulae[j_formula]
edatas[i_edata].noise_formulae[
j_formula
] = f"noiseFormula_{obs_name}"
# different conditions will have the same noise formula
if (obs_name, original_formula) not in formulae_changed:
self.rr.addParameter(f"noiseFormula_{obs_name}", 0.0, False)
self.rr.addAssignmentRule(
f"noiseFormula_{obs_name}",
original_formula,
forceRegenerate=False,
)
self.rr.regenerateModel()
formulae_changed.append((obs_name, original_formula))
def _write_observables_to_model(self):
"""Write observables of petab problem to the model."""
# add all observables as species
for obs_id in self.petab_problem.observable_df.index:
self.rr.addParameter(obs_id, 0.0, False)
# extract all parameters from observable formulas
parameters = petab.get_output_parameters(
self.petab_problem.observable_df,
self.petab_problem.model,
noise=True,
observables=True,
)
# add all parameters to the model
for param_id in parameters:
self.rr.addParameter(param_id, 0.0, False)
formulae = self.petab_problem.observable_df[
OBSERVABLE_FORMULA
].to_dict()
# add all observable formulas as assignment rules
for obs_id, formula in formulae.items():
self.rr.addAssignmentRule(obs_id, formula, forceRegenerate=False)
# regenerate model to apply changes
self.rr.regenerateModel()
[docs]
def create_edatas(self) -> list[ExpData]:
"""Create a List of :class:`ExpData` objects from the PEtab problem."""
# Create Dataframes per condition
return ExpData.from_petab_problem(self.petab_problem)
[docs]
def fill_model(self):
"""Fill the RoadRunner model inplace from the PEtab problem.
Parameters
----------
return_model:
Flag indicating if the model should be returned.
"""
if not isinstance(self.petab_problem.model, SbmlModel):
raise ValueError(
"The model is not an SBML model. Using "
"RoadRunner as simulator requires an SBML model."
) # TODO: add Pysb support
if self.petab_problem.model.sbml_document:
sbml_document = self.petab_problem.model.sbml_document
elif self.petab_problem.model.sbml_model:
sbml_document = (
self.petab_problem.model.sbml_model.getSBMLDocument()
)
else:
raise ValueError("No SBML model found.")
sbml_writer = libsbml.SBMLWriter()
sbml_string = sbml_writer.writeSBMLToString(sbml_document)
self.rr.load(sbml_string)
self._write_observables_to_model()
[docs]
def create_parameter_mapping(self):
"""Create a parameter mapping from the PEtab problem."""
simulation_conditions = (
self.petab_problem.get_simulation_conditions_from_measurement_df()
)
mapping = petab.get_optimization_to_simulation_parameter_mapping(
condition_df=self.petab_problem.condition_df,
measurement_df=self.petab_problem.measurement_df,
parameter_df=self.petab_problem.parameter_df,
observable_df=self.petab_problem.observable_df,
model=self.petab_problem.model,
)
# check whether any species in the condition table are assigned
species = self.rr.model.getFloatingSpeciesIds()
# overrides in parameter table are handled already
overrides = [
specie
for specie in species
if specie in self.petab_problem.condition_df.columns
]
if not overrides:
return mapping
for (_, condition), mapping_per_condition in zip(
simulation_conditions.iterrows(), mapping
):
for override in overrides:
preeq_id = condition.get(PREEQUILIBRATION_CONDITION_ID)
sim_id = condition.get(SIMULATION_CONDITION_ID)
if preeq_id:
mapping_per_condition[0][
override
] = self.petab_problem.condition_df.loc[preeq_id, override]
mapping_per_condition[2][override] = "lin"
if sim_id:
mapping_per_condition[1][
override
] = self.petab_problem.condition_df.loc[sim_id, override]
mapping_per_condition[3][override] = "lin"
return mapping
[docs]
def create_objective(
self,
rr: roadrunner.RoadRunner | None = None,
edatas: ExpData | None = None,
) -> RoadRunnerObjective:
"""Create a :class:`pypesto.objective.RoadRunnerObjective`.
Parameters
----------
rr:
RoadRunner instance.
edatas:
ExpData object.
"""
roadrunner_instance = rr
if roadrunner_instance is None:
roadrunner_instance = self.rr
self.fill_model()
if edatas is None:
edatas = self.create_edatas()
parameter_mapping = self.create_parameter_mapping()
# get x_names
x_names = self.petab_problem.get_x_ids()
calculator = RoadRunnerCalculator()
# run the check for noise formulae
self._check_noise_formulae(edatas, parameter_mapping)
return RoadRunnerObjective(
rr=roadrunner_instance,
edatas=edatas,
parameter_mapping=parameter_mapping,
petab_problem=self.petab_problem,
calculator=calculator,
x_names=x_names,
)
[docs]
def create_prior(self) -> NegLogParameterPriors | None:
"""
Create a prior from the parameter table.
Returns None, if no priors are defined.
"""
prior_list = []
if petab.OBJECTIVE_PRIOR_TYPE not in self.petab_problem.parameter_df:
return None
for i, x_id in enumerate(self.petab_problem.x_ids):
prior_type_entry = self.petab_problem.parameter_df.loc[
x_id, petab.OBJECTIVE_PRIOR_TYPE
]
if (
isinstance(prior_type_entry, str)
and prior_type_entry != petab.PARAMETER_SCALE_UNIFORM
):
prior_params = [
float(param)
for param in self.petab_problem.parameter_df.loc[
x_id, petab.OBJECTIVE_PRIOR_PARAMETERS
].split(";")
]
scale = self.petab_problem.parameter_df.loc[
x_id, petab.PARAMETER_SCALE
]
prior_list.append(
get_parameter_prior_dict(
i, prior_type_entry, prior_params, scale
)
)
return NegLogParameterPriors(prior_list)
[docs]
def create_startpoint_method(self, **kwargs) -> StartpointMethod:
"""Create a startpoint method.
Parameters
----------
**kwargs:
Additional keyword arguments passed on to
:meth:`pypesto.startpoint.FunctionStartpoints.__init__`.
"""
return PetabStartpoints(petab_problem=self.petab_problem, **kwargs)
[docs]
def create_problem(
self,
objective: RoadRunnerObjective | None = None,
x_guesses: Iterable[float] | None = None,
problem_kwargs: dict[str, Any] | None = None,
startpoint_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> Problem:
"""Create a :class:`pypesto.problem.Problem`.
Parameters
----------
objective:
Objective as created by :meth:`create_objective`.
x_guesses:
Guesses for the parameter values, shape (g, dim), where g denotes
the number of guesses. These are used as start points in the
optimization.
problem_kwargs:
Passed to :meth:`pypesto.problem.Problem.__init__`.
startpoint_kwargs:
Keyword arguments forwarded to
:meth:`PetabImporter.create_startpoint_method`.
**kwargs:
Additional key word arguments passed on to the objective,
if not provided.
Returns
-------
A :class:`pypesto.problem.Problem` instance.
"""
if objective is None:
objective = self.create_objective(**kwargs)
x_fixed_indices = self.petab_problem.x_fixed_indices
x_fixed_vals = self.petab_problem.x_nominal_fixed_scaled
x_ids = self.petab_problem.x_ids
lb = self.petab_problem.lb_scaled
ub = self.petab_problem.ub_scaled
x_scales = [
self.petab_problem.parameter_df.loc[x_id, petab.PARAMETER_SCALE]
for x_id in x_ids
]
if problem_kwargs is None:
problem_kwargs = {}
if startpoint_kwargs is None:
startpoint_kwargs = {}
prior = self.create_prior()
if prior is not None:
objective = AggregatedObjective([objective, prior])
problem = Problem(
objective=objective,
lb=lb,
ub=ub,
x_fixed_indices=x_fixed_indices,
x_fixed_vals=x_fixed_vals,
x_guesses=x_guesses,
x_names=x_ids,
x_scales=x_scales,
x_priors_defs=prior,
startpoint_method=self.create_startpoint_method(
**startpoint_kwargs
),
copy_objective=False,
**problem_kwargs,
)
return problem