Source code for pypesto.ensemble.ensemble

from functools import partial
import numpy as np
import pandas as pd
from typing import Sequence, Tuple, Callable, Dict

from .. import Result
from ..engine import (
    Engine,
    MultiProcessEngine,
    MultiThreadEngine,
    SingleCoreEngine,
)
from ..predict import (
    PredictionConditionResult,
    PredictionResult,
)
from ..sample import geweke_test
from .task import EnsembleTask
from .constants import (PREDICTOR, PREDICTION_ID, PREDICTION_RESULTS,
                        PREDICTION_ARRAYS, PREDICTION_SUMMARY, OUTPUT,
                        OUTPUT_SENSI, TIMEPOINTS, X_VECTOR, NX, X_NAMES,
                        NVECTORS, VECTOR_TAGS, PREDICTIONS, MODE_FUN,
                        EnsembleType, ENSEMBLE_TYPE, MEAN, MEDIAN,
                        STANDARD_DEVIATION, SUMMARY, LOWER_BOUND,
                        UPPER_BOUND, get_percentile_label)


[docs]class EnsemblePrediction: """ A ensemble prediction consists of an ensemble, i.e., a set of parameter vectors and their identifiers such as a sample, and a prediction function. It can be attached to a ensemble-type object """
[docs] def __init__(self, predictor: Callable[[Sequence], PredictionResult], prediction_id: str = None, prediction_results: Sequence[PredictionResult] = None, lower_bound: Sequence[np.ndarray] = None, upper_bound: Sequence[np.ndarray] = None): """ Constructor. Parameters ---------- predictor: Prediction function, e.g., an AmiciPredictor, which takes a parameter vector as input and outputs a PredictionResult object prediction_id: Identifier for the predictions prediction_results: List of Prediction results lower_bound: Array of potential lower bounds for the predictions, should have the same shape as the output of the predictions, i.e., a list of numpy array (one list entry per condition), with the arrays having the shape of n_timepoints x n_outputs for each condition. upper_bound: array of potential upper bounds for the parameters """ self.predictor = predictor self.prediction_id = prediction_id self.prediction_results = prediction_results if prediction_results is None: self.prediction_results = [] # handle bounds self.lower_bound = lower_bound self.upper_bound = upper_bound self.prediction_arrays = None self.prediction_summary = {MEAN: None, STANDARD_DEVIATION: None, MEDIAN: None}
def __iter__(self): """ __iter__ makes the instances of the class iterable objects, allowing to apply functions such as __dict__ to them. """ yield PREDICTOR, self.predictor yield PREDICTION_ID, self.prediction_id yield PREDICTION_RESULTS, self.prediction_results yield PREDICTION_ARRAYS, self.prediction_arrays yield PREDICTION_SUMMARY, {i_key: dict(self.prediction_summary[i_key]) for i_key in self.prediction_summary.keys()} yield LOWER_BOUND, self.lower_bound yield UPPER_BOUND, self.upper_bound
[docs] def condense_to_arrays(self): """ This functions reshapes the predictions results to an array and adds them as a member to the EnsemblePrediction objects. It's meant to be used only if all conditions of a prediction have the same observables, as this is often the case for large-scale data sets taken from online databases or similar. """ # prepare for storing results over all predictions output = [] output_sensi = [] timepoints = [] for result in self.prediction_results: # stack outputs, output sensitivities and timepoints to one array # use first element as dummy, to see if outputs have been computed if result.conditions[0].output is not None: output.append(np.concatenate( [cond.output for cond in result.conditions], axis=0)) else: output = None if result.conditions[0].output_sensi is not None: output_sensi.append(np.concatenate( [cond.output_sensi for cond in result.conditions], axis=0)) else: output_sensi = None timepoints.append(np.concatenate( [cond.timepoints for cond in result.conditions], axis=0)) # stack results in third dimension if output is not None: output = np.stack(output, axis=2) if output_sensi is not None: output_sensi = np.stack(output_sensi, axis=3) # formulate as dict self.prediction_arrays = { OUTPUT: output, OUTPUT_SENSI: output_sensi, TIMEPOINTS: np.stack(timepoints, axis=-1) }
[docs] def compute_summary(self, percentiles_list: Sequence[int] = (5, 20, 80, 95) ) -> Dict: """ Compute the mean, the median, the standard deviation and possibly percentiles from the ensemble prediction results. Those summary results are added as a data member to the EnsemblePrediction object. Parameters ---------- percentiles_list: List or tuple of percent numbers for the percentiles Returns ------- summary: dictionary of predictions results with the keys mean, std, median, percentiles, ... """ # check if prediction results are available if not self.prediction_results: raise ArithmeticError('Cannot compute summary statistics from ' 'empty prediction results.') n_conditions = len(self.prediction_results[0].conditions) def _stack_outputs(ic: int): """ Group outputs for different parameter vectors of one ensemble together, if they belong to the same simulation condition, and stacks them in one array """ # Were outputs computed if self.prediction_results[0].conditions[ic].output is None: return None # stack predictions output_list = [prediction.conditions[ic].output for prediction in self.prediction_results] # stack into one numpy array return np.stack(output_list, axis=-1) def _stack_outputs_sensi(ic: int): """ Group output sensitivities for different parameter vectors of one ensemble together, if the belong to the same simulation condition, and stacks them in one array """ # Were output sensitivitiess computed if self.prediction_results[0].conditions[ic].output_sensi is None: return None # stack predictions output_sensi_list = [prediction.conditions[ic].output_sensi for prediction in self.prediction_results] # stack into one numpy array return np.stack(output_sensi_list, axis=-1) def _compute_summary(tmp_array, percentiles_list): """ Computes means, standard deviation, median, and requested percentiles for a set of stacked simulations """ summary = {} summary[MEAN] = np.mean(tmp_array, axis=-1) summary[STANDARD_DEVIATION] = np.std(tmp_array, axis=-1) summary[MEDIAN] = np.median(tmp_array, axis=-1) for perc in percentiles_list: summary[get_percentile_label(perc)] = \ np.percentile(tmp_array, perc, axis=-1) return summary # preallocate for results cond_lists = {MEAN: [], STANDARD_DEVIATION: [], MEDIAN: []} for perc in percentiles_list: cond_lists[get_percentile_label(perc)] = [] # iterate over conditions, compute summary for i_cond in range(n_conditions): # use some short hand current_cond = self.prediction_results[0].conditions[i_cond] # create a temporary array with all the outputs needed and wanted tmp_output = _stack_outputs(i_cond) tmp_output_sensi = _stack_outputs_sensi(i_cond) # handle outputs if tmp_output is not None: output_summary = _compute_summary(tmp_output, percentiles_list) else: output_summary = {i_key: None for i_key in cond_lists.keys()} # handle output sensitivities if tmp_output_sensi is not None: output_sensi_summary = _compute_summary(tmp_output_sensi, percentiles_list) else: output_sensi_summary = {i_key: None for i_key in cond_lists.keys()} # create some PredictionConditionResult to have an easier creation # of PredictionResults for the summaries later on for i_key in cond_lists.keys(): cond_lists[i_key].append( PredictionConditionResult( timepoints=current_cond.timepoints, output=output_summary[i_key], output_sensi=output_sensi_summary[i_key], output_ids=current_cond.output_ids ) ) self.prediction_summary = {i_key: PredictionResult( conditions=cond_lists[i_key], condition_ids=self.prediction_results[0].condition_ids, comment=str(i_key)) for i_key in cond_lists.keys() } # also return the object return self.prediction_summary
[docs]class Ensemble: """ A ensemble is a wrapper around an numpy array. It comes with some convenience functionality: It allows to map parameter values via identifiers to the correct parameters, it allows to compute summaries of the parameter vectors (mean, standard deviation, median, percentiles) more easily, and it can store predictions made by pyPESTO, such that the parameter ensemble and the predictions are linked to each other. """
[docs] def __init__(self, x_vectors: np.ndarray, x_names: Sequence[str] = None, vector_tags: Sequence[Tuple[int, int]] = None, ensemble_type: EnsembleType = None, predictions: Sequence[EnsemblePrediction] = None, lower_bound: np.ndarray = None, upper_bound: np.ndarray = None): """ Constructor. Parameters ---------- x_vectors: parameter vectors of the ensemble, in the format n_parameter x n_vectors x_names: Names or identifiers of the parameters vector_tags: Additional tag, which adds information about the the parameter vectors of the form (optimization_run, optimization_step) if the ensemble is created from an optimization result or (sampling_chain, sampling_step) if the ensemble is created from a sampling result. ensemble_type: Type of ensemble: Ensemble (default), sample, or unprocessed_chain Samples are meant to be representative, ensembles can be any ensemble of parameters, and unprocessed chains still have burn-ins predictions: List of EnsemblePrediction objects lower_bound: array of potential lower bounds for the parameters upper_bound: array of potential upper bounds for the parameters """ # Do we have a representative sample or just random ensemble? self.ensemble_type = EnsembleType.ensemble if ensemble_type is not None: self.ensemble_type = ensemble_type # handle parameter vectors and sizes self.x_vectors = x_vectors self.n_x = x_vectors.shape[0] self.n_vectors = x_vectors.shape[1] self.vector_tags = vector_tags self.summary = None # store bounds self.lower_bound = np.full((self.n_x,), np.nan) if lower_bound is not None: if np.array(lower_bound).size == 1: self.lower_bound = np.full((x_vectors.shape[0],), lower_bound) else: self.lower_bound = lower_bound self.upper_bound = np.full(self.n_x, np.nan) if upper_bound is not None: if np.array(upper_bound).size == 1: self.upper_bound = np.full(x_vectors.shape[0], upper_bound) else: self.upper_bound = upper_bound # handle parameter names if x_names is not None: self.x_names = x_names else: self.x_names = [f'x_{ix}' for ix in range(self.n_x)] # Do we have predictions for this ensemble? self.predictions = [] if predictions is not None: self.predictions = predictions
[docs] @staticmethod def from_sample( result: Result, remove_burn_in: bool = True, chain_slice: slice = None, **kwargs, ): """Construct an ensemble from a sample. Parameters ---------- result: A pyPESTO result that contains a sample result. remove_burn_in: Exclude parameter vectors from the ensemble if they are in the "burn-in". chain_slice: Subset the chain with a slice. Any "burn-in" removal occurs first. Returns ------- The ensemble. """ x_vectors = result.sample_result.trace_x[0] if remove_burn_in: if result.sample_result.burn_in is None: geweke_test(result) burn_in = result.sample_result.burn_in x_vectors = x_vectors[burn_in:] if chain_slice is not None: x_vectors = x_vectors[chain_slice] x_vectors = x_vectors.T return Ensemble(x_vectors, **kwargs)
def __iter__(self): """ __iter__ makes the instances of the class iterable objects, allowing to apply functions such as __dict__ to them. """ yield X_VECTOR, self.x_vectors yield NX, self.n_x yield X_NAMES, self.x_names yield NVECTORS, self.n_vectors yield VECTOR_TAGS, self.vector_tags yield ENSEMBLE_TYPE, self.ensemble_type yield PREDICTIONS, self.predictions yield SUMMARY, self.summary yield LOWER_BOUND, self.lower_bound yield UPPER_BOUND, self.upper_bound def _map_parameters_by_objective( self, predictor: Callable, default_value: float = None, ): """ The parameters of the ensemble don't need to have the same ordering as in the predictor. This functions maps them onto each other """ # create short hands parameter_ids_objective = predictor.amici_objective.x_names parameter_ids_ensemble = self.x_names # map, and fill with `default_value` if not found and `default_value` # is specified. mapping = [] for parameter_id_objective in parameter_ids_objective: if parameter_id_objective in parameter_ids_ensemble: mapping.append( parameter_ids_ensemble.index(parameter_id_objective) ) elif default_value is not None: mapping.append(default_value) return mapping
[docs] def predict( self, predictor: Callable, prediction_id: str = None, sensi_orders: Tuple = (0,), default_value: float = None, mode: str = MODE_FUN, engine: Engine = None, progress_bar: bool = True ) -> EnsemblePrediction: """ Convenience function to run predictions for a full ensemble: User needs to hand over a predictor function and settings, then all results are grouped as EnsemblePrediction for the whole ensemble Parameters ---------- predictor: Prediction function, e.g., an AmiciPredictor prediction_id: Identifier for the predictions sensi_orders: Specifies which sensitivities to compute, e.g. (0,1) -> fval, grad default_value: If parameters are needed in the mapping, which are not found in the parameter source, it can make sense to fill them up with this default value (e.g. `np.nan`) in some cases (to be used with caution though). mode: Whether to compute function values or residuals. engine: Parallelization engine. Defaults to sequential execution on a `SingleCoreEngine`. progress_bar: Whether to display a progress bar. Returns ------- The prediction of the ensemble. """ if engine is None: engine = SingleCoreEngine() # Vectors are chunked to improve parallization performance. n_chunks = self.n_vectors # Default is no chunking. if isinstance(engine, MultiProcessEngine): n_chunks = engine.n_procs if isinstance(engine, MultiThreadEngine): n_chunks = engine.n_threads chunks = [ ( (chunk_i+0) * int(np.floor(self.n_vectors / n_chunks)), (chunk_i+1) * int(np.floor(self.n_vectors / n_chunks)), ) for chunk_i in range(n_chunks) ] # Last chunk should contain any remaining vectors that may have # been skipped due to the `floor` method. chunks[-1] = (chunks[-1][0], self.n_vectors) # Get the correct parameter mapping. mapping = self._map_parameters_by_objective( predictor, default_value=default_value, ) # Setup the tasks with the prediction method and chunked vectors. method = partial(predictor, sensi_orders=sensi_orders, mode=mode) tasks = [ EnsembleTask( method=method, vectors=self.x_vectors[mapping, chunk_start:chunk_end], id=chunk_i, ) for chunk_i, (chunk_start, chunk_end) in enumerate(chunks) ] # Execute tasks and flatten chunked results. prediction_results = [ prediction_result for prediction_chunk in engine.execute( tasks, progress_bar=progress_bar) for prediction_result in prediction_chunk ] return EnsemblePrediction( predictor=predictor, prediction_id=prediction_id, prediction_results=prediction_results, )
[docs] def compute_summary(self, percentiles_list: Sequence[int] = (5, 20, 80, 95)): """ This function computes the mean, the median, the standard deviation and possibly percentiles for the parameters of the ensemble. Those summary results are added as a data member to the EnsemblePrediction object. Parameters ---------- percentiles_list: List or tuple of percent numbers for the percentiles Returns ------- summary: Dict with mean, std, median, and percentiles of parameter vectors """ # compute summaries based on parameters summary = {MEAN: np.mean(self.x_vectors, axis=1), STANDARD_DEVIATION: np.std(self.x_vectors, axis=1), MEDIAN: np.median(self.x_vectors, axis=1)} for perc in percentiles_list: summary[get_percentile_label(perc)] = \ np.percentile(self.x_vectors, perc, axis=1) # store and return results self.summary = summary return summary
[docs] def check_identifiability(self) -> pd.DataFrame: """ Use ensemble mean and standard deviation to assess (in a rudimentary way) whether or not parameters are identifiable. Returns a dataframe with tuples, which specify whether or not the lower and the upper bounds are violated. Returns ------- parameter_identifiability: DataFrame indicating parameter identifiability based on mean plus/minus standard deviations and parameter bounds """ # Recompute the summary, maybe the ensemble objects has been changed. self.compute_summary() # check identifiability for each parameter parameter_identifiability = [] for ix, x_name in enumerate(self.x_names): # define some short hands lb = self.lower_bound[ix] ub = self.upper_bound[ix] mean = self.summary[MEAN][ix] std = self.summary[STANDARD_DEVIATION][ix] median = self.summary[MEAN][ix] perc_list = [int(i_key[11:]) for i_key in self.summary.keys() if i_key[0:4] == 'perc'] perc_lower = [perc for perc in perc_list if perc < 50] perc_upper = [perc for perc in perc_list if perc > 50] # create dict of identifiability tmp_identifiability = { 'parameterId': x_name, 'lowerBound': lb, 'upperBound': ub, 'ensemble_mean': mean, 'ensemble_std': std, 'ensemble_median': median, 'within lb: 1 std': lb < mean - std, 'within ub: 1 std': ub > mean + std, 'within lb: 2 std': lb < mean - 2 * std, 'within ub: 2 std': ub > mean + 2 * std, 'within lb: 3 std': lb < mean - 3 * std, 'within ub: 3 std': ub > mean + 3 * std, } # handle percentiles for perc in perc_lower: tmp_identifiability[f'within lb: perc {perc}'] = \ lb < self.summary[get_percentile_label(perc)][ix] for perc in perc_upper: tmp_identifiability[f'within ub: perc {perc}'] = \ ub > self.summary[get_percentile_label(perc)][ix] parameter_identifiability.append(tmp_identifiability) # create DataFrame parameter_identifiability = pd.DataFrame(parameter_identifiability) parameter_identifiability.index = \ parameter_identifiability['parameterId'] return parameter_identifiability