自然語言處理之HMM模型分詞


漢語中句子以字為單位的,但語義理解仍是以詞為單位,所以也就存在中文分詞問題。主要的技術可以分為:規則分詞、統計分詞以及混合分詞(規則+統計)。

基於規則的分詞是一種機械分詞,主要依賴於維護詞典,在切分時將與劇中的字符串與詞典中的詞進行匹配。主要包括正向最大匹配法、逆向最大匹配法以及雙向最大匹配法。

統計分詞主要思想是將每個詞視作由字組成,如果相連的字在不同文本中出現次數越多,就越可能是一個詞。(隱馬爾可夫【HMM】、條件隨機場【CRF】等)

l  長度為m的字符串確定其概率分布:

P(x1,x2……,xm)=P(x1)P(x1|x1,x2)……P(xm|x1,x2,……,xm-1)

l  用n-gram模型將上式進行簡化,認為其概率僅與其前n-1個詞相關:

P~=P(xi|xi-(n-1),xi-(n-2)……,xi-1)=count(xi-(n-1),xi-(n-2)……,xi-1,xi)/count(xi-(n-1),xi-(n-2)……,xi-1)

 

隱馬爾可夫模型HMM將分詞作為字在字串中的序列標注任務來實現。每個字在構造一個特定詞語時都占據一個確定的構詞結構:B(詞首)、M(詞中)、E(詞尾)和S(單獨成詞)。

l  抽象表示:

W表示輸入的句子,長度n,O表示輸出標簽,則理想是maxP(o1,o2…|w1,w2……)

貝葉斯公式P(o|w)=P(w|o)P(o)/P(w) 因為P(w)是給定輸出為常數,因此要最大化P(w|o)P(o)

P(w|o)= P(w1|o1) P(w2|o3) ……P(w3|o3) 作馬爾可夫假設每個輸出僅與上一個輸出有關

P(o)=P(o1)P(o2|o1)……P(on|o(n-1))

再通過Veterbi算法求最優路徑(如果最終的最優路徑經過某個oi那么從初始結點到oi-1必然也是最優路徑)。

