NLP文本分類學習筆記0:數據預處理及訓練說明


本系列文章參考了github項目Chinese-Text-Classification-Pytorch

數據集及划分

使用來自github中的online_shopping_10_cats中文數據集,數據集包含10個類別,共6萬多條評論數據,正、負向評論各約3萬條。10個類別為:包括書籍、平板、手機、水果、洗發水、熱水器、蒙牛、衣服、計算機、酒店。數據集為csv文件,結構為

cat label review
10個類別之一 正負向感情,值為0,1 評論內容

10個類別的數據是不平衡的

cat 數量
平板 10000
水果 10000
洗發水 10000
衣服 10000
酒店 10000
計算機 3992
書籍 3851
手機 2323
蒙牛 2033
熱水器 575
將64%的數據作為訓練數據,20%作為測試數據,16%作為驗證數據
# 將csv數據進行分割
# 參考https://blog.csdn.net/weixin_38008864/article/details/99915084
def split_csv(infile, trainfile, valtestfile, seed=999, ratio=0.2):
    df = pd.read_csv(infile)
    idxs = np.arange(df.shape[0])
    np.random.seed(seed)
    np.random.shuffle(idxs)
    val_size = int(len(idxs) * ratio)
    df.iloc[idxs[:val_size], :].to_csv(valtestfile, index=False)
    df.iloc[idxs[val_size:], :].to_csv(trainfile, index=False)

詞嵌入向量

預訓練詞向量直接使用gensim庫和微軟中文語料msr_training訓練wordvec詞向量,詞向量維數設置為200維,訓練窗口大小設置為5

#預訓練詞向量模型,200維
dataset=word2vec.Text8Corpus(corpus)
model = word2vec.Word2Vec(sentences=dataset, vector_size=200, window=5, min_count=1, workers=4)
model.save(embedding_model_path)

通過gensim訓練的模型不便直接使用,在之后建立數據的詞典后,將其轉化為這種形式:{詞id號:詞的預訓練嵌入向量}

#得到預訓練向量
def build_embed_pretrained(vocab_path,embedding_model_path,embedding_path):
    #導入模型
    model = word2vec.Word2Vec.load(embedding_model_path)
    #導入詞典
    vocab=pickle.load(open(vocab_path,'rb'))
    embeddings = [0] *len(vocab.dict)
    #得到詞典中的每一個詞對應詞向量
    for word,id in vocab.dict.items():
        if (model.wv.__contains__(word)):
            embeddings[id]=model.wv[word]
        #模型中沒有對應的詞用0代替
        else:
            embeddings[id]=np.zeros((200,))
    #保存在.npz文件中
    np.savez_compressed(embedding_path, embeddings=embeddings)

文本處理

標簽類別處理

讀取數據中cat這一列,構建類別對應為數字id,如下所示,最后保存為json文件
{"書籍": 0, "平板": 1, "手機": 2, "水果": 3, "洗發水": 4, "熱水器": 5, "蒙牛": 6, "衣服": 7, "計算機": 8, "酒店": 9}

df = pd.read_csv(path)
# 獲得標簽與對應數字id
with open(class_path, 'w', encoding='utf-8') as f:
    class_dict = {}
    for i in df['cat'].unique():
        class_dict[i] = len(class_dict)
    json.dump(class_dict, f, ensure_ascii=False)

建立詞典

對於文本,去除停用詞並分詞后,構造一個類,對詞進行計數,在詞頻和詞數的限制下,保存為字典形式{詞:詞對應數字編號},最后將這個類保存

vocab=mydataset.Word2Num()
for i in df['review']:
    if pd.notnull(i):
        i = remove(i)
        sentence=jieba.lcut(i)
        vocab.fit(sentence)
vocab.build_vocab(min=MIN_FREQ,max_features=MAX_VOCAB_SIZE)
pickle.dump(vocab, open(vocab_path, 'wb'))

數據序列化

讀入建立好的詞典類,標簽類別文件和數據,對於每一行數據。將cat對應的類別轉化為對應的id,將數據去除停用詞並分詞后,通過詞典類轉化為數字序列,最后將處理好的數據以這種形式保存:dict = {'label':label,'text':text},label為類別,text為數據

# 處理數據,序列化
def build_dataset(path,class_path,vocab_path,save_path):
    w2=pickle.load(open(vocab_path, 'rb'))
    label_dict=json.load(open(class_path,'r',encoding='utf-8'))
    df = pd.read_csv(path)
    label=[]
    text=[]
    for index,i in df.iterrows():
        if pd.notnull(i['review']):
            label.append(label_dict[i['cat']])
            sentence=jieba.lcut(remove(i['review']))
            text.append(w2.transform(sentence,max_len=32))
    # 字典
    dict = {'label':label,'text':text}
    df = pd.DataFrame(dict)
    # 保存 dataframe
    df.to_pickle(save_path)

構建dataloader

為了便於之后batch操作,定義GetLoader類,繼承torch中Dataset方法,並重寫__getitem__()和__len__()方法

項目目錄結構說明

統一在preTreatment.py中對數據集處理后,
再到run.py中調用相應模型與數據進行訓練

訓練

數據處理好后,只要在run.py中導入對應的模型結構和參數即可,模型在本系列的其它文章介紹

完整代碼

mydataset.py

import torch

# 定義GetLoader類,繼承Dataset方法,並重寫__getitem__()和__len__()方法
class GetLoader(torch.utils.data.Dataset):
    # 初始化函數,得到數據
    def __init__(self, data_root, data_label):
        self.data = data_root
        self.label = data_label
    # index是根據batchsize划分數據后得到的索引,最后將data和對應的labels進行一起返回
    def __getitem__(self, index):
        data = self.data[index]
        labels = self.label[index]
        return data, labels
    # 該函數返回數據大小長度,目的是DataLoader方便划分,如果不知道大小,DataLoader會一臉懵逼
    def __len__(self):
        return len(self.data)

# 文本序列化
class Word2Num():
    UNK_TAG="UNK"
    PAD_TAG="PAD"
    UNK=0
    PAD=1
    def __init__(self):
        self.dict={
            self.UNK_TAG:self.UNK,
            self.PAD_TAG:self.PAD
        }
        self.count={}

    #單個句子保存在詞典中
    def fit(self,sentence):
        for word in sentence:
            self.count[word]=self.count.get(word,0)+1

    def build_vocab(self,min=5,max=None,max_features=None):
        #限制詞頻的大小
        if min is not None:
            self.count={word:value for word,value in self.count.items() if value>min}
        if max is not None:
            self.count={word:value for word,value in self.count.items() if value<max}

        #限制保留的詞數
        if max_features is not None:
            temp=sorted(self.count.items(),key=lambda x:x[-1],reverse=True)[:max_features]
            self.count=dict(temp)
        #得到詞典
        for word in self.count:
            self.dict[word]=len(self.dict)
        #得到翻轉詞典
        self.inverse_dict=dict(zip(self.dict.values(),self.dict.keys()))

    # 由詞得到其編號
    def transform(self,sentence,max_len=None):
        #對句子填充或裁剪
        if max_len is not None:
            if max_len>len(sentence):
                sentence=sentence+[self.PAD_TAG]*(max_len-len(sentence))
            if max_len<len(sentence):
                sentence=sentence[:max_len]
        return [self.dict.get(word,self.UNK) for word in sentence]

    # 由編號得到詞
    def inverse_transform(self,indices):
        return [self.inverse_dict.get(idx) for idx in indices]

preTreatment.py

import json
import pickle
import jieba
import pandas as pd
import numpy as np
from gensim.models import word2vec
import re
import mydataset

# 將csv數據進行分割
# https://blog.csdn.net/weixin_38008864/article/details/99915084
def split_csv(infile, trainfile, valtestfile, seed=999, ratio=0.2):
    df = pd.read_csv(infile)
    idxs = np.arange(df.shape[0])
    np.random.seed(seed)
    np.random.shuffle(idxs)
    val_size = int(len(idxs) * ratio)
    df.iloc[idxs[:val_size], :].to_csv(valtestfile, index=False)
    df.iloc[idxs[val_size:], :].to_csv(trainfile, index=False)

