Source code for pypesto.sample.metropolis

from typing import Dict, Sequence, Union

import numpy as np
from tqdm import tqdm

from ..history import NoHistory
from ..objective import NegLogPriors, ObjectiveBase
from ..problem import Problem
from ..result import McmcPtResult
from .sampler import InternalSample, InternalSampler

[docs]class MetropolisSampler(InternalSampler): """Simple Metropolis-Hastings sampler with fixed proposal variance."""
[docs] def __init__(self, options: Dict = None): super().__init__(options) self.problem: Union[Problem, None] = None self.neglogpost: Union[ObjectiveBase, None] = None self.neglogprior: Union[NegLogPriors, None] = None self.trace_x: Union[Sequence[np.ndarray], None] = None self.trace_neglogpost: Union[Sequence[float], None] = None self.trace_neglogprior: Union[Sequence[float], None] = None self.temper_lpost: bool = False
[docs] @classmethod def default_options(cls): """Return the default options for the sampler.""" return { 'std': 1.0, # the proposal standard deviation 'show_progress': True, # whether to show the progress }
[docs] def initialize(self, problem: Problem, x0: np.ndarray): """Initialize the sampler.""" self.problem = problem self.neglogpost = problem.objective self.neglogpost.history = NoHistory() if problem.x_priors is None: self.neglogprior = lambda x: -0.0 else: self.neglogprior = problem.x_priors self.trace_x = [x0] self.trace_neglogpost = [self.neglogpost(x0)] self.trace_neglogprior = [self.neglogprior(x0)]
[docs] def sample(self, n_samples: int, beta: float = 1.0): """Load last recorded particle.""" x = self.trace_x[-1] lpost = -self.trace_neglogpost[-1] lprior = -self.trace_neglogprior[-1] show_progress = self.options['show_progress'] # loop over iterations for _ in tqdm(range(int(n_samples)), disable=not show_progress): # perform step x, lpost, lprior = self._perform_step( x=x, lpost=lpost, lprior=lprior, beta=beta ) # record step self.trace_x.append(x) self.trace_neglogpost.append(-lpost) self.trace_neglogprior.append(-lprior)
[docs] def make_internal(self, temper_lpost: bool): """ Allow the inner samplers to be used as inner samplers. Can be called by parallel tempering samplers during initialization. Default: Do nothing. Parameters ---------- temper_lpost: Whether to temperate the posterior or only the likelihood. """ self.options['show_progress'] = False self.temper_lpost = temper_lpost
def _perform_step( self, x: np.ndarray, lpost: np.ndarray, lprior: np.ndarray, beta: float ): """ Perform a step. Propose new parameter, evaluate and check whether to accept. """ # propose step x_new: np.ndarray = self._propose_parameter(x) # check if step lies within bounds if any(x_new < or any(x_new > self.problem.ub): # will not be accepted lpost_new = -np.inf else: # compute log posterior lpost_new = -self.neglogpost(x_new) # check posterior evaluation is successful if np.isnan(lpost_new): # will not be accepted lpost_new = -np.inf # compute log prior lprior_new = -self.neglogprior(x_new) if not self.temper_lpost: # extract current log likelihood value llh = lpost - lprior # extract proposed log likelihood value llh_new = lpost_new - lprior_new # log acceptance probability (temper log likelihood) log_p_acc = min(beta * (llh_new - llh) + (lprior_new - lprior), 0) # catch the nan case which occurs if both lpost_new and lprior_new are -inf if np.isnan(log_p_acc): log_p_acc = -np.inf else: # log acceptance probability (temper log posterior) log_p_acc = min(beta * (lpost_new - lpost), 0) # flip a coin u = np.random.uniform(0, 1) # check acceptance if np.log(u) < log_p_acc: # update particle x = x_new lpost = lpost_new lprior = lprior_new # update proposal self._update_proposal( x, lpost, log_p_acc, len(self.trace_neglogpost) + 1 ) return x, lpost, lprior def _propose_parameter(self, x: np.ndarray): """Propose a step.""" x_new: np.ndarray = x + self.options['std'] * np.random.randn(len(x)) return x_new def _update_proposal( self, x: np.ndarray, lpost: float, log_p_acc: float, n_sample_cur: int ): """Update the proposal density. Default: Do nothing."""
[docs] def get_last_sample(self) -> InternalSample: """Get the last sample in the chain. Returns ------- internal_sample: The last sample in the chain in the exchange format. """ return InternalSample( x=self.trace_x[-1], lpost=-self.trace_neglogpost[-1], lprior=-self.trace_neglogprior[-1], )
[docs] def set_last_sample(self, sample: InternalSample): """ Set the last sample in the chain to the passed value. Parameters ---------- sample: The sample that will replace the last sample in the chain. """ self.trace_x[-1] = sample.x self.trace_neglogpost[-1] = -sample.lpost self.trace_neglogprior[-1] = -sample.lprior
[docs] def get_samples(self) -> McmcPtResult: """Get the samples into the fitting pypesto format.""" result = McmcPtResult( trace_x=np.array([self.trace_x]), trace_neglogpost=np.array([self.trace_neglogpost]), trace_neglogprior=np.array([self.trace_neglogprior]), betas=np.array([1.0]), ) return result