Demo: the forward-backward algorithm

import random

import numpy

import matplotlib.pyplot as plt

###########################

## generating the data

# generate 2/3 n from hot, then 1/3 n from cold.

# returns: observations, #hot, #cold

def generate_observations(n):

# probabilities of ice cream amounts given hot / cold:

# shown here as amounts of 1's, 2's, 3's out of 10 days

hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]

cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]

observations = [ ]

# choose 2n observations from "hot"

numhot = int(2.0/3 * n)

for i in range(numhot): observations.append(random.choice(hot))

# choose n observations from "cold"

numcold = n - numhot

for i in range(numcold): observations.append(random.choice(cold))

return (observations, numhot, numcold)

###########################

# data structures:

#

# numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.

#

# emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)

# where entry (j, o) has emis_j(o).

#

# transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).

#

# forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability

# for state s and observation t

################

# computing forward and backward probabilities

# forward:

# alpha_t(j) = P(o1, ..., ot, qt = j | HMM)

# forward function: returns numpy matrix of size (N, T)

def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):

forwardmatrix = numpy.zeros((numstates, len(observations)))

# initialization

obs_index = observation_indices[ observations[0]]

for s in range(numstates):

forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]

# recursion step

for t in range(1, len(observations)):

obs_index = observation_indices[ observations[t]]

for s in range(numstates):

forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \

for s2 in range(numstates)])

return forwardmatrix

# beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)

# backward function: returns numpy matrix of size (N, T)

def backwardprobs(observations, trans, emis, numstates, observation_indices):

backwardmatrix = numpy.zeros((numstates, len(observations)))

# initialization

for s in range(numstates):

backwardmatrix[ s, len(observations) - 1 ] = 1.0

# recursion

for t in range(len(observations) - 2, -1, -1):

obs_index = observation_indices[ observations[t+1]]

for s in range(numstates):

backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \

for s2 in range(numstates) ])

return backwardmatrix

def test_alphabeta():

observations = [3,1,3]

trans = numpy.matrix("0.7 0.3; 0.4 0.6")

emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")

initialprob = numpy.array([0.8, 0.2])

numstates = 2

obs_indices = { 1 : 0, 2 : 1, 3: 2}

print("FORWARD")

print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))

print("\n")

print('BACKWARD')

print(backwardprobs(observations, trans, emis, numstates, obs_indices))

print("\n")

####

# expectation step:

# re-estimate xi_t(i, j) and gamma_t(j)

# returns two things:

# - gamma is a (N, T) numpy matrix

# - xi is a list of T numpy matrices of size (N, N)

def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):

# denominator: P(O | HMM)

p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])

# computing xi

xi = [ ]

for t in range(len(observations) - 1):

obs_index = observation_indices[observations[t+1]]

xi_t = numpy.zeros((numstates, numstates))

for s_i in range(numstates):

for s_j in range(numstates):

xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm

xi.append(xi_t)

# computing gamma

gamma = numpy.zeros((numstates + 2, len(observations)))

for t in range(len(observations) - 1):

for s_i in range(numstates):

gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])

for s_j in range(numstates):

gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )

return (gamma, xi)

###

# maximization step:

# re-estimate trans, emis based on gamma, xi

# returns:

# - initialprob

# - trans

# - emis

def maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):

# re-estimate initial probabilities

initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])

# re-estimate emission probabilities

emis = numpy.zeros((numstates, vocabsize))

for s in range(numstates):

denominator = sum( [gamma[s, t] for t in range(len(observations))])

for vocab_item, obs_index in observation_indices.items():

emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator

# re-estimate transition probabilities

trans = numpy.zeros((numstates, numstates))

for s_i in range(numstates):

denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])

for s_j in range(numstates):

trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator

return (initialprob, trans, emis)

##########

# testing forward/backward

def test_forwardbackward(numobservations, numiter):

########

# generate observation

observations, truenumhot, truenumcold = generate_observations(numobservations)

obs_indices = { 1 : 0, 2 : 1, 3: 2}

numstates = 2

vocabsize = 3

#####

# HMM initialization

# initialize initial probs

unnormalized = numpy.random.rand(numstates)

initialprob = unnormalized / sum(unnormalized)

# initialize emission probs

emis = numpy.zeros((numstates, vocabsize))

for s in range(numstates):

unnormalized = numpy.random.rand(vocabsize)

emis[s] = unnormalized / sum(unnormalized)

# initialize transition probs

trans = numpy.zeros((numstates, numstates))

for s in range(numstates):

unnormalized = numpy.random.rand(numstates)

trans[s] = unnormalized / sum(unnormalized)

print("OBSERVATIONS:")

print(observations)

print("\n")

print("Random initialization:")

print("INITIALPROB")

print(initialprob)

print("\n")

print("EMIS")

print(emis)

print("\n")

print("TRANS")

print(trans)

print("\n")

input()

for iteration in range(numiter):

forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)

backward = backwardprobs(observations, trans, emis, numstates, obs_indices)

gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)

initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)

print("Re-computed:")

print("INITIALPROB")

print(initialprob)

print("\n")

print("EMIS")

print(emis)

print("\n")

print("TRANS")

print(trans)

print("\n")

print("GAMMA(1)")

print(gamma[0])

print("\n")

print("GAMMA(2)")

print(gamma[1])

print("\n")

# the first truenumhot observations were generated from the "hot" state.

# what is the probability of being in state 1 for the first

# truenumhot observations as opposed to the rest

avgprob_state1_for_truehot = sum(gamma[0][:truenumhot]) / truenumhot

avgprob_state1_for_truecold = sum(gamma[0][truenumhot:]) / truenumcold

print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)

print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)

# plot observations and probabilities of being in certain states

from matplotlib import interactive

interactive(True)

xpoints = numpy.arange(len(observations))

fig, ax1 = plt.subplots()

ax1.plot(xpoints, observations, "b-")

plt.ylim([0, 4])

ax1.set_xlabel("timepoints")

ax1.set_ylabel("observations", color = "b")

ax2 = ax1.twinx()

ax2.plot(xpoints, gamma[0], "r-")

plt.ylim([0.0, 1.0])

ax2.set_ylabel("prob", color = "r")

plt.show()

input()

plt.close()