import pandas as pd
import os
import sys
import importlib
import shutil
import logging
import tempfile
from typing import Iterable, List, Optional, Sequence, Union, Callable
from ..problem import Problem
from ..objective import AmiciObjective, AmiciObjectBuilder, AggregatedObjective
from ..predict import AmiciPredictor, PredictionResult
from ..predict.constants import CONDITION_SEP
from ..objective.priors import NegLogParameterPriors, \
get_parameter_prior_dict
try:
import petab
from petab.C import PREEQUILIBRATION_CONDITION_ID, SIMULATION_CONDITION_ID
import amici
import amici.petab_import
import amici.petab_objective
import amici.parameter_mapping
except ImportError:
pass
logger = logging.getLogger(__name__)
[docs]class PetabImporter(AmiciObjectBuilder):
MODEL_BASE_DIR = "amici_models"
[docs] def __init__(self,
petab_problem: 'petab.Problem',
output_folder: str = None,
model_name: str = None):
"""
petab_problem:
Managing access to the model and data.
output_folder:
Folder to contain the amici model. Defaults to
'./amici_models/{model_name}'.
model_name:
Name of the model, which will in particular be the name of the
compiled model python module.
"""
self.petab_problem = petab_problem
if output_folder is None:
output_folder = _find_output_folder_name(
self.petab_problem,
model_name=model_name,
)
self.output_folder = output_folder
if model_name is None:
model_name = _find_model_name(self.output_folder)
self.model_name = model_name
[docs] @staticmethod
def from_yaml(yaml_config: Union[dict, str],
output_folder: str = None,
model_name: str = None) -> 'PetabImporter':
"""
Simplified constructor using a petab yaml file.
"""
petab_problem = petab.Problem.from_yaml(yaml_config)
return PetabImporter(
petab_problem=petab_problem,
output_folder=output_folder,
model_name=model_name)
[docs] def create_model(self,
force_compile: bool = False,
**kwargs) -> 'amici.Model':
"""
Import amici model. If necessary or force_compile is True, compile
first.
Parameters
----------
force_compile:
If False, the model is compiled only if the output folder does not
exist yet. If True, the output folder is deleted and the model
(re-)compiled in either case.
.. warning::
If `force_compile`, then an existing folder of that name will
be deleted.
kwargs: Extra arguments passed to amici.SbmlImporter.sbml2amici
"""
# courtesy check whether target is folder
if os.path.exists(self.output_folder) \
and not os.path.isdir(self.output_folder):
raise AssertionError(
f"Refusing to remove {self.output_folder} for model "
f"compilation: Not a folder.")
# add module to path
if self.output_folder not in sys.path:
sys.path.insert(0, self.output_folder)
# compile
if self._must_compile(force_compile):
logger.info(f"Compiling amici model to folder "
f"{self.output_folder}.")
self.compile_model(**kwargs)
else:
logger.info(f"Using existing amici model in folder "
f"{self.output_folder}.")
return self._create_model()
def _create_model(self) -> 'amici.Model':
"""
No checks, no compilation, just load the model module and return
the model.
"""
# load moduĺe
module = amici.import_model_module(module_name=self.model_name,
module_path=self.output_folder)
model = module.getModel()
return model
def _must_compile(self, force_compile: bool):
"""
Check whether the model needs to be compiled first.
"""
# asked by user
if force_compile:
return True
# folder does not exist
if not os.path.exists(self.output_folder) or \
not os.listdir(self.output_folder):
return True
# try to import (in particular checks version)
try:
# importing will already raise an exception if version wrong
importlib.import_module(self.model_name)
except ModuleNotFoundError:
return True
# no need to (re-)compile
return False
[docs] def compile_model(self, **kwargs):
"""
Compile the model. If the output folder exists already, it is first
deleted.
Parameters
----------
kwargs: Extra arguments passed to `amici.SbmlImporter.sbml2amici`.
"""
# delete output directory
if os.path.exists(self.output_folder):
shutil.rmtree(self.output_folder)
amici.petab_import.import_model(
sbml_model=self.petab_problem.sbml_model,
condition_table=self.petab_problem.condition_df,
observable_table=self.petab_problem.observable_df,
model_name=self.model_name,
model_output_dir=self.output_folder,
**kwargs)
[docs] def create_solver(self, model: 'amici.Model' = None) -> 'amici.Solver':
"""
Return model solver.
"""
# create model
if model is None:
model = self.create_model()
solver = model.getSolver()
return solver
[docs] def create_edatas(
self,
model: 'amici.Model' = None,
simulation_conditions=None
) -> List['amici.ExpData']:
"""
Create list of amici.ExpData objects.
"""
# create model
if model is None:
model = self.create_model()
return amici.petab_objective.create_edatas(
amici_model=model,
petab_problem=self.petab_problem,
simulation_conditions=simulation_conditions)
[docs] def create_objective(
self,
model: 'amici.Model' = None,
solver: 'amici.Solver' = None,
edatas: Sequence['amici.ExpData'] = None,
force_compile: bool = False,
**kwargs
) -> AmiciObjective:
"""Create a :class:`pypesto.AmiciObjective`.
Parameters
----------
model:
The AMICI model.
solver:
The AMICI solver.
edatas:
The experimental data in AMICI format.
force_compile:
Whether to force-compile the model if not passed.
**kwargs:
Additional arguments passed on to the objective.
Returns
-------
objective:
A :class:`pypesto.AmiciObjective` for the model and the data.
"""
# get simulation conditions
simulation_conditions = petab.get_simulation_conditions(
self.petab_problem.measurement_df)
# create model
if model is None:
model = self.create_model(force_compile=force_compile)
# create solver
if solver is None:
solver = self.create_solver(model)
# create conditions and edatas from measurement data
if edatas is None:
edatas = self.create_edatas(
model=model,
simulation_conditions=simulation_conditions)
parameter_mapping = amici.petab_objective.create_parameter_mapping(
petab_problem=self.petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=True,
amici_model=model)
par_ids = self.petab_problem.x_ids
# fill in dummy parameters (this is needed since some objective
# initialization e.g. checks for preeq parameters)
problem_parameters = {key: val for key, val in zip(
self.petab_problem.x_ids,
self.petab_problem.x_nominal_scaled)}
amici.parameter_mapping.fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=True,
parameter_mapping=parameter_mapping,
amici_model=model)
# create objective
obj = AmiciObjective(
amici_model=model, amici_solver=solver, edatas=edatas,
x_ids=par_ids, x_names=par_ids,
parameter_mapping=parameter_mapping,
amici_object_builder=self,
**kwargs)
return obj
[docs] def create_predictor(
self,
objective: AmiciObjective = None,
amici_output_fields: Sequence[str] = None,
post_processor: Union[Callable, None] = None,
post_processor_sensi: Union[Callable, None] = None,
post_processor_time: Union[Callable, None] = None,
max_chunk_size: Union[int, None] = None,
output_ids: Sequence[str] = None,
condition_ids: Sequence[str] = None,
) -> AmiciPredictor:
"""Create a :class:`pypesto.predict.AmiciPredictor`.
The `AmiciPredictor` facilitates generation of predictions from
parameter vectors.
Parameters
----------
objective:
An objective object, which will be used to get model simulations
amici_output_fields:
keys that exist in the return data object from AMICI, which should
be available for the post-processors
post_processor:
A callable function which applies postprocessing to the simulation
results. Default are the observables of the AMICI model.
This method takes a list of ndarrays (as returned in the field
['y'] of amici ReturnData objects) as input.
post_processor_sensi:
A callable function which applies postprocessing to the
sensitivities of the simulation results. Default are the
observable sensitivities of the AMICI model.
This method takes two lists of ndarrays (as returned in the
fields ['y'] and ['sy'] of amici ReturnData objects) as input.
post_processor_time:
A callable function which applies postprocessing to the timepoints
of the simulations. Default are the timepoints of the amici model.
This method takes a list of ndarrays (as returned in the field
['t'] of amici ReturnData objects) as input.
max_chunk_size:
In some cases, we don't want to compute all predictions at once
when calling the prediction function, as this might not fit into
the memory for large datasets and models.
Here, the user can specify a maximum number of conditions, which
should be simulated at a time.
Default is 0 meaning that all conditions will be simulated.
Other values are only applicable, if an output file is specified.
output_ids:
IDs of outputs, if post-processing is used
condition_ids:
IDs of conditions, if post-processing is used
Returns
-------
predictor:
A :class:`pypesto.predict.AmiciPredictor` for the model, using
the outputs of the AMICI model and the timepoints from the
PEtab data
"""
# if the user didn't pass an objective function, we create it first
if objective is None:
objective = self.create_objective()
# create a identifiers of preequilibration and simulation condition ids
# which can then be stored in the prediction result
edata_conditions = objective.amici_object_builder.petab_problem.\
get_simulation_conditions_from_measurement_df()
if PREEQUILIBRATION_CONDITION_ID not in list(edata_conditions.columns):
preeq_dummy = [''] * edata_conditions.shape[0]
edata_conditions[PREEQUILIBRATION_CONDITION_ID] = preeq_dummy
edata_conditions.drop_duplicates(inplace=True)
if condition_ids is None:
condition_ids = [
edata_conditions.loc[id, PREEQUILIBRATION_CONDITION_ID] +
CONDITION_SEP + edata_conditions.loc[id,
SIMULATION_CONDITION_ID]
for id in edata_conditions.index
]
# wrap around AmiciPredictor
predictor = AmiciPredictor(
amici_objective=objective,
amici_output_fields=amici_output_fields,
post_processor=post_processor,
post_processor_sensi=post_processor_sensi,
post_processor_time=post_processor_time,
max_chunk_size=max_chunk_size,
output_ids=output_ids,
condition_ids=condition_ids)
return predictor
[docs] def create_prior(self) -> NegLogParameterPriors:
"""
Creates a prior from the parameter table. Returns None, if no priors
are defined.
"""
prior_list = []
if petab.OBJECTIVE_PRIOR_TYPE in self.petab_problem.parameter_df:
for i, x_id in enumerate(self.petab_problem.x_ids):
prior_type_entry = self.petab_problem.\
parameter_df.loc[x_id, petab.OBJECTIVE_PRIOR_TYPE]
if (isinstance(prior_type_entry, str)
and prior_type_entry != petab.PARAMETER_SCALE_UNIFORM):
prior_params = [float(param) for param in
self.petab_problem.parameter_df.
loc[x_id, petab.OBJECTIVE_PRIOR_PARAMETERS]
.split(';')]
scale = self.petab_problem.\
parameter_df.loc[x_id, petab.PARAMETER_SCALE]
prior_list.append(
get_parameter_prior_dict(i,
prior_type_entry,
prior_params,
scale))
if len(prior_list):
return NegLogParameterPriors(prior_list)
else:
return None
[docs] def create_startpoint_method(self):
"""
Creates a startpoint method, if the PEtab problem specifies an
initializationPrior. Returns None, if no initializationPrior
is specified.
"""
if petab.INITIALIZATION_PRIOR_TYPE \
not in self.petab_problem.parameter_df:
return None
def startpoint_method(n_starts: int, **kwargs):
return petab.sample_parameter_startpoints(
self.petab_problem.parameter_df,
n_starts=n_starts)
return startpoint_method
[docs] def create_problem(
self, objective: AmiciObjective = None,
x_guesses: Optional[Iterable[float]] = None, **kwargs
) -> Problem:
"""Create a :class:`pypesto.Problem`.
Parameters
----------
objective:
Objective as created by `create_objective`.
x_guesses:
Guesses for the parameter values, shape (g, dim), where g denotes
the number of guesses. These are used as start points in the
optimization.
**kwargs:
Additional key word arguments passed on to the objective,
if not provided.
Returns
-------
problem:
A :class:`pypesto.Problem` for the objective.
"""
if objective is None:
objective = self.create_objective(**kwargs)
prior = self.create_prior()
if prior is not None:
objective = AggregatedObjective([objective, prior])
x_scales = \
[self.petab_problem.parameter_df.loc[x_id, petab.PARAMETER_SCALE]
for x_id in self.petab_problem.x_ids]
problem = Problem(
objective=objective,
lb=self.petab_problem.lb_scaled,
ub=self.petab_problem.ub_scaled,
x_fixed_indices=self.petab_problem.x_fixed_indices,
x_fixed_vals=self.petab_problem.x_nominal_fixed_scaled,
x_guesses=x_guesses,
startpoint_method=self.create_startpoint_method(),
x_names=self.petab_problem.x_ids,
x_scales=x_scales,
x_priors_defs=prior)
return problem
[docs] def rdatas_to_measurement_df(
self, rdatas: Sequence['amici.ReturnData'],
model: 'amici.Model' = None
) -> pd.DataFrame:
"""
Create a measurement dataframe in the petab format from
the passed `rdatas` and own information.
Parameters
----------
rdatas:
A list of rdatas as produced by
pypesto.AmiciObjective.__call__(x, return_dict=True)['rdatas'].
model:
The amici model.
Returns
-------
measurement_df:
A dataframe built from the rdatas in the format as in
self.petab_problem.measurement_df.
"""
# create model
if model is None:
model = self.create_model()
measurement_df = self.petab_problem.measurement_df
return amici.petab_objective.rdatas_to_measurement_df(
rdatas, model, measurement_df)
[docs] def rdatas_to_simulation_df(
self, rdatas: Sequence['amici.ReturnData'],
model: 'amici.Model' = None
) -> pd.DataFrame:
"""Same as `rdatas_to_measurement_df`, execpt a petab simulation
dataframe is created, i.e. the measurement column label is adjusted.
"""
return self.rdatas_to_measurement_df(rdatas, model).rename(
{petab.MEASUREMENT: petab.SIMULATION})
[docs] def prediction_to_petab_measurement_df(
self,
prediction: PredictionResult,
predictor: AmiciPredictor = None
) -> pd.DataFrame:
"""
If a PEtab problem is simulated without post-processing, then the
result can be cast into a PEtab measurement or simulation dataframe
Parameters
----------
prediction:
A prediction result as produced by an AmiciPredictor
predictor:
The AmiciPredictor function
Returns
-------
measurement_df:
A dataframe built from the rdatas in the format as in
self.petab_problem.measurement_df.
"""
# create rdata-like dicts from the prediction result
rdatas = []
for condition in prediction.conditions:
rdatas.append({'t': condition.timepoints,
'y': condition.output})
# add an AMICI model, if possible
model = None
if predictor is not None:
model = predictor.amici_objective.amici_model
return self.rdatas_to_measurement_df(rdatas, model)
[docs] def prediction_to_petab_simulation_df(
self,
prediction: PredictionResult,
predictor: AmiciPredictor = None
) -> pd.DataFrame:
"""Same as `prediction_to_petab_measurement_df`, except a PEtab
simulation dataframe is created, i.e. the measurement column label is
adjusted.
"""
return self.prediction_to_petab_measurement_df(
prediction, predictor).rename(
{petab.MEASUREMENT: petab.SIMULATION})
def _find_output_folder_name(
petab_problem: 'petab.Problem',
model_name: str,
) -> str:
"""
Find a name for storing the compiled amici model in. If available,
use the sbml model name from the `petab_problem` or the provided
`model_name` (latter is given priority), otherwise create a unique name.
The folder will be located in the `PetabImporter.MODEL_BASE_DIR`
subdirectory of the current directory.
"""
# check whether location for amici model is a file
if os.path.exists(PetabImporter.MODEL_BASE_DIR) and \
not os.path.isdir(PetabImporter.MODEL_BASE_DIR):
raise AssertionError(
f"{PetabImporter.MODEL_BASE_DIR} exists and is not a directory, "
f"thus cannot create a directory for the compiled amici model.")
# create base directory if non-existent
if not os.path.exists(PetabImporter.MODEL_BASE_DIR):
os.makedirs(PetabImporter.MODEL_BASE_DIR)
# try sbml model id
sbml_model_id = petab_problem.sbml_model.getId()
if model_name is not None:
sbml_model_id = model_name
if sbml_model_id:
output_folder = os.path.abspath(
os.path.join(PetabImporter.MODEL_BASE_DIR, sbml_model_id))
else:
# create random folder name
output_folder = os.path.abspath(
tempfile.mkdtemp(dir=PetabImporter.MODEL_BASE_DIR))
return output_folder
def _find_model_name(output_folder: str) -> str:
"""
Just re-use the last part of the output folder.
"""
return os.path.split(os.path.normpath(output_folder))[-1]