"""Ensemble utilities."""
import os
from pathlib import Path
from typing import Callable, Literal, Sequence, Union
import h5py
import numpy as np
import pandas as pd
from ..C import (
LOWER_BOUND,
OPTIMIZE,
OUTPUT,
OUTPUT_IDS,
OUTPUT_SIGMAY,
OUTPUT_WEIGHT,
PREDICTION_ID,
PREDICTION_RESULTS,
SAMPLE,
SUMMARY,
TIMEPOINTS,
UPPER_BOUND,
X_NAMES,
EnsembleType,
)
from ..result import PredictionConditionResult, PredictionResult
from ..store import read_result, write_array
from .ensemble import Ensemble, EnsemblePrediction
[docs]
def read_from_csv(
path: str,
sep: str = '\t',
index_col: int = 0,
headline_parser: Callable = None,
ensemble_type: EnsembleType = None,
lower_bound: np.ndarray = None,
upper_bound: np.ndarray = None,
):
"""
Create an ensemble from a csv file.
Parameters
----------
path:
path to csv file to read in parameter ensemble
sep:
separator in csv file
index_col:
index column in csv file
headline_parser:
A function which reads in the headline of the csv file and converts it
into vector_tags (see constructor of Ensemble for more details)
ensemble_type:
Ensemble type: representative sample or random ensemble
lower_bound:
array of potential lower bounds for the parameters
upper_bound:
array of potential upper bounds for the parameters
Returns
-------
result:
Ensemble object of parameter vectors
"""
# get the data from the csv
ensemble_df = pd.read_csv(path, sep=sep, index_col=index_col)
# set the type of the ensemble
if ensemble_type is None:
ensemble_type = EnsembleType.ensemble
return read_from_df(
dataframe=ensemble_df,
headline_parser=headline_parser,
ensemble_type=ensemble_type,
lower_bound=lower_bound,
upper_bound=upper_bound,
)
def read_ensemble_from_hdf5(
filename: str,
input_type: Literal['optimize', 'sample'] = OPTIMIZE,
remove_burn_in: bool = True,
chain_slice: slice = None,
cutoff: float = np.inf,
max_size: int = np.inf,
):
"""
Create an ensemble from an HDF5 storage file.
Parameters
----------
filename:
Name or path of the HDF5 file.
input_type:
Which type of ensemble to create. From History, from
Optimization or from Sample.
Returns
-------
ensemble:
Ensemble object of parameter vectors
"""
# TODO: add option HISTORY. Need to fix
# reading history from hdf5.
if input_type == OPTIMIZE:
result = read_result(filename=filename, optimize=True)
return Ensemble.from_optimization_endpoints(
result=result, rel_cutoff=cutoff, max_size=max_size
)
elif input_type == SAMPLE:
result = read_result(filename=filename, sample=True)
return Ensemble.from_sample(
result=result,
remove_burn_in=remove_burn_in,
chain_slice=chain_slice,
)
else:
raise ValueError(
'The type you provided was neither '
f'"{SAMPLE}" nor "{OPTIMIZE}". Those are '
'currently the only supported types. '
'Please choose one of them.'
)
[docs]
def read_from_df(
dataframe: pd.DataFrame,
headline_parser: Callable = None,
ensemble_type: EnsembleType = None,
lower_bound: np.ndarray = None,
upper_bound: np.ndarray = None,
):
"""
Create an ensemble from a csv file.
Parameters
----------
dataframe:
pandas.DataFrame to read in parameter ensemble
headline_parser:
A function which reads in the headline of the csv file and converts it
into vector_tags (see constructor of Ensemble for more details)
ensemble_type:
Ensemble type: representative sample or random ensemble
lower_bound:
array of potential lower bounds for the parameters
upper_bound:
array of potential upper bounds for the parameters
Returns
-------
result:
Ensemble object of parameter vectors
"""
# if we have a parser to make vector_tags from column names, we use it
vector_tags = None
if headline_parser is not None:
vector_tags = headline_parser(list(dataframe.columns))
# set the type of the ensemble
if ensemble_type is None:
ensemble_type = EnsembleType.ensemble
return Ensemble(
x_vectors=dataframe.values,
x_names=list(dataframe.index),
vector_tags=vector_tags,
ensemble_type=ensemble_type,
lower_bound=lower_bound,
upper_bound=upper_bound,
)
[docs]
def write_ensemble_prediction_to_h5(
ensemble_prediction: EnsemblePrediction,
output_file: str,
base_path: str = None,
):
"""
Write an `EnsemblePrediction` to hdf5.
Parameters
----------
ensemble_prediction:
The prediciton to be saved.
output_file:
The filename of the hdf5 file.
base_path:
An optional filepath where the file should be saved to.
"""
# parse base path
base = Path('')
if base_path is not None:
base = Path(base_path)
# open file
with h5py.File(output_file, 'a') as f:
# write prediction ID if available
if ensemble_prediction.prediction_id is not None:
f.create_dataset(
os.path.join(base, PREDICTION_ID),
data=ensemble_prediction.prediction_id,
)
# write lower bounds per condition, if available
if ensemble_prediction.lower_bound is not None:
if isinstance(ensemble_prediction.lower_bound[0], np.ndarray):
lb_grp = f.require_group(LOWER_BOUND)
for i_cond, lower_bounds in enumerate(
ensemble_prediction.lower_bound
):
condition_id = ensemble_prediction.prediction_results[
0
].condition_ids[i_cond]
write_array(lb_grp, condition_id, lower_bounds)
elif isinstance(ensemble_prediction.lower_bound[0], float):
f.create_dataset(
LOWER_BOUND, data=ensemble_prediction.lower_bound
)
# write upper bounds per condition, if available
if ensemble_prediction.upper_bound is not None:
if isinstance(ensemble_prediction.upper_bound[0], np.ndarray):
ub_grp = f.require_group(UPPER_BOUND)
for i_cond, upper_bounds in enumerate(
ensemble_prediction.upper_bound
):
condition_id = ensemble_prediction.prediction_results[
0
].condition_ids[i_cond]
write_array(ub_grp, condition_id, upper_bounds)
elif isinstance(ensemble_prediction.upper_bound[0], float):
f.create_dataset(
UPPER_BOUND, data=ensemble_prediction.upper_bound
)
# write summary statistics to h5 file
for (
summary_id,
summary,
) in ensemble_prediction.prediction_summary.items():
if summary is None:
continue
tmp_base_path = os.path.join(base, f'{SUMMARY}_{summary_id}')
f.create_group(tmp_base_path)
summary.write_to_h5(output_file, base_path=tmp_base_path)
# write the single prediction results
for i_result, result in enumerate(
ensemble_prediction.prediction_results
):
tmp_base_path = os.path.join(
base, f'{PREDICTION_RESULTS}_{i_result}'
)
result.write_to_h5(output_file, base_path=tmp_base_path)
def get_prediction_dataset(
ens: Union[Ensemble, EnsemblePrediction], prediction_index: int = 0
) -> np.ndarray:
"""
Extract an array of prediction.
Can be done from either an Ensemble object which contains a list of
predictions of from an EnsemblePrediction object.
Parameters
----------
ens:
Ensemble objects containing a set of parameter vectors and a set of
predictions or EnsemblePrediction object containing only predictions
prediction_index:
index telling which prediction from the list should be analyzed
Returns
-------
dataset:
numpy array containing the ensemble predictions
"""
if isinstance(ens, Ensemble):
dataset = ens.predictions[prediction_index]
elif isinstance(ens, EnsemblePrediction):
ens.condense_to_arrays()
dataset = ens.prediction_arrays[OUTPUT].transpose()
else:
raise Exception(
'Need either an Ensemble object with predictions or '
'an EnsemblePrediction object as input. Stopping.'
)
return dataset
[docs]
def read_ensemble_prediction_from_h5(
predictor: Union[Callable[[Sequence], PredictionResult], None],
input_file: str,
):
"""Read an ensemble prediction from an HDF5 File."""
# open file
with h5py.File(input_file, 'r') as f:
pred_res_list = []
bounds = {}
for key in f.keys():
if key.startswith(SUMMARY):
continue
if key == PREDICTION_ID:
prediction_id = f[key][()].decode()
continue
if key in {LOWER_BOUND, UPPER_BOUND}:
if isinstance(f[key], h5py._hl.dataset.Dataset):
bounds[key] = f[key][:]
continue
bounds[key] = [
f[f'{key}/{cond}'][()] for cond in f[key].keys()
]
bounds[key] = np.array(bounds[key])
continue
x_names = list(decode_array(f[f'{key}/{X_NAMES}'][()]))
condition_ids = list(decode_array(f[f'{key}/condition_ids'][()]))
pred_cond_res_list = []
for id, _ in enumerate(condition_ids):
output = f[f'{key}/{id}/{OUTPUT}'][:]
output_ids = tuple(
decode_array(f[f'{key}/{id}' f'/{OUTPUT_IDS}'][:])
)
timepoints = f[f'{key}/{id}/{TIMEPOINTS}'][:]
try:
output_weight = f[f'{key}/{id}/{OUTPUT_WEIGHT}'][()]
except KeyError:
output_weight = None
try:
output_sigmay = f[f'{key}/{id}/{OUTPUT_SIGMAY}'][:]
except KeyError:
output_sigmay = None
pred_cond_res_list.append(
PredictionConditionResult(
timepoints=timepoints,
output_ids=output_ids,
output=output,
x_names=x_names,
output_weight=output_weight,
output_sigmay=output_sigmay,
)
)
pred_res_list.append(
PredictionResult(
conditions=pred_cond_res_list, condition_ids=condition_ids
)
)
return EnsemblePrediction(
predictor=predictor,
prediction_id=prediction_id,
prediction_results=pred_res_list,
)
def decode_array(array: np.ndarray) -> np.ndarray:
"""Decode array of bytes to string."""
for i in range(len(array)):
array[i] = array[i].decode()
return array