# 去除停用詞
def remove(text):
    remove_chars = '[0-9a-zA-Z’!"#$%&\'()*+,-./:;<=>?@,,。。 ?★、…【】《》?“”,‘’。’![\\]^_`{|}~]+'
    return re.sub(remove_chars, '', text).strip()

# 獲得詞典,獲得處理后的數據,將數據標簽都轉化為數字形式
def build_vocab_label(MAX_VOCAB_SIZE,MIN_FREQ,path,class_path,vocab_path):
    df = pd.read_csv(path)
    # 獲得標簽與對應數字id
    with open(class_path, 'w', encoding='utf-8') as f:
        class_dict = {}
        for i in df['cat'].unique():
            class_dict[i] = len(class_dict)
        json.dump(class_dict, f, ensure_ascii=False)
    # 建立詞表
    vocab=mydataset.Word2Num()
    for i in df['review']:
        if pd.notnull(i):
            i = remove(i)
            sentence=jieba.lcut(i)
            vocab.fit(sentence)
    vocab.build_vocab(min=MIN_FREQ,max_features=MAX_VOCAB_SIZE)
    pickle.dump(vocab, open(vocab_path, 'wb'))

# 處理數據,序列化
def build_dataset(path,class_path,vocab_path,save_path):
    w2=pickle.load(open(vocab_path, 'rb'))
    label_dict=json.load(open(class_path,'r',encoding='utf-8'))
    df = pd.read_csv(path)
    label=[]
    text=[]
    for index,i in df.iterrows():
        if pd.notnull(i['review']):
            label.append(label_dict[i['cat']])
            sentence=jieba.lcut(remove(i['review']))
            text.append(w2.transform(sentence,max_len=32))
    # 字典
    dict = {'label':label,'text':text}
    df = pd.DataFrame(dict)
    # 保存 dataframe
    df.to_pickle(save_path)


#得到預訓練向量
def build_embed_pretrained(vocab_path,embedding_model_path,embedding_path):
    #導入模型
    model = word2vec.Word2Vec.load(embedding_model_path)
    #導入詞典
    vocab=pickle.load(open(vocab_path,'rb'))
    embeddings = [0] *len(vocab.dict)
    #得到詞典中的每一個詞對應詞向量
    for word,id in vocab.dict.items():
        if (model.wv.__contains__(word)):
            embeddings[id]=model.wv[word]
        #模型中沒有對應的詞用0代替
        else:
            embeddings[id]=np.zeros((200,))
    #保存在.npz文件中
    np.savez_compressed(embedding_path, embeddings=embeddings)

if __name__ == '__main__':
    # 詞表長度
    MAX_VOCAB_SIZE = 100000
    # 詞頻限制
    MIN_FREQ = 1
    corpus='C:/Users/DELL/Downloads/icwb2-data/icwb2-data/training/msr_training.utf8'
    embedding_path = 'data/embedding'
    embedding_model_path = "mymodel/word2vec.model"

    dataset_path = r'C:\Users\DELL\Desktop\mydata\online_shopping_10_cats.csv'
    class_path='data/class.json'
    vocab_path='data/vocab.pkl'
    trainfiles='data/train.csv'
    testfile='data/test.csv'
    trainfile='data/dataset_train.csv'
    vaildfile='data/dataset_valid.csv'
    train_file='data/train.df'
    test_file='data/test.df'
    vaild_file='data/valid.df'

    #預訓練詞向量模型,200維,已經有模型后,可以不用再運行
    dataset=word2vec.Text8Corpus(corpus)
    model = word2vec.Word2Vec(sentences=dataset, vector_size=200, window=5, min_count=1, workers=4)
    model.save(embedding_model_path)

    build_vocab_label(MAX_VOCAB_SIZE,MIN_FREQ,dataset_path,class_path,vocab_path)

    #先將sms_spam.csv數據分為train.csv和test.csv,已經將數據集分割后可以不再運行
    split_csv(infile=dataset_path,trainfile=trainfiles,valtestfile=testfile,seed=999,ratio=0.2)
    #再將train.csv分為dataset_train.csv和dataset_valid.csv
    split_csv(infile=trainfiles,trainfile=trainfile,valtestfile=vaildfile,seed=999,ratio=0.2)

    build_dataset(trainfile,class_path,vocab_path,train_file)
    build_dataset(vaildfile, class_path, vocab_path, vaild_file)
    build_dataset(testfile, class_path, vocab_path, test_file)

    build_embed_pretrained(vocab_path,embedding_model_path,embedding_path)

run.py

from mymodel import myMLP,myCNN,myRNN
import mydataset
import numpy as np
import torch
from torch import nn,optim
from torch.utils.data import DataLoader

#導入對應的參數,embedding_pre為True,則使用預訓練詞向量
#config=myMLP.Config(embedding_pre=True)
config=myCNN.Config(embedding_pre=True)
# config=myRNN.Config(embedding_pre=True)

#構造dataloader
def collate_fn(batch):
    text,label=list(zip(*batch))
    text=torch.LongTensor(text)
    label = torch.LongTensor(label)
    return text,label

# 加載訓練,驗證,測試數據集
vectorized_data=np.load(config.train_path,allow_pickle=True)
train_ds=mydataset.GetLoader(vectorized_data['text'],vectorized_data['label'])
train_dl=DataLoader(train_ds,batch_size=config.batch_size,shuffle=True,collate_fn=collate_fn)
vectorized_data=np.load(config.dev_path,allow_pickle=True)
valid_ds=mydataset.GetLoader(vectorized_data['text'],vectorized_data['label'])
valid_dl=DataLoader(valid_ds,batch_size=config.batch_size,shuffle=True,collate_fn=collate_fn)
vectorized_data=np.load(config.test_path,allow_pickle=True)
test_ds=mydataset.GetLoader(vectorized_data['text'],vectorized_data['label'])
test_dl=DataLoader(test_ds,batch_size=config.batch_size,shuffle=True,collate_fn=collate_fn)

#計算准確率
def accuracys(pre,label):
    pre=torch.max(pre.data,1)[1]
    accuracy=pre.eq(label.data.view_as(pre)).sum()
    return accuracy,len(label)

#導入網絡結構
#model=myMLP.MLP(config).to(config.device)
model=myCNN.Model(config).to(config.device)
# model=myRNN.Model(config).to(config.device)

#訓練
criterion=nn.CrossEntropyLoss()
optimizer=optim.Adam(model.parameters(),lr=config.learning_rate)
best_loss=float('inf')
for epoch in range(config.epochs):
    train_acc = []
    for batch_idx,(data,target)in enumerate(train_dl):
        model.train()
        out=model(data)
        loss=criterion(out,target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_acc.append(accuracys(out,target))
        train_r = (sum(tup[0] for tup in train_acc), sum(tup[1] for tup in train_acc))
        print('當前epoch:{}\t[{}/{}]{:.0f}%\t損失:{:.6f}\t訓練集准確率:{:.2f}%\t'.format(
            epoch, batch_idx, len(train_dl), 100. * batch_idx / len(train_dl), loss.data,
                   100. * train_r[0].numpy() / train_r[1]
        ))
        #每100批次進行一次驗證
        if batch_idx%100==0 and batch_idx!=0:
            model.eval()
            val_acc=[]
            loss_total=0
            with torch.no_grad():
                for (data,target) in valid_dl:
                    out=model(data)
                    loss_total = criterion(out, target).data+loss_total
                    val_acc.append(accuracys(out,target))
            val_r = (sum(tup[0] for tup in val_acc), sum(tup[1] for tup in val_acc))
            print('損失:{:.6f}\t驗證集准確率:{:.2f}%\t'.format(loss_total/len(valid_dl),100. * val_r[0].numpy() / val_r[1]))
            #如果驗證損失低於最好損失,則保存模型
            if loss_total < best_loss:
                best_loss = loss_total
                torch.save(model.state_dict(), config.save_path)


#測試
model.load_state_dict(torch.load(config.save_path))
model.eval()
test_acc=[]
with torch.no_grad():
    for (data, target) in test_dl:
        out = model(data)
        test_acc.append(accuracys(out, target))
test_r = (sum(tup[0] for tup in test_acc), sum(tup[1] for tup in test_acc))
print('測試集准確率:{:.2f}%\t'.format(100. * test_r[0].numpy() / test_r[1]))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM