摘要:
本文主要介紹隱馬爾可夫模型HMM的python實現,參考的文獻主要是:
[1]. Lawrence R. Rabiner, ‘A Tutorial on Hidden Markov Models and Selected Applications in Speech Recognition,’ Proceedings of the IEEE, Vol. 77, No. 2, pp. 257-286, February, 1989
[2]. Dawei Shen, 'Some Mathematics for HMM' October 13th, 2008
[3]. 統計學習方法, 李航
[4]. 數學之美, 吳軍
文獻一是非常經典的Hidden Markov Models的入門文章,思路清晰,講解透徹,且有一定深度;文獻二和三也有很多內容是參考文獻一來的。
文獻二主要的優勢是一些計算式推導出了一些簡便的結果,使代碼實現起來更簡單,從而在另一個角度提供了更清晰的概念。
《統計學習方法》中的第十章專門講了隱馬爾可夫模型,更是用拉格朗日乘子法寫出拉格朗日函數,進而求偏導數的方法推導出Baum-Welch算法的模型參數估計公式,有趣的是,文獻一是用概率理論的方式直接寫出參數估計公式的,兩者的結果當然是完全一樣。另外這本書還提供了一個“盒子和球模型”的實例來說明隱馬爾可夫模型的相關概念,非常便於快速理解隱馬爾可夫模型到底是怎么一回事。讀過這本書的網友應該知道李航老師這本書內容豐富但只有區區兩百多頁,每篇都是干貨滿滿,簡明扼要。當然,有些地方實在“太扼要”了。
《數學之美》這本書第26章比較清晰地說明了維特比Viterbi算法的原理,如果不太理解這個算法的,可以去閱讀這本書。另外,這本書不僅適合初學者或着外行人閱讀並理解相關機器學習的算法,思路;同時,它對有一定基礎的MLer也非常有助益。
本文不會介紹太多概念性的東西,會直接推薦讀者參閱哪份文獻的哪一部分。而且,也會直接引用參考文獻的公式(或等式)來闡述內容。所以想隨本文一起學習的讀者需要准備相關的文獻(前兩份文獻可以直接搜索,兩者都是可以下載的pdf文檔)。
Outline 大綱:
- Notation
- 維特比算法的python實現
- 前向算法 ,后向算法,gamma(γ),xi(ξ),Baum-Welch算法及其python實現
- 帶scale的Baum-Welch算法,多項觀測序列帶scale的Baum-Welch算法,
- 拓展
1. Notation
S={s1,s2,…,sN,} 表示所有可能狀態的集合(《統計學習方法》使用Q來表示),
N 表示可能的狀態數,
V={v1,v2,…,vM,} 表示可能的觀測的集合,
M 表示可能的觀測數,
I 表示長度為T的狀態序列,O是對應的觀測序列,
A 表示狀態轉移概率矩陣,
B 表示觀測概率矩陣,
Pi 表示初始狀態概率向量,
Lambda =(A, B, Pi)表示隱馬爾可夫模型的參數模型。
如果沒有接觸過 或者不熟悉隱馬爾可夫模型(HMM)的讀者 可以參閱《統計學習方法》, 它提供了一個“盒子和球模型”的實例來說明隱馬爾可夫模型的相關概念,非常便於快速理解隱馬爾可夫模型到底是怎么一回事。
2. 維特比算法的python實現
通常來說隱馬爾可夫模型(以下簡稱HMM)有3個基本問題:概率計算問題,學習問題,預測問題(也稱解碼問題)。第一個問題對應前向算法和后向算法;二,三一般使用Baum-Welch算法,維特比Viterbi算法來解決。
大部分時候前向算法和后向算法都是為Baum-Welch算法服務的,而維特比Viterbi算法是單獨存在的,所以我們先講維特比(以下簡稱Viterbi)算法。
Viterbi算法實際是用了動態規划原理簡化了求解的復雜度。之所以說簡化,是相對窮舉法(列出所有可能的狀態序列I,然后取概率最大的一個序列)而言的。
如果不理解Viterbi算法可以參閱《數學之美》第26章。
算法實現就很簡單了,直接參照《統計學習方法》第185頁,算法10.5的步驟。
首先定義一個類:
1 import numpy as np 2 3 class HMM: 4 def __init__(self, Ann, Bnm, Pi, O): 5 self.A = np.array(Ann, np.float) 6 self.B = np.array(Bnm, np.float) 7 self.Pi = np.array(Pi, np.float) 8 self.O = np.array(O, np.float) 9 self.N = self.A.shape[0] 10 self.M = self.B.shape[1]
Viterbi算法作為類HMM的函數,相關代碼如下
1 def viterbi(self): 2 # given O,lambda .finding I 3 4 T = len(self.O) 5 I = np.zeros(T, np.float) 6 7 delta = np.zeros((T, self.N), np.float) 8 psi = np.zeros((T, self.N), np.float) 9 10 for i in range(self.N): 11 delta[0, i] = self.Pi[i] * self.B[i, self.O[0]] 12 psi[0, i] = 0 13 14 for t in range(1, T): 15 for i in range(self.N): 16 delta[t, i] = self.B[i,self.O[t]] * np.array( [delta[t-1,j] * self.A[j,i] 17 for j in range(self.N)] ).max() 18 psi[t,i] = np.array( [delta[t-1,j] * self.A[j,i] 19 for j in range(self.N)] ).argmax() 20 21 P_T = delta[T-1, :].max() 22 I[T-1] = delta[T-1, :].argmax() 23 24 for t in range(T-2, -1, -1): 25 I[t] = psi[t+1, I[t+1]] 26 27 return I
delta,psi 分別是 δ,ψ
其中np, 是import numpy as np, numpy這個包很好用,它的argmax()方法在這里非常實用。
3. 前向算法 ,后向算法,gamma-γ,xi-ξ,Baum_Welch算法及其python實現
HMM的公式推導包含很多概率值,如果你不能比較好地理解概率相關知識的話,相應的公式推導過程會比較難以理解,可以閱讀Bishop寫的《Pattern Recognition And Machine Learning》這本書,當然,在機器學習方面這本書一直都是經典。
前向(forward)概率矩陣alpha-α(公式書寫時它根a很像,注意區分),
后向(backward)概率矩陣beta-β
算法定義和步驟參閱《統計學習方法》第175頁或者文獻二,
相關代碼如下:
def forward(self): T = len(self.O) alpha = np.zeros((T, self.N), np.float) for i in range(self.N): alpha[0,i] = self.Pi[i] * self.B[i, self.O[0]] for t in range(T-1): for i in range(self.N): summation = 0 # for every i 'summation' should reset to '0' for j in range(self.N): summation += alpha[t,j] * self.A[j,i] alpha[t+1, i] = summation * self.B[i, self.O[t+1]] summation = 0.0 for i in range(self.N): summation += alpha[T-1, i] Polambda = summation return Polambda,alpha
def backward(self): T = len(self.O) beta = np.zeros((T, self.N), np.float) for i in range(self.N): beta[T-1, i] = 1.0 for t in range(T-2,-1,-1): for i in range(self.N): summation = 0.0 # for every i 'summation' should reset to '0' for j in range(self.N): summation += self.A[i,j] * self.B[j, self.O[t+1]] * beta[t+1,j] beta[t,i] = summation Polambda = 0.0 for i in range(self.N): Polambda += self.Pi[i] * self.B[i, self.O[0]] * beta[0, i] return Polambda, beta
Polambda表示P(O| λ)
接下來計算gamma-γ和 xi-ξ。 根據《統計學習方法》的公式可以得到如下代碼:
def compute_gamma(self,alpha,beta): T = len(self.O) gamma = np.zeros((T, self.N), np.float) # the probability of Ot=q for t in range(T): for i in range(self.N): gamma[t, i] = alpha[t,i] * beta[t,i] / sum( alpha[t,j] * beta[t,j] for j in range(self.N) ) return gamma
def compute_xi(self,alpha,beta): T = len(self.O) xi = np.zeros((T-1, self.N, self.N), np.float) # note that: not T for t in range(T-1): # note: not T for i in range(self.N): for j in range(self.N): numerator = alpha[t,i] * self.A[i,j] * self.B[j,self.O[t+1]] * beta[t+1,j] # the multiply term below should not be replaced by 'nummerator', # since the 'i,j' in 'numerator' are fixed. # In addition, should not use 'i,j' below, to avoid error and confusion. denominator = sum( sum( alpha[t,i1] * self.A[i1,j1] * self.B[j1,self.O[t+1]] * beta[t+1,j1] for j1 in range(self.N) ) # the second sum for i1 in range(self.N) ) # the first sum xi[t,i,j] = numerator / denominator return xi
注意計算時要傳入參數alpha,beta
然后來實現Baum_Welch算法,根據《統計學習方法》或者文獻二,
首先初始化參數,怎么初始化是很重要。因為Baum_Welch算法(亦是EM算法的一種特殊體現)並不能保證得到全局最優值,很容易就掉到局部最優然后出不來了。
當delta_lambda大於某一值時一直運行下去。
關於x的設置,如果過小,程序容易進入死循環,因為每一次的收斂過程lambda會有比較大的變化,那么當它接近局部/全局最優時,就會在左右徘徊一直是delta_lambda > x。
def Baum_Welch(self): # given O list finding lambda model(can derive T form O list) # also given N, M, T = len(self.O) V = [k for k in range(self.M)] # initialization - lambda self.A = np.array(([[0,1,0,0],[0.4,0,0.6,0],[0,0.4,0,0.6],[0,0,0.5,0.5]]), np.float) self.B = np.array(([[0.5,0.5],[0.3,0.7],[0.6,0.4],[0.8,0.2]]), np.float) # mean value may not be a good choice self.Pi = np.array(([1.0 / self.N] * self.N), np.float) # must be 1.0 , if 1/3 will be 0 # self.A = np.array([[1.0 / self.N] * self.N] * self.N) # must array back, then can use[i,j] # self.B = np.array([[1.0 / self.M] * self.M] * self.N) x = 1 delta_lambda = x + 1 times = 0 # iteration - lambda while delta_lambda > x: # x Polambda1, alpha = self.forward() # get alpha Polambda2, beta = self.backward() # get beta gamma = self.compute_gamma(alpha,beta) # use alpha, beta xi = self.compute_xi(alpha,beta) lambda_n = [self.A,self.B,self.Pi] for i in range(self.N): for j in range(self.N): numerator = sum(xi[t,i,j] for t in range(T-1)) denominator = sum(gamma[t,i] for t in range(T-1)) self.A[i, j] = numerator / denominator for j in range(self.N): for k in range(self.M): numerator = sum(gamma[t,j] for t in range(T) if self.O[t] == V[k] ) # TBD denominator = sum(gamma[t,j] for t in range(T)) self.B[i, k] = numerator / denominator for i in range(self.N): self.Pi[i] = gamma[0,i] # if sum directly, there will be positive and negative offset delta_A = map(abs, lambda_n[0] - self.A) # delta_A is still a matrix delta_B = map(abs, lambda_n[1] - self.B) delta_Pi = map(abs, lambda_n[2] - self.Pi) delta_lambda = sum([ sum(sum(delta_A)), sum(sum(delta_B)), sum(delta_Pi) ]) times += 1 print times return self.A, self.B, self.Pi
4.帶scale的Baum-Welch算法,多項觀測序列帶scale的Baum-Welch算法
理論上來說上面已經完整地用代碼實現了HMM, 然而事實總是沒有那么簡單,后續還有不少問題需要解決,不過這篇文章只提兩點,一個是用scale解決計算過程中容易發送的浮點數下溢問題,另一個是同時輸入多個觀測序列的改進版Baum-Welch算法。
參考文獻二: scaling problem
根據文獻二的公式我們加入scale,重寫forward(),backward(),Baum-Welch() 三個方法。
def forward_with_scale(self): T = len(self.O) alpha_raw = np.zeros((T, self.N), np.float) alpha = np.zeros((T, self.N), np.float) c = [i for i in range(T)] # scaling factor; 0 or sequence doesn't matter for i in range(self.N): alpha_raw[0,i] = self.Pi[i] * self.B[i, self.O[0]] c[0] = 1.0 / sum(alpha_raw[0,i] for i in range(self.N)) for i in range(self.N): alpha[0, i] = c[0] * alpha_raw[0,i] for t in range(T-1): for i in range(self.N): summation = 0.0 for j in range(self.N): summation += alpha[t,j] * self.A[j, i] alpha_raw[t+1, i] = summation * self.B[i, self.O[t+1]] c[t+1] = 1.0 / sum(alpha_raw[t+1,i1] for i1 in range(self.N)) for i in range(self.N): alpha[t+1, i] = c[t+1] * alpha_raw[t+1, i] return alpha, c def backward_with_scale(self,c): T = len(self.O) beta_raw = np.zeros((T, self.N), np.float) beta = np.zeros((T, self.N), np.float) for i in range(self.N): beta_raw[T-1, i] = 1.0 beta[T-1, i] = c[T-1] * beta_raw[T-1, i] for t in range(T-2,-1,-1): for i in range(self.N): summation = 0.0 for j in range(self.N): summation += self.A[i,j] * self.B[j, self.O[t+1]] * beta[t+1,j] beta[t,i] = c[t] * summation # summation = beta_raw[t,i] return beta
def Baum_Welch_with_scale(self): T = len(self.O) V = [k for k in range(self.M)] # initialization - lambda , should be float(need .0) self.A = np.array([[0.2,0.2,0.3,0.3],[0.2,0.1,0.6,0.1],[0.3,0.4,0.1,0.2],[0.3,0.2,0.2,0.3]]) self.B = np.array([[0.5,0.5],[0.3,0.7],[0.6,0.4],[0.8,0.2]]) x = 5 delta_lambda = x + 1 times = 0 # iteration - lambda while delta_lambda > x: # x alpha,c = self.forward_with_scale() beta = self.backward_with_scale(c) lambda_n = [self.A,self.B,self.Pi] for i in range(self.N): for j in range(self.N): numerator_A = sum(alpha[t,i] * self.A[i,j] * self.B[j, self.O[t+1]] * beta[t+1,j] for t in range(T-1)) denominator_A = sum(alpha[t,i] * beta[t,i] / c[t] for t in range(T-1)) self.A[i, j] = numerator_A / denominator_A for j in range(self.N): for k in range(self.M): numerator_B = sum(alpha[t,j] * beta[t,j] / c[t] for t in range(T) if self.O[t] == V[k] ) # TBD denominator_B = sum(alpha[t,j] * beta[t,j] / c[t] for t in range(T)) self.B[j, k] = numerator_B / denominator_B # Pi have no business with c denominator_Pi = sum(alpha[0,j] * beta[0,j] for j in range(self.N)) for i in range(self.N): self.Pi[i] = alpha[0,i] * beta[0,i] / denominator_Pi #self.Pi[i] = gamma[0,i] # if sum directly, there will be positive and negative offset delta_A = map(abs, lambda_n[0] - self.A) # delta_A is still a matrix delta_B = map(abs, lambda_n[1] - self.B) delta_Pi = map(abs, lambda_n[2] - self.Pi) delta_lambda = sum([ sum(sum(delta_A)), sum(sum(delta_B)), sum(delta_Pi) ]) times += 1 print times return self.A, self.B, self.Pi
第二個問題,根據文獻二,我們直接實現帶scale的修改版Baum-Welch算法。為了方便,我們將這個函數單獨出來,寫在HMM類的外面:
# for multiple sequences of observations symbols(with scaling alpha & beta) # out of class HMM, independent function def modified_Baum_Welch_with_scale(O_set): # initialization - lambda A = np.array([[0.2,0.2,0.3,0.3],[0.2,0.1,0.6,0.1],[0.3,0.4,0.1,0.2],[0.3,0.2,0.2,0.3]]) B = np.array([[0.2,0.2,0.3,0.3],[0.2,0.1,0.6,0.1],[0.3,0.4,0.1,0.2],[0.3,0.2,0.2,0.3]]) # B = np.array([[0.5,0.5],[0.3,0.7],[0.6,0.4],[0.8,0.2]]) Pi = [0.25,0.25,0.25,0.25] # computing alpha_set, beta_set, c_set O_length = len(O_set) whatever = [j for j in range(O_length)] alpha_set, beta_set = whatever, whatever c_set = [j for j in range(O_length)] # can't use whatever, the c_set will be 3d-array ??? N = A.shape[0] M = B.shape[1] T = [j for j in range(O_length)] # can't use whatever, the beta_set will be 1d-array ??? for i in range(O_length): T[i] = len(O_set[i]) V = [k for k in range(M)] x = 1 delta_lambda = x + 1 times = 0 while delta_lambda > x: # iteration - lambda lambda_n = [A, B] for i in range(O_length): alpha_set[i], c_set[i] = HMM(A, B, Pi, O_set[i]).forward_with_scale() beta_set[i] = HMM(A, B, Pi, O_set[i]).backward_with_scale(c_set[i]) for i in range(N): for j in range(N): numerator_A = 0.0 denominator_A = 0.0 for l in range(O_length): raw_numerator_A = sum( alpha_set[l][t,i] * A[i,j] * B[j, O_set[l][t+1]] * beta_set[l][t+1,j] for t in range(T[l]-1) ) numerator_A += raw_numerator_A raw_denominator_A = sum( alpha_set[l][t,i] * beta_set[l][t,i] / c_set[l][t] for t in range(T[l]-1) ) denominator_A += raw_denominator_A A[i, j] = numerator_A / denominator_A for j in range(N): for k in range(M): numerator_B = 0.0 denominator_B = 0.0 for l in range(O_length): raw_numerator_B = sum( alpha_set[l][t,j] * beta_set[l][t,j] / c_set[l][t] for t in range(T[l]) if O_set[l][t] == V[k] ) numerator_B += raw_numerator_B raw_denominator_B = sum( alpha_set[l][t,j] * beta_set[l][t,j] / c_set[l][t] for t in range(T[l]) ) denominator_B += raw_denominator_B B[j, k] = numerator_B / denominator_B # Pi should not need to computing in this case, # in other cases, will get some corresponding Pi # if sum directly, there will be positive and negative offset delta_A = map(abs, lambda_n[0] - A) # delta_A is still a matrix delta_B = map(abs, lambda_n[1] - A) delta_lambda = sum([ sum(sum(delta_A)), sum(sum(delta_B)) ]) times += 1 print times return A, B
這里我們不重新估算pi,在實際應用中pi根據情況而定,有時並不需要。
正如作者所說:By using scaling, we happily find that all Pl’s terms are cancelled out! The resulting format looks much cleaner!
這真的是一個happily finding。
這樣就實現了一個基本的HMM, 雖然比較基礎,但是得益於這個模型本身的強大,區區這200多行代碼已經有很強大的功能,讀者可以試一試,操作得當應該可以用來實現一些簡單的預測。
5.擴展
引用文獻一的原話:
V. Implementation issues for HMMs
The discussion in the previous two sections has primarily dealt with the theory of HMMs and several variations on the form of the model. In this section we deal with several practical implementation issues including scaling, multiple observation sequences, initial parameter estimates, missing data, and choice of model size and type. For some of these implementation issues we can prescribe exact analytical solutions; for other issues we can only provide some set-of-the-pants experience gained from working with HMMs over the last several years.
讀者可以研讀文獻一后面的內容,以便進一步學習,也可以搜索相關的新文獻來深入學習,畢竟這篇文章是1989年出的。
文章寫的比較匆忙,如有疑問,歡迎評論。
感謝瀏覽!