Source code for pypesto.sample.pymc3

import numpy as np
from typing import Union
import logging

from ..objective import History
from ..problem import Problem
from .sampler import Sampler
from .result import McmcPtResult

logger = logging.getLogger(__name__)

try:
    import pymc3 as pm
    import arviz as az
    import theano.tensor as tt
except ImportError:
    pm = az = tt = None

try:
    from .theano import TheanoLogProbability
except (AttributeError, ImportError):
    TheanoLogProbability = None


[docs]class Pymc3Sampler(Sampler): """Wrapper around Pymc3 samplers. Parameters ---------- step_function: A pymc3 step function, e.g. NUTS, Slice. If not specified, pymc3 determines one automatically (preferable). **kwargs: Options are directly passed on to `pymc3.sample`. """
[docs] def __init__(self, step_function=None, **kwargs): super().__init__(kwargs) self.step_function = step_function self.problem: Union[Problem, None] = None self.x0: Union[np.ndarray, None] = None self.trace: Union[pm.backends.Text, None] = None self.data: Union[az.InferenceData, None] = None
[docs] @classmethod def translate_options(cls, options): if not options: options = {'chains': 1} return options
[docs] def initialize(self, problem: Problem, x0: np.ndarray): self.problem = problem if x0 is not None: if len(x0) != problem.dim: x0 = problem.get_reduced_vector(x0) self.x0 = x0 self.trace = None self.data = None self.problem.objective.history = History()
[docs] def sample( self, n_samples: int, beta: float = 1.): problem = self.problem log_post_fun = TheanoLogProbability(problem, beta) trace = self.trace x0 = None if self.x0 is not None: x0 = {x_name: val for x_name, val in zip(self.problem.x_names, self.x0)} # create model context with pm.Model() as model: # uniform bounds k = [pm.Uniform(x_name, lower=lb, upper=ub) for x_name, lb, ub in zip(problem.get_reduced_vector(problem.x_names), problem.lb, problem.ub)] # convert to tensor vector theta = tt.as_tensor_variable(k) # use a DensityDist for the log-posterior pm.DensityDist('log_post', logp=lambda v: log_post_fun(v), observed={'v': theta}) # step, by default automatically determined by pymc3 step = None if self.step_function: step = self.step_function() # perform the actual sampling trace = pm.sample( draws=int(n_samples), trace=trace, start=x0, step=step, **self.options) # convert trace to inference data object data = az.from_pymc3(trace=trace, model=model) self.trace = trace self.data = data
[docs] def get_samples(self) -> McmcPtResult: # parameter values trace_x = np.asarray( self.data.posterior.to_array()).transpose((1, 2, 0)) # TODO this is only the negative objective values trace_neglogpost = np.asarray(self.data.log_likelihood.to_array()) # remove trailing dimensions trace_neglogpost = np.reshape(trace_neglogpost, trace_neglogpost.shape[1:-1]) # flip sign trace_neglogpost = - trace_neglogpost if trace_x.shape[0] != trace_neglogpost.shape[0] \ or trace_x.shape[1] != trace_neglogpost.shape[1] \ or trace_x.shape[2] != self.problem.dim: raise ValueError("Trace dimensions are inconsistent") return McmcPtResult( trace_x=np.array(trace_x), trace_neglogpost=np.array(trace_neglogpost), trace_neglogprior=np.full(trace_neglogpost.shape, np.nan), betas=np.array([1.] * trace_x.shape[0]), )