"""
Utilities
=========
Package-wide utilities.
"""
from collections.abc import Sequence
from numbers import Number
from operator import itemgetter
from typing import Any, Callable, Optional, Union
import numpy as np
from scipy import cluster
from tqdm import tqdm as _tqdm
def _check_none(fun: Callable[..., Any]) -> Callable[..., Union[Any, None]]:
"""Return None if any input argument is None; Wrapper function."""
def checked_fun(*args, **kwargs):
if any(x is None for x in [*args, *(kwargs.values())]):
return None
return fun(*args, **kwargs)
return checked_fun
@_check_none
def res_to_chi2(res: np.ndarray) -> float:
"""Translate residuals to chi2 values, `chi2 = sum(res**2) + C`."""
return float(np.dot(res, res))
@_check_none
def chi2_to_fval(chi2: float) -> float:
"""Translate chi2 to function value, `fval = 0.5*chi2 = 0.5*sum(res**2) + C`.
Note that for the function value we thus employ a probabilistic
interpretation, as the log-likelihood of a standard normal noise model.
This is in line with e.g. AMICI's and SciPy's objective definition.
"""
return 0.5 * chi2
@_check_none
def fval_to_chi2(fval: float) -> float:
"""Translate function value to chi2, `chi2 = 2 * fval`.
Note that for the function value we thus employ a probabilistic
interpretation, as the log-likelihood of a standard normal noise model.
This is in line with e.g. AMICI's and SciPy's objective definition.
"""
return 2.0 * fval
@_check_none
def res_to_fval(res: np.ndarray) -> float:
"""Translate residuals to function value, `fval = 0.5*sum(res**2) + C`."""
return chi2_to_fval(res_to_chi2(res))
@_check_none
def sres_to_schi2(res: np.ndarray, sres: np.ndarray) -> np.ndarray:
"""Translate residual sensitivities to chi2 gradient."""
return 2 * res.dot(sres)
@_check_none
def schi2_to_grad(schi2: np.ndarray) -> np.ndarray:
"""Translate chi2 gradient to function value gradient.
See also :func:`chi2_to_fval`.
"""
return 0.5 * schi2
@_check_none
def grad_to_schi2(grad: np.ndarray) -> np.ndarray:
"""Translate function value gradient to chi2 gradient."""
return 2.0 * grad
@_check_none
def sres_to_grad(res: np.ndarray, sres: np.ndarray) -> np.ndarray:
"""Translate residual sensitivities to function value gradient.
Assumes `fval = 0.5*sum(res**2)`.
See also :func:`chi2_to_fval`.
"""
return schi2_to_grad(sres_to_schi2(res, sres))
@_check_none
def sres_to_fim(sres: np.ndarray) -> np.ndarray:
"""Translate residual sensitivities to FIM.
The FIM is based on the function values, not chi2, i.e. has a normalization
of 0.5 as in :func:`res_to_fval`.
"""
return sres.transpose().dot(sres)
def is_none_or_nan(x: Union[Number, None]) -> bool:
"""
Check if x is None or NaN.
Parameters
----------
x:
object to be checked
Returns
-------
True if x is None or NaN, False otherwise.
"""
return x is None or np.isnan(x)
def is_none_or_nan_array(x: Union[Number, np.ndarray, None]) -> bool:
"""
Check if x is None or NaN array.
Parameters
----------
x:
object to be checked
Returns
-------
True if x is None or NaN array, False otherwise.
"""
return x is None or np.isnan(x).all()
def allclose(
x: Union[Number, np.ndarray], y: Union[Number, np.ndarray]
) -> bool:
"""
Check if two arrays are close.
Parameters
----------
x: first array
y: second array
Returns
-------
True if all elements of x and y are close, False otherwise.
"""
# Note: We use this wrapper around np.allclose in order to more easily
# adjust hyper parameters for the tolerance.
return np.allclose(x, y)
def isclose(
x: Union[Number, np.ndarray],
y: Union[Number, np.ndarray],
) -> Union[bool, np.ndarray]:
"""
Check if two values or arrays are close, element-wise.
Parameters
----------
x: first array
y: second array
Returns
-------
Element-wise boolean comparison of x and y.
"""
# Note: We use this wrapper around np.isclose in order to more easily
# adjust hyper parameters for the tolerance.
return np.isclose(x, y)
def get_condition_label(condition_id: str) -> str:
"""Convert a condition ID to a label.
Labels for conditions are used at different locations (e.g. ensemble
prediction code, and visualization code). This method ensures that the same
condition is labeled identically everywhere.
Parameters
----------
condition_id:
The condition ID that will be used to generate a label.
Returns
-------
The condition label.
"""
return f"condition_{condition_id}"
[docs]
def assign_clusters(vals):
"""
Find clustering.
Parameters
----------
vals: numeric list or array
List to be clustered.
Returns
-------
clust: numeric list
Indicating the corresponding cluster of each element from
'vals'.
clustsize: numeric list
Size of clusters, length equals number of clusters.
"""
# sanity checks
if vals is None or len(vals) == 0:
return [], []
elif len(vals) == 1:
return np.array([0]), np.array([1.0])
# linkage requires (n, 1) data array
vals = np.reshape(vals, (-1, 1))
# however: clusters are sorted by size, not by value... Resort.
# Create preallocated object first
cluster_indices = np.zeros(vals.size, dtype=int)
# get clustering based on distance
clust = cluster.hierarchy.fcluster(
cluster.hierarchy.linkage(vals), t=0.1, criterion="distance"
)
# get unique clusters
_, ind_clust = np.unique(clust, return_index=True)
unique_clust = clust[np.sort(ind_clust)]
cluster_size = np.zeros(unique_clust.size, dtype=int)
# loop over clusters: resort and count number of entries
for index, i_clust in enumerate(unique_clust):
cluster_indices[np.where(clust == i_clust)] = index
cluster_size[index] = sum(clust == i_clust)
return cluster_indices, cluster_size
[docs]
def delete_nan_inf(
fvals: np.ndarray,
x: Optional[Sequence[Union[np.ndarray, list[float]]]] = None,
xdim: Optional[int] = 1,
magnitude_bound: Optional[float] = np.inf,
) -> tuple[np.ndarray, np.ndarray]:
"""
Delete nan and inf values in fvals.
If parameters 'x' are passed, also the corresponding entries are deleted.
Parameters
----------
x:
array of parameters
fvals:
array of fval
xdim:
dimension of x, in case x dimension cannot be inferred
magnitude_bound:
any values with a magnitude (absolute value) larger than the
`magnitude_bound` are also deleted
Returns
-------
x:
array of parameters without nan or inf
fvals:
array of fval without nan or inf
"""
fvals = np.asarray(fvals)
finite_fvals = np.isfinite(fvals) & (np.absolute(fvals) < magnitude_bound)
if x is not None:
if np.isfinite(fvals).any():
finite_indices = np.where(finite_fvals)[0]
x = np.array(itemgetter(*finite_indices)(x))
# itemgetter does not return list for single element
if len(x.shape) == 1:
x = x.reshape((1, x.shape[0]))
else:
# If none of the fvals are finite we try to construct an
# empty np.ndarray with the correct dimension (other functions rely
# on x.shape[1] to be of correct dimension)
x = np.array(x)
x = np.empty(
(
0,
(
x.shape[1]
if x.ndim == 2
else x[0].shape[0]
if x[0] is not None
else xdim
),
)
)
return x, fvals[finite_fvals]
def tqdm(*args, enable: bool = None, **kwargs):
"""
Create a progress bar using tqdm.
Parameters
----------
args:
Arguments passed to tqdm.
enable:
Whether to enable the progress bar.
If None, use tqdm defaults.
Mutually exclusive with `disable`.
kwargs:
Keyword arguments passed to tqdm.
Returns
-------
progress_bar:
A progress bar.
"""
# Drop the `disable` argument unless it is not-None.
# This way, we don't interfere with TQDM_DISABLE or other global
# tqdm settings.
disable = kwargs.pop("disable", None)
if enable is not None:
if disable is not None and enable != disable:
raise ValueError(
"Contradicting values for `enable` and `disable` passed."
)
disable = not enable
if disable is not None:
kwargs["disable"] = disable
return _tqdm(*args, **kwargs)