代碼實現如下(訓練語料庫沒有給出):

 

  1 class HMM(object):
  2     def __init__(self):
  3         import os
  4 
  5         # 主要是用於存取算法中間結果,不用每次都訓練模型
  6         self.model_file = 'hmm_model.pkl'
  7 
  8         # 狀態值集合
  9         self.state_list = ['B', 'M', 'E', 'S']
 10         # 參數加載,用於判斷是否需要重新加載model_file
 11         self.load_para = False
 12 
 13     # 用於加載已計算的中間結果,當需要重新訓練時,需初始化清空結果
 14     def try_load_model(self, trained):
 15         if trained:
 16             import pickle
 17             with open(self.model_file, 'rb') as f:
 18                 self.A_dic = pickle.load(f)
 19                 self.B_dic = pickle.load(f)
 20                 self.Pi_dic = pickle.load(f)
 21                 self.load_para = True
 22 
 23         else:
 24             # 狀態轉移概率(狀態->狀態的條件概率)
 25             self.A_dic = {}
 26             # 發射概率(狀態->詞語的條件概率)
 27             self.B_dic = {}
 28             # 狀態的初始概率
 29             self.Pi_dic = {}
 30             self.load_para = False
 31 
 32     # 計算轉移概率、發射概率以及初始概率
 33     def train(self, path):
 34 
 35         # 重置幾個概率矩陣
 36         self.try_load_model(False)
 37 
 38         # 統計狀態出現次數,求p(o)
 39         Count_dic = {}
 40 
 41         # 初始化參數
 42         def init_parameters():
 43             for state in self.state_list:
 44                 self.A_dic[state] = {s: 0.0 for s in self.state_list}
 45                 self.Pi_dic[state] = 0.0
 46                 self.B_dic[state] = {}
 47 
 48                 Count_dic[state] = 0
 49 
 50         def makeLabel(text):
 51             out_text = []
 52             if len(text) == 1:
 53                 out_text.append('S')
 54             else:
 55                 out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']
 56 
 57             return out_text
 58 
 59         init_parameters()
 60         line_num = -1
 61         # 觀察者集合,主要是字以及標點等
 62         words = set()
 63         with open(path, encoding='utf8') as f:
 64             for line in f:
 65                 line_num += 1
 66 
 67                 line = line.strip()
 68                 if not line:
 69                     continue
 70 
 71                 word_list = [i for i in line if i != ' ']
 72                 words |= set(word_list)  # 更新字的集合
 73 
 74                 linelist = line.split()
 75 
 76                 line_state = []
 77                 for w in linelist:
 78                     line_state.extend(makeLabel(w))
 79                 
 80                 assert len(word_list) == len(line_state)
 81 
 82                 for k, v in enumerate(line_state):
 83                     Count_dic[v] += 1
 84                     if k == 0:
 85                         self.Pi_dic[v] += 1  # 每個句子的第一個字的狀態,用於計算初始狀態概率
 86                     else:
 87                         self.A_dic[line_state[k - 1]][v] += 1  # 計算轉移概率
 88                         self.B_dic[line_state[k]][word_list[k]] = \
 89                             self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0  # 計算發射概率
 90         
 91         self.Pi_dic = {k: v * 1.0 / line_num for k, v in self.Pi_dic.items()}
 92         self.A_dic = {k: {k1: v1 / Count_dic[k] for k1, v1 in v.items()}
 93                       for k, v in self.A_dic.items()}
 94         #加1平滑
 95         self.B_dic = {k: {k1: (v1 + 1) / Count_dic[k] for k1, v1 in v.items()}
 96                       for k, v in self.B_dic.items()}
 97         #序列化
 98         import pickle
 99         with open(self.model_file, 'wb') as f:
100             pickle.dump(self.A_dic, f)
101             pickle.dump(self.B_dic, f)
102             pickle.dump(self.Pi_dic, f)
103 
104         return self
105 
106     def viterbi(self, text, states, start_p, trans_p, emit_p):
107         V = [{}]
108         path = {}
109         for y in states:
110             V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
111             path[y] = [y]
112         for t in range(1, len(text)):
113             V.append({})
114             newpath = {}
115             
116             #檢驗訓練的發射概率矩陣中是否有該字
117             neverSeen = text[t] not in emit_p['S'].keys() and \
118                 text[t] not in emit_p['M'].keys() and \
119                 text[t] not in emit_p['E'].keys() and \
120                 text[t] not in emit_p['B'].keys()
121             for y in states:
122                 emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #設置未知字單獨成詞
123                 (prob, state) = max(
124                     [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
125                       emitP, y0)
126                      for y0 in states if V[t - 1][y0] > 0])
127                 V[t][y] = prob
128                 newpath[y] = path[state] + [y]
129             path = newpath
130             
131         if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0):
132             (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')])
133         else:
134             (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
135         
136         return (prob, path[state])
137 
138     def cut(self, text):
139         import os
140         if not self.load_para:
141             self.try_load_model(os.path.exists(self.model_file))
142         prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)      
143         begin, next = 0, 0    
144         for i, char in enumerate(text):
145             pos = pos_list[i]
146             if pos == 'B':
147                 begin = i
148             elif pos == 'E':
149                 yield text[begin: i+1]
150                 next = i+1
151             elif pos == 'S':
152                 yield char
153                 next = i+1
154         if next < len(text):
155             yield text[next:]
156 #####測試
157 hmm=HMM()
158 hmm.train('cutwords\\trainCorpus.txt_utf8.txt')
159 
160 text='這是一個很棒的方案!'
161 res=hmm.cut(text)
162 print(text)
163 print(str(list(res)))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM