import logging
from functools import partial
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from ..C import (
ENSEMBLE_TYPE,
HISTORY,
LOWER_BOUND,
MEAN,
MEDIAN,
MODE_FUN,
NVECTORS,
NX,
OUTPUT,
OUTPUT_SENSI,
PERCENTILE,
PREDICTION_ARRAYS,
PREDICTION_ID,
PREDICTION_RESULTS,
PREDICTION_SUMMARY,
PREDICTIONS,
PREDICTOR,
STANDARD_DEVIATION,
SUMMARY,
TIMEPOINTS,
UPPER_BOUND,
VECTOR_TAGS,
WEIGHTED_SIGMA,
X_NAMES,
X_VECTOR,
EnsembleType,
)
from ..engine import (
Engine,
MultiProcessEngine,
MultiThreadEngine,
SingleCoreEngine,
)
from ..objective import AmiciObjective
from ..result import PredictionConditionResult, PredictionResult, Result
from ..sample import geweke_test
from .task import EnsembleTask
logger = logging.getLogger(__name__)
[docs]class EnsemblePrediction:
"""
Class of ensemble prediction.
An ensemble prediction consists of an ensemble, i.e., a set of parameter
vectors and their identifiers such as a sample, and a prediction function.
It can be attached to a ensemble-type object
"""
[docs] def __init__(
self,
predictor: Optional[Callable[[Sequence], PredictionResult]] = None,
prediction_id: str = None,
prediction_results: Sequence[PredictionResult] = None,
lower_bound: Sequence[np.ndarray] = None,
upper_bound: Sequence[np.ndarray] = None):
"""
Initialize.
Parameters
----------
predictor:
Prediction function, e.g., an AmiciPredictor, which takes a
parameter vector as input and outputs a PredictionResult object
prediction_id:
Identifier for the predictions
prediction_results:
List of Prediction results
lower_bound:
Array of potential lower bounds for the predictions, should have
the same shape as the output of the predictions, i.e., a list of
numpy array (one list entry per condition), with the arrays having
the shape of n_timepoints x n_outputs for each condition.
upper_bound:
array of potential upper bounds for the parameters
"""
self.predictor = predictor
if predictor is None:
logger.info("This `EnsemblePrediction` has no predictor.")
self.prediction_id = prediction_id
self.prediction_results = prediction_results
if prediction_results is None:
self.prediction_results = []
# handle bounds, Not yet Implemented
if lower_bound is not None:
raise NotImplementedError
if upper_bound is not None:
raise NotImplementedError
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.prediction_arrays = None
self.prediction_summary = {MEAN: None,
STANDARD_DEVIATION: None,
MEDIAN: None,
WEIGHTED_SIGMA: None}
def __iter__(self):
"""
Make the instances of the class iterable objects.
Allows to apply functions such as __dict__ to them.
"""
yield PREDICTOR, self.predictor
yield PREDICTION_ID, self.prediction_id
yield PREDICTION_RESULTS, self.prediction_results
yield PREDICTION_ARRAYS, self.prediction_arrays
yield PREDICTION_SUMMARY, {i_key: dict(self.prediction_summary[i_key])
for i_key in self.prediction_summary.keys()}
yield LOWER_BOUND, self.lower_bound
yield UPPER_BOUND, self.upper_bound
[docs] def condense_to_arrays(self):
"""
Add prediction result to EnsemblePrediction object.
Reshape the prediction results to an array and add them as a
member to the EnsemblePrediction objects. It's meant to be used only
if all conditions of a prediction have the same observables, as this
is often the case for large-scale data sets taken from online
databases or similar.
"""
# prepare for storing results over all predictions
output = []
output_sensi = []
timepoints = []
for result in self.prediction_results:
# stack outputs, output sensitivities and timepoints to one array
# use first element as dummy, to see if outputs have been computed
if result.conditions[0].output is not None:
output.append(np.concatenate(
[cond.output for cond in result.conditions], axis=0))
else:
output = None
if result.conditions[0].output_sensi is not None:
output_sensi.append(np.concatenate(
[cond.output_sensi for cond in result.conditions], axis=0))
else:
output_sensi = None
timepoints.append(np.concatenate(
[cond.timepoints for cond in result.conditions], axis=0))
# stack results in third dimension
if output is not None:
output = np.stack(output, axis=2)
if output_sensi is not None:
output_sensi = np.stack(output_sensi, axis=3)
# formulate as dict
self.prediction_arrays = {
OUTPUT: output,
OUTPUT_SENSI: output_sensi,
TIMEPOINTS: np.stack(timepoints, axis=-1)
}
[docs] def compute_summary(self,
percentiles_list: Sequence[int] = (5, 20, 80, 95),
weighting: bool = False,
compute_weighted_sigma: bool = False
) -> Dict:
"""
Compute summary from the ensemble prediction results.
Summary includes the mean, the median, the standard deviation and
possibly percentiles. Those summary results are added as a data
member to the EnsemblePrediction object.
Parameters
----------
percentiles_list:
List or tuple of percent numbers for the percentiles
weighting:
Whether weights should be used for trajectory.
compute_weighted_sigma:
Whether weighted standard deviation of the ensemble mean trajectory
should be computed. Defaults to False.
Returns
-------
summary:
dictionary of predictions results with the keys mean, std, median,
percentiles, ...
"""
# check if prediction results are available
if not self.prediction_results:
raise ArithmeticError('Cannot compute summary statistics from '
'empty prediction results.')
# if weightings shall be used, check whether weights are there
if weighting:
if not self.prediction_results[0].conditions[0].output_weight:
raise ValueError('There are no weights in the '
'prediction results.')
n_conditions = len(self.prediction_results[0].conditions)
def _stack_outputs(ic: int):
"""
Stack outputs.
Group outputs for different parameter vectors of one ensemble
together, if they belong to the same simulation condition, and
stack them in one array.
"""
# Were outputs computed
if self.prediction_results[0].conditions[ic].output is None:
return None
# stack predictions
output_list = [prediction.conditions[ic].output
for prediction in self.prediction_results]
# stack into one numpy array
return np.stack(output_list, axis=-1)
def _stack_outputs_sensi(ic: int):
"""
Stack output sensitivities.
Group output sensitivities for different parameter vectors of one
ensemble together, if they belong to the same simulation condition,
and stack them in one array.
"""
# Were output sensitivitiess computed
if self.prediction_results[0].conditions[ic].output_sensi is None:
return None
# stack predictions
output_sensi_list = [prediction.conditions[ic].output_sensi
for prediction in self.prediction_results]
# stack into one numpy array
return np.stack(output_sensi_list, axis=-1)
def _stack_weights(ic: int) -> np.ndarray:
"""
Stack weights.
Group weights for different parameter vectors of one ensemble
together, if they belong to the same simulation condition, and
stack them in one array
Parameters
----------
ic: the condition number.
Returns
-------
The stacked weights.
"""
# Were outputs computed
if self.prediction_results[0].conditions[ic].output_weight is None:
return None
# stack predictions
output_weight_list = [prediction.conditions[ic].output_weight
for prediction in self.prediction_results]
# stack into one numpy array
return np.stack(output_weight_list, axis=-1)
def _stack_sigmas(ic: int):
"""
Stack sigmas.
Group sigmas for different parameter vectors of one ensemble
together, if they belong to the same simulation condition, and
stack them in one array.
"""
# Were outputs computed
if self.prediction_results[0].conditions[ic].output_sigmay is None:
return None
# stack predictions
output_sigmay_list = [prediction.conditions[ic].output_sigmay
for prediction in self.prediction_results]
# stack into one numpy array
return np.stack(output_sigmay_list, axis=-1)
def _compute_summary(tmp_array, percentiles_list, weights,
tmp_sigmas=None):
"""
Compute summary for a set of stacked simulations.
Summary includes means, standard deviation, median, and requested
percentiles.
"""
summary = {}
summary[MEAN] = np.average(tmp_array, axis=-1, weights=weights)
summary[STANDARD_DEVIATION] = np.sqrt(np.average(
(tmp_array.T-summary[MEAN].T).T**2,
axis=-1, weights=weights))
summary[MEDIAN] = np.median(tmp_array, axis=-1)
if tmp_sigmas is not None:
summary[WEIGHTED_SIGMA] = np.sqrt(np.average(
tmp_sigmas**2, axis=-1, weights=weights))
for perc in percentiles_list:
summary[get_percentile_label(perc)] = \
np.percentile(tmp_array, perc, axis=-1)
return summary
# preallocate for results
cond_lists = {MEAN: [], STANDARD_DEVIATION: [],
MEDIAN: []}
if compute_weighted_sigma:
cond_lists[WEIGHTED_SIGMA] = []
for perc in percentiles_list:
cond_lists[get_percentile_label(perc)] = []
# iterate over conditions, compute summary
for i_cond in range(n_conditions):
# use some short hand
current_cond = self.prediction_results[0].conditions[i_cond]
# create a temporary array with all the outputs needed and wanted
tmp_output = _stack_outputs(i_cond)
tmp_output_sensi = _stack_outputs_sensi(i_cond)
tmp_weights = np.ones(tmp_output.shape[-1])
if weighting:
# take exp() to get the likelihood values,
# as the weights would be the log likelihoods.
tmp_weights = np.exp(_stack_weights(i_cond))
tmp_sigmas = None
if compute_weighted_sigma:
tmp_sigmas = _stack_sigmas(i_cond)
# handle outputs
if tmp_output is not None:
output_summary = _compute_summary(tmp_output,
percentiles_list,
tmp_weights,
tmp_sigmas)
else:
output_summary = {i_key: None for i_key in cond_lists.keys()}
# handle output sensitivities
if tmp_output_sensi is not None:
output_sensi_summary = _compute_summary(tmp_output_sensi,
percentiles_list)
else:
output_sensi_summary = {i_key: None
for i_key in cond_lists.keys()}
# create some PredictionConditionResult to have an easier creation
# of PredictionResults for the summaries later on
for i_key in cond_lists.keys():
cond_lists[i_key].append(
PredictionConditionResult(
timepoints=current_cond.timepoints,
output=output_summary[i_key],
output_sensi=output_sensi_summary[i_key],
output_ids=current_cond.output_ids
)
)
self.prediction_summary = {i_key: PredictionResult(
conditions=cond_lists[i_key],
condition_ids=self.prediction_results[0].condition_ids,
comment=str(i_key))
for i_key in cond_lists.keys()
}
# also return the object
return self.prediction_summary
[docs] def compute_chi2(self, amici_objective: AmiciObjective):
"""
Compute the chi^2 error of the weighted mean trajectory.
Parameters
----------
amici_objective:
The objective function of the model,
the parameter ensemble was created from.
Returns
-------
The chi^2 error.
"""
if (self.prediction_summary[MEAN] is None) or (
self.prediction_summary[WEIGHTED_SIGMA] is None):
try:
self.compute_summary(
weighting=True,
compute_weighted_sigma=True)
except TypeError:
raise ValueError('Computing a summary failed.')
n_conditions = len(self.prediction_results[0].conditions)
chi_2 = []
for i_cond in range(n_conditions):
# get measurements and put into right form
y_meas = amici_objective.edatas[i_cond].getObservedData()
y_meas = np.array(y_meas)
# bring into shape (n_t,n_y)
y_meas = y_meas.reshape(
amici_objective.edatas[0].nt(),
amici_objective.edatas[0].nytrue()
)
mean_traj = \
self.prediction_summary[MEAN].conditions[i_cond].output
weighted_sigmas = \
self.prediction_summary[
WEIGHTED_SIGMA].conditions[i_cond].output
if y_meas.shape != mean_traj.shape:
raise ValueError('Shape of trajectory and shape '
'of measurements does not match.')
chi_2.append(np.nansum(((y_meas-mean_traj)/weighted_sigmas)**2))
return np.sum(chi_2)
[docs]class Ensemble:
"""
An ensemble is a wrapper around a numpy array.
It comes with some convenience functionality: It allows to map parameter
values via identifiers to the correct parameters, it allows to compute
summaries of the parameter vectors (mean, standard deviation, median,
percentiles) more easily, and it can store predictions made by pyPESTO,
such that the parameter ensemble and the predictions are linked to each
other.
"""
[docs] def __init__(self,
x_vectors: np.ndarray,
x_names: Sequence[str] = None,
vector_tags: Sequence[Tuple[int, int]] = None,
ensemble_type: EnsembleType = None,
predictions: Sequence[EnsemblePrediction] = None,
lower_bound: np.ndarray = None,
upper_bound: np.ndarray = None):
"""
Initialize.
Parameters
----------
x_vectors:
parameter vectors of the ensemble, in the format
n_parameter x n_vectors
x_names:
Names or identifiers of the parameters
vector_tags:
Additional tag, which adds information about the the parameter
vectors of the form (optimization_run, optimization_step) if the
ensemble is created from an optimization result or
(sampling_chain, sampling_step) if the ensemble is created from a
sampling result.
ensemble_type:
Type of ensemble: Ensemble (default), sample, or unprocessed_chain
Samples are meant to be representative, ensembles can be any
ensemble of parameters, and unprocessed chains still have burn-ins
predictions:
List of EnsemblePrediction objects
lower_bound:
array of potential lower bounds for the parameters
upper_bound:
array of potential upper bounds for the parameters
"""
# Do we have a representative sample or just random ensemble?
self.ensemble_type = EnsembleType.ensemble
if ensemble_type is not None:
self.ensemble_type = ensemble_type
# handle parameter vectors and sizes
self.x_vectors = x_vectors
self.n_x = x_vectors.shape[0]
self.n_vectors = x_vectors.shape[1]
self.vector_tags = vector_tags
self.summary = None
# store bounds
self.lower_bound = np.full((self.n_x,), np.nan)
if lower_bound is not None:
if np.array(lower_bound).size == 1:
self.lower_bound = np.full((x_vectors.shape[0],), lower_bound)
else:
self.lower_bound = lower_bound
self.upper_bound = np.full(self.n_x, np.nan)
if upper_bound is not None:
if np.array(upper_bound).size == 1:
self.upper_bound = np.full(x_vectors.shape[0], upper_bound)
else:
self.upper_bound = upper_bound
# handle parameter names
if x_names is not None:
self.x_names = x_names
else:
self.x_names = [f'x_{ix}' for ix in range(self.n_x)]
# Do we have predictions for this ensemble?
self.predictions = []
if predictions is not None:
self.predictions = predictions
[docs] @staticmethod
def from_sample(
result: Result,
remove_burn_in: bool = True,
chain_slice: slice = None,
x_names: Sequence[str] = None,
lower_bound: np.ndarray = None,
upper_bound: np.ndarray = None,
**kwargs,
):
"""
Construct an ensemble from a sample.
Parameters
----------
result:
A pyPESTO result that contains a sample result.
remove_burn_in:
Exclude parameter vectors from the ensemble if they are in the
"burn-in".
chain_slice:
Subset the chain with a slice. Any "burn-in" removal occurs first.
x_names:
Names or identifiers of the parameters
lower_bound:
array of potential lower bounds for the parameters
upper_bound:
array of potential upper bounds for the parameters
Returns
-------
The ensemble.
"""
x_vectors = result.sample_result.trace_x[0]
if x_names is None:
x_names = [result.problem.x_names[i]
for i in result.problem.x_free_indices]
if lower_bound is None:
lower_bound = result.problem.lb
if upper_bound is None:
upper_bound = result.problem.ub
if remove_burn_in:
if result.sample_result.burn_in is None:
geweke_test(result)
burn_in = result.sample_result.burn_in
x_vectors = x_vectors[burn_in:]
if chain_slice is not None:
x_vectors = x_vectors[chain_slice]
x_vectors = x_vectors.T
return Ensemble(x_vectors=x_vectors,
x_names=x_names,
lower_bound=lower_bound,
upper_bound=upper_bound,
**kwargs)
[docs] @staticmethod
def from_optimization_endpoints(
result: Result,
cutoff: float = np.inf,
max_size: int = np.inf,
**kwargs,
):
"""
Construct an ensemble from an optimization result.
Parameters
----------
result:
A pyPESTO result that contains an optimization result.
cutoff:
Exclude parameters from the optimization if the
nllh is higher than the `cutoff`.
max_size:
The maximum size the ensemble should be.
Returns
-------
The ensemble.
"""
x_vectors = []
vector_tags = []
x_names = [result.problem.x_names[i]
for i in result.problem.x_free_indices]
for start in result.optimize_result.list:
# add the parameters from the next start as long as we
# did not reach maximum size and the next value is still
# lower than the cutoff value
if start['fval'] <= cutoff and len(x_vectors) < max_size:
x_vectors.append(start['x'])
# the vector tag will be a -1 to indicate it is the last step
vector_tags.append((int(start['id']), -1))
else:
break
# print a warning if there are no vectors within the ensemble
if len(x_vectors) == 0:
raise ValueError('The ensemble does not contain any vectors. '
'Either the cutoff value was too small\n or the '
'result.optimize_result object might be empty.')
elif len(x_vectors) < max_size:
logger.info(f'The ensemble contains {len(x_vectors)} parameter '
'vectors, which is less than the maximum size.\nIf '
'you want to include more \nvectors, you can consider '
'raising the cutoff value or including parameters '
'from \nthe history with the `from_history` function.')
x_vectors = np.stack(x_vectors, axis=1)
return Ensemble(x_vectors=x_vectors,
x_names=x_names,
vector_tags=vector_tags,
lower_bound=result.problem.lb,
upper_bound=result.problem.ub,
**kwargs)
[docs] @staticmethod
def from_optimization_history(
result: Result,
cutoff: float = np.inf,
max_size: int = np.inf,
max_per_start: int = np.inf,
distribute: bool = True,
**kwargs,
):
"""
Construct an ensemble from the history of an optimization.
Parameters
----------
result:
A pyPESTO result that contains an optimization result
with history recorded.
cutoff:
Exclude parameters from the optimization if the nllh
is higher than the `cutoff`.
max_size:
The maximum size the ensemble should be.
max_per_start:
The maximum number of vectors to be included from a
single optimization start.
distribute:
Boolean flag, whether the best (False) values from the
start should be taken or whether the indices should be
more evenly distributed.
Returns
-------
The ensemble.
"""
if not result.optimize_result.list[0].history.options['trace_record']:
logger.warning('The optimize result has no trace. The Ensemble '
'will automatically be created through '
'from_optimization_endpoints().')
return Ensemble.from_optimization_endpoints(result=result,
cutoff=cutoff,
max_size=max_size,
**kwargs)
x_vectors = []
vector_tags = []
x_names = [result.problem.x_names[i]
for i in result.problem.x_free_indices]
lb = result.problem.lb
ub = result.problem.ub
# calculate the number of starts whose final nllh is below cutoff
n_starts = sum(start['fval'] <= cutoff
for start in result.optimize_result.list)
fval_trace = [
np.array(
result.optimize_result.list[i_ms][HISTORY].get_fval_trace()
)
for i_ms in range(n_starts)
]
x_trace = [
result.optimize_result.list[i_ms][HISTORY].get_x_trace()
for i_ms in range(n_starts)
]
# calculate the number of iterations included from each start
n_per_starts = entries_per_start(fval_traces=fval_trace,
cutoff=cutoff,
max_per_start=max_per_start,
max_size=max_size)
# determine x_vectors from each start
for start in range(n_starts):
indices = get_vector_indices(trace_start=fval_trace[start],
cutoff=cutoff,
n_vectors=n_per_starts[start],
distribute=distribute)
x_vectors.extend([x_trace[start][ind] for ind in indices])
vector_tags.extend([
(int(result.optimize_result.list[start]['id']), ind)
for ind in indices
])
# raise a `ValueError` if there are no vectors within the ensemble
if len(x_vectors) == 0:
raise ValueError('The ensemble does not contain any vectors. '
'Either the `cutoff` value was too \nsmall '
'or the `result.optimize_result` object might '
'be empty.')
x_vectors = np.stack(x_vectors, axis=1)
return Ensemble(x_vectors=x_vectors,
x_names=x_names,
vector_tags=vector_tags,
lower_bound=lb,
upper_bound=ub,
**kwargs)
def __iter__(self):
"""
Make the instances of the class iterable objects.
Allows to apply functions such as __dict__ to them.
"""
yield X_VECTOR, self.x_vectors
yield NX, self.n_x
yield X_NAMES, self.x_names
yield NVECTORS, self.n_vectors
yield VECTOR_TAGS, self.vector_tags
yield ENSEMBLE_TYPE, self.ensemble_type
yield PREDICTIONS, self.predictions
yield SUMMARY, self.summary
yield LOWER_BOUND, self.lower_bound
yield UPPER_BOUND, self.upper_bound
def _map_parameters_by_objective(
self,
predictor: Callable,
default_value: float = None,
):
"""
Create mapping for parameters from ensebmle to predictor.
The parameters of the ensemble don't need to have the same ordering as
in the predictor.
"""
# create short hands
parameter_ids_objective = predictor.amici_objective.x_names
parameter_ids_ensemble = self.x_names
# map, and fill with `default_value` if not found and `default_value`
# is specified.
mapping = []
for parameter_id_objective in parameter_ids_objective:
if parameter_id_objective in parameter_ids_ensemble:
mapping.append(
parameter_ids_ensemble.index(parameter_id_objective)
)
elif default_value is not None:
mapping.append(default_value)
return mapping
[docs] def predict(
self,
predictor: Callable,
prediction_id: str = None,
sensi_orders: Tuple = (0,),
default_value: float = None,
mode: str = MODE_FUN,
include_llh_weights: bool = False,
include_sigmay: bool = False,
engine: Engine = None,
progress_bar: bool = True
) -> EnsemblePrediction:
"""
Run predictions for a full ensemble.
User needs to hand over a predictor function and settings, then all
results are grouped as EnsemblePrediction for the whole ensemble
Parameters
----------
predictor:
Prediction function, e.g., an AmiciPredictor
prediction_id:
Identifier for the predictions
sensi_orders:
Specifies which sensitivities to compute, e.g. (0,1) -> fval, grad
default_value:
If parameters are needed in the mapping, which are not found in the
parameter source, it can make sense to fill them up with this
default value (e.g. `np.nan`) in some cases (to be used with
caution though).
mode:
Whether to compute function values or residuals.
include_llh_weights:
Whether to include weights in the output of the predictor.
include_sigmay:
Whether to include standard deviations in the output
of the predictor.
engine:
Parallelization engine. Defaults to sequential execution on a
`SingleCoreEngine`.
progress_bar:
Whether to display a progress bar.
Returns
-------
The prediction of the ensemble.
"""
if engine is None:
engine = SingleCoreEngine()
# Vectors are chunked to improve parallization performance.
n_chunks = self.n_vectors # Default is no chunking.
if isinstance(engine, MultiProcessEngine):
n_chunks = engine.n_procs
if isinstance(engine, MultiThreadEngine):
n_chunks = engine.n_threads
chunks = [
(
(chunk_i+0) * int(np.floor(self.n_vectors / n_chunks)),
(chunk_i+1) * int(np.floor(self.n_vectors / n_chunks)),
)
for chunk_i in range(n_chunks)
]
# Last chunk should contain any remaining vectors that may have
# been skipped due to the `floor` method.
chunks[-1] = (chunks[-1][0], self.n_vectors)
# Get the correct parameter mapping.
mapping = self._map_parameters_by_objective(
predictor,
default_value=default_value,
)
# Setup the tasks with the prediction method and chunked vectors.
method = partial(predictor,
sensi_orders=sensi_orders,
mode=mode,
include_sigmay=include_sigmay,
include_llh_weights=include_llh_weights)
tasks = [
EnsembleTask(
method=method,
vectors=self.x_vectors[mapping, chunk_start:chunk_end],
id=chunk_i,
)
for chunk_i, (chunk_start, chunk_end) in enumerate(chunks)
]
# Execute tasks and flatten chunked results.
prediction_results = [
prediction_result
for prediction_chunk in engine.execute(
tasks, progress_bar=progress_bar)
for prediction_result in prediction_chunk
]
return EnsemblePrediction(
predictor=predictor,
prediction_id=prediction_id,
prediction_results=prediction_results,
)
[docs] def compute_summary(self,
percentiles_list: Sequence[int] = (5, 20, 80, 95)):
"""
Compute summary for the parameters of the ensemble.
Summary includes the mean, the median, the standard deviation and
possibly percentiles. Those summary results are added as a data
member to the EnsemblePrediction object.
Parameters
----------
percentiles_list:
List or tuple of percent numbers for the percentiles
Returns
-------
summary:
Dict with mean, std, median, and percentiles of parameter vectors
"""
# compute summaries based on parameters
summary = {MEAN: np.mean(self.x_vectors, axis=1),
STANDARD_DEVIATION: np.std(self.x_vectors, axis=1),
MEDIAN: np.median(self.x_vectors, axis=1)}
for perc in percentiles_list:
summary[get_percentile_label(perc)] = \
np.percentile(self.x_vectors, perc, axis=1)
# store and return results
self.summary = summary
return summary
[docs] def check_identifiability(self) -> pd.DataFrame:
"""
Check identifiability of ensemble.
Use ensemble mean and standard deviation to assess (in a rudimentary
way) whether or not parameters are identifiable. Returns a dataframe
with tuples, which specify whether or not the lower and the upper
bounds are violated.
Returns
-------
parameter_identifiability:
DataFrame indicating parameter identifiability based on mean
plus/minus standard deviations and parameter bounds
"""
# Recompute the summary, maybe the ensemble objects has been changed.
self.compute_summary()
# check identifiability for each parameter
parameter_identifiability = []
for ix, x_name in enumerate(self.x_names):
# define some short hands
lb = self.lower_bound[ix]
ub = self.upper_bound[ix]
mean = self.summary[MEAN][ix]
std = self.summary[STANDARD_DEVIATION][ix]
median = self.summary[MEAN][ix]
perc_list = [int(i_key[11:]) for i_key in self.summary.keys()
if i_key[0:4] == 'perc']
perc_lower = [perc for perc in perc_list if perc < 50]
perc_upper = [perc for perc in perc_list if perc > 50]
# create dict of identifiability
tmp_identifiability = {
'parameterId': x_name,
'lowerBound': lb,
'upperBound': ub,
'ensemble_mean': mean,
'ensemble_std': std,
'ensemble_median': median,
'within lb: 1 std': lb < mean - std,
'within ub: 1 std': ub > mean + std,
'within lb: 2 std': lb < mean - 2 * std,
'within ub: 2 std': ub > mean + 2 * std,
'within lb: 3 std': lb < mean - 3 * std,
'within ub: 3 std': ub > mean + 3 * std,
}
# handle percentiles
for perc in perc_lower:
tmp_identifiability[f'within lb: perc {perc}'] = \
lb < self.summary[get_percentile_label(perc)][ix]
for perc in perc_upper:
tmp_identifiability[f'within ub: perc {perc}'] = \
ub > self.summary[get_percentile_label(perc)][ix]
parameter_identifiability.append(tmp_identifiability)
# create DataFrame
parameter_identifiability = pd.DataFrame(parameter_identifiability)
parameter_identifiability.index = \
parameter_identifiability['parameterId']
return parameter_identifiability
def entries_per_start(fval_traces: List['np.ndarray'],
cutoff: float,
max_size: int,
max_per_start: int, ):
"""
Create the indices of each start that will be included in the ensemble.
Parameters
----------
fval_traces:
the fval-trace of each start.
cutoff:
Exclude parameters from the optimization if the nllh
is higher than the `cutoff`.
max_size:
The maximum size the ensemble should be.
max_per_start:
The maximum number of vectors to be included from a
single optimization start.
Returns
-------
A list of number of candidates per start that are to
be included in the ensemble.
"""
# choose possible candidates
ens_ind = [np.flatnonzero(fval <= cutoff) for fval in fval_traces]
# count the number of candidates per start
n_per_start = np.array([len(start) for start in ens_ind])
# if all possible indices can be included, return
if (n_per_start < max_per_start).all() and sum(n_per_start) < max_size:
return n_per_start
# trimm down starts that exceed the limit:
n_per_start = [min(n, max_per_start) for n in n_per_start]
# trimm down more until it fits the max size
decr = 0
while(sum(n_per_start) > max_size):
n_per_start = [min(n, max_per_start-decr)
for n in n_per_start]
decr += 1
# TODO: Possibility. With this implementation we could
# in a scenario, where we have more candidates than
# max size end up with an ensemble of size
# `max_size - len(n_starts)` in the worst case. We could introduce
# a flag which would be `force_max`, that indicates
# whether those remaining free slots should be filled by
# entries from certain starts. This would brng up the
# discussion which starts to choose. One obvious choice
# would be the best starts based on their endpoint.
return n_per_start
def get_vector_indices(trace_start: np.ndarray,
cutoff: float,
n_vectors: int,
distribute: bool, ):
"""
Return the indices to be taken into an ensemble.
Parameters
----------
trace_start:
The fval_trace of a single start.
cutoff:
Exclude parameters from the optimization if the nllh
is higher than the `cutoff`.
n_vectors:
The number of indices to be included from one start.
distribute:
Boolean flag, whether the best (False) values from the
start should be taken or whether the indices should be
more evenly distributed.
Returns
-------
The indices to include in the ensemble.
"""
candidates = np.flatnonzero(trace_start <= cutoff)
if distribute:
indices = np.round(np.linspace(0, len(candidates) - 1, n_vectors))
return candidates[indices.astype(int)]
else:
return candidates[:n_vectors]
[docs]def get_percentile_label(percentile: Union[float, int, str]) -> str:
"""Convert a percentile to a label.
Labels for percentiles are used at different locations (e.g. ensemble
prediction code, and visualization code). This method ensures that the same
percentile is labeled identically everywhere.
The percentile is rounded to two decimal places in the label representation
if it is specified to more decimal places. This is for readability in
plotting routines, and to avoid float to string conversion issues related
to float precision.
Parameters
----------
percentile:
The percentile value that will be used to generate a label.
Returns
-------
The label of the (possibly rounded) percentile.
"""
if isinstance(percentile, str):
percentile = float(percentile)
if percentile == round(percentile):
percentile = round(percentile)
if isinstance(percentile, float):
percentile_str = f'{percentile:.2f}'
# Add `...` to the label if the percentile value changed after rounding
if float(percentile_str) != percentile:
percentile_str += '...'
percentile = percentile_str
return f'{PERCENTILE} {percentile}'