對於要替換的詞是隨機選擇的,因此一種直觀感受是,如果一些重要詞被替換了,那么增強后文本的質量會大打折扣。這一部分介紹的方法,則是為了盡量避免這一問題,所實現的詞替換技術,姑且稱之為「基於非核心詞替換的數據增強技術」。
我們最早是在 google 提出 UDA 算法的那篇論文中發現的這一技術 [6],是否在更早的文獻中出現過,我們沒有再深究了,有了解的同學請留言告知。
整個技術的核心點也比較簡單,用詞典中不重要的詞去替換文本中一定比例的不重要詞,從而產生新的文本。
我們知道在信息檢索中,一般會用 TF-IDF 值來衡量一個詞對於一段文本的重要性,下面簡單介紹一下 TF-IDF 的定義:
TF(詞頻)即一個詞在文中出現的次數,統計出來就是詞頻 TF,顯而易見,一個詞在文章中出現很多次,那么這個詞可能有着很大的作用,但如果這個詞又經常出現在其他文檔中,如「的」、「我」,那么其重要性就要大打折扣,后者就是用 IDF 來表征。
IDF(逆文檔頻率),一個重要性調整系數,衡量一個詞是不是常見詞。如果某個詞比較少見,但是它在這篇文章中多次出現,那么它很可能就反映了這篇文章的特性,正是我們所需要的關鍵詞。
TF-IDF = TF×IDF,通過此公式可以有效衡量一個詞對於一段文本的重要性。當我們知道一個詞對於一個文本的重要性之后,再采用與 TF-IDF 負相關的概率去采樣文中的詞,用來決定是否要替換,這樣可以有效避免將文本中的一些關鍵詞進行錯誤替換或刪除。
UDA 論文中所提出的具體實現方式如下:
實現一:該方法沒有使用字典詞匯中被抽取的概率, 而是在待分析文本中選出待替換數據后,通過預訓練詞向量查找top1相似詞匯(sgns.merge.word這個詞向量還是質量蠻高的,感謝小萌同學),進行替換得到增強樣本。代碼如下:
""" Function: data enhance Author: dyx DateTime: 2020.7.16 """ import os import jieba import jieba.analyse from gensim import corpora from gensim import models import pickle import json from gensim.models import KeyedVectors STOPWORDS = open('../resources/stopwords.txt', 'r', encoding='utf8').readlines() STOPWORDS = {each.strip():1 for each in STOPWORDS} CURRENT_FOLDER = os.path.dirname(os.path.abspath(__file__)) WORD2VEC_FILE = r'F:\詞向量\sgns.merge.word\sgns.merge.word' ENHANCE_FILE = os.path.join(CURRENT_FOLDER, 'data', 'enhance_train_v1.json') WORD2VEC = models.KeyedVectors.load_word2vec_format(WORD2VEC_FILE, binary=False) ANALYSIS_DATA = os.path.join(CURRENT_FOLDER, 'data', 'analysis_data.txt') PERD_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/predict_dev') TRUE_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/dev.json') # wv_from_text.init_sims(replace=True) class DataEnhance(): def __init__(self, data_path): self.data_path = data_path # with open(self.data_path, 'r', encoding='utf8') as fr: # self.content = fr.readlines() with open(PERD_DEV, 'r', encoding='utf8') as fr: self.pred_dev = fr.readlines() with open(TRUE_DEV, 'r', encoding='utf8') as fr: self.true_dev = fr.readlines() def read_data(self): label_count = {'Yes':0, 'No':0, 'Depends':0} word_list = [] analysis_data = [] for each in self.content: yesno_answer = eval(each).get('yesno_answer') answer = eval(each).get('answer') question = eval(each).get('question') tokens = list(jieba.cut(answer)) # 針對answer進行增強 word_list.append(tokens) # tokens = [each for each in tokens if each not in STOPWORDS] # keywords_tfidf = jieba.analyse.extract_tags(answer, topK=30, withWeight=True) # jieba.analyse.set_stop_words(STOPWORDS) # keywords_tfidf_sotpwords = jieba.analyse.extract_tags(answer, topK=30, withWeight=True) # keywords_textrank = jieba.analyse.textrank(answer, topK=30, withWeight=True) # print('question:{}; yesno_answer:{}\nanswer:{}\nsegment:{}\nkeywords_tfidf:{}\n'.format(question, yesno_answer, answer, tokens, keywords_tfidf)) if yesno_answer == "Yes": label_count[yesno_answer] += 1 elif yesno_answer == 'No': label_count[yesno_answer] += 1 elif yesno_answer == "Depends": label_count[yesno_answer] += 1 to_save = 'label:{}\tanswer:{}\n'.format(yesno_answer, answer) analysis_data.append(to_save) print(to_save) with open(ANALYSIS_DATA, 'w', encoding='utf8') as fw: fw.writelines(analysis_data) dictionary = corpora.Dictionary(word_list) new_corpus = [dictionary.doc2bow(w) for w in word_list] tfidf = models.TfidfModel(new_corpus) tfidf.save(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model')) str_id = dictionary.token2id with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'wb') as fw: pickle.dump(str_id, fw) print(label_count) print('tfidf:{}'.format(tfidf)) def unimportant_word_enhance(self): with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'rb') as fr: str_id = pickle.load(fr) tfidf_temp = models.TfidfModel.load(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model')) # 構造文本-tfidf值 tfidf = {} for id in tfidf_temp.dfs: word = list(str_id.keys())[list(str_id.values()).index(id)] tfidf[word] = tfidf_temp.dfs[id] * tfidf_temp.idfs[id] # 最大tfidf值 max_tfidf = max(tfidf.values()) for key, value in tfidf.items(): if (value == max_tfidf): print('{} 詞匯最大tfidf值:{}'.format(key, value)) # word was extract prob extract_prob = {} z_sigma = 0 for key in tfidf: z_sigma += max_tfidf-tfidf[key] for key, value in tfidf.items(): extract_prob[key] = (max_tfidf-value)/z_sigma print("每個詞被抽取的概率:{}".format(extract_prob)) finnal_data = [] p = 0.3 for sample in self.content: text = eval(sample).get('answer') # replace prob # text = r'是的。凡是能夠被2整除的整數都是偶數,例如2、4、-6等。偶數當中也有正偶數、0、負偶數。' tokens = jieba.lcut(text.strip()) sent_tfidf = {} for each in tokens: sent_tfidf[each] = tfidf.get(each) C = max(sent_tfidf.values()) Z = 0.0 replace_prob = {} for each in tokens: Z += (C-sent_tfidf[each])/len(tokens) for each in tokens: temp = p*(C-sent_tfidf[each])/Z if temp < 1.0: replace_prob[each] = temp else: replace_prob[each] = 1.0 replace_prob_sorted = sorted(replace_prob.items(), key=lambda x:x[1], reverse=True) print("當前句子詞被替換的概率:{}".format(replace_prob_sorted)) replace_word_01 = replace_prob_sorted[0][0] replace_word_02 = replace_prob_sorted[0][1] # similar_word for i in [replace_word_01, replace_word_02]: if i in WORD2VEC.vocab: target_word = WORD2VEC.similar_by_word(i, 1) target_word = target_word[0][0] print('原始詞匯:{}\t被替換詞匯:{}'.format(i, target_word)) text = text.replace(i, target_word) new_sample = json.loads(sample) new_sample['answer'] = text finnal_data.append(json.dumps(new_sample, ensure_ascii=False)+'\n') with open(ENHANCE_FILE, 'w', encoding='utf8') as fw: fw.writelines(finnal_data) print('ok') def analysis_data(self): label_count = {'Yes': 0, 'No': 0, 'Depends': 0} error_count = 0 length = len(self.true_dev) for a, p in zip(self.true_dev, self.pred_dev): x = json.loads(a) a_yna = eval(a).get('yesno_answer') p_yna = eval(p).get('yesno_answer') if a_yna != p_yna: error_count += 1 if a_yna == "Yes": label_count[a_yna] += 1 elif a_yna == 'No': label_count[a_yna] += 1 elif a_yna == "Depends": label_count[a_yna] += 1 q = eval(a).get('question') a = eval(a).get('answer') title = [each.get('title') for each in x.get('documents')] paragraphs = [each.get('paragraphs') for each in x.get('documents')] print(title) print(paragraphs) print('question:{}\nanswer:{}\ntrue label:{}\npredict label:{}\n\n'.format( q, a, a_yna, p_yna)) print('總預測樣本數:{},誤判樣本數:{},Acc:{:.4f}'.format(length, error_count, (length-error_count)/length)) print('預測錯誤標簽分布:{}'.format(label_count)) if __name__ == '__main__': data_path = os.path.join(CURRENT_FOLDER, 'data', 'train.json') de = DataEnhance(data_path) # de.read_data() # de.unimportant_word_enhance() de.analysis_data()
實現二:復現原文中的詞匯抽取方法。優點是這些詞匯依舊是原始語料中的詞匯,能夠避免一定的語義漂移現象。初步代碼如下:
""" Function: data enhance Author: dyx DateTime: 2020.7.16 """ import os import jieba import jieba.analyse from gensim import corpora from gensim import models import pickle import json import numpy as np from gensim.models import KeyedVectors STOPWORDS = open('../resources/stopwords.txt', 'r', encoding='utf8').readlines() STOPWORDS = {each.strip():1 for each in STOPWORDS} CURRENT_FOLDER = os.path.dirname(os.path.abspath(__file__)) # WORD2VEC_FILE = r'F:\詞向量\sgns.merge.word\sgns.merge.word' ENHANCE_FILE = os.path.join(CURRENT_FOLDER, 'data', 'enhance_train_v1.json') # WORD2VEC = models.KeyedVectors.load_word2vec_format(WORD2VEC_FILE, binary=False) ANALYSIS_DATA = os.path.join(CURRENT_FOLDER, 'data', 'analysis_data.txt') PERD_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/predict_dev') TRUE_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/dev.json') # wv_from_text.init_sims(replace=True) class DataEnhance(): def __init__(self, data_path): self.data_path = data_path with open(self.data_path, 'r', encoding='utf8') as fr: self.content = fr.readlines() with open(PERD_DEV, 'r', encoding='utf8') as fr: self.pred_dev = fr.readlines() with open(TRUE_DEV, 'r', encoding='utf8') as fr: self.true_dev = fr.readlines() def read_data(self): label_count = {'Yes':0, 'No':0, 'Depends':0} word_list = [] analysis_data = [] for each in self.content: yesno_answer = eval(each).get('yesno_answer') answer = eval(each).get('answer') question = eval(each).get('question') tokens = list(jieba.cut(answer)) # 針對answer進行增強 word_list.append(tokens) # tokens = [each for each in tokens if each not in STOPWORDS] # keywords_tfidf = jieba.analyse.extract_tags(answer, topK=30, withWeight=True) # jieba.analyse.set_stop_words(STOPWORDS) # keywords_tfidf_sotpwords = jieba.analyse.extract_tags(answer, topK=30, withWeight=True) # keywords_textrank = jieba.analyse.textrank(answer, topK=30, withWeight=True) # print('question:{}; yesno_answer:{}\nanswer:{}\nsegment:{}\nkeywords_tfidf:{}\n'.format(question, yesno_answer, answer, tokens, keywords_tfidf)) if yesno_answer == "Yes": label_count[yesno_answer] += 1 elif yesno_answer == 'No': label_count[yesno_answer] += 1 elif yesno_answer == "Depends": label_count[yesno_answer] += 1 to_save = 'label:{}\tanswer:{}\n'.format(yesno_answer, answer) analysis_data.append(to_save) print(to_save) with open(ANALYSIS_DATA, 'w', encoding='utf8') as fw: fw.writelines(analysis_data) dictionary = corpora.Dictionary(word_list) new_corpus = [dictionary.doc2bow(w) for w in word_list] tfidf = models.TfidfModel(new_corpus) tfidf.save(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model')) str_id = dictionary.token2id with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'wb') as fw: pickle.dump(str_id, fw) print(label_count) print('tfidf:{}'.format(tfidf)) def unimportant_word_enhance(self): with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'rb') as fr: str_id = pickle.load(fr) tfidf_temp = models.TfidfModel.load(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model')) # 構造文本-tfidf值 tfidf = {} for id in tfidf_temp.dfs: word = list(str_id.keys())[list(str_id.values()).index(id)] tfidf[word] = tfidf_temp.dfs[id] * tfidf_temp.idfs[id] # 最大tfidf值 max_tfidf = max(tfidf.values()) for key, value in tfidf.items(): if (value == max_tfidf): print('{} 詞匯最大tfidf值:{}'.format(key, value)) # word was extract prob extract_prob = {} z_sigma = 0 for key in tfidf: z_sigma += max_tfidf-tfidf[key] original_word = [] word_prob = [] for key, value in tfidf.items(): prob = (max_tfidf-value)/z_sigma extract_prob[key] = prob original_word.append(key) word_prob.append(prob) print("每個詞被抽取的概率:{}".format(extract_prob)) finnal_data = [] p = 0.3 for sample in self.content: text = eval(sample).get('answer') extract_word = original_word[np.random.choice(len(original_word), 1, p=word_prob)[0]] # replace prob # text = r'是的。凡是能夠被2整除的整數都是偶數,例如2、4、-6等。偶數當中也有正偶數、0、負偶數。' tokens = jieba.lcut(text.strip()) sent_tfidf = {} for each in tokens: sent_tfidf[each] = tfidf.get(each) C = max(sent_tfidf.values()) Z = 0.0 replace_prob = {} for each in tokens: Z += (C-sent_tfidf[each])/len(tokens) for each in tokens: temp = p*(C-sent_tfidf[each])/Z if temp < 1.0: replace_prob[each] = temp else: replace_prob[each] = 1.0 replace_prob_sorted = sorted(replace_prob.items(), key=lambda x:x[1], reverse=True) print("當前句子詞被替換的概率:{}".format(replace_prob_sorted)) replace_word_01 = replace_prob_sorted[0][0] text = text.replace(replace_word_01, extract_word) print('被替換詞:{}; 目標詞匯:{}'.format(replace_word_01, extract_word)) new_sample = json.loads(sample) new_sample['answer'] = text finnal_data.append(json.dumps(new_sample, ensure_ascii=False)+'\n') with open(ENHANCE_FILE, 'w', encoding='utf8') as fw: fw.writelines(finnal_data) print('ok') def analysis_data(self): label_count = {'Yes': 0, 'No': 0, 'Depends': 0} error_count = 0 length = len(self.true_dev) for a, p in zip(self.true_dev, self.pred_dev): x = json.loads(a) a_yna = eval(a).get('yesno_answer') p_yna = eval(p).get('yesno_answer') if a_yna != p_yna: error_count += 1 if a_yna == "Yes": label_count[a_yna] += 1 elif a_yna == 'No': label_count[a_yna] += 1 elif a_yna == "Depends": label_count[a_yna] += 1 q = eval(a).get('question') a = eval(a).get('answer') title = [each.get('title') for each in x.get('documents')] paragraphs = [each.get('paragraphs') for each in x.get('documents')] print(title) print(paragraphs) print('question:{}\nanswer:{}\ntrue label:{}\npredict label:{}\n\n'.format( q, a, a_yna, p_yna)) print('總預測樣本數:{},誤判樣本數:{},Acc:{:.4f}'.format(length, error_count, (length-error_count)/length)) print('預測錯誤標簽分布:{}'.format(label_count)) if __name__ == '__main__': data_path = os.path.join(CURRENT_FOLDER, 'data', 'train.json') de = DataEnhance(data_path) de.read_data() de.unimportant_word_enhance() # de.analysis_data()