Source code for pypesto.select.postprocessors

"""Process a model selection :class:`ModelProblem` after calibration."""
from pathlib import Path
from typing import List

import matplotlib.pyplot as plt
import numpy as np
from petab_select.constants import ESTIMATE, TYPE_PATH, Criterion

from .. import store, visualize
from .model_problem import TYPE_POSTPROCESSOR, ModelProblem

__all__ = [
    'model_id_binary_postprocessor',
    'multi_postprocessor',
    'report_postprocessor',
    'save_postprocessor',
    'waterfall_plot_postprocessor',
]


[docs] def multi_postprocessor( problem: ModelProblem, postprocessors: List[TYPE_POSTPROCESSOR] = None, ): """Combine multiple postprocessors into a single postprocessor. See :meth:`save_postprocessor` for usage hints. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. postprocessors: A list of postprocessors, which will be sequentially applied to the optimized model ``problem``. The location where results will be stored. """ for postprocessor in postprocessors: postprocessor(problem)
[docs] def waterfall_plot_postprocessor( problem: ModelProblem, output_path: TYPE_PATH = ".", ): """Produce a waterfall plot. See :meth:`save_postprocessor` for usage hints and argument documentation. """ visualize.waterfall(problem.minimize_result) plot_output_path = Path(output_path) / (problem.model.model_hash + ".png") plt.savefig(str(plot_output_path))
[docs] def save_postprocessor( problem: ModelProblem, output_path: TYPE_PATH = ".", use_model_hash: bool = False, ): """Save the parameter estimation result. When used, first set the output folder for results, e.g. with :func:`functools.partial`. This is because postprocessors should take only a single parameter: an optimized model. .. code-block:: python from functools import partial output_path = 'results' pp = partial(save_postprocessor, output_path=output_path) selector = pypesto.select.ModelSelector( problem=problem, model_postprocessor=pp, ) Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. output_path: The location where output will be stored. use_model_hash: Whether the filename should use the model hash. Defaults to ``False``, in which case the model ID is used instead. """ stem = problem.model.model_id if use_model_hash: stem = problem.model.get_hash() store.write_result( problem.minimize_result, Path(output_path) / (stem + ".hdf5"), )
[docs] def model_id_binary_postprocessor(problem: ModelProblem): """Change a PEtab Select model ID to a binary string. Changes the model ID in-place to be a string like ``M_ijk``, where ``i``, ``j``, ``k``, etc. are ``1`` if the parameter in that position is estimated, or ``0`` if the parameter is fixed. To ensure that other postprocessors (e.g. :func:`report_postprocessor`) use this new model ID, when in use with a :func:`multi_postprocessor`, ensure this is before the other postprocessors in the ``postprocessors`` argument of :func:`multi_postprocessor`. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. """ model_id = "M_" for parameter_value in problem.model.parameters.values(): model_id += "1" if parameter_value == ESTIMATE else "0" problem.model.model_id = model_id
[docs] def report_postprocessor( problem: ModelProblem, output_filepath: TYPE_PATH, criteria: List[Criterion] = None, ): """Create a TSV table of model selection results. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. output_filepath: The file path where the report will be saved. criteria: The criteria that will be in the report. Defaults to nllh, AIC, AICc, and BIC. """ output_filepath = Path(output_filepath) write_header = False # Only write the header if the file doesn't yet exist or is empty. if not output_filepath.exists() or output_filepath.stat().st_size == 0: write_header = True if criteria is None: criteria = [ Criterion.NLLH, Criterion.AIC, Criterion.AICC, Criterion.BIC, ] start_optimization_times = problem.minimize_result.optimize_result.time header = [] row = [] header.append('model_id') row.append(problem.model.model_id) header.append('total_time') row.append(str(sum(start_optimization_times))) for criterion in criteria: header.append(criterion.value) row.append(str(problem.model.get_criterion(criterion))) # Arbitrary convergence criterion header.append('n_converged') row.append( str( ( np.array(problem.minimize_result.optimize_result.fval) < (problem.minimize_result.optimize_result.list[0].fval + 0.1) ).sum() ) ) for start_index, start_optimization_time in enumerate( start_optimization_times ): header.append(f'start_time_{start_index}') row.append(str(start_optimization_time)) with open(output_filepath, 'a+') as f: if write_header: f.write('\t'.join(header) + '\n') f.write('\t'.join(row) + '\n')