BiLSTM-CRF 被提出用於NER或者詞性標注,效果比單純的CRF或者lstm或者bilstm效果都要好。
根據pytorch官方指南(https://pytorch.org/tutorials/beginner/nlp/advanced_tutorial.html#bi-lstm-conditional-random-field-discussion),實現了BiLSTM-CRF一個toy級別的源碼。下面是我個人的學習理解過程。
1. LSTM
LSTM的原理前人已經解釋的非常清楚了:https://zhuanlan.zhihu.com/p/32085405
BiLSTM-CRF中,BiLSTM部分主要用於,根據一個單詞的上下文,給出當前單詞對應標簽的概率分布,可以把BiLSTM看成一個編碼層。
比如,對於標簽集{N, V, O}和單詞China,BiLSTM可能輸出形如(0.88,-1.23,0.03)的非歸一化概率分布。
這個分布我們看作是crf的特征分布輸入,那么在CRF中我們需要學習的就是特征轉移概率。
2. CRF
主要講一下代碼中要用到的CRF的預測(維特比解碼)
維特比算法流程:
1.求出位置1的各個標記的非規范化概率δ1(j)δ1(j)
2.由遞推公式(前后向概率計算)
每一步都保留當前所有可能的狀態ll 對應的最大的非規范化概率,
並將最大非規范化概率狀態對應的路徑(當前狀態得到最大概率時上一步的狀態yiyi)記錄
Ψi(l)=argmax(1≤j≤m){δi−1(j)+w∗Fi(yi−1=j,yi=l,x)}=argmaxδi(l),l=1,2,…,mΨi(l)=argmax(1≤j≤m){δi−1(j)+w∗Fi(yi−1=j,yi=l,x)}=argmaxδi(l),l=1,2,…,m
就是PijPij的取值有m*m個,對每一個yjyj,都確定一個(而不是可能的m個)能最大化概率的yiyi狀態
3.遞推到i=ni=n時終止
這時候求得非規范化概率的最大值為
最優路徑終點
4.遞歸路徑
由最優路徑終點遞歸得到的最優路徑(由當前最大概率狀態狀態對應的上一步狀態,然后遞歸)
求得最優路徑:
3. 損失函數
最后由CRF輸出,損失函數的形式主要由CRF給出
在BiLSTM-CRF中,給定輸入序列X,網絡輸出對應的標注序列y,得分為
(轉移概率和狀態概率之和)
利用softmax函數,我們為每一個正確的tag序列y定義一個概率值
在訓練中,我們的目標就是最大化概率p(y│X) ,怎么最大化呢,用對數似然(因為p(y│X)中存在指數和除法,對數似然可以化簡這些運算)
對數似然形式如下:
最大化這個對數似然,就是最小化他的相反數:
¥−log(p(y│X))=log(∑y′∈YXes(X,y′))−S(X,y)−log(p(y│X))=log(∑y′∈YXes(X,y′))−S(X,y)$
(loss function/object function)
最小化可以借助梯度下降實現
在對損失函數進行計算的時候,前一項S(X,y)S(X,y)很容易計算,
后一項log(∑y′∈YXes(X,y′))log(∑y′∈YXes(X,y′))比較復雜,計算過程中由於指數較大常常會出現上溢或者下溢,
由公式 log∑e(xi)=a+log∑e(xi−a)log∑e(xi)=a+log∑e(xi−a),可以借助a對指數進行放縮,通常a取xixi的最大值(即a=max[Xi]a=max[Xi]),這可以保證指數最大不會超過0,於是你就不會上溢出。即便剩余的部分下溢出了,你也能得到一個合理的值。
又因為log(∑yelog(∑xex)+y)log(∑yelog(∑xex)+y),在loglog取ee作為底數的情況下,可以化簡為
log(∑yey∗elog(∑xex))=log(∑yey∗∑xex)=log(∑y∑xex+y)log(∑yey∗elog(∑xex))=log(∑yey∗∑xex)=log(∑y∑xex+y)。
log_sum_exp因為需要計算所有路徑,那么在計算過程中,計算每一步路徑得分之和和直接計算全局得分是等價的,就可以大大減少計算時間。
當前的分數可以由上一步的總得分+轉移得分+狀態得分得到,這也是pytorch范例中
next_tag_var = forward_var + trans_score + emit_score
的由來
注意,由於程序中比較好選一整行而不是一整列,所以調換i,j的含義,t[i][j]表示從j狀態轉移到i狀態的轉移概率
直接分析源碼的前向傳播部分,其中_get_lstm_features函數調用了pytorch的BiLSTM
def forward(self, sentence): """ 重寫前向傳播 :param sentence: 輸入的句子序列 :return:返回分數和標記序列 """ lstm_feats = self._get_lstm_features(sentence) score, tag_seq = self._viterbi_decode(lstm_feats) return score, tag_seq
源碼的維特比算法實現,在訓練結束,還要使用該算法進行預測
def _viterbi_decode(self, feats): """ 使用維特比算法預測 :param feats:lstm的所有輸出 :return:返回最大概率和最優路徑 """ backpointers = [] # step1. 初始化 init_vvars = torch.full((1, self.tagset_size), -1000.) # 初始化第一步的轉移概率 init_vvars[0][self.tag_to_idx[START_TAG]] = 0 # 初始化每一步的非規范化概率 forward_var = init_vvars # step2. 遞推 # 遍歷每一個單詞通過bilstm輸出的概率分布 for feat in feats: # 每次循環重新統計 bptrs_t = [] viterbivars_t = [] for next_tag in range(self.tagset_size): # 根據維特比算法 # 下一個tag_i+1的非歸一化概率是上一步概率加轉移概率(勢函數和勢函數的權重都統一看成轉移概率的一部分) next_tag_var = forward_var + self.transitions[next_tag] # next_tag_var = tensor([[-3.8879e-01, 1.5657e+00, 1.7734e+00, -9.9964e+03, -9.9990e+03]]) # 計算所有前向概率(?) # CRF是單步線性鏈馬爾可夫,所以每個狀態只和他上1個狀態有關,可以用二維的概率轉移矩陣表示 # 保存當前最大狀態 best_tag_id = argmax(next_tag_var) # best_tag_id = torch.argmax(next_tag_var).item() bptrs_t.append(best_tag_id) # 從一個1*N向量中取出一個值(標量),將這個標量再轉換成一維向量 viterbivars_t.append(next_tag_var[0][best_tag_id].view(1)) # viterbivars 長度為self.tagset_size,對應feat的維度 forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1) # 記錄每一個時間i,每個狀態取值l取最大非規范化概率對應的上一步狀態 backpointers.append(bptrs_t) # step3. 終止 terminal_var = forward_var + self.transitions[self.tag_to_idx[STOP_TAG]] best_tag_id = argmax(terminal_var) path_score = terminal_var[0][best_tag_id] # step4. 返回路徑 best_path = [best_tag_id] for bptrs_t in reversed(backpointers): best_tag_id = bptrs_t[best_tag_id] best_path.append(best_tag_id) # Pop off the start tag (we dont want to return that to the caller) start = best_path.pop() assert start == self.tag_to_idx[START_TAG] # Sanity check best_path.reverse() return path_score, best_path
源碼的損失函數計算
def neg_log_likelihood(self, sentence, tags): """ 實現負對數似然函數 :param sentence: :param tags: :return: """ # 返回句子中每個單詞對應的標簽概率分布 feats = self._get_lstm_features(sentence) forward_score = self._forward_alg(feats) gold_score = self._score_sentence(feats, tags) # 輸出路徑的得分(S(X,y)) # 返回負對數似然函數的結果 return forward_score - gold_score def _forward_alg(self, feats): """ 使用前向算法計算損失函數的第一項log(\sum(exp(S(X,y’)))) :param feats: 從BiLSTM輸出的特征 :return: 返回 """ init_alphas = torch.full((1, self.tagset_size), -10000.) init_alphas[0][self.tag_to_idx[START_TAG]] = 0. forward_var = init_alphas for feat in feats: # 存放t時刻的 概率狀態 alphas_t = [] for current_tag in range(self.tagset_size): # lstm輸出的是非歸一化分布概率 emit_score = feat[current_tag].view(1, -1).expand(1, self.tagset_size) # self.transitions[current_tag] 就是從上一時刻所有狀態轉移到當前某狀態的非歸一化轉移概率 # 取出的轉移矩陣的行是一維的,這里調用view函數轉換成二維矩陣 trans_score = self.transitions[current_tag].view(1, -1) # trans_score + emit_score 等於所有特征函數之和 # forward 是截至上一步的得分 current_tag_var = forward_var + trans_score + emit_score alphas_t.append(log_sum_exp(current_tag_var).view(1)) forward_var = torch.cat(alphas_t).view(1, -1) # 調用view函數轉換成1*N向量 terminal_var = forward_var + self.transitions[self.tag_to_idx[STOP_TAG]] alpha = log_sum_exp(terminal_var) return alpha def _score_sentence(self, feats, tags): """ 返回S(X,y) :param feats: 從BiLSTM輸出的特征 :param tags: CRF輸出的標記路徑 :return: """ score = torch.zeros(1) tags = torch.cat([torch.tensor([self.tag_to_idx[START_TAG]], dtype=torch.long),tags]) for i, feat in enumerate(feats): score = score + self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]] score = score + self.transitions[self.tag_to_idx[STOP_TAG],tags[-1]] return score