一種數據增強方法-非核心詞替換


對於要替換的詞是隨機選擇的,因此一種直觀感受是,如果一些重要詞被替換了,那么增強后文本的質量會大打折扣。這一部分介紹的方法,則是為了盡量避免這一問題,所實現的詞替換技術,姑且稱之為「基於非核心詞替換的數據增強技術」。

我們最早是在 google 提出 UDA 算法的那篇論文中發現的這一技術 [6],是否在更早的文獻中出現過,我們沒有再深究了,有了解的同學請留言告知。

整個技術的核心點也比較簡單,用詞典中不重要的詞去替換文本中一定比例的不重要詞,從而產生新的文本。

我們知道在信息檢索中,一般會用 TF-IDF 值來衡量一個詞對於一段文本的重要性,下面簡單介紹一下 TF-IDF 的定義:

TF(詞頻)即一個詞在文中出現的次數,統計出來就是詞頻 TF,顯而易見,一個詞在文章中出現很多次,那么這個詞可能有着很大的作用,但如果這個詞又經常出現在其他文檔中,如「的」、「我」,那么其重要性就要大打折扣,后者就是用 IDF 來表征。

IDF(逆文檔頻率),一個重要性調整系數,衡量一個詞是不是常見詞。如果某個詞比較少見,但是它在這篇文章中多次出現,那么它很可能就反映了這篇文章的特性,正是我們所需要的關鍵詞。

TF-IDF = TF×IDF,通過此公式可以有效衡量一個詞對於一段文本的重要性。當我們知道一個詞對於一個文本的重要性之后,再采用與 TF-IDF 負相關的概率去采樣文中的詞,用來決定是否要替換,這樣可以有效避免將文本中的一些關鍵詞進行錯誤替換或刪除。

UDA 論文中所提出的具體實現方式如下:

 

實現一:該方法沒有使用字典詞匯中被抽取的概率, 而是在待分析文本中選出待替換數據后,通過預訓練詞向量查找top1相似詞匯(sgns.merge.word這個詞向量還是質量蠻高的,感謝小萌同學),進行替換得到增強樣本。代碼如下:

"""
    Function: data enhance
    Author: dyx
    DateTime: 2020.7.16
"""
import os
import jieba
import jieba.analyse
from gensim import corpora
from gensim import models
import pickle
import json
from gensim.models import KeyedVectors

STOPWORDS = open('../resources/stopwords.txt', 'r', encoding='utf8').readlines()
STOPWORDS = {each.strip():1 for each in STOPWORDS}
CURRENT_FOLDER = os.path.dirname(os.path.abspath(__file__))
WORD2VEC_FILE = r'F:\詞向量\sgns.merge.word\sgns.merge.word'
ENHANCE_FILE = os.path.join(CURRENT_FOLDER, 'data', 'enhance_train_v1.json')
WORD2VEC = models.KeyedVectors.load_word2vec_format(WORD2VEC_FILE, binary=False)
ANALYSIS_DATA = os.path.join(CURRENT_FOLDER, 'data', 'analysis_data.txt')
PERD_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/predict_dev')
TRUE_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/dev.json')
# wv_from_text.init_sims(replace=True)

