網上關於HMM的學習資料、博客有很多,基本都是左邊摘抄一點,右邊摘抄一點,這里一個圖,那里一個圖,公式中有的變量說不清道不明,學起來很費勁。
經過瀏覽幾篇博文(其實有的地方寫的也比較亂),在7張4開的草稿紙上寫公式、單步跟蹤程序,終於還是搞清楚了HMM的原理。
HMM學習過程:
1、搜索相關博客:
隱馬爾可夫模型[博客](圖示比較詳細,前部分還可以,后部分公式有點亂):http://www.leexiang.com/hidden-markov-model
HMM簡介、forward算法和viterbi算法[博客](含源碼,算法描述不是很清晰,但是有源碼可看)http://www.cnblogs.com/zhangchaoyang/articles/2219571.html
forward-backward算法[博客](含源碼,算法描述不是很清晰,但是有源碼可看):http://www.cnblogs.com/zhangchaoyang/articles/2220398.html
隱馬爾科夫模型PPT—劉秉權[百度文庫](算法流程、公式、參數都比較詳細,有理論基礎之后是很好的總結資源,但是沒有具體例子,無基礎的同學學習起來不是很形象。):http://wenku.baidu.com/view/2f0d944769eae009581bec04.html
----其他代碼資源(沒有理論基礎,只看代碼很難看懂HMM的原理)---
UMDHMM的C語言實現:http://www.kanungo.com/software/umdhmm-v1.02.zip
GitHub上一個UMDHMM的Python實現:https://github.com/dkyang/UMDHMM-python
2、根據隱馬爾科夫模型PPT—劉秉權[百度文庫],在5張4開草稿紙上把HMM流程順一遍,下邊是整理的筆記:
HMM三個算法的作用:
forward算法:(評估)給定一HMM模型,計算一觀察序列O1O2...OLEN出現的概率。
viterbi算法:(解碼)給定一HMM模型,計算一觀察序列O1O2...OLEN對應的最可能的隱藏序列H1H2...HLEN及該隱藏序列出現的概率。
forward-backward算法:(學習)給定一觀察序列O1O2...OLEN,求解能夠擬合這個序列的HMM模型。
HMM三個算法之間的關系:
forward算法中的forward變量就是forward-backforward算法中的forward變量,而backward變量與forward變量是類似的;
forward-backward算法是為了通過類似最大似然估計的方法找到局部最優的模型參數,在迭代過程中forward變量和backward變量起了很大作用;
viterbi算法和forward算法很相似,只是forward算法迭代過程需要的是sum,viterbi算法迭代過程需要的是max,而且viterbi算法除了輸出概率,還要用逆推過程求解路徑;
當用forwar-backward算法求解出模型參數之后,用戶給出一個觀察序列,用viterbi算法就能求出最可能的隱藏序列以及概率了。
首先是forward算法的Python實現:
#-*-coding:utf-8-*- __author__ = 'ZhangHe' def forward(N,M,A,B,P,observed): p = 0.0 #觀察到的狀態數目 LEN = len(observed) #中間概率LEN*M Q = [([0]*N) for i in range(LEN)] #第一個觀察到的狀態,狀態的初始概率乘上隱藏狀態到觀察狀態的條件概率。 for j in range(N): Q[0][j] = P[j]*B[j][observation.index(observed[0])] #第一個之后的狀態,首先從前一天的每個狀態,轉移到當前狀態的概率求和,然后乘上隱藏狀態到觀察狀態的條件概率。 for i in range(1,LEN): for j in range(N): sum = 0.0 for k in range(N): sum += Q[i-1][k]*A[k][j] Q[i][j] = sum * B[j][observation.index(observed[i])] for i in range(N): p += Q[LEN-1][i] return p # 3 種隱藏層狀態:sun cloud rain hidden = [] hidden.append('sun') hidden.append('cloud') hidden.append('rain') N = len(hidden) # 4 種觀察層狀態:dry dryish damp soggy observation = [] observation.append('dry') observation.append('damp') observation.append('soggy') M = len(observation) # 初始狀態矩陣(1*N第一天是sun,cloud,rain的概率) P = (0.3,0.3,0.4) # 狀態轉移矩陣A(N*N 隱藏層狀態之間互相轉變的概率) A=((0.2,0.3,0.5),(0.1,0.5,0.4),(0.6,0.1,0.3)) # 混淆矩陣B(N*M 隱藏層狀態對應的觀察層狀態的概率) B=((0.1,0.5,0.4),(0.2,0.4,0.4),(0.3,0.6,0.1)) #假設觀察到一組序列為observed,輸出HMM模型(N,M,A,B,P)產生觀察序列observed的概率 observed = ['dry'] print forward(N,M,A,B,P,observed) observed = ['damp'] print forward(N,M,A,B,P,observed) observed = ['dry','damp'] print forward(N,M,A,B,P,observed) observed = ['dry','damp','soggy'] print forward(N,M,A,B,P,observed)
輸出結果:
0.21
0.51
0.1074
0.030162
其中前兩個結果和手工計算的一樣;
后兩個結果沒有手工計算,但是在調試程序過程中單步跟蹤運行代碼,運行過程與手工計算過程相同。
然后是Viterbi算法的Python實現:
def viterbi(N,M,A,B,P,hidden,observed): sta = [] LEN = len(observed) Q = [([0]*N) for i in range(LEN)] path = [([0]*N) for i in range(LEN)] #第一天計算,狀態的初始概率*隱藏狀態到觀察狀態的條件概率 for j in range(N): Q[0][j]=P[j]*B[j][observation.index(observed[0])] path[0][j] = -1 # 第一天以后的計算 # 前一天的每個狀態轉移到當前狀態的概率最大值 # * # 隱藏狀態到觀察狀態的條件概率 for i in range(1,LEN): for j in range(N): max = 0.0 index = 0 for k in range(N): if(Q[i-1][k]*A[k][j] > max): max = Q[i-1][k]*A[k][j] index = k Q[i][j] = max * B[j][observation.index(observed[i])] path[i][j] = index #找到最后一天天氣呈現哪種觀察狀態的概率最大 max = 0.0 idx = 0 for i in range(N): if(Q[LEN-1][i]>max): max = Q[LEN-1][i] idx = i print "最可能隱藏序列的概率:"+str(max) sta.append(hidden[idx]) #逆推回去找到每天出現哪個隱藏狀態的概率最大 for i in range(LEN-1,0,-1): idx = path[i][idx] sta.append(hidden[idx]) sta.reverse() return sta; # 3 種隱藏層狀態:sun cloud rain hidden = [] hidden.append('sun') hidden.append('cloud') hidden.append('rain') N = len(hidden) # 4 種觀察層狀態:dry dryish damp soggy observation = [] observation.append('dry') observation.append('damp') observation.append('soggy') M = len(observation) # 初始狀態矩陣(1*N第一天是sun,cloud,rain的概率) P = (0.3,0.3,0.4) # 狀態轉移矩陣A(N*N 隱藏層狀態之間互相轉變的概率) A=((0.2,0.3,0.5),(0.1,0.5,0.4),(0.6,0.1,0.3)) # 混淆矩陣B(N*M 隱藏層狀態對應的觀察層狀態的概率) B=((0.1,0.5,0.4),(0.2,0.4,0.4),(0.3,0.6,0.1)) #假設觀察到一組序列為observed,輸出HMM模型(N,M,A,B,P)產生觀察序列observed的概率 observed = ['dry','damp','soggy'] print viterbi(N,M,A,B,P,hidden,observed)
輸出:
最可能隱藏序列的概率:0.005184
['rain', 'rain', 'sun']
GITHUB上一個Python實現的完整HMM:
import numpy as np DELTA = 0.001 class HMM: def __init__(self, pi, A, B): self.pi = pi self.A = A self.B = B self.M = B.shape[1] self.N = A.shape[0] def forward(self,obs): T = len(obs) N = self.N alpha = np.zeros([N,T]) alpha[:,0] = self.pi[:] * self.B[:,obs[0]-1] for t in xrange(1,T): for n in xrange(0,N): alpha[n,t] = np.sum(alpha[:,t-1] * self.A[:,n]) * self.B[n,obs[t]-1] prob = np.sum(alpha[:,T-1]) return prob, alpha def forward_with_scale(self, obs): """see scaling chapter in "A tutorial on hidden Markov models and selected applications in speech recognition." """ T = len(obs) N = self.N alpha = np.zeros([N,T]) scale = np.zeros(T) alpha[:,0] = self.pi[:] * self.B[:,obs[0]-1] scale[0] = np.sum(alpha[:,0]) alpha[:,0] /= scale[0] for t in xrange(1,T): for n in xrange(0,N): alpha[n,t] = np.sum(alpha[:,t-1] * self.A[:,n]) * self.B[n,obs[t]-1] scale[t] = np.sum(alpha[:,t]) alpha[:,t] /= scale[t] logprob = np.sum(np.log(scale[:])) return logprob, alpha, scale def backward(self, obs): T = len(obs) N = self.N beta = np.zeros([N,T]) beta[:,T-1] = 1 for t in reversed(xrange(0,T-1)): for n in xrange(0,N): beta[n,t] = np.sum(self.B[:,obs[t+1]-1] * self.A[n,:] * beta[:,t+1]) prob = np.sum(beta[:,0]) return prob, beta def backward_with_scale(self, obs, scale): T = len(obs) N = self.N beta = np.zeros([N,T]) beta[:,T-1] = 1 / scale[T-1] for t in reversed(xrange(0,T-1)): for n in xrange(0,N): beta[n,t] = np.sum(self.B[:,obs[t+1]-1] * self.A[n,:] * beta[:,t+1]) beta[n,t] /= scale[t] return beta def viterbi(self, obs): T = len(obs) N = self.N psi = np.zeros([N,T]) # reverse pointer delta = np.zeros([N,T]) q = np.zeros(T) temp = np.zeros(N) delta[:,0] = self.pi[:] * self.B[:,obs[0]-1] for t in xrange(1,T): for n in xrange(0,N): temp = delta[:,t-1] * self.A[:,n] max_ind = argmax(temp) psi[n,t] = max_ind delta[n,t] = self.B[n,obs[t]-1] * temp[max_ind] max_ind = argmax(delta[:,T-1]) q[T-1] = max_ind prob = delta[:,T-1][max_ind] for t in reversed(xrange(1,T-1)): q[t] = psi[q[t+1],t+1] return prob, q, delta def viterbi_log(self, obs): T = len(obs) N = self.N psi = np.zeros([N,T]) delta = np.zeros([N,T]) pi = np.zeros(self.pi.shape) A = np.zeros(self.A.shape) biot = np.zeros([N,T]) pi = np.log(self.pi) A = np.log(self.A) biot = np.log(self.B[:,obs[:]-1]) delta[:,0] = pi[:] + biot[:,0] for t in xrange(1,T): for n in xrange(0,N): temp = delta[:,t-1] + self.A[:,n] max_ind = argmax(temp) psi[n,t] = max_ind delta[n,t] = temp[max_ind] + biot[n,t] max_ind = argmax(delta[:,T-1]) q[T-1] = max_ind logprob = delta[max_ind,T-1] for t in reversed(xrange(1,T-1)): q[t] = psi[q[t+1],t+1] return logprob, q, delta def baum_welch(self, obs): T = len(obs) M = self.M N = self.N alpha = np.zeros([N,T]) beta = np.zeros([N,T]) scale = np.zeros(T) gamma = np.zeros([N,T]) xi = np.zeros([N,N,T-1]) # caculate initial parameters logprobprev, alpha, scale = self.forward_with_scale(obs) beta = self.backward_with_scale(obs, scale) gamma = self.compute_gamma(alpha, beta) xi = self.compute_xi(obs, alpha, beta) logprobinit = logprobprev # start interative while True: # E-step self.pi = 0.001 + 0.999*gamma[:,0] for i in xrange(N): denominator = np.sum(gamma[i,0:T-1]) for j in xrange(N): numerator = np.sum(xi[i,j,0:T-1]) self.A[i,j] = numerator / denominator self.A = 0.001 + 0.999*self.A for j in xrange(0,N): denominator = np.sum(gamma[j,:]) for k in xrange(0,M): numerator = 0.0 for t in xrange(0,T): if obs[t]-1 == k: numerator += gamma[j,t] self.B[j,k] = numerator / denominator self.B = 0.001 + 0.999*self.B # M-step logprobcur, alpha, scale = self.forward_with_scale(obs) beta = self.backward_with_scale(obs, scale) gamma = self.compute_gamma(alpha, beta) xi = self.compute_xi(obs, alpha, beta) delta = logprobcur - logprobprev logprobprev = logprobcur # print "delta is ", delta if delta <= DELTA: break logprobfinal = logprobcur return logprobinit, logprobfinal def compute_gamma(self, alpha, beta): gamma = np.zeros(alpha.shape) gamma = alpha[:,:] * beta[:,:] gamma = gamma / np.sum(gamma,0) return gamma def compute_xi(self, obs, alpha, beta): T = len(obs) N = self.N xi = np.zeros((N, N, T-1)) for t in xrange(0,T-1): for i in xrange(0,N): for j in xrange(0,N): xi[i,j,t] = alpha[i,t] * self.A[i,j] * \ self.B[j,obs[t+1]-1] * beta[j,t+1] xi[:,:,t] /= np.sum(np.sum(xi[:,:,t],1),0) return xi def read_hmm(hmmfile): fhmm = open(hmmfile,'r') M = int(fhmm.readline().split(' ')[1]) N = int(fhmm.readline().split(' ')[1]) A = np.array([]) fhmm.readline() for i in xrange(N): line = fhmm.readline() if i == 0: A = np.array(map(float,line.split(','))) else: A = np.vstack((A,map(float,line.split(',')))) B = np.array([]) fhmm.readline() for i in xrange(N): line = fhmm.readline() if i == 0: B = np.array(map(float,line.split(','))) else: B = np.vstack((B,map(float,line.split(',')))) fhmm.readline() line = fhmm.readline() pi = np.array(map(float,line.split(','))) fhmm.close() return M, N, pi, A, B def read_sequence(seqfile): fseq = open(seqfile,'r') T = int(fseq.readline().split(' ')[1]) line = fseq.readline() obs = np.array(map(int,line.split(','))) fseq.close() return T, obs