Source code for pypesto.ensemble.util

"""Ensemble utilities."""

import os
from pathlib import Path
from typing import Callable, Literal, Sequence, Union

import h5py
import numpy as np
import pandas as pd

from ..C import (
    LOWER_BOUND,
    OPTIMIZE,
    OUTPUT,
    OUTPUT_IDS,
    OUTPUT_SIGMAY,
    OUTPUT_WEIGHT,
    PREDICTION_ID,
    PREDICTION_RESULTS,
    SAMPLE,
    SUMMARY,
    TIMEPOINTS,
    UPPER_BOUND,
    X_NAMES,
    EnsembleType,
)
from ..result import PredictionConditionResult, PredictionResult
from ..store import read_result, write_array
from .ensemble import Ensemble, EnsemblePrediction


[docs] def read_from_csv( path: str, sep: str = '\t', index_col: int = 0, headline_parser: Callable = None, ensemble_type: EnsembleType = None, lower_bound: np.ndarray = None, upper_bound: np.ndarray = None, ): """ Create an ensemble from a csv file. Parameters ---------- path: path to csv file to read in parameter ensemble sep: separator in csv file index_col: index column in csv file headline_parser: A function which reads in the headline of the csv file and converts it into vector_tags (see constructor of Ensemble for more details) ensemble_type: Ensemble type: representative sample or random ensemble lower_bound: array of potential lower bounds for the parameters upper_bound: array of potential upper bounds for the parameters Returns ------- result: Ensemble object of parameter vectors """ # get the data from the csv ensemble_df = pd.read_csv(path, sep=sep, index_col=index_col) # set the type of the ensemble if ensemble_type is None: ensemble_type = EnsembleType.ensemble return read_from_df( dataframe=ensemble_df, headline_parser=headline_parser, ensemble_type=ensemble_type, lower_bound=lower_bound, upper_bound=upper_bound, )
def read_ensemble_from_hdf5( filename: str, input_type: Literal['optimize', 'sample'] = OPTIMIZE, remove_burn_in: bool = True, chain_slice: slice = None, cutoff: float = np.inf, max_size: int = np.inf, ): """ Create an ensemble from an HDF5 storage file. Parameters ---------- filename: Name or path of the HDF5 file. input_type: Which type of ensemble to create. From History, from Optimization or from Sample. Returns ------- ensemble: Ensemble object of parameter vectors """ # TODO: add option HISTORY. Need to fix # reading history from hdf5. if input_type == OPTIMIZE: result = read_result(filename=filename, optimize=True) return Ensemble.from_optimization_endpoints( result=result, rel_cutoff=cutoff, max_size=max_size ) elif input_type == SAMPLE: result = read_result(filename=filename, sample=True) return Ensemble.from_sample( result=result, remove_burn_in=remove_burn_in, chain_slice=chain_slice, ) else: raise ValueError( 'The type you provided was neither ' f'"{SAMPLE}" nor "{OPTIMIZE}". Those are ' 'currently the only supported types. ' 'Please choose one of them.' )
[docs] def read_from_df( dataframe: pd.DataFrame, headline_parser: Callable = None, ensemble_type: EnsembleType = None, lower_bound: np.ndarray = None, upper_bound: np.ndarray = None, ): """ Create an ensemble from a csv file. Parameters ---------- dataframe: pandas.DataFrame to read in parameter ensemble headline_parser: A function which reads in the headline of the csv file and converts it into vector_tags (see constructor of Ensemble for more details) ensemble_type: Ensemble type: representative sample or random ensemble lower_bound: array of potential lower bounds for the parameters upper_bound: array of potential upper bounds for the parameters Returns ------- result: Ensemble object of parameter vectors """ # if we have a parser to make vector_tags from column names, we use it vector_tags = None if headline_parser is not None: vector_tags = headline_parser(list(dataframe.columns)) # set the type of the ensemble if ensemble_type is None: ensemble_type = EnsembleType.ensemble return Ensemble( x_vectors=dataframe.values, x_names=list(dataframe.index), vector_tags=vector_tags, ensemble_type=ensemble_type, lower_bound=lower_bound, upper_bound=upper_bound, )
[docs] def write_ensemble_prediction_to_h5( ensemble_prediction: EnsemblePrediction, output_file: str, base_path: str = None, ): """ Write an `EnsemblePrediction` to hdf5. Parameters ---------- ensemble_prediction: The prediciton to be saved. output_file: The filename of the hdf5 file. base_path: An optional filepath where the file should be saved to. """ # parse base path base = Path('') if base_path is not None: base = Path(base_path) # open file with h5py.File(output_file, 'a') as f: # write prediction ID if available if ensemble_prediction.prediction_id is not None: f.create_dataset( os.path.join(base, PREDICTION_ID), data=ensemble_prediction.prediction_id, ) # write lower bounds per condition, if available if ensemble_prediction.lower_bound is not None: if isinstance(ensemble_prediction.lower_bound[0], np.ndarray): lb_grp = f.require_group(LOWER_BOUND) for i_cond, lower_bounds in enumerate( ensemble_prediction.lower_bound ): condition_id = ensemble_prediction.prediction_results[ 0 ].condition_ids[i_cond] write_array(lb_grp, condition_id, lower_bounds) elif isinstance(ensemble_prediction.lower_bound[0], float): f.create_dataset( LOWER_BOUND, data=ensemble_prediction.lower_bound ) # write upper bounds per condition, if available if ensemble_prediction.upper_bound is not None: if isinstance(ensemble_prediction.upper_bound[0], np.ndarray): ub_grp = f.require_group(UPPER_BOUND) for i_cond, upper_bounds in enumerate( ensemble_prediction.upper_bound ): condition_id = ensemble_prediction.prediction_results[ 0 ].condition_ids[i_cond] write_array(ub_grp, condition_id, upper_bounds) elif isinstance(ensemble_prediction.upper_bound[0], float): f.create_dataset( UPPER_BOUND, data=ensemble_prediction.upper_bound ) # write summary statistics to h5 file for ( summary_id, summary, ) in ensemble_prediction.prediction_summary.items(): if summary is None: continue tmp_base_path = os.path.join(base, f'{SUMMARY}_{summary_id}') f.create_group(tmp_base_path) summary.write_to_h5(output_file, base_path=tmp_base_path) # write the single prediction results for i_result, result in enumerate( ensemble_prediction.prediction_results ): tmp_base_path = os.path.join( base, f'{PREDICTION_RESULTS}_{i_result}' ) result.write_to_h5(output_file, base_path=tmp_base_path)
def get_prediction_dataset( ens: Union[Ensemble, EnsemblePrediction], prediction_index: int = 0 ) -> np.ndarray: """ Extract an array of prediction. Can be done from either an Ensemble object which contains a list of predictions of from an EnsemblePrediction object. Parameters ---------- ens: Ensemble objects containing a set of parameter vectors and a set of predictions or EnsemblePrediction object containing only predictions prediction_index: index telling which prediction from the list should be analyzed Returns ------- dataset: numpy array containing the ensemble predictions """ if isinstance(ens, Ensemble): dataset = ens.predictions[prediction_index] elif isinstance(ens, EnsemblePrediction): ens.condense_to_arrays() dataset = ens.prediction_arrays[OUTPUT].transpose() else: raise Exception( 'Need either an Ensemble object with predictions or ' 'an EnsemblePrediction object as input. Stopping.' ) return dataset
[docs] def read_ensemble_prediction_from_h5( predictor: Union[Callable[[Sequence], PredictionResult], None], input_file: str, ): """Read an ensemble prediction from an HDF5 File.""" # open file with h5py.File(input_file, 'r') as f: pred_res_list = [] bounds = {} for key in f.keys(): if key.startswith(SUMMARY): continue if key == PREDICTION_ID: prediction_id = f[key][()].decode() continue if key in {LOWER_BOUND, UPPER_BOUND}: if isinstance(f[key], h5py._hl.dataset.Dataset): bounds[key] = f[key][:] continue bounds[key] = [ f[f'{key}/{cond}'][()] for cond in f[key].keys() ] bounds[key] = np.array(bounds[key]) continue x_names = list(decode_array(f[f'{key}/{X_NAMES}'][()])) condition_ids = list(decode_array(f[f'{key}/condition_ids'][()])) pred_cond_res_list = [] for id, _ in enumerate(condition_ids): output = f[f'{key}/{id}/{OUTPUT}'][:] output_ids = tuple( decode_array(f[f'{key}/{id}' f'/{OUTPUT_IDS}'][:]) ) timepoints = f[f'{key}/{id}/{TIMEPOINTS}'][:] try: output_weight = f[f'{key}/{id}/{OUTPUT_WEIGHT}'][()] except KeyError: output_weight = None try: output_sigmay = f[f'{key}/{id}/{OUTPUT_SIGMAY}'][:] except KeyError: output_sigmay = None pred_cond_res_list.append( PredictionConditionResult( timepoints=timepoints, output_ids=output_ids, output=output, x_names=x_names, output_weight=output_weight, output_sigmay=output_sigmay, ) ) pred_res_list.append( PredictionResult( conditions=pred_cond_res_list, condition_ids=condition_ids ) ) return EnsemblePrediction( predictor=predictor, prediction_id=prediction_id, prediction_results=pred_res_list, )
def decode_array(array: np.ndarray) -> np.ndarray: """Decode array of bytes to string.""" for i in range(len(array)): array[i] = array[i].decode() return array