Demo: the forward-backward algorithm
import random
import numpy
import matplotlib.pyplot as plt
###########################
## generating the data
# generate 2/3 n from hot, then 1/3 n from cold.
# returns: observations, #hot, #cold
def generate_observations(n):
# probabilities of ice cream amounts given hot / cold:
# shown here as amounts of 1's, 2's, 3's out of 10 days
hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]
cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]
observations = [ ]
# choose 2n observations from "hot"
numhot = int(2.0/3 * n)
for i in range(numhot): observations.append(random.choice(hot))
# choose n observations from "cold"
numcold = n - numhot
for i in range(numcold): observations.append(random.choice(cold))
return (observations, numhot, numcold)
###########################
# data structures:
#
# numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.
#
# emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)
# where entry (j, o) has emis_j(o).
#
# transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).
#
# forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability
# for state s and observation t
################
# computing forward and backward probabilities
# forward:
# alpha_t(j) = P(o1, ..., ot, qt = j | HMM)
# forward function: returns numpy matrix of size (N, T)
def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):
forwardmatrix = numpy.zeros((numstates, len(observations)))
# initialization
obs_index = observation_indices[ observations[0]]
for s in range(numstates):
forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]
# recursion step
for t in range(1, len(observations)):
obs_index = observation_indices[ observations[t]]
for s in range(numstates):
forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \
for s2 in range(numstates)])
return forwardmatrix
# beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)
# backward function: returns numpy matrix of size (N, T)
def backwardprobs(observations, trans, emis, numstates, observation_indices):
backwardmatrix = numpy.zeros((numstates, len(observations)))
# initialization
for s in range(numstates):
backwardmatrix[ s, len(observations) - 1 ] = 1.0
# recursion
for t in range(len(observations) - 2, -1, -1):
obs_index = observation_indices[ observations[t+1]]
for s in range(numstates):
backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \
for s2 in range(numstates) ])
return backwardmatrix
def test_alphabeta():
observations = [3,1,3]
trans = numpy.matrix("0.7 0.3; 0.4 0.6")
emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")
initialprob = numpy.array([0.8, 0.2])
numstates = 2
obs_indices = { 1 : 0, 2 : 1, 3: 2}
print("FORWARD")
print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))
print("\n")
print('BACKWARD')
print(backwardprobs(observations, trans, emis, numstates, obs_indices))
print("\n")
####
# expectation step:
# re-estimate xi_t(i, j) and gamma_t(j)
# returns two things:
# - gamma is a (N, T) numpy matrix
# - xi is a list of T numpy matrices of size (N, N)
def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):
# denominator: P(O | HMM)
p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])
# computing xi
xi = [ ]
for t in range(len(observations) - 1):
obs_index = observation_indices[observations[t+1]]
xi_t = numpy.zeros((numstates, numstates))
for s_i in range(numstates):
for s_j in range(numstates):
xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm
xi.append(xi_t)
# computing gamma
gamma = numpy.zeros((numstates + 2, len(observations)))
for t in range(len(observations) - 1):
for s_i in range(numstates):
gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])
for s_j in range(numstates):
gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )
return (gamma, xi)
###
# maximization step:
# re-estimate trans, emis based on gamma, xi
# returns:
# - initialprob
# - trans
# - emis
def maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):
# re-estimate initial probabilities
initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])
# re-estimate emission probabilities
emis = numpy.zeros((numstates, vocabsize))
for s in range(numstates):
denominator = sum( [gamma[s, t] for t in range(len(observations))])
for vocab_item, obs_index in observation_indices.items():
emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator
# re-estimate transition probabilities
trans = numpy.zeros((numstates, numstates))
for s_i in range(numstates):
denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])
for s_j in range(numstates):
trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator
return (initialprob, trans, emis)
##########
# testing forward/backward
def test_forwardbackward(numobservations, numiter):
########
# generate observation
observations, truenumhot, truenumcold = generate_observations(numobservations)
obs_indices = { 1 : 0, 2 : 1, 3: 2}
numstates = 2
vocabsize = 3
#####
# HMM initialization
# initialize initial probs
unnormalized = numpy.random.rand(numstates)
initialprob = unnormalized / sum(unnormalized)
# initialize emission probs
emis = numpy.zeros((numstates, vocabsize))
for s in range(numstates):
unnormalized = numpy.random.rand(vocabsize)
emis[s] = unnormalized / sum(unnormalized)
# initialize transition probs
trans = numpy.zeros((numstates, numstates))
for s in range(numstates):
unnormalized = numpy.random.rand(numstates)
trans[s] = unnormalized / sum(unnormalized)
print("OBSERVATIONS:")
print(observations)
print("\n")
print("Random initialization:")
print("INITIALPROB")
print(initialprob)
print("\n")
print("EMIS")
print(emis)
print("\n")
print("TRANS")
print(trans)
print("\n")
input()
for iteration in range(numiter):
forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)
backward = backwardprobs(observations, trans, emis, numstates, obs_indices)
gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)
initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)
print("Re-computed:")
print("INITIALPROB")
print(initialprob)
print("\n")
print("EMIS")
print(emis)
print("\n")
print("TRANS")
print(trans)
print("\n")
print("GAMMA(1)")
print(gamma[0])
print("\n")
print("GAMMA(2)")
print(gamma[1])
print("\n")
# the first truenumhot observations were generated from the "hot" state.
# what is the probability of being in state 1 for the first
# truenumhot observations as opposed to the rest
avgprob_state1_for_truehot = sum(gamma[0][:truenumhot]) / truenumhot
avgprob_state1_for_truecold = sum(gamma[0][truenumhot:]) / truenumcold
print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)
print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)
# plot observations and probabilities of being in certain states
from matplotlib import interactive
interactive(True)
xpoints = numpy.arange(len(observations))
fig, ax1 = plt.subplots()
ax1.plot(xpoints, observations, "b-")
plt.ylim([0, 4])
ax1.set_xlabel("timepoints")
ax1.set_ylabel("observations", color = "b")
ax2 = ax1.twinx()
ax2.plot(xpoints, gamma[0], "r-")
plt.ylim([0.0, 1.0])
ax2.set_ylabel("prob", color = "r")
plt.show()
input()
plt.close()