Source code for pypesto.sample.adaptive_metropolis

import numbers

import numpy as np

from ..problem import Problem
from .metropolis import MetropolisSampler


[docs] class AdaptiveMetropolisSampler(MetropolisSampler): """Metropolis-Hastings sampler with adaptive proposal covariance. A core problem of the standard Metropolis-Hastings sampler is its fixed proposal distribution, which must be manually tuned. This sampler adapts the proposal distribution during the sampling process based on previous samples. It adapts the correlation structure and the scaling factor of the proposal distribution. For both parts, there exist a variety of methods, see * Ballnus et al. 2017. Comprehensive benchmarking of Markov chain Monte Carlo methods for dynamical systems (https://doi.org/10.1186/s12918-017-0433-1) * Andrieu et al. 2008. A tutorial on adaptive MCMC (https://doi.org/10.1007/s11222-008-9110-y) for a review. Here, we approximate the covariance matrix via a weighted average of current and earlier samples, with a decay factor determining the relative contribution of the current sample and earlier ones to the weighted average of mean and covariance. The scaling factor we aim to converge to a fixed target acceptance rate of 0.234, as suggested by theoretical results. The implementation is based on: * Lacki et al. 2015. State-dependent swap strategies and automatic reduction of number of temperatures in adaptive parallel tempering algorithm (https://doi.org/10.1007/s11222-015-9579-0) * Miasojedow et al. 2013. An adaptive parallel tempering algorithm (https://doi.org/10.1080/10618600.2013.778779) In turn, these are based on adaptive MCMC as discussed in: * Haario et al. 2001. An adaptive Metropolis algorithm (https://doi.org/10.2307/3318737) For reference matlab implementations see: * https://github.com/ICB-DCM/PESTO/blob/master/private/performPT.m * https://github.com/ICB-DCM/PESTO/blob/master/private/updateStatistics.m """
[docs] def __init__(self, options: dict = None): super().__init__(options) self._cov = None self._mean_hist = None self._cov_hist = None self._cov_scale = None
[docs] @classmethod def default_options(cls): """Return the default options for the sampler.""" return { # controls adaptation degeneration velocity of the proposals # in [0, 1], with 0 -> no adaptation, i.e. classical # Metropolis-Hastings "decay_constant": 0.51, # number of samples before adaptation decreases significantly. # a higher value reduces the impact of early adaptation "threshold_sample": 1, # regularization factor for ill-conditioned cov matrices of # the adapted proposal density. regularization might happen if the # eigenvalues of the cov matrix strongly differ in order # of magnitude. in this case, the algorithm adds a small # diag matrix to the cov matrix with elements of this factor "reg_factor": 1e-6, # initial covariance matrix. defaults to a unit matrix "cov0": None, # target acceptance rate "target_acceptance_rate": 0.234, # show progress "show_progress": None, }
[docs] def initialize(self, problem: Problem, x0: np.ndarray): """Initialize the sampler.""" super().initialize(problem, x0) if self.options["cov0"] is not None: cov0 = self.options["cov0"] if isinstance(cov0, numbers.Real): cov0 = float(cov0) * np.eye(len(x0)) else: cov0 = np.eye(len(x0)) self._cov = regularize_covariance(cov0, self.options["reg_factor"]) self._mean_hist = self.trace_x[-1] self._cov_hist = self._cov self._cov_scale = 1.0
def _propose_parameter(self, x: np.ndarray): x_new = np.random.multivariate_normal(x, self._cov) return x_new def _update_proposal( self, x: np.ndarray, lpost: float, log_p_acc: float, n_sample_cur: int ): # parse options decay_constant = self.options["decay_constant"] threshold_sample = self.options["threshold_sample"] reg_factor = self.options["reg_factor"] target_acceptance_rate = self.options["target_acceptance_rate"] # compute historical mean and covariance self._mean_hist, self._cov_hist = update_history_statistics( mean=self._mean_hist, cov=self._cov_hist, x_new=x, n_cur_sample=max(n_sample_cur + 1, threshold_sample), decay_constant=decay_constant, ) # compute covariance scaling factor based on the target acceptance rate self._cov_scale *= np.exp( (np.exp(log_p_acc) - target_acceptance_rate) / np.power(n_sample_cur + 1, decay_constant) ) # set proposal covariance # TODO check publication self._cov = self._cov_scale * self._cov_hist # regularize proposal covariance self._cov = regularize_covariance(cov=self._cov, reg_factor=reg_factor)
def update_history_statistics( mean: np.ndarray, cov: np.ndarray, x_new: np.ndarray, n_cur_sample: int, decay_constant: float, ) -> tuple[np.ndarray, np.ndarray]: """Update sampling mean and covariance matrix via weighted average. Update sampling mean and covariance matrix based on the previous estimate and the most recent sample via a weighted average, with gradually decaying update rate. Parameters ---------- mean: The estimated mean of the samples, that was calculated in the previous iteration. cov: The estimated covariance matrix of the sample, that was calculated in the previous iteration. x_new: Most recent sample. n_cur_sample: Current number of samples. decay_constant: Adaption decay, in [0, 1]. Higher values result in faster decays, such that later iterations influence the adaption more weakly. Returns ------- mean, cov: The updated values for the estimated mean and the estimated covariance matrix of the sample. """ update_rate = n_cur_sample ** (-decay_constant) mean = (1 - update_rate) * mean + update_rate * x_new dx = x_new - mean cov = (1 - update_rate) * cov + update_rate * dx.reshape( (-1, 1) ) @ dx.reshape((1, -1)) return mean, cov def regularize_covariance(cov: np.ndarray, reg_factor: float) -> np.ndarray: """ Regularize the estimated covariance matrix of the sample. Useful if the estimated covariance matrix is ill-conditioned. Increments the diagonal a little to ensure positivity. Parameters ---------- cov: Estimate of the covariance matrix of the sample. reg_factor: Regularization factor. Larger values result in stronger regularization. Returns ------- cov: Regularized estimate of the covariance matrix of the sample. """ eig = np.linalg.eigvals(cov) eig_min = min(eig) if eig_min <= 0: cov += (abs(eig_min) + reg_factor) * np.eye(cov.shape[0]) return cov