Demo: the forward-backward algorithm

import random

import numpy

import matplotlib.pyplot as plt

###########################

## generating the data

# generate 2/3 n from hot, then 1/3 n from cold.

# returns: observations, #hot, #cold

def generate_observations(n):

    # probabilities of ice cream amounts given hot / cold:

    # shown here as amounts of 1's, 2's, 3's out of 10 days

    hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]

    cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]

    observations  = [ ]

    # choose 2n observations from "hot"

    numhot = int(2.0/3 * n)

    for i in range(numhot): observations.append(random.choice(hot))

    # choose n observations from "cold"

    numcold = n - numhot

    for i in range(numcold): observations.append(random.choice(cold))

    return (observations, numhot, numcold)

###########################

# data structures:

#

# numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.

#

# emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)

# where entry (j, o) has emis_j(o).

#

# transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).

#

# forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability

# for state s and observation t

################

# computing forward and backward probabilities

# forward:

# alpha_t(j) = P(o1, ..., ot, qt = j | HMM)

# forward function: returns numpy matrix of size (N, T)

def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):

    forwardmatrix = numpy.zeros((numstates, len(observations)))

    # initialization

    obs_index = observation_indices[ observations[0]]

    for s in range(numstates):

        forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]

    # recursion step

    for t in range(1, len(observations)):

        obs_index = observation_indices[ observations[t]]

        for s in range(numstates):

            forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \

                                       for s2 in range(numstates)])

    return forwardmatrix

# beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)

# backward function: returns numpy matrix of size (N, T)

def backwardprobs(observations, trans, emis, numstates, observation_indices):

    backwardmatrix = numpy.zeros((numstates, len(observations)))

    # initialization

    for s in range(numstates):

        backwardmatrix[ s, len(observations) - 1 ] = 1.0

    # recursion

    for t in range(len(observations) - 2, -1, -1):

        obs_index = observation_indices[ observations[t+1]]

        for s in range(numstates):

            backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \

                                         for s2 in range(numstates) ])

    return backwardmatrix

                                

def test_alphabeta():

    observations = [3,1,3]

    trans = numpy.matrix("0.7 0.3; 0.4 0.6")

    emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")

    initialprob = numpy.array([0.8, 0.2])

    numstates = 2

    obs_indices = { 1 : 0, 2 : 1, 3: 2}

    print("FORWARD")

    print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))

    print("\n")

   

    print('BACKWARD')

    print(backwardprobs(observations, trans, emis, numstates, obs_indices))

    print("\n")

   

####

# expectation step:

# re-estimate xi_t(i, j) and gamma_t(j)

# returns two things:

# - gamma is a (N, T) numpy matrix

# - xi is a list of T numpy matrices of size (N, N)

def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):

    # denominator: P(O | HMM)

    p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])

   

    # computing xi

    xi = [ ]

    for t in range(len(observations) - 1):

        obs_index = observation_indices[observations[t+1]]

       

        xi_t = numpy.zeros((numstates, numstates))

       

        for s_i in range(numstates):

            for s_j in range(numstates):

                xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm

        xi.append(xi_t)

    # computing gamma

    gamma = numpy.zeros((numstates + 2, len(observations)))

    for t in range(len(observations) - 1):

        for s_i in range(numstates):

            gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])

    for s_j in range(numstates):

        gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )

           

    return (gamma, xi)

###

# maximization step:

# re-estimate trans, emis based on gamma, xi

# returns:

# - initialprob

# - trans

# - emis

def maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):

    # re-estimate initial probabilities

    initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])

   

    # re-estimate emission probabilities

    emis = numpy.zeros((numstates, vocabsize))

    for s in range(numstates):

        denominator = sum( [gamma[s, t] for t in range(len(observations))])

        for vocab_item, obs_index in observation_indices.items():

            emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator

    # re-estimate transition probabilities

    trans = numpy.zeros((numstates, numstates))

    for s_i in range(numstates):

        denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])

       

        for s_j in range(numstates):

            trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator

    return (initialprob, trans, emis)

##########

# testing forward/backward

def test_forwardbackward(numobservations, numiter):

    ########

    # generate observation

    observations, truenumhot, truenumcold = generate_observations(numobservations)

    obs_indices = { 1 : 0, 2 : 1, 3: 2}

    numstates = 2

    vocabsize = 3

    #####

    # HMM initialization

   

    # initialize initial probs

    unnormalized = numpy.random.rand(numstates)

    initialprob = unnormalized / sum(unnormalized)

   

    # initialize emission probs

    emis = numpy.zeros((numstates, vocabsize))

    for s in range(numstates):

        unnormalized = numpy.random.rand(vocabsize)

        emis[s] = unnormalized / sum(unnormalized)

   

    # initialize transition probs

    trans = numpy.zeros((numstates, numstates))

    for s in range(numstates):

        unnormalized = numpy.random.rand(numstates)

        trans[s] = unnormalized / sum(unnormalized)

    print("OBSERVATIONS:")

    print(observations)

    print("\n")

   

    print("Random initialization:")

    print("INITIALPROB")

    print(initialprob)

    print("\n")

    print("EMIS")

    print(emis)

    print("\n")

    print("TRANS")

    print(trans)

    print("\n")

    input()

   

    for iteration in range(numiter):

   

        forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)

        backward = backwardprobs(observations, trans, emis, numstates, obs_indices)

        gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)

        initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)

        print("Re-computed:")

        print("INITIALPROB")

        print(initialprob)

        print("\n")

        print("EMIS")

        print(emis)

        print("\n")

        print("TRANS")

        print(trans)

        print("\n")

   

        print("GAMMA(1)")

        print(gamma[0])

        print("\n")

       

        print("GAMMA(2)")

        print(gamma[1])

        print("\n")

       

        # the first truenumhot observations were generated from the "hot" state.

        # what is the probability of being in state 1 for the first

        # truenumhot observations as opposed to the rest

        avgprob_state1_for_truehot = sum(gamma[0][:truenumhot]) / truenumhot

        avgprob_state1_for_truecold = sum(gamma[0][truenumhot:]) / truenumcold

        print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)

        print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)

        # plot observations and probabilities of being in certain states

        from matplotlib import interactive

        interactive(True)

        xpoints = numpy.arange(len(observations))

        fig, ax1 = plt.subplots()

        ax1.plot(xpoints, observations, "b-")

        plt.ylim([0, 4])

        ax1.set_xlabel("timepoints")

        ax1.set_ylabel("observations", color = "b")

   

        ax2 = ax1.twinx()

        ax2.plot(xpoints, gamma[0], "r-")

        plt.ylim([0.0, 1.0])

        ax2.set_ylabel("prob", color = "r")

        plt.show()

        input()

        plt.close()