漢語中句子以字為單位的,但語義理解仍是以詞為單位,所以也就存在中文分詞問題。主要的技術可以分為:規則分詞、統計分詞以及混合分詞(規則+統計)。
基於規則的分詞是一種機械分詞,主要依賴於維護詞典,在切分時將與劇中的字符串與詞典中的詞進行匹配。主要包括正向最大匹配法、逆向最大匹配法以及雙向最大匹配法。
統計分詞主要思想是將每個詞視作由字組成,如果相連的字在不同文本中出現次數越多,就越可能是一個詞。(隱馬爾可夫【HMM】、條件隨機場【CRF】等)
l 長度為m的字符串確定其概率分布:
P(x1,x2……,xm)=P(x1)P(x1|x1,x2)……P(xm|x1,x2,……,xm-1)
l 用n-gram模型將上式進行簡化,認為其概率僅與其前n-1個詞相關:
P~=P(xi|xi-(n-1),xi-(n-2)……,xi-1)=count(xi-(n-1),xi-(n-2)……,xi-1,xi)/count(xi-(n-1),xi-(n-2)……,xi-1)
隱馬爾可夫模型HMM將分詞作為字在字串中的序列標注任務來實現。每個字在構造一個特定詞語時都占據一個確定的構詞結構:B(詞首)、M(詞中)、E(詞尾)和S(單獨成詞)。
l 抽象表示:
W表示輸入的句子,長度n,O表示輸出標簽,則理想是maxP(o1,o2…|w1,w2……)
貝葉斯公式P(o|w)=P(w|o)P(o)/P(w) 因為P(w)是給定輸出為常數,因此要最大化P(w|o)P(o)
P(w|o)= P(w1|o1) P(w2|o3) ……P(w3|o3) 作馬爾可夫假設每個輸出僅與上一個輸出有關
P(o)=P(o1)P(o2|o1)……P(on|o(n-1))
再通過Veterbi算法求最優路徑(如果最終的最優路徑經過某個oi那么從初始結點到oi-1必然也是最優路徑)。
代碼實現如下(訓練語料庫沒有給出):
1 class HMM(object): 2 def __init__(self): 3 import os 4 5 # 主要是用於存取算法中間結果,不用每次都訓練模型 6 self.model_file = 'hmm_model.pkl' 7 8 # 狀態值集合 9 self.state_list = ['B', 'M', 'E', 'S'] 10 # 參數加載,用於判斷是否需要重新加載model_file 11 self.load_para = False 12 13 # 用於加載已計算的中間結果,當需要重新訓練時,需初始化清空結果 14 def try_load_model(self, trained): 15 if trained: 16 import pickle 17 with open(self.model_file, 'rb') as f: 18 self.A_dic = pickle.load(f) 19 self.B_dic = pickle.load(f) 20 self.Pi_dic = pickle.load(f) 21 self.load_para = True 22 23 else: 24 # 狀態轉移概率(狀態->狀態的條件概率) 25 self.A_dic = {} 26 # 發射概率(狀態->詞語的條件概率) 27 self.B_dic = {} 28 # 狀態的初始概率 29 self.Pi_dic = {} 30 self.load_para = False 31 32 # 計算轉移概率、發射概率以及初始概率 33 def train(self, path): 34 35 # 重置幾個概率矩陣 36 self.try_load_model(False) 37 38 # 統計狀態出現次數,求p(o) 39 Count_dic = {} 40 41 # 初始化參數 42 def init_parameters(): 43 for state in self.state_list: 44 self.A_dic[state] = {s: 0.0 for s in self.state_list} 45 self.Pi_dic[state] = 0.0 46 self.B_dic[state] = {} 47 48 Count_dic[state] = 0 49 50 def makeLabel(text): 51 out_text = [] 52 if len(text) == 1: 53 out_text.append('S') 54 else: 55 out_text += ['B'] + ['M'] * (len(text) - 2) + ['E'] 56 57 return out_text 58 59 init_parameters() 60 line_num = -1 61 # 觀察者集合,主要是字以及標點等 62 words = set() 63 with open(path, encoding='utf8') as f: 64 for line in f: 65 line_num += 1 66 67 line = line.strip() 68 if not line: 69 continue 70 71 word_list = [i for i in line if i != ' '] 72 words |= set(word_list) # 更新字的集合 73 74 linelist = line.split() 75 76 line_state = [] 77 for w in linelist: 78 line_state.extend(makeLabel(w)) 79 80 assert len(word_list) == len(line_state) 81 82 for k, v in enumerate(line_state): 83 Count_dic[v] += 1 84 if k == 0: 85 self.Pi_dic[v] += 1 # 每個句子的第一個字的狀態,用於計算初始狀態概率 86 else: 87 self.A_dic[line_state[k - 1]][v] += 1 # 計算轉移概率 88 self.B_dic[line_state[k]][word_list[k]] = \ 89 self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0 # 計算發射概率 90 91 self.Pi_dic = {k: v * 1.0 / line_num for k, v in self.Pi_dic.items()} 92 self.A_dic = {k: {k1: v1 / Count_dic[k] for k1, v1 in v.items()} 93 for k, v in self.A_dic.items()} 94 #加1平滑 95 self.B_dic = {k: {k1: (v1 + 1) / Count_dic[k] for k1, v1 in v.items()} 96 for k, v in self.B_dic.items()} 97 #序列化 98 import pickle 99 with open(self.model_file, 'wb') as f: 100 pickle.dump(self.A_dic, f) 101 pickle.dump(self.B_dic, f) 102 pickle.dump(self.Pi_dic, f) 103 104 return self 105 106 def viterbi(self, text, states, start_p, trans_p, emit_p): 107 V = [{}] 108 path = {} 109 for y in states: 110 V[0][y] = start_p[y] * emit_p[y].get(text[0], 0) 111 path[y] = [y] 112 for t in range(1, len(text)): 113 V.append({}) 114 newpath = {} 115 116 #檢驗訓練的發射概率矩陣中是否有該字 117 neverSeen = text[t] not in emit_p['S'].keys() and \ 118 text[t] not in emit_p['M'].keys() and \ 119 text[t] not in emit_p['E'].keys() and \ 120 text[t] not in emit_p['B'].keys() 121 for y in states: 122 emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #設置未知字單獨成詞 123 (prob, state) = max( 124 [(V[t - 1][y0] * trans_p[y0].get(y, 0) * 125 emitP, y0) 126 for y0 in states if V[t - 1][y0] > 0]) 127 V[t][y] = prob 128 newpath[y] = path[state] + [y] 129 path = newpath 130 131 if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0): 132 (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')]) 133 else: 134 (prob, state) = max([(V[len(text) - 1][y], y) for y in states]) 135 136 return (prob, path[state]) 137 138 def cut(self, text): 139 import os 140 if not self.load_para: 141 self.try_load_model(os.path.exists(self.model_file)) 142 prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic) 143 begin, next = 0, 0 144 for i, char in enumerate(text): 145 pos = pos_list[i] 146 if pos == 'B': 147 begin = i 148 elif pos == 'E': 149 yield text[begin: i+1] 150 next = i+1 151 elif pos == 'S': 152 yield char 153 next = i+1 154 if next < len(text): 155 yield text[next:] 156 #####測試 157 hmm=HMM() 158 hmm.train('cutwords\\trainCorpus.txt_utf8.txt') 159 160 text='這是一個很棒的方案!' 161 res=hmm.cut(text) 162 print(text) 163 print(str(list(res)))
