Source code for pypesto.select.postprocessors

"""Process a model selection :class:`ModelProblem` after calibration."""

from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
from petab_select.constants import ESTIMATE, TYPE_PATH, Criterion

from .. import store, visualize
from .model_problem import TYPE_POSTPROCESSOR, ModelProblem

__all__ = [
    "model_id_binary_postprocessor",
    "multi_postprocessor",
    "report_postprocessor",
    "save_postprocessor",
    "waterfall_plot_postprocessor",
]


[docs] def multi_postprocessor( problem: ModelProblem, postprocessors: list[TYPE_POSTPROCESSOR] = None, ): """Combine multiple postprocessors into a single postprocessor. See :meth:`save_postprocessor` for usage hints. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. postprocessors: A list of postprocessors, which will be sequentially applied to the optimized model ``problem``. The location where results will be stored. """ for postprocessor in postprocessors: postprocessor(problem)
[docs] def waterfall_plot_postprocessor( problem: ModelProblem, output_path: TYPE_PATH = ".", ): """Produce a waterfall plot. See :meth:`save_postprocessor` for usage hints and argument documentation. """ visualize.waterfall(problem.minimize_result) plot_output_path = Path(output_path) / (problem.model.model_hash + ".png") plt.savefig(str(plot_output_path))
[docs] def save_postprocessor( problem: ModelProblem, output_path: TYPE_PATH = ".", use_model_hash: bool = False, ): """Save the parameter estimation result. When used, first set the output folder for results, e.g. with :func:`functools.partial`. This is because postprocessors should take only a single parameter: an optimized model. .. code-block:: python from functools import partial output_path = 'results' pp = partial(save_postprocessor, output_path=output_path) selector = pypesto.select.ModelSelector( problem=problem, model_postprocessor=pp, ) Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. output_path: The location where output will be stored. use_model_hash: Whether the filename should use the model hash. Defaults to ``False``, in which case the model ID is used instead. """ stem = problem.model.model_id if use_model_hash: stem = problem.model.get_hash() store.write_result( problem.minimize_result, Path(output_path) / (stem + ".hdf5"), )
[docs] def model_id_binary_postprocessor(problem: ModelProblem): """Change a PEtab Select model ID to a binary string. Changes the model ID in-place to be a string like ``M_ijk``, where ``i``, ``j``, ``k``, etc. are ``1`` if the parameter in that position is estimated, or ``0`` if the parameter is fixed. To ensure that other postprocessors (e.g. :func:`report_postprocessor`) use this new model ID, when in use with a :func:`multi_postprocessor`, ensure this is before the other postprocessors in the ``postprocessors`` argument of :func:`multi_postprocessor`. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. """ model_id = "M_" for parameter_value in problem.model.parameters.values(): model_id += "1" if parameter_value == ESTIMATE else "0" problem.model.model_id = model_id
[docs] def report_postprocessor( problem: ModelProblem, output_filepath: TYPE_PATH, criteria: list[Criterion] = None, ): """Create a TSV table of model selection results. Parameters ---------- problem: A model selection :class:`ModelProblem` that has been optimized. output_filepath: The file path where the report will be saved. criteria: The criteria that will be in the report. Defaults to nllh, AIC, AICc, and BIC. """ output_filepath = Path(output_filepath) write_header = False # Only write the header if the file doesn't yet exist or is empty. if not output_filepath.exists() or output_filepath.stat().st_size == 0: write_header = True if criteria is None: criteria = [ Criterion.NLLH, Criterion.AIC, Criterion.AICC, Criterion.BIC, ] start_optimization_times = problem.minimize_result.optimize_result.time header = [] row = [] header.append("model_id") row.append(problem.model.model_id) header.append("total_time") row.append(str(sum(start_optimization_times))) for criterion in criteria: header.append(criterion.value) row.append(str(problem.model.get_criterion(criterion))) # Arbitrary convergence criterion header.append("n_converged") row.append( str( ( np.array(problem.minimize_result.optimize_result.fval) < (problem.minimize_result.optimize_result.list[0].fval + 0.1) ).sum() ) ) for start_index, start_optimization_time in enumerate( start_optimization_times ): header.append(f"start_time_{start_index}") row.append(str(start_optimization_time)) with open(output_filepath, "a+") as f: if write_header: f.write("\t".join(header) + "\n") f.write("\t".join(row) + "\n")