ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • The beginners guide to Hamiltonian Monte Carlo
    Research/Generative Model 2024. 3. 29. 20:58

    https://bayesianbrad.github.io/posts/2019_hmc.html


    In this post I will go through a powerful Markov Chain Monte Carlo (MCMC) algorithm called Hamiltonian Monte Carlo (HMC) (MC's be in da house) and demonstrate how to implement the algorithm within the pytorch framework.

     

    Let us start with a super nice gif demonstrating the conservation of momentum in action:


    Why is momentum important?

    Well, it makes things move. It gives things the spice of life and so when we have a particle, a sample and want it to explore the space we need to make it move.

     

    But, we want it to move efficiently around the space that we wish to explore. It turns out that this Irish guy named Hamilton, the guy behind the Hamiltonian, found a really cool way of representing such systems so that paths that explore the space efficiently naturally pop out from the representation. Hamilton showed that we can describe any mechanical system with just 2 variables, or 2 degrees of freedom, represented by position q and momentum p.


    The Phase Space

    Is just a fancy coordinate system that is defined in terms of position and momentum, so when we explore trajectories in the phase space, we explore contours of energy, that is the phase (angle with respect to position and momentum) changes in phase space! Here is an example phase space:

    The red inner regions represent where the motion is stable, i.e, we have energy preserving trajectories. Take note of this, it is important. In fact, it is integral for why HMC works so well in high-dimensions. It relates to something called symplectic geometries, which essentially have lots of nice mathematically properties that just so happen to be the exact features we require for certain dynamical systems.

     

    #ConspiracyTheory. Because they preserve energy, we can define an MCMC strategy in which the chance of accepting a particle, sample is high, by defining the acceptance criteria in terms of the total energy on these paths!

     

    The regions outside of this red region are non-energy preserving - this is not gouda (I know my cheeses) for our MCMC algorithm, because this would lead to low-acceptance rates, thus our algorithm would be computationally inefficient at exploring the space we are exploring! Hence, we want to construct a system that is energy preserving. #ConservationOfEnergy

     

    Thankfully, Hamilton provided us with a way to easily find these trajectories!


    The Hamiltonian

    The Hamiltonian of a physical system is defined completely with respect to the position q and p momentum variables and defines the total energy of the system on a particular trajectory for a given q and p, the circles on the phase space diagram above.

     

    The  Hamiltonian is the Legendre transform of the Lagrangian and gives us the total energy in the system. All you need to know about the Lagrangian is that the Lagrangian was just another representation for describing the energy in dynamical systems, which itself was an extension of Newtonian formalism, it is basically a function of kinetic energy minus potential energy. It is defined as follows:

     

    I hope that you are beginning to feel all that potential at your finger tips - well almost - bad joke I know...


    Hamiltonian Monte Carlo

    Within the HMC MCMC framework the positionsq, are the variables of interest and for each position variable we have to create a fictitious momentump. For compactness let z=(q,p). The potential energy U(q) will be the minus of the log of the probability density for the distribution of the position variables we wish to sample, plus any constant that is convenient. The kinetic energy will represents the dynamics of our variables, for which a popular form of

    , where M is symmetric, positive definite and typically diagonal. This form of K(p) corresponds to a minus the log probability of the zero mean Gaussian distribution with covariance matrix M, more on the choice of kinetic energy later. For this choice of kinetic energy we can write the Hamiltonian equations as follows:

     

    It should be noted that M can take on different properties, which can effect the way that we explore the space in numerous ways. More on this in another blog post, as it gets a little complicated.


    The Canonical Distribution

    To view the Hamiltonian in terms of probabilities, we use the concept of the canonical distribution from Statistical mechanics to construct our probability density function - it is a density because we define our space to be continuous. Thus, the distribution that we wish to sample from can be related to the potential energy via the canonical distribution as:

     

    As the Hamiltonian is just an energy function we may can insert (4) into our canonical distribution (7), which gives us the joint density:

     

    where T = 1 is fixed. And so we can now very easily get to our target distribution p(q), which is dependent on our choice of potential U(q), as this expression factorizes into two independent probability distributions. We characterise the posterior distribution for the model parameters using the potential energy function:

     

    where p(q) is the prior distribution, and p(q|D) is the likelihood of the diven data D.


    Solving Hamilton's Equations

    In order to solve the set of differential equations generated from Hamilton’s formalism we need to integrate. If the potential function (joint density in the MCMC (Bayesian) framework) is well understood, then it maybe possible to solve Hamilton’s equations analytically. However, in many instances in both the physical and statistical setting you have functions that generate integrals that are impossible to solve analytically using known techniques. A resolution to this is to employ numerical methods, which produce an approximation that is close to the true value. However, to implement a numerical solver requires us to discretize Hamilton’s equations which induces errors #Boo!


    Errors of Numerical Solvers

    One could employ Euler’s method for solving a series of differential equations, however, it has less than desirable properties in regards to the local and global errors incurred in the approximation #OhNo. Where local error, is the error after one step and global error, is the error after simulating for some fixed time interval. A more optimal integrator is the Leapfrog integrator, that has a local error of O(ϵ^2) and a global error of O(ϵ^3) #Yay!.

     

    Here is a nice figure, directly lifted from [1] Radford Neal's paper, showing how trajectories diverge between the Euler and Leapfrog integrators.


    The Leapfrog Integrator

    Both the Leapfrog integrator and the Euler integrator are part of a class of symplectic integrators, i.e they are volume preserving. Both could be used for HMC, but as the Leapfrog integrator has significantly lower local and global errors it has become the integrator of choice. The integrator first starts with a particle at t=0 and then evaluates the updated dynamics, subject to the form of the Hamiltonian, equation (4), at subsequent times t+ϵ,,t+nϵ, where ϵ is the stepsize in which we increase our dynamical moves and n is the number of time steps to simulate the dynamics for.


    Some initial points to notice


    The Hamilton Monte Carlo Algorithm

     

    The Hamiltonian Monte Carlo algorithm starts at a specified initial set of parameters q, see code below. Then, for a given number of iterations, a new momentum vector is sampled and the current value of the parameter q is updated using the leapfrog integrator with discretization time ϵ and number of steps L accroding to the Hamiltonian dynamics. Then a Metropolis acceptance step is applied, and a decision is made whether to update to the new state (q,p) or keep the existing state, see code below.


    How to Implement HMC in a System

    I'm going slightly exaggerate some aspects of the code, so that it is clear how one can implement HMC more generally and if needed, for research purposes, add your own HMC flavor. Disclaimer: This code is not optimised for speed, it is here for readability and clarity. I should also note, that once Pytorch 0.4 has been released, all Variable statements can be replaced with torch.tensor(data=...) and if you want the gradients of those objects, then you do torch.tensor(data=... , dtype=..., requires_grad=True) We are going to use a helper function for now called VariableCast(), for Pytorch v0.3 and lower, which handles this automatically:

        import torch
        from torch.autograd.variable import Variable
        def VariableCast(value, grad = False):
            '''
            Casts an input to torch Variable object, with gradients either False or True. 
            input
            -----
            value - Type: scalar, Variable object, torch.Tensor, numpy ndarray
            grad  - Type: bool . If true then we require the gradient of that object
            output
            ------
            torch.autograd.variable.Variable object
            '''
            if value is None:
                return None
            elif isinstance(value, Variable):
                return value
            elif torch.is_tensor(value):
                return Variable(value, requires_grad = grad)
            elif isinstance(value, np.ndarray):
                tensor = torch.from_numpy(value).float()
                return Variable(tensor, requires_grad = grad)
            elif isinstance(value,list):
                return Variable(torch.FloatTensor(value), requires_grad=grad)
            else:
                return Variable(torch.FloatTensor([value]), requires_grad = grad)

    The Integrator

        #!/usr/bin/env python3
        # -*- coding: utf-8 -*-
        import torch
        import numpy as np
        from kinetic import Kinetic
        from torch.autograd import Variable
    
        class Integrator():
    
            def __init__(self, potential, min_step, max_step, max_traj, min_traj):
                self.potential = potential
                if min_step is None:
                    self.min_step = np.random.uniform(0.01, 0.07)
                else:
                    self.min_step = min_step
                if max_step is None:
                    self.max_step = np.random.uniform(0.07, 0.18)
                else:
                    self.max_step = max_step
                if max_traj is None:
                    self.max_traj = np.random.uniform(18, 25)
                else:
                    self.max_traj = max_traj
                if min_traj is None:
                    self.min_traj = np.random.uniform(1, 18)
                else:
                    self.min_traj = min_traj
    
            def generate_new_step_traj(self):
                ''' Generates a new step adn trajectory size  '''
                step_size = np.random.uniform(self.min_step, self.max_step)
                traj_size = int(np.random.uniform(self.min_traj, self.max_traj))
                return step_size, traj_size
    
    
            def leapfrog(self, p_init, q, grad_init):
                '''Performs the leapfrog steps of the HMC for the specified trajectory
                length, given by num_steps
                Parameters
                ----------
                    values_init
                    p_init
                    grad_init     - Description: contains the initial gradients of the joint w.r.t parameters.
    
                Outputs
                -------
                    q -    Description: proposed new q
                    p      -    Description: proposed new auxillary momentum
                '''
                step_size, traj_size = self.generate_new_step_traj()
                values_init = q
                self.kinetic = Kinetic(p_init)
                # Start by updating the momentum a half-step and q by a full step
                p = p_init + 0.5 * step_size * grad_init
                q = values_init + step_size * self.kinetic.gauss_ke(p, grad=True)
                for i in range(traj_size - 1):
                    # range equiv to [2:nsteps] as we have already performed the first step
                    # update momentum
                    p = p + step_size * self.potential.eval(q, grad=True)
                    # update q
                    q = q + step_size * self.kinetic.gauss_ke(p, grad=True)
    
                # Do a final update of the momentum for a half step
                p = p + 0.5 * step_size * self.potential.eval(q, grad=True)
    
                # return new proposal state
                return q, p

    Kinetic Energy

        from core import VariableCast
        import torch
        from torch.autograd import Variable
        class Kinetic():
            ''' A basic class that implements kinetic energies and computes gradients
            Methods
            -------
            gauss_ke          : Returns KE gauss
            laplace_ke        : Returns KE laplace
    
            Attributes
            ----------
            p    - Type       : torch.Tensor, torch.autograd.Variable,nparray
                Size       : [1, ... , N]
                Description: Vector of current momentum
    
            M    - Type       : torch.Tensor, torch.autograd.Variable, nparray
                Size       : \mathbb{R}^{N \times N}
                Description: The mass matrix, defaults to identity.
    
            '''
            def __init__(self, p, M = None):
    
                if M is not None:
                    if isinstance(M, Variable):
                        self.M  = VariableCast(torch.inverse(M.data))
                    else:
                        self.M  = VariableCast(torch.inverse(M))
                else:
                    self.M  = VariableCast(torch.eye(p.size()[0])) # inverse of identity is identity
    
    
            def gauss_ke(self,p, grad = False):
                '''' (p dot p) / 2 and Mass matrix M = \mathbb{I}_{dim,dim}'''
                self.p = VariableCast(p)
                P = Variable(self.p.data, requires_grad=True)
                K = 0.5 * P.t().mm(self.M).mm(P)
    
                if grad:
                    return self.ke_gradients(P, K)
                else:
                    return K
            def laplace_ke(self, p, grad = False):
                self.p = VariableCast(p)
                P = Variable(self.p.data, requires_grad=True)
                K = torch.sign(P).mm(self.M)
                if grad:
                    return self.ke_gradients(P, K)
                else:
                    return K
            def ke_gradients(self, P, K):
                return torch.autograd.grad([K], [P])[0]

    Potential Energy

    Step 1 Automatically calculating the gradients for an arbitrary function might sound difficult, but with any auto differentiation framework this is actually quite trivial. In Pytorch we can do the following:

        def _grad_logp(self, logp, values):
            """
            Returns the gradient of the log pdf, with respect for
            each parameter
    
            Parameters
            ----------
            logjoint       - The density function 
            values         - All parameters for which you need gradients.
        
            :return: torch.autograd.Variable
            """
    
            gradient_of_param = torch.autograd.grad(outputs=logp, inputs=values, retain_graph=True)[0]
            return gradient_of_param

    Metropolis Step

        import torch
        from torch.autograd import Variable
        from Utils.kinetic import Kinetic
    
        class Metropolis():
    
            def __init__(self, potential, integrator, M):
                self.potential  = potential
                self.integrator = integrator
                self.M          = M
                self.count      = 0
    
            def sample_momentum(self,values):
                assert(isinstance(values, Variable))
                return Variable(torch.randn(values.data.size()))
    
            def hamiltonian(self, logjoint, p):
                """Computes the Hamiltonian  given the current postion and momentum
                H = U(q) + K(p)
                U is the potential energy and is = -log_posterior(q)
                Parameters
                ----------
                logjoint    - Type:torch.autograd.Variable
                            Size: \mathbb{R}^{1 \times D}
                p           - Type: torch.Tensor.Variable
                            Size: \mathbb{R}^{1 \times D}.
                            Description: Auxiliary momentum
                log_potential :Function from state to position to 'energy'= -log_posterior
    
                Returns
                -------
                hamitonian : float
                """
                T = self.kinetic.gauss_ke(p, grad=False)
                return -logjoint + T
            def acceptance(self, values_init, logjoint_init, grad_init):
                '''Returns the new accepted state
    
                Parameters
                ----------
    
                Output
                ------
                returns accepted or rejected proposal
                '''
    
                # generate initial momentum
    
                #### FLAG
                p_init = self.sample_momentum(values_init)
                # generate kinetic energy object.
                self.kinetic = Kinetic(p_init, self.M)
                # calc hamiltonian  on initial state
                orig = self.hamiltonian(logjoint_init, p_init)
    
                # generate proposals
                values, p = self.integrator.leapfrog(p_init, values_init, grad_init)
    
                # calculate new hamiltonian given current
                logjoint_prop, _ = self.potential.eval(values, grad=False)
    
                current = self.hamiltonian(logjoint_prop, p)
                alpha = torch.min(torch.exp(orig - current))
                # calculate acceptance probability
                if isinstance(alpha, Variable):
                    p_accept = torch.min(torch.ones(1, 1), alpha.data)
                else:
                    p_accept = torch.min(torch.ones(1, 1), alpha)
                # print(p_accept)
                if p_accept[0][0] > torch.Tensor(1, 1).uniform_()[0][0]:  # [0][0] dirty code to get integersr
                    # Updates count globally for target acceptance rate
                    # print('Debug : Accept')
                    # print('Debug : Count ', self.count)
                    self.count = self.count + 1
                    return values, self.count
                else:
                    # print('Debug : reject')
                    return values_init, self.count

    Tying Everything Together

        import torch
        import numpy as np
        import time
        import math
        from torch.autograd import Variable
        from Utils.integrator import Integrator
        from Utils.metropolis_step import Metropolis
        # np.random.seed(12345)
        # torch.manual_seed(12345)
        class HMCsampler():
            '''
            Notes:  the params from FOPPL graph will have to all be passed to - maybe
            Methods
            -------
            leapfrog_step - preforms the integrator step in HMC
            hamiltonian   - calculates the value of hamiltonian
            acceptance    - calculates the acceptance probability
            run_sampler
    
            Attributes
            ----------
    
            '''
            def __init__(self, program, burn_in= 100, n_samples= 20000, M= None,  min_step= None, max_step= None,\
                        min_traj= None, max_traj= None):
                self.burn_in    = burn_in
                self.n_samples  = n_samples
                self.M          = M
                self.potential  = program()
                self.integrator = Integrator(self.potential, min_step, max_step, \
                                            min_traj, max_traj)
                # self.dim        = dim
    
                # TO DO : Implement a adaptive step size tuning from HMC
                # TO DO : Have a desired target acceptance ratio
                # TO DO : Implement a adaptive trajectory size from HMC
    
    
            def run_sampler(self):
                ''' Runs the hmc internally for a number of samples and updates
                our parameters of interest internally
                Parameters
                ----------
                n_samples
                burn_in
    
                Output
                ----------
                A tensor of the number of required samples
                Acceptance rate
                '''
                print(' The sampler is now running')
                # In the future dim = # of variables will not be needed as Yuan will provide
                # that value in the program and it shall return the required dim.
                logjoint_init, values_init, grad_init, dim = self.potential.generate()
                metropolis   = Metropolis(self.potential, self.integrator, self.M)
                temp,count   = metropolis.acceptance(values_init, logjoint_init, grad_init)
                samples      = Variable(torch.zeros(self.n_samples,dim))
                samples[0]   = temp.data.t()
    
    
                # Then run for loop from 2:n_samples
                for i in range(self.n_samples-1):
                    logjoint_init, grad_init = self.potential.eval(temp, grad_loop= True)
                    temp, count = metropolis.acceptance(temp, logjoint_init, grad_init)
                    samples[i + 1, :] = temp.data.t()
                    # try:
                    #     samples[i+1,:] = temp.data.t()
                    # except RuntimeError:
                    #     print(i)
                    #     break
                    # update parameters and draw new momentum
                    if i == np.floor(self.n_samples/4) or i == np.floor(self.n_samples/2) or i == np.floor(3*self.n_samples/4):
                        print(' At interation {}'.format(i))
    
                # Basic summary statistics
                target_acceptance =  count / (self.n_samples)
                samples_reduced   = samples[self.burn_in:, :]
                mean = torch.mean(samples_reduced,dim=0, keepdim= True)
                print()
                print('****** EMPIRICAL MEAN/COV USING HMC ******')
                print('empirical mean : ', mean)
                print('Average acceptance rate is: ', target_acceptance)
    
                return samples,samples_reduced, mean

     

    'Research > Generative Model' 카테고리의 다른 글

    Understanding Conditional VAEs  (0) 2024.04.02
    Hamiltonian Monte Carlo  (0) 2024.03.31
    Metropolis and Gibbs Sampling  (0) 2024.03.28
    Metropolis-Hastings algorithm  (0) 2024.03.28
    Importance Sampling Explained End-to-End  (0) 2024.03.28
Designed by Tistory.