Pytorch使用torchtext構建數據集進行文本分類


torchtext包含以下組件:

  • Field :主要包含以下數據預處理的配置信息,比如指定分詞方法,是否轉成小寫,起始字符,結束字符,補全字符以及詞典等等
  • Dataset :繼承自pytorch的Dataset,用於加載數據,提供了TabularDataset可以指點路徑,格式,Field信息就可以方便的完成數據加載。同時torchtext還提供預先構建的常用數據集的Dataset對象,可以直接加載使用,splits方法可以同時加載訓練集,驗證集和測試集。
  • Iterator : 主要是數據輸出的模型的迭代器,可以支持batch定制

1. 定義Field

規定如何處理數據

  1. 主要的參數:
    • sequential: 是否把數據表示成序列,如果是False, 不能使用分詞 默認值: True
    • use_vocab: 是否使用詞典對象. 如果是False 數據的類型必須已經是數值類型. 默認值: True
    • init_token: 每一條數據的起始字符 默認值: None
    • eos_token: 每條數據的結尾字符 默認值: None
    • fix_length: 修改每條數據的長度為該值,不夠的用pad_token補全. 默認值: None
    • tensor_type: 把數據轉換成的tensor類型 默認值: torch.LongTensor
    • lower: 是否把數據轉化為小寫 默認值: False.
    • tokenize: 分詞函數. 默認值: str.split.
    • pad_token: 用於補全的字符. 默認值: " "
    • unk_token: 不存在詞典里的字符. 默認值: " "
  2. 主要的方法
    • build_vocab(): 建立詞典
    • numericalize(): 把文本數據數值化,返回tensor
  3. 簡單的例子
TEXT = data.Field(sequential=True, tokenize=tokenizer, fix_length=100)
LABEL = data.Field(sequential=False, use_vocab=False)

tokenizer是自定義的分詞方法,也可以加載已有的分詞方法,在此使用spcay。在此假設spcay已經安裝完成。
三種nlp分詞模型spacy en_core_web_sm/md/lg代表(small,medium,large)。
spacy en_core_web_lg(780M),spacy en_core_web_sm(10M),md介於兩者之間,開發中一般使用sm即可。
使用spcay定義的tokenizer如下:

import spacy
spacy_en = spacy.load('en_core_web_sm')
def tokenizer(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

2. 讀取數據集Dataset

torchtext作為一個第三方庫,其Dataset也是繼承自pytorch提供的Dataset。
TabularDataset 可以很方便地讀取數據集,支持CSV、TSV、JSON。splits方法可以同時讀取訓練集、驗證集、測試集。

  • 加載數據集,我的數據集第一列為label(數值類型),第二列尾content(str類型),csv類型,且表頭為label和content,共分20個類別。因此在加載數據集時,要令skip_header=True,跳過表頭。

傳入的fields必須與列的順序相同。對於不使用的列,在fields的位置傳入一個None。

from torchtext import data

train_val_field = [('label', LABEL), ('content', TEXT)]
train_data, valid_data = data.TabularDataset.splits(
    path='./data/',
    traiin='train.csv',
    validation='valid.csv',
    format='csv',
    fields=train_val_field,
    skip_header=True
)

# 因為測試集不要label,所以在field中令label列傳入None
test_field = [('label', None), ('content', TEXT)]
test_path = './data/test.csv'
test_data = data.TabularDataset(
    path=test_path,
    format='csv',
    fields=test_field,
    skip_header=True
)
TEXT.build_vocab(train_data, min_freq=3, vectors='glove.6B.50d')

3. 生成迭代器Iterator

在torchvision和PyTorch中,數據的處理和批處理由DataLoader處理。 torchtext相同的東西命名成了Iterator。基本功能是一樣的,但Iterators具有一些NLP特有的便捷功能。

  • 關鍵參數如下:
    • dataset:加載的數據集
    • sort:是對全體數據按照升序順序進行排序,而sort_within_batch僅僅對一個batch內部的數據進行排序。
    • shuffle:是否打亂數據
    • train:是否是一個訓練集
    • batch_size:batch的大小
    • sort_within_batch:參數設置為True時,按照sort_key按降序對每個小批次內的數據進行降序排序。
    • repeat:是否在不同的epochs中重復迭代 ,默認是False
    • device:生成batch的設備

創建迭代器的例子

BucketIterator是torchtext最強大的功能之一。BucketIterator和Iterator的區別是,BucketIterator盡可能的把長度相似的句子放在一個batch里面。而且BucketIterator會自動將輸入序列進行shuffle並做bucket,不過,需要告訴BucketIterator想在哪個數據屬性上做bucket。

在本文中希望根據comment_text字段的長度進行bucket處理,因此我們將其作為關鍵字參數傳入sort_key = lambda x: len(x.content)

  1. 驗證集和訓練集合使用BucketIterator.splits(),目的是自動進行shuffle和padding,並且為了訓練效率期間,盡量把句子長度相似的shuffle在一起。
  2. 測試集用Iterator,因為不用sort。
from torchtext.data import Iterator, BucketIterator

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_batch_size = 64
val_batch_size = 64
test_batch_size = 64

# 同時對訓練集和驗證集進行迭代器構建
train_iter, val_iter = BucketIterator.splits(
    (train_data, valid_data),
    batch_sizes=(train_batch_size, val_batch_size),
    device=device,
    sort_key=lambda x: len(x.content),
    sort_within_batch=False,
    repeat=False
)

# 對測試集進行迭代器構建
test_iter = Iterator(
    test_data,
    batch_size=test_batch_size,
    device=device, 
    sort=False,
    sort_within_batch=False,
    repeat=False
)
"""
因為sort和shuffle默認只對train=True字段進行的,train字段默認是True。所以測試集合可以令train=False的方式構建
"""
# 測試集也可以這么構建
test_iter = Iterator(
    test_data,
    batch_size=test_batch_size,
    device=device,
    train=False
)

4. 搭建TextCNN

經過上述這三步,數據集就加載好了,下面搭建一個TextCNN測試。

import torch
import torch.nn as nn
import torch.nn.functional as F


class TextCNN(nn.Module):
    def __init__(self, emb_dim, kernel_sizes, kernel_num):
        super(TextCNN, self).__init__()

	self.embedding = nn.Embedding(len(TEXT.vocab), emb_dim)
	self.embedding_dropout = nn.Dropout(0.5)
	# 使用nn.ModuleList來裝三個nn.Sequential構成的卷積塊
	self.convs = nn.ModuleList([
	# 使用nn.Sequential構成卷積塊,每個卷積塊裝有一層卷積和LeakyReLU激活函數
	    nn.Sequential(
		nn.Conv1d(in_channels=emb_dim, out_channels=kernel_num, kernel_size=size),
		nn.LeakyReLU(),
		)
		for size in kernel_sizes
        ])
	in_features = kernel_num * len(kernel_sizes)
	self.linear1 = nn.Linear(in_features=in_features, out_features=in_features//2)
	self.fc_dropout = nn.Dropout(0.5)
	self.linear2 = nn.Linear(in_features=in_features//2, out_features=20)

    def forward(self,x):
	# 初始輸入格式為(length, batch_size)
	out = self.embedding(x)
	out = self.embedding_dropout(out)
	# (length, batch_size, emb) -> (batch_size, emb, length)
	out = torch.transpose(out, 1, 2)
	out = torch.transpose(out, 0, 2)
	out = [conv(out) for conv in self.convs]
	# stride為步幅
	out = [F.max_pool1d(one, kernel_size=one.size(2), stride=2) for one in out]
	# 拼接維度1,並去掉維度2
        out = torch.cat(out, dim=1).squeeze(2)
	out = self.linear1(F.leaky_relu(out))
	out = self.fc_dropout(out)
	out = self.linear2(F.leaky_relu(out))
	return out

# 4種卷積核,每種60個,拼接后就有240種特征
model = TextCNN(50, [3, 4, 5, 6], 60).to(device)

5. 開始訓練

  • 定義超參數
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter


learning_rate = 1e-3
# 定義優化器和損失函數
# Adam是有自適應學習率的優化器,利用梯度的一階矩估計和二階矩估計動態調整學習率
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 因為是多分類問題,所以使用交叉熵損失函數,pytorch的交叉熵損失函數是會做softmax的,所以在模型中沒有添加softmax層
criterion = F.cross_entropy
# 設置學習率下降策略
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

# 定義tensorboard日志的輸出目錄
writer = SummaryWriter("runs/cnn")
  • 訓練
import numpy as np

# -----------------------------------模型訓練--------------------------------------
epochs = 100
best_valid_acc = float('-inf')
model_save_path = './model_paras/func/cnn_model_paras.pkl'

for epoch in range(epochs):
    loss_one_epoch = 0.0
    correct_num = 0.0
    total_num = 0.0
    scheduler.step()

    for i, batch in enumerate(train_iter):
        model.train()
        label, content = batch.label, batch.content
        # 進行forward()、backward()、更新權重
        optimizer.zero_grad()
        pred = model(content)
        loss = criterion(pred, label)
        loss.backward()
        optimizer.step()

        # 統計預測信息
        total_num += label.size(0)
        # 預測有多少個標簽是預測中的,並加總
        correct_num += (torch.argmax(pred, dim=1) == label).sum().float().item()
        loss_one_epoch += loss.item()

        # 每10個iteration 打印一次訓練信息,loss為10個iteration的平均
        if i % 10 == 9:
            loss_avg = loss_one_epoch / 10
            loss_one_epoch = 0.0
            print("Training: Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(epoch + 1, epochs, i + 1, len(train_iter), loss_avg, correct_num / total_num))

            # 記錄訓練loss
            writer.add_scalars('Loss', {'train_loss': loss_avg}, epoch)
            # 記錄learning rate
            writer.add_scalar('learning rate', scheduler.get_lr()[0], epoch)
            # 記錄Accuracy
            writer.add_scalars('Accuracy', {'train_acc': correct_num / total_num}, epoch)

    # ---------------------------------------驗證------------------------------
    if epoch % 2 == 0:
        loss_one_epoch = 0.0
        classes_num = 20
        conf_mat = np.zeros([classes_num, classes_num])
        model.eval()
        for i, batch in enumerate(val_iter):
            label, content = batch.label, batch.content
            pred = model(content)
            pred.detach()
            # 計算loss
            loss = criterion(pred, label)
            loss_one_epoch += loss.item()

            # 統計預測信息
            total_num += label.size(0)
            # 預測有多少個標簽是預測中的,並加總
            correct_num += (torch.argmax(pred, dim=1) == label).sum().float().item()
            loss_one_epoch += loss.item()

	    # 統計混淆矩陣
	    for j in range(len(label)):
	        cate_i = label[j].item()
		pre_i = torch.argmax(pred, dim=1)[j].item()
		conf_mat[cate_i, pre_i] += 1.0

        # 打印驗證集的准確率,numpy的trace()就是求對角線元素的和sum()是求所有元素的和
        print('{} set Accuracy:{:.2%}'.format('Valid', conf_mat.trace() / conf_mat.sum()))

        # 記錄Loss, accuracy
        writer.add_scalars('Loss', {'valid_loss': loss_one_epoch / len(val_iter)}, epoch)
        writer.add_scalars('Accuracy', {'valid_acc': conf_mat.trace() / conf_mat.sum()}, epoch)

    # 每個epoch計算一下驗證集准確率如果模型效果變好,保存模型
    if (conf_mat.trace() / conf_mat.sum()) > best_valid_acc:
        print("超過最好模型了")
	best_valid_acc = (conf_mat.trace() / conf_mat.sum())
	torch.save(model.state_dict(), model_save_path)

總結

到這,一個簡單的TextCNN就算是完成了。
在訓練和測試中,統計混淆矩陣本來是為了計算召回率、F1、精確率等的,埋了個坑。
不過因為我有點懶,就一直沒填,等什么時候有心情了再做吧。

最后我必須得吐槽一下博客園的markdown編輯器,實在是太難用了,代碼格式調了半天,最后全部調成頂格才沒問題了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM