Source code for pypesto.petab.importer

"""Contains the PetabImporter class."""

from __future__ import annotations

import logging
import os
import shutil
import sys
import tempfile
import warnings
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from functools import partial
from importlib.metadata import version
from typing import (
    Any,
    Callable,
)

import numpy as np
import pandas as pd

from ..C import (
    CENSORED,
    CENSORING_TYPES,
    CONDITION_SEP,
    MEASUREMENT_TYPE,
    MODE_FUN,
    MODE_RES,
    ORDINAL,
    ORDINAL_OPTIONS,
    PARAMETER_TYPE,
    RELATIVE,
    SEMIQUANTITATIVE,
    SPLINE_APPROXIMATION_OPTIONS,
    InnerParameterType,
)
from ..hierarchical.inner_calculator_collector import InnerCalculatorCollector
from ..objective import AggregatedObjective, AmiciObjective
from ..objective.amici import AmiciObjectBuilder
from ..objective.priors import NegLogParameterPriors, get_parameter_prior_dict
from ..predict import AmiciPredictor
from ..problem import HierarchicalProblem, Problem
from ..result import PredictionResult
from ..startpoint import CheckedStartpoints, StartpointMethod

try:
    import amici
    import amici.petab
    import amici.petab.conditions
    import amici.petab.parameter_mapping
    import amici.petab.simulations
    import petab
    from amici.petab.import_helpers import check_model
    from petab.C import (
        ESTIMATE,
        NOISE_PARAMETERS,
        OBSERVABLE_ID,
        PREEQUILIBRATION_CONDITION_ID,
        SIMULATION_CONDITION_ID,
    )
    from petab.models import MODEL_TYPE_SBML
except ImportError:
    amici = None

logger = logging.getLogger(__name__)


[docs] class PetabImporter(AmiciObjectBuilder): """ Importer for PEtab files. Create an :class:`amici.amici.Model`, an :class:`pypesto.objective.AmiciObjective` or a :class:`pypesto.problem.Problem` from PEtab files. The created objective function is a negative log-likelihood function and can thus be negative. The actual form of the likelihood depends on the noise model specified in the provided PEtab problem. For more information, see the `PEtab documentation <https://petab.readthedocs.io/en/latest/documentation_data_format.html#noise-distributions>`_. """ # noqa MODEL_BASE_DIR = f"amici_models/{version('amici') if amici else ''}"
[docs] def __init__( self, petab_problem: petab.Problem, output_folder: str = None, model_name: str = None, validate_petab: bool = True, validate_petab_hierarchical: bool = True, hierarchical: bool = False, inner_options: dict = None, ): """Initialize importer. Parameters ---------- petab_problem: Managing access to the model and data. output_folder: Folder to contain the amici model. Defaults to './amici_models/{model_name}'. model_name: Name of the model, which will in particular be the name of the compiled model python module. validate_petab: Flag indicating if the PEtab problem shall be validated. validate_petab_hierarchical: Flag indicating if the PEtab problem shall be validated in terms of pyPESTO's hierarchical optimization implementation. hierarchical: Whether to use hierarchical optimization or not, in case the underlying PEtab problem has parameters marked for hierarchical optimization (non-empty `parameterType` column in the PEtab parameter table). Required for ordinal, censored and semiquantitative data. inner_options: Options for the inner problems and solvers. If not provided, default options will be used. """ self.petab_problem = petab_problem self._hierarchical = hierarchical self._non_quantitative_data_types = ( get_petab_non_quantitative_data_types(petab_problem) ) if self._non_quantitative_data_types is None and hierarchical: raise ValueError( "Hierarchical optimization enabled, but no non-quantitative " "data types specified. Specify non-quantitative data types " "or disable hierarchical optimization." ) if ( self._non_quantitative_data_types is not None and any( data_type in self._non_quantitative_data_types for data_type in [ORDINAL, CENSORED, SEMIQUANTITATIVE] ) and not self._hierarchical ): raise ValueError( "Ordinal, censored and semiquantitative data require " "hierarchical optimization to be enabled.", ) self.inner_options = inner_options if self.inner_options is None: self.inner_options = {} self.validate_inner_options() self.validate_petab = validate_petab if self.validate_petab: if petab.lint_problem(petab_problem): raise ValueError("Invalid PEtab problem.") if self._hierarchical and validate_petab_hierarchical: from ..hierarchical.petab import ( validate_hierarchical_petab_problem, ) validate_hierarchical_petab_problem(petab_problem) if output_folder is None: output_folder = _find_output_folder_name( self.petab_problem, model_name=model_name, ) self.output_folder = output_folder if model_name is None: model_name = _find_model_name(self.output_folder) self.model_name = model_name
[docs] @staticmethod def from_yaml( yaml_config: dict | str, output_folder: str = None, model_name: str = None, ) -> PetabImporter: """Simplified constructor using a petab yaml file.""" petab_problem = petab.Problem.from_yaml(yaml_config) return PetabImporter( petab_problem=petab_problem, output_folder=output_folder, model_name=model_name, )
[docs] def validate_inner_options(self): """Validate the inner options.""" for key in self.inner_options: if key not in ORDINAL_OPTIONS + SPLINE_APPROXIMATION_OPTIONS: raise ValueError(f"Unknown inner option {key}.")
[docs] def check_gradients( self, *args, rtol: float = 1e-2, atol: float = 1e-3, mode: str | list[str] = None, multi_eps=None, **kwargs, ) -> bool: """ Check if gradients match finite differences (FDs). Parameters ---------- rtol: relative error tolerance atol: absolute error tolerance mode: function values or residuals objAbsoluteTolerance: absolute tolerance in sensitivity calculation objRelativeTolerance: relative tolerance in sensitivity calculation multi_eps: multiple test step width for FDs Returns ------- match: Whether gradients match FDs (True) or not (False) """ par = np.asarray(self.petab_problem.x_nominal_scaled) problem = self.create_problem() objective = problem.objective free_indices = par[problem.x_free_indices] dfs = [] modes = [] if mode is None: modes = [MODE_FUN, MODE_RES] else: modes = [mode] if multi_eps is None: multi_eps = np.array([10 ** (-i) for i in range(3, 9)]) for mode in modes: try: dfs.append( objective.check_grad_multi_eps( free_indices, *args, **kwargs, mode=mode, multi_eps=multi_eps, ) ) except (RuntimeError, ValueError): # Might happen in case PEtab problem not well defined or # fails for specified tolerances in forward sensitivities return False return all( any( [ np.all( (mode_df.rel_err.values < rtol) | (mode_df.abs_err.values < atol) ), ] ) for mode_df in dfs )
[docs] def create_model( self, force_compile: bool = False, verbose: bool = True, **kwargs, ) -> amici.Model: """ Import amici model. Parameters ---------- force_compile: If False, the model is compiled only if the output folder does not exist yet. If True, the output folder is deleted and the model (re-)compiled in either case. .. warning:: If `force_compile`, then an existing folder of that name will be deleted. verbose: Passed to AMICI's model compilation. If True, the compilation progress is printed. kwargs: Extra arguments passed to amici.SbmlImporter.sbml2amici """ # courtesy check whether target is folder if os.path.exists(self.output_folder) and not os.path.isdir( self.output_folder ): raise AssertionError( f"Refusing to remove {self.output_folder} for model " f"compilation: Not a folder." ) # add module to path if self.output_folder not in sys.path: sys.path.insert(0, self.output_folder) # compile if self._must_compile(force_compile): logger.info( f"Compiling amici model to folder " f"{self.output_folder}." ) if self.petab_problem.model.type_id == MODEL_TYPE_SBML: self.compile_model( validate=self.validate_petab, verbose=verbose, **kwargs, ) else: self.compile_model(verbose=verbose, **kwargs) else: logger.debug( f"Using existing amici model in folder " f"{self.output_folder}." ) return self._create_model()
def _create_model(self) -> amici.Model: """Load model module and return the model, no checks/compilation.""" # load moduĺe module = amici.import_model_module( module_name=self.model_name, module_path=self.output_folder ) model = module.getModel() check_model( amici_model=model, petab_problem=self.petab_problem, ) return model def _must_compile(self, force_compile: bool): """Check whether the model needs to be compiled first.""" # asked by user if force_compile: return True # folder does not exist if not os.path.exists(self.output_folder) or not os.listdir( self.output_folder ): return True # try to import (in particular checks version) try: # importing will already raise an exception if version wrong amici.import_model_module(self.model_name, self.output_folder) except ModuleNotFoundError: return True except amici.AmiciVersionError as e: logger.info( "amici model will be re-imported due to version " f"mismatch: {e}" ) return True # no need to (re-)compile return False
[docs] def compile_model(self, **kwargs): """ Compile the model. If the output folder exists already, it is first deleted. Parameters ---------- kwargs: Extra arguments passed to :meth:`amici.sbml_import.SbmlImporter.sbml2amici` or :func:`amici.pysb_import.pysb2amici`. """ # delete output directory if os.path.exists(self.output_folder): shutil.rmtree(self.output_folder) amici.petab.import_petab_problem( petab_problem=self.petab_problem, model_name=self.model_name, model_output_dir=self.output_folder, **kwargs, )
[docs] def create_solver( self, model: amici.Model = None, verbose: bool = True, ) -> amici.Solver: """Return model solver.""" # create model if model is None: model = self.create_model(verbose=verbose) solver = model.getSolver() return solver
[docs] def create_edatas( self, model: amici.Model = None, simulation_conditions=None, verbose: bool = True, ) -> list[amici.ExpData]: """Create list of :class:`amici.amici.ExpData` objects.""" # create model if model is None: model = self.create_model(verbose=verbose) return amici.petab.conditions.create_edatas( amici_model=model, petab_problem=self.petab_problem, simulation_conditions=simulation_conditions, )
[docs] def create_objective( self, model: amici.Model = None, solver: amici.Solver = None, edatas: Sequence[amici.ExpData] = None, force_compile: bool = False, verbose: bool = True, **kwargs, ) -> AmiciObjective: """Create a :class:`pypesto.objective.AmiciObjective`. Parameters ---------- model: The AMICI model. solver: The AMICI solver. edatas: The experimental data in AMICI format. force_compile: Whether to force-compile the model if not passed. verbose: Passed to AMICI's model compilation. If True, the compilation progress is printed. **kwargs: Additional arguments passed on to the objective. In case of ordinal or semiquantitative measurements, ``inner_options`` can optionally be passed here. If none are given, ``inner_options`` given to the importer constructor (or inner defaults) will be chosen. Returns ------- A :class:`pypesto.objective.AmiciObjective` for the model and the data. """ # get simulation conditions simulation_conditions = petab.get_simulation_conditions( self.petab_problem.measurement_df ) # create model if model is None: model = self.create_model( force_compile=force_compile, verbose=verbose ) # create solver if solver is None: solver = self.create_solver(model) # create conditions and edatas from measurement data if edatas is None: edatas = self.create_edatas( model=model, simulation_conditions=simulation_conditions ) parameter_mapping = ( amici.petab.parameter_mapping.create_parameter_mapping( petab_problem=self.petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=True, amici_model=model, fill_fixed_parameters=False, ) ) par_ids = self.petab_problem.x_ids # fill in dummy parameters (this is needed since some objective # initialization e.g. checks for preeq parameters) problem_parameters = dict( zip(self.petab_problem.x_ids, self.petab_problem.x_nominal_scaled) ) amici.petab.conditions.fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=True, parameter_mapping=parameter_mapping, amici_model=model, ) calculator = None amici_reporting = None if ( self._non_quantitative_data_types is not None and self._hierarchical ): inner_options = kwargs.pop("inner_options", None) inner_options = ( inner_options if inner_options is not None else self.inner_options ) calculator = InnerCalculatorCollector( self._non_quantitative_data_types, self.petab_problem, model, edatas, inner_options, ) amici_reporting = amici.RDataReporting.full # FIXME: currently not supported with hierarchical if "guess_steadystate" in kwargs and kwargs["guess_steadystate"]: warnings.warn( "`guess_steadystate` not supported with hierarchical " "optimization. Disabling `guess_steadystate`.", stacklevel=1, ) kwargs["guess_steadystate"] = False inner_parameter_ids = calculator.get_inner_par_ids() par_ids = [x for x in par_ids if x not in inner_parameter_ids] max_sensi_order = kwargs.get("max_sensi_order", None) if ( self._non_quantitative_data_types is not None and any( data_type in self._non_quantitative_data_types for data_type in [ORDINAL, CENSORED, SEMIQUANTITATIVE] ) and max_sensi_order is not None and max_sensi_order > 1 ): raise ValueError( "Ordinal, censored and semiquantitative data cannot be " "used with second order sensitivities. Use a up to first order " "method or disable ordinal, censored and semiquantitative " ) # create objective obj = AmiciObjective( amici_model=model, amici_solver=solver, edatas=edatas, x_ids=par_ids, x_names=par_ids, parameter_mapping=parameter_mapping, amici_object_builder=self, calculator=calculator, amici_reporting=amici_reporting, **kwargs, ) return obj
[docs] def create_predictor( self, objective: AmiciObjective = None, amici_output_fields: Sequence[str] = None, post_processor: Callable | None = None, post_processor_sensi: Callable | None = None, post_processor_time: Callable | None = None, max_chunk_size: int | None = None, output_ids: Sequence[str] = None, condition_ids: Sequence[str] = None, ) -> AmiciPredictor: """Create a :class:`pypesto.predict.AmiciPredictor`. The `AmiciPredictor` facilitates generation of predictions from parameter vectors. Parameters ---------- objective: An objective object, which will be used to get model simulations amici_output_fields: keys that exist in the return data object from AMICI, which should be available for the post-processors post_processor: A callable function which applies postprocessing to the simulation results. Default are the observables of the AMICI model. This method takes a list of ndarrays (as returned in the field ['y'] of amici ReturnData objects) as input. post_processor_sensi: A callable function which applies postprocessing to the sensitivities of the simulation results. Default are the observable sensitivities of the AMICI model. This method takes two lists of ndarrays (as returned in the fields ['y'] and ['sy'] of amici ReturnData objects) as input. post_processor_time: A callable function which applies postprocessing to the timepoints of the simulations. Default are the timepoints of the amici model. This method takes a list of ndarrays (as returned in the field ['t'] of amici ReturnData objects) as input. max_chunk_size: In some cases, we don't want to compute all predictions at once when calling the prediction function, as this might not fit into the memory for large datasets and models. Here, the user can specify a maximum number of conditions, which should be simulated at a time. Default is 0 meaning that all conditions will be simulated. Other values are only applicable, if an output file is specified. output_ids: IDs of outputs, if post-processing is used condition_ids: IDs of conditions, if post-processing is used Returns ------- A :class:`pypesto.predict.AmiciPredictor` for the model, using the outputs of the AMICI model and the timepoints from the PEtab data. """ # if the user didn't pass an objective function, we create it first if objective is None: objective = self.create_objective() # create a identifiers of preequilibration and simulation condition ids # which can then be stored in the prediction result edata_conditions = objective.amici_object_builder.petab_problem.get_simulation_conditions_from_measurement_df() if PREEQUILIBRATION_CONDITION_ID not in list(edata_conditions.columns): preeq_dummy = [""] * edata_conditions.shape[0] edata_conditions[PREEQUILIBRATION_CONDITION_ID] = preeq_dummy edata_conditions.drop_duplicates(inplace=True) if condition_ids is None: condition_ids = [ edata_conditions.loc[id, PREEQUILIBRATION_CONDITION_ID] + CONDITION_SEP + edata_conditions.loc[id, SIMULATION_CONDITION_ID] for id in edata_conditions.index ] # wrap around AmiciPredictor predictor = AmiciPredictor( amici_objective=objective, amici_output_fields=amici_output_fields, post_processor=post_processor, post_processor_sensi=post_processor_sensi, post_processor_time=post_processor_time, max_chunk_size=max_chunk_size, output_ids=output_ids, condition_ids=condition_ids, ) return predictor
[docs] def create_prior(self) -> NegLogParameterPriors | None: """ Create a prior from the parameter table. Returns None, if no priors are defined. """ prior_list = [] if petab.OBJECTIVE_PRIOR_TYPE in self.petab_problem.parameter_df: for i, x_id in enumerate(self.petab_problem.x_ids): prior_type_entry = self.petab_problem.parameter_df.loc[ x_id, petab.OBJECTIVE_PRIOR_TYPE ] if ( isinstance(prior_type_entry, str) and prior_type_entry != petab.PARAMETER_SCALE_UNIFORM ): prior_params = [ float(param) for param in self.petab_problem.parameter_df.loc[ x_id, petab.OBJECTIVE_PRIOR_PARAMETERS ].split(";") ] scale = self.petab_problem.parameter_df.loc[ x_id, petab.PARAMETER_SCALE ] prior_list.append( get_parameter_prior_dict( i, prior_type_entry, prior_params, scale ) ) if len(prior_list): return NegLogParameterPriors(prior_list) else: return None
[docs] def create_startpoint_method(self, **kwargs) -> StartpointMethod: """Create a startpoint method. Parameters ---------- **kwargs: Additional keyword arguments passed on to :meth:`pypesto.startpoint.FunctionStartpoints.__init__`. """ return PetabStartpoints(petab_problem=self.petab_problem, **kwargs)
[docs] def create_problem( self, objective: AmiciObjective = None, x_guesses: Iterable[float] | None = None, problem_kwargs: dict[str, Any] = None, startpoint_kwargs: dict[str, Any] = None, **kwargs, ) -> Problem: """Create a :class:`pypesto.problem.Problem`. Parameters ---------- objective: Objective as created by :meth:`create_objective`. x_guesses: Guesses for the parameter values, shape (g, dim), where g denotes the number of guesses. These are used as start points in the optimization. problem_kwargs: Passed to :meth:`pypesto.problem.Problem.__init__`. startpoint_kwargs: Keyword arguments forwarded to :meth:`PetabImporter.create_startpoint_method`. **kwargs: Additional key word arguments passed on to the objective, if not provided. Returns ------- A :class:`pypesto.problem.Problem` for the objective. """ if objective is None: objective = self.create_objective(**kwargs) x_fixed_indices = self.petab_problem.x_fixed_indices x_fixed_vals = self.petab_problem.x_nominal_fixed_scaled x_ids = self.petab_problem.x_ids lb = self.petab_problem.lb_scaled ub = self.petab_problem.ub_scaled # Raise error if the correct calculator is not used. if self._hierarchical: if not isinstance(objective.calculator, InnerCalculatorCollector): raise AssertionError( f"If hierarchical optimization is enabled, the `calculator` attribute of the `objective` has to be {InnerCalculatorCollector} and not {objective.calculator}." ) # In case of hierarchical optimization, parameters estimated in the # inner subproblem are removed from the outer problem if self._hierarchical: inner_parameter_ids = objective.calculator.get_inner_par_ids() lb = [b for x, b in zip(x_ids, lb) if x not in inner_parameter_ids] ub = [b for x, b in zip(x_ids, ub) if x not in inner_parameter_ids] x_ids = [x for x in x_ids if x not in inner_parameter_ids] x_fixed_indices = list( map(x_ids.index, self.petab_problem.x_fixed_ids) ) x_scales = [ self.petab_problem.parameter_df.loc[x_id, petab.PARAMETER_SCALE] for x_id in x_ids ] if problem_kwargs is None: problem_kwargs = {} if startpoint_kwargs is None: startpoint_kwargs = {} prior = self.create_prior() if prior is not None: if self._hierarchical: raise NotImplementedError( "Hierarchical optimization in combination with priors " "is not yet supported." ) objective = AggregatedObjective([objective, prior]) if self._hierarchical: problem_class = HierarchicalProblem else: problem_class = Problem problem = problem_class( objective=objective, lb=lb, ub=ub, x_fixed_indices=x_fixed_indices, x_fixed_vals=x_fixed_vals, x_guesses=x_guesses, x_names=x_ids, x_scales=x_scales, x_priors_defs=prior, startpoint_method=self.create_startpoint_method( **startpoint_kwargs ), **problem_kwargs, ) return problem
[docs] def rdatas_to_measurement_df( self, rdatas: Sequence[amici.ReturnData], model: amici.Model = None, verbose: bool = True, ) -> pd.DataFrame: """ Create a measurement dataframe in the petab format. Parameters ---------- rdatas: A list of rdatas as produced by ``pypesto.AmiciObjective.__call__(x, return_dict=True)['rdatas']``. model: The amici model. verbose: Passed to AMICI's model compilation. If True, the compilation progress is printed. Returns ------- A dataframe built from the rdatas in the format as in ``self.petab_problem.measurement_df``. """ # create model if model is None: model = self.create_model(verbose=verbose) measurement_df = self.petab_problem.measurement_df return amici.petab.simulations.rdatas_to_measurement_df( rdatas, model, measurement_df )
[docs] def rdatas_to_simulation_df( self, rdatas: Sequence[amici.ReturnData], model: amici.Model = None, ) -> pd.DataFrame: """ See :meth:`rdatas_to_measurement_df`. Except a petab simulation dataframe is created, i.e. the measurement column label is adjusted. """ return self.rdatas_to_measurement_df(rdatas, model).rename( columns={petab.MEASUREMENT: petab.SIMULATION} )
[docs] def prediction_to_petab_measurement_df( self, prediction: PredictionResult, predictor: AmiciPredictor = None, ) -> pd.DataFrame: """ Cast prediction into a dataframe. If a PEtab problem is simulated without post-processing, then the result can be cast into a PEtab measurement or simulation dataframe Parameters ---------- prediction: A prediction result as produced by an :class:`pypesto.predict.AmiciPredictor`. predictor: The :class:`pypesto.predict.AmiciPredictor` instance. Returns ------- A dataframe built from the rdatas in the format as in ``self.petab_problem.measurement_df``. """ # create rdata-like dicts from the prediction result @dataclass class FakeRData: ts: np.ndarray y: np.ndarray rdatas = [ FakeRData(ts=condition.timepoints, y=condition.output) for condition in prediction.conditions ] # add an AMICI model, if possible model = None if predictor is not None: model = predictor.amici_objective.amici_model return self.rdatas_to_measurement_df(rdatas, model)
[docs] def prediction_to_petab_simulation_df( self, prediction: PredictionResult, predictor: AmiciPredictor = None, ) -> pd.DataFrame: """ See :meth:`prediction_to_petab_measurement_df`. Except a PEtab simulation dataframe is created, i.e. the measurement column label is adjusted. """ return self.prediction_to_petab_measurement_df( prediction, predictor ).rename(columns={petab.MEASUREMENT: petab.SIMULATION})
def _find_output_folder_name( petab_problem: petab.Problem, model_name: str, ) -> str: """ Find a name for storing the compiled amici model in. If available, use the model name from the ``petab_problem`` or the provided ``model_name`` (latter is given priority), otherwise create a unique name. The folder will be located in the :obj:`PetabImporter.MODEL_BASE_DIR` subdirectory of the current directory. """ # check whether location for amici model is a file if os.path.exists(PetabImporter.MODEL_BASE_DIR) and not os.path.isdir( PetabImporter.MODEL_BASE_DIR ): raise AssertionError( f"{PetabImporter.MODEL_BASE_DIR} exists and is not a directory, " f"thus cannot create a directory for the compiled amici model." ) # create base directory if non-existent if not os.path.exists(PetabImporter.MODEL_BASE_DIR): os.makedirs(PetabImporter.MODEL_BASE_DIR) # try model id model_id = petab_problem.model.model_id if model_name is not None: model_id = model_name if model_id: output_folder = os.path.abspath( os.path.join(PetabImporter.MODEL_BASE_DIR, model_id) ) else: # create random folder name output_folder = os.path.abspath( tempfile.mkdtemp(dir=PetabImporter.MODEL_BASE_DIR) ) return output_folder def _find_model_name(output_folder: str) -> str: """Just re-use the last part of the output folder.""" return os.path.split(os.path.normpath(output_folder))[-1] def get_petab_non_quantitative_data_types( petab_problem: petab.Problem, ) -> set[str]: """ Get the data types from the PEtab problem. Parameters ---------- petab_problem: The PEtab problem. Returns ------- data_types: A list of the data types. """ non_quantitative_data_types = set() caught_observables = set() # For ordinal, censored and semiquantitative data, search # for the corresponding data types in the measurement table meas_df = petab_problem.measurement_df if MEASUREMENT_TYPE in meas_df.columns: petab_data_types = meas_df[MEASUREMENT_TYPE].unique() for data_type in [ORDINAL, SEMIQUANTITATIVE] + CENSORING_TYPES: if data_type in petab_data_types: non_quantitative_data_types.add( CENSORED if data_type in CENSORING_TYPES else data_type ) caught_observables.update( set( meas_df[meas_df[MEASUREMENT_TYPE] == data_type][ OBSERVABLE_ID ] ) ) # For relative data, search for parameters to estimate with # a scaling/offset/sigma parameter type if PARAMETER_TYPE in petab_problem.parameter_df.columns: # get the df with non-nan parameter types par_df = petab_problem.parameter_df[ petab_problem.parameter_df[PARAMETER_TYPE].notna() ] for par_id, row in par_df.iterrows(): if not row[ESTIMATE]: continue if row[PARAMETER_TYPE] in [ InnerParameterType.SCALING, InnerParameterType.OFFSET, ]: non_quantitative_data_types.add(RELATIVE) # For sigma parameters, we need to check if they belong # to an observable with a non-quantitative data type elif row[PARAMETER_TYPE] == InnerParameterType.SIGMA: corresponding_observables = set( meas_df[meas_df[NOISE_PARAMETERS] == par_id][OBSERVABLE_ID] ) if not (corresponding_observables & caught_observables): non_quantitative_data_types.add(RELATIVE) # TODO this can be made much shorter if the relative measurements # are also specified in the measurement table, but that would require # changing the PEtab format of a lot of benchmark models. if len(non_quantitative_data_types) == 0: return None return non_quantitative_data_types class PetabStartpoints(CheckedStartpoints): """Startpoint method for PEtab problems. Samples optimization startpoints from the distributions defined in the provided PEtab problem. The PEtab-problem is copied. """ def __init__(self, petab_problem: petab.Problem, **kwargs): super().__init__(**kwargs) self._parameter_df = petab_problem.parameter_df.copy() self._priors: list[tuple] | None = None self._free_ids: list[str] | None = None def _setup( self, pypesto_problem: Problem, ): """Update priors if necessary. Check if ``problem.x_free_indices`` changed since last call, and if so, get the corresponding priors from PEtab. """ current_free_ids = np.asarray(pypesto_problem.x_names)[ pypesto_problem.x_free_indices ] if ( self._priors is not None and len(current_free_ids) == len(self._free_ids) and np.all(current_free_ids == self._free_ids) ): # no need to update return # update priors self._free_ids = current_free_ids id_to_prior = dict( zip( self._parameter_df.index[self._parameter_df[ESTIMATE] == 1], petab.parameters.get_priors_from_df( self._parameter_df, mode=petab.INITIALIZATION ), ) ) self._priors = list(map(id_to_prior.__getitem__, current_free_ids)) def __call__( self, n_starts: int, problem: Problem, ) -> np.ndarray: """Call the startpoint method.""" # Update the list of priors if needed self._setup(pypesto_problem=problem) return super().__call__(n_starts, problem) def sample( self, n_starts: int, lb: np.ndarray, ub: np.ndarray, ) -> np.ndarray: """Actual startpoint sampling. Must only be called through `self.__call__` to ensure that the list of priors matches the currently free parameters in the :class:`pypesto.Problem`. """ sampler = partial(petab.sample_from_prior, n_starts=n_starts) startpoints = list(map(sampler, self._priors)) return np.array(startpoints).T