class DataEnhance():

    def __init__(self, data_path):
        self.data_path = data_path
        # with open(self.data_path, 'r', encoding='utf8') as fr:
        #     self.content = fr.readlines()
        with open(PERD_DEV, 'r', encoding='utf8') as fr:
            self.pred_dev = fr.readlines()
        with open(TRUE_DEV, 'r', encoding='utf8') as fr:
            self.true_dev = fr.readlines()



    def read_data(self):
            label_count = {'Yes':0, 'No':0, 'Depends':0}
            word_list = []
            analysis_data = []

            for each in self.content:
                yesno_answer = eval(each).get('yesno_answer')
                answer = eval(each).get('answer')
                question = eval(each).get('question')
                tokens = list(jieba.cut(answer))
                # 針對answer進行增強
                word_list.append(tokens)
                # tokens = [each for each in tokens if each not in STOPWORDS]

                # keywords_tfidf = jieba.analyse.extract_tags(answer, topK=30, withWeight=True)
                # jieba.analyse.set_stop_words(STOPWORDS)
                # keywords_tfidf_sotpwords = jieba.analyse.extract_tags(answer, topK=30, withWeight=True)
                # keywords_textrank = jieba.analyse.textrank(answer, topK=30, withWeight=True)
                # print('question:{}; yesno_answer:{}\nanswer:{}\nsegment:{}\nkeywords_tfidf:{}\n'.format(question, yesno_answer, answer, tokens, keywords_tfidf))
                if yesno_answer == "Yes":
                    label_count[yesno_answer] += 1
                elif yesno_answer == 'No':
                    label_count[yesno_answer] += 1
                elif yesno_answer == "Depends":
                    label_count[yesno_answer] += 1
                    to_save = 'label:{}\tanswer:{}\n'.format(yesno_answer, answer)
                    analysis_data.append(to_save)
                    print(to_save)

            with open(ANALYSIS_DATA, 'w', encoding='utf8') as fw:
                fw.writelines(analysis_data)
            dictionary = corpora.Dictionary(word_list)
            new_corpus = [dictionary.doc2bow(w) for w in word_list]
            tfidf = models.TfidfModel(new_corpus)
            tfidf.save(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model'))
            str_id = dictionary.token2id
            with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'wb') as fw:
                pickle.dump(str_id, fw)
            print(label_count)
            print('tfidf:{}'.format(tfidf))


    def unimportant_word_enhance(self):

        with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'rb') as fr:
            str_id = pickle.load(fr)
        tfidf_temp = models.TfidfModel.load(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model'))

        # 構造文本-tfidf值
        tfidf = {}
        for id  in tfidf_temp.dfs:
            word = list(str_id.keys())[list(str_id.values()).index(id)]
            tfidf[word] = tfidf_temp.dfs[id] * tfidf_temp.idfs[id]
        # 最大tfidf值
        max_tfidf = max(tfidf.values())
        for key, value in tfidf.items():
            if (value == max_tfidf):
                print('{} 詞匯最大tfidf值:{}'.format(key, value))
        # word was extract prob
        extract_prob = {}
        z_sigma = 0
        for key in tfidf:
            z_sigma += max_tfidf-tfidf[key]

        for key, value in tfidf.items():
            extract_prob[key] = (max_tfidf-value)/z_sigma
        print("每個詞被抽取的概率:{}".format(extract_prob))


        finnal_data = []
        p = 0.3
        for sample in self.content:
            text = eval(sample).get('answer')
            # replace prob
            # text = r'是的。凡是能夠被2整除的整數都是偶數,例如2、4、-6等。偶數當中也有正偶數、0、負偶數。'
            tokens = jieba.lcut(text.strip())
            sent_tfidf = {}

            for each in tokens:
                sent_tfidf[each] = tfidf.get(each)

            C = max(sent_tfidf.values())
            Z = 0.0
            replace_prob = {}
            for each in tokens:
                Z += (C-sent_tfidf[each])/len(tokens)
            for each in tokens:
                temp = p*(C-sent_tfidf[each])/Z
                if temp < 1.0:
                    replace_prob[each] = temp
                else:
                    replace_prob[each] = 1.0
            replace_prob_sorted = sorted(replace_prob.items(), key=lambda x:x[1], reverse=True)
            print("當前句子詞被替換的概率:{}".format(replace_prob_sorted))
            replace_word_01 = replace_prob_sorted[0][0]
            replace_word_02 = replace_prob_sorted[0][1]

            # similar_word
            for i in [replace_word_01, replace_word_02]:
                if i in WORD2VEC.vocab:
                    target_word = WORD2VEC.similar_by_word(i, 1)
                    target_word = target_word[0][0]
                    print('原始詞匯:{}\t被替換詞匯:{}'.format(i, target_word))
                    text = text.replace(i, target_word)
            new_sample = json.loads(sample)
            new_sample['answer'] = text
            finnal_data.append(json.dumps(new_sample, ensure_ascii=False)+'\n')
        with open(ENHANCE_FILE, 'w', encoding='utf8') as fw:
            fw.writelines(finnal_data)

        print('ok')


    def analysis_data(self):
        label_count = {'Yes': 0, 'No': 0, 'Depends': 0}
        error_count = 0
        length = len(self.true_dev)

        for a, p in zip(self.true_dev, self.pred_dev):
            x = json.loads(a)
            a_yna = eval(a).get('yesno_answer')
            p_yna = eval(p).get('yesno_answer')
            if a_yna != p_yna:
                error_count += 1

                if a_yna == "Yes":
                    label_count[a_yna] += 1
                elif a_yna == 'No':
                    label_count[a_yna] += 1
                elif a_yna == "Depends":
                    label_count[a_yna] += 1

                q = eval(a).get('question')
                a = eval(a).get('answer')

                title = [each.get('title') for each in x.get('documents')]
                paragraphs = [each.get('paragraphs') for each in x.get('documents')]
                print(title)
                print(paragraphs)
                print('question:{}\nanswer:{}\ntrue label:{}\npredict label:{}\n\n'.format( q, a, a_yna, p_yna))

        print('總預測樣本數:{},誤判樣本數:{},Acc:{:.4f}'.format(length, error_count, (length-error_count)/length))
        print('預測錯誤標簽分布:{}'.format(label_count))


if __name__ == '__main__':
    data_path = os.path.join(CURRENT_FOLDER, 'data', 'train.json')
    de = DataEnhance(data_path)
    # de.read_data()
    # de.unimportant_word_enhance()
    de.analysis_data()

 

實現二:復現原文中的詞匯抽取方法。優點是這些詞匯依舊是原始語料中的詞匯,能夠避免一定的語義漂移現象。初步代碼如下:

"""
    Function: data enhance
    Author: dyx
    DateTime: 2020.7.16
"""
import os
import jieba
import jieba.analyse
from gensim import corpora
from gensim import models
import pickle
import json
import numpy as np
from gensim.models import KeyedVectors

STOPWORDS = open('../resources/stopwords.txt', 'r', encoding='utf8').readlines()
STOPWORDS = {each.strip():1 for each in STOPWORDS}
CURRENT_FOLDER = os.path.dirname(os.path.abspath(__file__))
# WORD2VEC_FILE = r'F:\詞向量\sgns.merge.word\sgns.merge.word'
ENHANCE_FILE = os.path.join(CURRENT_FOLDER, 'data', 'enhance_train_v1.json')
# WORD2VEC = models.KeyedVectors.load_word2vec_format(WORD2VEC_FILE, binary=False)
ANALYSIS_DATA = os.path.join(CURRENT_FOLDER, 'data', 'analysis_data.txt')
PERD_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/predict_dev')
TRUE_DEV = os.path.join(CURRENT_FOLDER, 'data/error_data/dev.json')
# wv_from_text.init_sims(replace=True)

class DataEnhance():

    def __init__(self, data_path):
        self.data_path = data_path
        with open(self.data_path, 'r', encoding='utf8') as fr:
            self.content = fr.readlines()
        with open(PERD_DEV, 'r', encoding='utf8') as fr:
            self.pred_dev = fr.readlines()
        with open(TRUE_DEV, 'r', encoding='utf8') as fr:
            self.true_dev = fr.readlines()



    def read_data(self):
            label_count = {'Yes':0, 'No':0, 'Depends':0}
            word_list = []
            analysis_data = []

            for each in self.content:
                yesno_answer = eval(each).get('yesno_answer')
                answer = eval(each).get('answer')
                question = eval(each).get('question')
                tokens = list(jieba.cut(answer))
                # 針對answer進行增強
                word_list.append(tokens)
                # tokens = [each for each in tokens if each not in STOPWORDS]

                # keywords_tfidf = jieba.analyse.extract_tags(answer, topK=30, withWeight=True)
                # jieba.analyse.set_stop_words(STOPWORDS)
                # keywords_tfidf_sotpwords = jieba.analyse.extract_tags(answer, topK=30, withWeight=True)
                # keywords_textrank = jieba.analyse.textrank(answer, topK=30, withWeight=True)
                # print('question:{}; yesno_answer:{}\nanswer:{}\nsegment:{}\nkeywords_tfidf:{}\n'.format(question, yesno_answer, answer, tokens, keywords_tfidf))
                if yesno_answer == "Yes":
                    label_count[yesno_answer] += 1
                elif yesno_answer == 'No':
                    label_count[yesno_answer] += 1
                elif yesno_answer == "Depends":
                    label_count[yesno_answer] += 1
                    to_save = 'label:{}\tanswer:{}\n'.format(yesno_answer, answer)
                    analysis_data.append(to_save)
                    print(to_save)

            with open(ANALYSIS_DATA, 'w', encoding='utf8') as fw:
                fw.writelines(analysis_data)
            dictionary = corpora.Dictionary(word_list)
            new_corpus = [dictionary.doc2bow(w) for w in word_list]
            tfidf = models.TfidfModel(new_corpus)
            tfidf.save(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model'))
            str_id = dictionary.token2id
            with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'wb') as fw:
                pickle.dump(str_id, fw)
            print(label_count)
            print('tfidf:{}'.format(tfidf))


    def unimportant_word_enhance(self):

        with open(os.path.join(CURRENT_FOLDER, 'model', 'str_id.pkl'), 'rb') as fr:
            str_id = pickle.load(fr)
        tfidf_temp = models.TfidfModel.load(os.path.join(CURRENT_FOLDER, 'model', 'tfidf.model'))

        # 構造文本-tfidf值
        tfidf = {}
        for id  in tfidf_temp.dfs:
            word = list(str_id.keys())[list(str_id.values()).index(id)]
            tfidf[word] = tfidf_temp.dfs[id] * tfidf_temp.idfs[id]
        # 最大tfidf值
        max_tfidf = max(tfidf.values())
        for key, value in tfidf.items():
            if (value == max_tfidf):
                print('{} 詞匯最大tfidf值:{}'.format(key, value))
        # word was extract prob
        extract_prob = {}
        z_sigma = 0
        for key in tfidf:
            z_sigma += max_tfidf-tfidf[key]

        original_word = []
        word_prob = []

        for key, value in tfidf.items():
            prob = (max_tfidf-value)/z_sigma
            extract_prob[key] = prob
            original_word.append(key)
            word_prob.append(prob)
        print("每個詞被抽取的概率:{}".format(extract_prob))


        finnal_data = []
        p = 0.3
        for sample in self.content:
            text = eval(sample).get('answer')
            extract_word = original_word[np.random.choice(len(original_word), 1, p=word_prob)[0]]

            # replace prob
            # text = r'是的。凡是能夠被2整除的整數都是偶數,例如2、4、-6等。偶數當中也有正偶數、0、負偶數。'
            tokens = jieba.lcut(text.strip())
            sent_tfidf = {}

            for each in tokens:
                sent_tfidf[each] = tfidf.get(each)

            C = max(sent_tfidf.values())
            Z = 0.0
            replace_prob = {}
            for each in tokens:
                Z += (C-sent_tfidf[each])/len(tokens)
            for each in tokens:
                temp = p*(C-sent_tfidf[each])/Z
                if temp < 1.0:
                    replace_prob[each] = temp
                else:
                    replace_prob[each] = 1.0
            replace_prob_sorted = sorted(replace_prob.items(), key=lambda x:x[1], reverse=True)
            print("當前句子詞被替換的概率:{}".format(replace_prob_sorted))
            replace_word_01 = replace_prob_sorted[0][0]
            text = text.replace(replace_word_01, extract_word)
            print('被替換詞:{}; 目標詞匯:{}'.format(replace_word_01, extract_word))
            new_sample = json.loads(sample)
            new_sample['answer'] = text
            finnal_data.append(json.dumps(new_sample, ensure_ascii=False)+'\n')


        with open(ENHANCE_FILE, 'w', encoding='utf8') as fw:
            fw.writelines(finnal_data)

        print('ok')


    def analysis_data(self):
        label_count = {'Yes': 0, 'No': 0, 'Depends': 0}
        error_count = 0
        length = len(self.true_dev)

        for a, p in zip(self.true_dev, self.pred_dev):
            x = json.loads(a)
            a_yna = eval(a).get('yesno_answer')
            p_yna = eval(p).get('yesno_answer')
            if a_yna != p_yna:
                error_count += 1

                if a_yna == "Yes":
                    label_count[a_yna] += 1
                elif a_yna == 'No':
                    label_count[a_yna] += 1
                elif a_yna == "Depends":
                    label_count[a_yna] += 1

                q = eval(a).get('question')
                a = eval(a).get('answer')

                title = [each.get('title') for each in x.get('documents')]
                paragraphs = [each.get('paragraphs') for each in x.get('documents')]
                print(title)
                print(paragraphs)
                print('question:{}\nanswer:{}\ntrue label:{}\npredict label:{}\n\n'.format( q, a, a_yna, p_yna))

        print('總預測樣本數:{},誤判樣本數:{},Acc:{:.4f}'.format(length, error_count, (length-error_count)/length))
        print('預測錯誤標簽分布:{}'.format(label_count))


if __name__ == '__main__':
    data_path = os.path.join(CURRENT_FOLDER, 'data', 'train.json')
    de = DataEnhance(data_path)
    de.read_data()
    de.unimportant_word_enhance()
    # de.analysis_data()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM