Source code for pypesto.sample.emcee

"""EmceeSampler class."""

from __future__ import annotations

import logging

import numpy as np

from ..problem import Problem
from ..result import McmcPtResult
from ..startpoint import UniformStartpoints, uniform
from .sampler import Sampler, SamplerImportError

logger = logging.getLogger(__name__)


[docs] class EmceeSampler(Sampler): """Use emcee for sampling. Wrapper around https://emcee.readthedocs.io/en/stable/, see there for details. """
[docs] def __init__( self, nwalkers: int = 1, sampler_args: dict = None, run_args: dict = None, ): """ Initialize sampler. Parameters ---------- nwalkers: The number of walkers in the ensemble. sampler_args: Further keyword arguments that are passed on to ``emcee.EnsembleSampler.__init__``. run_args: Further keyword arguments that are passed on to ``emcee.EnsembleSampler.run_mcmc``. """ # check dependencies try: import emcee except ImportError: raise SamplerImportError("emcee") from None super().__init__() self.nwalkers: int = nwalkers if sampler_args is None: sampler_args = {} self.sampler_args: dict = sampler_args if run_args is None: run_args = {} self.run_args: dict = run_args # set in initialize self.problem: Problem | None = None self.sampler: emcee.EnsembleSampler | None = None self.state: emcee.State | None = None
[docs] def get_epsilon_ball_initial_state( self, center: np.ndarray, problem: Problem, epsilon: float = 1e-3, ): """Get walker initial positions as samples from an epsilon ball. The ball is scaled in each direction according to the magnitude of the center in that direction. It is assumed that, because vectors are generated near a good point, all generated vectors are evaluable, so evaluability is not checked. Points that are generated outside the problem bounds will get shifted to lie on the edge of the problem bounds. Parameters ---------- center: The center of the epsilon ball. The dimension should match the full dimension of the pyPESTO problem. This will be returned as the first position. problem: The pyPESTO problem. epsilon: The relative radius of the ball. e.g., if `epsilon=0.5` and the center of the first dimension is at 100, then the upper and lower bounds of the epsilon ball in the first dimension will be 150 and 50, respectively. """ # Epsilon ball lb = center * (1 - epsilon) ub = center * (1 + epsilon) # Adjust bounds to satisfy problem bounds lb[lb < problem.lb] = problem.lb[lb < problem.lb] ub[ub > problem.ub] = problem.ub[ub > problem.ub] # Sample initial positions initial_state_after_first = uniform( n_starts=self.nwalkers - 1, lb=lb, ub=ub, ) # Include `center` in initial positions initial_state = np.row_stack( ( center, initial_state_after_first, ) ) return initial_state
[docs] def initialize( self, problem: Problem, x0: np.ndarray | list[np.ndarray], ) -> None: """Initialize the sampler. It is recommended to initialize walkers Parameters ---------- x0: The "a priori preferred position". e.g., an optimized parameter vector. https://emcee.readthedocs.io/en/stable/user/faq/ The position of the first walker will be this, the remaining walkers will be assigned positions uniformly in a smaller ball around this vector. Alternatively, a set of vectors can be provided, which will be used to initialize walkers. In this case, any remaining walkers will be initialized at points sampled uniformly within the problem bounds. """ import emcee self.problem = problem # extract for pickling efficiency objective = self.problem.objective lb = self.problem.lb ub = self.problem.ub # parameter dimenstion ndim = len(self.problem.x_free_indices) def log_prob(x): """Log-probability density function.""" # check if parameter lies within bounds if any(x < lb) or any(x > ub): return -np.inf # invert sign return -1.0 * objective(x) # initialize sampler self.sampler = emcee.EnsembleSampler( nwalkers=self.nwalkers, ndim=ndim, log_prob_fn=log_prob, **self.sampler_args, ) # assign startpoints if self.state is None: if x0.ndim > 1 and len(x0.shape[0]) > 1: logger.warning( "More than a single vector was provided to initialize the " "walker positions. If these vectors do not exist in a " "small ball around a high-probability position (e.g. " "optimized vector) then sampling may be inefficient (see " "emcee FAQ: " "https://emcee.readthedocs.io/en/stable/user/faq/ )." ) # extract x0 x0 = np.asarray(x0) if x0.ndim == 1: x0 = [x0] x0 = np.array([problem.get_full_vector(x) for x in x0]) x_guesses_full0 = problem.x_guesses_full # add x0 to guesses problem.set_x_guesses( np.row_stack( ( x0, problem.x_guesses_full, ) ) ) # sample start points initial_state = UniformStartpoints( use_guesses=True, check_fval=True, check_grad=False, )( n_starts=self.nwalkers, problem=problem, ) # restore original guesses problem.set_x_guesses(x_guesses_full0) else: initial_state = self.get_epsilon_ball_initial_state( center=x0, problem=problem, ) self.state = initial_state
[docs] def sample(self, n_samples: int, beta: float = 1.0) -> None: """Return the most recent sample state.""" self.state = self.sampler.run_mcmc( initial_state=self.state, nsteps=n_samples, **self.run_args, )
[docs] def get_samples(self) -> McmcPtResult: """Get the samples into the fitting pypesto format.""" # all walkers are concatenated, yielding a flat array trace_x = np.array([self.sampler.get_chain(flat=True)]) trace_neglogpost = np.array([-self.sampler.get_log_prob(flat=True)]) # the sampler does not know priors trace_neglogprior = np.full(trace_neglogpost.shape, np.nan) # the walkers all run on temperature 1 betas = np.array([1.0]) result = McmcPtResult( trace_x=trace_x, trace_neglogpost=trace_neglogpost, trace_neglogprior=trace_neglogprior, betas=betas, ) return result