任務概述
敏感詞檢測是各類平台對用戶發布內容(UGC)進行審核的必做任務。
對於文本內容做敏感詞檢測,最簡單直接的方法就是規則匹配。構建一個敏感詞詞表,然后與文本內容進行匹配,如發現有敏感詞,則提交報告給人工審核或者直接加以屏蔽。
當然也可以用機器學習的方法來做,不過需要收集及標注大量數據,有條件的話也可以加以實現。
任務難點及解決策略
1)對抗檢測的場景:比如同音替換、字形替換、隱喻暗指、詞中間插入特殊字符等。
解決策略:特殊字符可以使用特殊字符詞表過濾,其他幾種不好解決,主要通過擴大敏感詞表規模來。
2)斷章取義的問題:從語義上理解沒有問題,但按窗口大小切出幾個字來看,卻屬於敏感詞,造成誤報。
解決策略:這個問題主要是分詞錯誤導致的,應當考慮分詞規則,而不是無腦遍歷,或者用正則匹配。
3)檢測效率問題:隨着詞表的增大,循環查找詞表的速度會變得很慢。
解決策略:使用DFA算法構建字典樹。
4)詞的歧義問題:一個詞某個義項是違規的,但其他義項或許是正常的。
解決策略:這個要結合上下文考慮,機器學習類的方法比較容易解決這一問題。
5)詞表質量問題:從網絡上獲取得到的敏感詞詞表,要么包含詞匯較少,不能滿足檢測需求,要么包含詞匯過多,檢測出了很多當前業務場景下不需要屏蔽的詞。
解決策略:需要定期人工整理。
敏感詞字典樹的構建
構建字典樹使用的是確定有限自動機(DFA)。DFA算法的核心是建立了以敏感詞為基礎的許多敏感詞樹。
它的基本思想是基於狀態轉移來檢索敏感詞,只需要掃描一次待檢測文本(長度n),就能對所有敏感詞進行檢測。
且DFA算法的時間復雜度O(n)基本上是與敏感詞的個數無關的,只與文本長度有關。
如下圖所示,比如abcd,abd,bcd,efg,hij這幾個詞在樹中表示如下。
中文的常用字只有四五千個,但由這些字構成的詞卻難以計數,如果循環遍歷,時間消耗極大,而通過字典樹,沿着根節點向下,每走一步就可以極大地縮小搜索空間。
代碼實現
import jieba
min_match = 1 # 最小匹配原則
max_match = 2 # 最大匹配原則
class SensitiveWordDetect:
def __init__(self, sensitive_words_path, stopWords_path):
#============把敏感詞庫加載到列表中====================
temp_line_list = []
with open(sensitive_words_path, 'r', encoding='utf-8') as file:
temp_line_list = file.readlines()
self.sensitive_word_list = sorted([i.split('\n')[0] for i in temp_line_list])
# print(self.sensitive_word_list[-10:])
#============把停用詞加載到列表中======================
temp_line_list_2 = []
with open(stopWords_path, 'r', encoding='utf-8') as file:
temp_line_list_2 = file.readlines()
self.stop_word_list = [i.split('\n')[0] for i in temp_line_list_2]
#==============得到sensitive字典=======================
self.sensitive_word_map = self.init_sensitive_word_map(self.sensitive_word_list)
#print(self.sensitive_word_map)
#print(len(self.sensitive_word_map))
# 構建敏感詞庫
def init_sensitive_word_map(self, sensitive_word_list):
sensitive_word_map = {}
# 讀取每一行,每一個word都是一個敏感詞
for word in sensitive_word_list:
now_map = sensitive_word_map
# 遍歷該敏感詞的每一個特定字符
for i in range(len(word)):
keychar = word[i]
word_map = now_map.get(keychar)
if word_map != None:
# now_map更新為下一層
now_map = word_map
else:
# 不存在則構建一個map, isEnd設置為0,因為不是最后一個
new_next_map = {}
new_next_map["isEnd"] = 0
now_map[keychar] = new_next_map
now_map = new_next_map
# 到這個詞末尾字符
if i == len(word)-1:
now_map["isEnd"] = 1
# print(sensitive_word_map)
return sensitive_word_map
def check_sensitive_word(self, txt, begin_index=0, match_mode=min_match):
'''
:param txt: 輸入待檢測的文本
:param begin_index:輸入文本開始的下標
:return:返回敏感詞字符的長度
'''
now_map = self.sensitive_word_map
sensitive_word_len = 0 # 敏感詞的長度
contain_char_sensitive_word_len = 0 # 包括特殊字符敏感詞的長度
end_flag = False #結束標記位
for i in range(begin_index, len(txt)):
char = txt[i]
if char in self.stop_word_list:
contain_char_sensitive_word_len += 1
continue
now_map = now_map.get(char)
if now_map != None:
sensitive_word_len += 1
contain_char_sensitive_word_len += 1
# 結束位置為True
if now_map.get("isEnd") == 1:
end_flag = True
# 最短匹配原則
if match_mode == min_match:
break
else:
break
if end_flag == False:
contain_char_sensitive_word_len = 0
#print(sensitive_word_len)
return contain_char_sensitive_word_len
def get_sensitive_word_list(self, txt):
# 去除停止詞
new_txt = ''
for char in txt:
if char not in self.stop_word_list:
new_txt += char
# 然后分詞
seg_list = list(jieba.cut(new_txt, cut_all=False))
cur_txt_sensitive_list = []
# 注意,並不是一個個char查找的,找到敏感詞會增強敏感詞的長度
for i in range(len(txt)):
length = self.check_sensitive_word(txt, i, match_mode=max_match)
if length > 0:
word = txt[i:i+length]
cur_txt_sensitive_list.append(word)
i = i+length-1 # 出了循環還要+1 i+length是沒有檢測到的,下次直接從i+length開始
# 對得到的結果和分詞結果進行匹配,不匹配的不要
rst_list = []
for line in cur_txt_sensitive_list:
new_line = ''
for char in line:
if char not in self.stop_word_list:
new_line += char
if new_line in seg_list:
rst_list.append(line)
return rst_list
def replace_sensitive_word(self, txt, replace_char='*'):
lst = self.get_sensitive_word_list(txt)
#print(lst)
# 如果需要加入的關鍵詞,已經在關鍵詞列表存在了,就不需要繼續添加
def judge(lst, word):
if len(lst) == 0:
return True
for str in lst:
if str.count(word) != 0:
return False
return True
# 最嚴格的打碼,選取最長打碼長度
for word in lst:
replace_str = len(word) * replace_char
txt = txt.replace(word, replace_str)
new_lst = []
for word in lst:
new_word = ""
# newWord是除去停用詞、最精煉版本的敏感詞
for char in word:
if char in self.stop_word_list:
continue
new_word += char
length = self.check_sensitive_word(new_word, 0, match_mode=min_match)
if judge(new_lst, new_word[:length]):
new_lst.append(new_word[:length])
else:
continue
return txt, new_lst # 最終返回的結果是屏蔽敏感詞后的文本,以及檢測出的敏感詞