Courses‎ > ‎Python worksheets‎ > ‎

### Demo: the forward-backward algorithm

 `import randomimport numpyimport matplotlib.pyplot as plt############################# generating the data# generate 2/3 n from hot, then 1/3 n from cold.# returns: observations, #hot, #colddef generate_observations(n):    # probabilities of ice cream amounts given hot / cold:    # shown here as amounts of 1's, 2's, 3's out of 10 days    hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]    cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]    observations  = [ ]    # choose 2n observations from "hot"    numhot = int(2.0/3 * n)    for i in range(numhot): observations.append(random.choice(hot))    # choose n observations from "cold"    numcold = n - numhot    for i in range(numcold): observations.append(random.choice(cold))    return (observations, numhot, numcold)############################ data structures:## numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.# # emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)# where entry (j, o) has emis_j(o). ## transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).## forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability# for state s and observation t################# computing forward and backward probabilities# forward:# alpha_t(j) = P(o1, ..., ot, qt = j | HMM)# forward function: returns numpy matrix of size (N, T)def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):    forwardmatrix = numpy.zeros((numstates, len(observations)))    # initialization    obs_index = observation_indices[ observations]    for s in range(numstates):        forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]    # recursion step    for t in range(1, len(observations)):        obs_index = observation_indices[ observations[t]]        for s in range(numstates):            forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \                                       for s2 in range(numstates)])    return forwardmatrix# beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)# backward function: returns numpy matrix of size (N, T)def backwardprobs(observations, trans, emis, numstates, observation_indices):    backwardmatrix = numpy.zeros((numstates, len(observations)))    # initialization    for s in range(numstates):        backwardmatrix[ s, len(observations) - 1 ] = 1.0    # recursion    for t in range(len(observations) - 2, -1, -1):        obs_index = observation_indices[ observations[t+1]]        for s in range(numstates):            backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \                                         for s2 in range(numstates) ])    return backwardmatrix                                 def test_alphabeta():    observations = [3,1,3]    trans = numpy.matrix("0.7 0.3; 0.4 0.6")    emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")    initialprob = numpy.array([0.8, 0.2])    numstates = 2    obs_indices = { 1 : 0, 2 : 1, 3: 2}    print("FORWARD")    print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))    print("\n")        print('BACKWARD')    print(backwardprobs(observations, trans, emis, numstates, obs_indices))    print("\n")    ##### expectation step:# re-estimate xi_t(i, j) and gamma_t(j)# returns two things:# - gamma is a (N, T) numpy matrix# - xi is a list of T numpy matrices of size (N, N)def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):    # denominator: P(O | HMM)    p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])        # computing xi    xi = [ ]    for t in range(len(observations) - 1):        obs_index = observation_indices[observations[t+1]]                xi_t = numpy.zeros((numstates, numstates))                for s_i in range(numstates):            for s_j in range(numstates):                xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm        xi.append(xi_t)    # computing gamma    gamma = numpy.zeros((numstates + 2, len(observations)))    for t in range(len(observations) - 1):        for s_i in range(numstates):            gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])    for s_j in range(numstates):        gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )                return (gamma, xi)#### maximization step:# re-estimate trans, emis based on gamma, xi# returns:# - initialprob# - trans# - emisdef maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):    # re-estimate initial probabilities    initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])        # re-estimate emission probabilities    emis = numpy.zeros((numstates, vocabsize))    for s in range(numstates):        denominator = sum( [gamma[s, t] for t in range(len(observations))])        for vocab_item, obs_index in observation_indices.items():            emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator    # re-estimate transition probabilities    trans = numpy.zeros((numstates, numstates))    for s_i in range(numstates):        denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])                for s_j in range(numstates):            trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator    return (initialprob, trans, emis)########### testing forward/backwarddef test_forwardbackward(numobservations, numiter):    ########    # generate observation    observations, truenumhot, truenumcold = generate_observations(numobservations)    obs_indices = { 1 : 0, 2 : 1, 3: 2}    numstates = 2    vocabsize = 3    #####    # HMM initialization        # initialize initial probs    unnormalized = numpy.random.rand(numstates)    initialprob = unnormalized / sum(unnormalized)        # initialize emission probs    emis = numpy.zeros((numstates, vocabsize))    for s in range(numstates):        unnormalized = numpy.random.rand(vocabsize)        emis[s] = unnormalized / sum(unnormalized)        # initialize transition probs    trans = numpy.zeros((numstates, numstates))    for s in range(numstates):        unnormalized = numpy.random.rand(numstates)        trans[s] = unnormalized / sum(unnormalized)    print("OBSERVATIONS:")    print(observations)    print("\n")        print("Random initialization:")    print("INITIALPROB")    print(initialprob)    print("\n")    print("EMIS")    print(emis)    print("\n")    print("TRANS")    print(trans)    print("\n")    input()        for iteration in range(numiter):            forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)        backward = backwardprobs(observations, trans, emis, numstates, obs_indices)        gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)        initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)        print("Re-computed:")        print("INITIALPROB")        print(initialprob)        print("\n")        print("EMIS")        print(emis)        print("\n")        print("TRANS")        print(trans)        print("\n")            print("GAMMA(1)")        print(gamma)        print("\n")                print("GAMMA(2)")        print(gamma)        print("\n")                # the first truenumhot observations were generated from the "hot" state.        # what is the probability of being in state 1 for the first        # truenumhot observations as opposed to the rest        avgprob_state1_for_truehot = sum(gamma[:truenumhot]) / truenumhot        avgprob_state1_for_truecold = sum(gamma[truenumhot:]) / truenumcold        print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)        print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)        # plot observations and probabilities of being in certain states        from matplotlib import interactive        interactive(True)        xpoints = numpy.arange(len(observations))        fig, ax1 = plt.subplots()        ax1.plot(xpoints, observations, "b-")        plt.ylim([0, 4])        ax1.set_xlabel("timepoints")        ax1.set_ylabel("observations", color = "b")            ax2 = ax1.twinx()        ax2.plot(xpoints, gamma, "r-")        plt.ylim([0.0, 1.0])        ax2.set_ylabel("prob", color = "r")        plt.show()        input()        plt.close()    `