import numpy as np
from typing import Dict, Sequence, Union
from tqdm import tqdm
from ..objective import History, ObjectiveBase, NegLogPriors
from ..problem import Problem
from .sampler import InternalSample, InternalSampler
from .result import McmcPtResult
[docs]class MetropolisSampler(InternalSampler):
"""
Simple Metropolis-Hastings sampler with fixed proposal variance.
"""
[docs] def __init__(self, options: Dict = None):
super().__init__(options)
self.problem: Union[Problem, None] = None
self.neglogpost: Union[ObjectiveBase, None] = None
self.neglogprior: Union[NegLogPriors, None] = None
self.trace_x: Union[Sequence[np.ndarray], None] = None
self.trace_neglogpost: Union[Sequence[float], None] = None
self.trace_neglogprior: Union[Sequence[float], None] = None
self.temper_lpost: bool = False
[docs] @classmethod
def default_options(cls):
return {
'std': 1., # the proposal standard deviation
'show_progress': True, # whether to show the progress
}
[docs] def initialize(self, problem: Problem, x0: np.ndarray):
self.problem = problem
self.neglogpost = problem.objective
self.neglogpost.history = History()
if problem.x_priors is None:
self.neglogprior = lambda x: -0.
else:
self.neglogprior = problem.x_priors
self.trace_x = [x0]
self.trace_neglogpost = [self.neglogpost(x0)]
self.trace_neglogprior = [self.neglogprior(x0)]
[docs] def sample(self, n_samples: int, beta: float = 1.):
# load last recorded particle
x = self.trace_x[-1]
lpost = - self.trace_neglogpost[-1]
lprior = - self.trace_neglogprior[-1]
show_progress = self.options['show_progress']
# loop over iterations
for _ in tqdm(range(int(n_samples)), disable=not show_progress):
# perform step
x, lpost, lprior = self._perform_step(
x=x, lpost=lpost, lprior=lprior, beta=beta)
# record step
self.trace_x.append(x)
self.trace_neglogpost.append(-lpost)
self.trace_neglogprior.append(-lprior)
[docs] def make_internal(self, temper_lpost: bool):
self.options['show_progress'] = False
self.temper_lpost = temper_lpost
def _perform_step(self, x: np.ndarray, lpost: np.ndarray,
lprior: np.ndarray, beta: float):
"""
Perform a step: Propose new parameter, evaluate and check whether to
accept.
"""
# propose step
x_new: np.ndarray = self._propose_parameter(x)
# check if step lies within bounds
if any(x_new < self.problem.lb) or any(x_new > self.problem.ub):
# will not be accepted
lpost_new = - np.inf
else:
# compute log posterior
lpost_new = - self.neglogpost(x_new)
# check posterior evaluation is successful
if np.isnan(lpost_new):
# will not be accepted
lpost_new = - np.inf
# compute log prior
lprior_new = - self.neglogprior(x_new)
if not self.temper_lpost:
# extract current log likelihood value
llh = lpost - lprior
# extract proposed log likelihood value
llh_new = lpost_new - lprior_new
# log acceptance probability (temper log likelihood)
log_p_acc = min(beta * (llh_new - llh) + (lprior_new - lprior), 0)
else:
# log acceptance probability (temper log posterior)
log_p_acc = min(beta * (lpost_new - lpost), 0)
# flip a coin
u = np.random.uniform(0, 1)
# check acceptance
if np.log(u) < log_p_acc:
# update particle
x = x_new
lpost = lpost_new
lprior = lprior_new
# update proposal
self._update_proposal(x, lpost,
log_p_acc, len(self.trace_neglogpost)+1)
return x, lpost, lprior
def _propose_parameter(self, x: np.ndarray):
"""Propose a step."""
x_new: np.ndarray = x + self.options['std'] * np.random.randn(len(x))
return x_new
def _update_proposal(self, x: np.ndarray, lpost: float,
log_p_acc: float, n_sample_cur: int):
"""Update the proposal density. Default: Do nothing."""
[docs] def get_last_sample(self) -> InternalSample:
return InternalSample(
x=self.trace_x[-1],
lpost=- self.trace_neglogpost[-1],
lprior=- self.trace_neglogprior[-1]
)
[docs] def set_last_sample(self, sample: InternalSample):
self.trace_x[-1] = sample.x
self.trace_neglogpost[-1] = - sample.lpost
self.trace_neglogprior[-1] = - sample.lprior
[docs] def get_samples(self) -> McmcPtResult:
result = McmcPtResult(
trace_x=np.array([self.trace_x]),
trace_neglogpost=np.array([self.trace_neglogpost]),
trace_neglogprior=np.array([self.trace_neglogprior]),
betas=np.array([1.]),
)
return result