Pytorch使用torchtext构建数据集进行文本分类


torchtext包含以下组件:

  • Field :主要包含以下数据预处理的配置信息,比如指定分词方法,是否转成小写,起始字符,结束字符,补全字符以及词典等等
  • Dataset :继承自pytorch的Dataset,用于加载数据,提供了TabularDataset可以指点路径,格式,Field信息就可以方便的完成数据加载。同时torchtext还提供预先构建的常用数据集的Dataset对象,可以直接加载使用,splits方法可以同时加载训练集,验证集和测试集。
  • Iterator : 主要是数据输出的模型的迭代器,可以支持batch定制

1. 定义Field

规定如何处理数据

  1. 主要的参数:
    • sequential: 是否把数据表示成序列,如果是False, 不能使用分词 默认值: True
    • use_vocab: 是否使用词典对象. 如果是False 数据的类型必须已经是数值类型. 默认值: True
    • init_token: 每一条数据的起始字符 默认值: None
    • eos_token: 每条数据的结尾字符 默认值: None
    • fix_length: 修改每条数据的长度为该值,不够的用pad_token补全. 默认值: None
    • tensor_type: 把数据转换成的tensor类型 默认值: torch.LongTensor
    • lower: 是否把数据转化为小写 默认值: False.
    • tokenize: 分词函数. 默认值: str.split.
    • pad_token: 用于补全的字符. 默认值: " "
    • unk_token: 不存在词典里的字符. 默认值: " "
  2. 主要的方法
    • build_vocab(): 建立词典
    • numericalize(): 把文本数据数值化,返回tensor
  3. 简单的例子
TEXT = data.Field(sequential=True, tokenize=tokenizer, fix_length=100)
LABEL = data.Field(sequential=False, use_vocab=False)

tokenizer是自定义的分词方法,也可以加载已有的分词方法,在此使用spcay。在此假设spcay已经安装完成。
三种nlp分词模型spacy en_core_web_sm/md/lg代表(small,medium,large)。
spacy en_core_web_lg(780M),spacy en_core_web_sm(10M),md介于两者之间,开发中一般使用sm即可。
使用spcay定义的tokenizer如下:

import spacy
spacy_en = spacy.load('en_core_web_sm')
def tokenizer(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

2. 读取数据集Dataset

torchtext作为一个第三方库,其Dataset也是继承自pytorch提供的Dataset。
TabularDataset 可以很方便地读取数据集,支持CSV、TSV、JSON。splits方法可以同时读取训练集、验证集、测试集。

  • 加载数据集,我的数据集第一列为label(数值类型),第二列尾content(str类型),csv类型,且表头为label和content,共分20个类别。因此在加载数据集时,要令skip_header=True,跳过表头。

传入的fields必须与列的顺序相同。对于不使用的列,在fields的位置传入一个None。

from torchtext import data

train_val_field = [('label', LABEL), ('content', TEXT)]
train_data, valid_data = data.TabularDataset.splits(
    path='./data/',
    traiin='train.csv',
    validation='valid.csv',
    format='csv',
    fields=train_val_field,
    skip_header=True
)

# 因为测试集不要label,所以在field中令label列传入None
test_field = [('label', None), ('content', TEXT)]
test_path = './data/test.csv'
test_data = data.TabularDataset(
    path=test_path,
    format='csv',
    fields=test_field,
    skip_header=True
)
TEXT.build_vocab(train_data, min_freq=3, vectors='glove.6B.50d')

3. 生成迭代器Iterator

在torchvision和PyTorch中,数据的处理和批处理由DataLoader处理。 torchtext相同的东西命名成了Iterator。基本功能是一样的,但Iterators具有一些NLP特有的便捷功能。

  • 关键参数如下:
    • dataset:加载的数据集
    • sort:是对全体数据按照升序顺序进行排序,而sort_within_batch仅仅对一个batch内部的数据进行排序。
    • shuffle:是否打乱数据
    • train:是否是一个训练集
    • batch_size:batch的大小
    • sort_within_batch:参数设置为True时,按照sort_key按降序对每个小批次内的数据进行降序排序。
    • repeat:是否在不同的epochs中重复迭代 ,默认是False
    • device:生成batch的设备

创建迭代器的例子

BucketIterator是torchtext最强大的功能之一。BucketIterator和Iterator的区别是,BucketIterator尽可能的把长度相似的句子放在一个batch里面。而且BucketIterator会自动将输入序列进行shuffle并做bucket,不过,需要告诉BucketIterator想在哪个数据属性上做bucket。

在本文中希望根据comment_text字段的长度进行bucket处理,因此我们将其作为关键字参数传入sort_key = lambda x: len(x.content)

  1. 验证集和训练集合使用BucketIterator.splits(),目的是自动进行shuffle和padding,并且为了训练效率期间,尽量把句子长度相似的shuffle在一起。
  2. 测试集用Iterator,因为不用sort。
from torchtext.data import Iterator, BucketIterator

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_batch_size = 64
val_batch_size = 64
test_batch_size = 64

# 同时对训练集和验证集进行迭代器构建
train_iter, val_iter = BucketIterator.splits(
    (train_data, valid_data),
    batch_sizes=(train_batch_size, val_batch_size),
    device=device,
    sort_key=lambda x: len(x.content),
    sort_within_batch=False,
    repeat=False
)

# 对测试集进行迭代器构建
test_iter = Iterator(
    test_data,
    batch_size=test_batch_size,
    device=device, 
    sort=False,
    sort_within_batch=False,
    repeat=False
)
"""
因为sort和shuffle默认只对train=True字段进行的,train字段默认是True。所以测试集合可以令train=False的方式构建
"""
# 测试集也可以这么构建
test_iter = Iterator(
    test_data,
    batch_size=test_batch_size,
    device=device,
    train=False
)

4. 搭建TextCNN

经过上述这三步,数据集就加载好了,下面搭建一个TextCNN测试。

import torch
import torch.nn as nn
import torch.nn.functional as F


class TextCNN(nn.Module):
    def __init__(self, emb_dim, kernel_sizes, kernel_num):
        super(TextCNN, self).__init__()

	self.embedding = nn.Embedding(len(TEXT.vocab), emb_dim)
	self.embedding_dropout = nn.Dropout(0.5)
	# 使用nn.ModuleList来装三个nn.Sequential构成的卷积块
	self.convs = nn.ModuleList([
	# 使用nn.Sequential构成卷积块,每个卷积块装有一层卷积和LeakyReLU激活函数
	    nn.Sequential(
		nn.Conv1d(in_channels=emb_dim, out_channels=kernel_num, kernel_size=size),
		nn.LeakyReLU(),
		)
		for size in kernel_sizes
        ])
	in_features = kernel_num * len(kernel_sizes)
	self.linear1 = nn.Linear(in_features=in_features, out_features=in_features//2)
	self.fc_dropout = nn.Dropout(0.5)
	self.linear2 = nn.Linear(in_features=in_features//2, out_features=20)

    def forward(self,x):
	# 初始输入格式为(length, batch_size)
	out = self.embedding(x)
	out = self.embedding_dropout(out)
	# (length, batch_size, emb) -> (batch_size, emb, length)
	out = torch.transpose(out, 1, 2)
	out = torch.transpose(out, 0, 2)
	out = [conv(out) for conv in self.convs]
	# stride为步幅
	out = [F.max_pool1d(one, kernel_size=one.size(2), stride=2) for one in out]
	# 拼接维度1,并去掉维度2
        out = torch.cat(out, dim=1).squeeze(2)
	out = self.linear1(F.leaky_relu(out))
	out = self.fc_dropout(out)
	out = self.linear2(F.leaky_relu(out))
	return out

# 4种卷积核,每种60个,拼接后就有240种特征
model = TextCNN(50, [3, 4, 5, 6], 60).to(device)

5. 开始训练

  • 定义超参数
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter


learning_rate = 1e-3
# 定义优化器和损失函数
# Adam是有自适应学习率的优化器,利用梯度的一阶矩估计和二阶矩估计动态调整学习率
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 因为是多分类问题,所以使用交叉熵损失函数,pytorch的交叉熵损失函数是会做softmax的,所以在模型中没有添加softmax层
criterion = F.cross_entropy
# 设置学习率下降策略
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

# 定义tensorboard日志的输出目录
writer = SummaryWriter("runs/cnn")
  • 训练
import numpy as np

# -----------------------------------模型训练--------------------------------------
epochs = 100
best_valid_acc = float('-inf')
model_save_path = './model_paras/func/cnn_model_paras.pkl'

for epoch in range(epochs):
    loss_one_epoch = 0.0
    correct_num = 0.0
    total_num = 0.0
    scheduler.step()

    for i, batch in enumerate(train_iter):
        model.train()
        label, content = batch.label, batch.content
        # 进行forward()、backward()、更新权重
        optimizer.zero_grad()
        pred = model(content)
        loss = criterion(pred, label)
        loss.backward()
        optimizer.step()

        # 统计预测信息
        total_num += label.size(0)
        # 预测有多少个标签是预测中的,并加总
        correct_num += (torch.argmax(pred, dim=1) == label).sum().float().item()
        loss_one_epoch += loss.item()

        # 每10个iteration 打印一次训练信息,loss为10个iteration的平均
        if i % 10 == 9:
            loss_avg = loss_one_epoch / 10
            loss_one_epoch = 0.0
            print("Training: Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(epoch + 1, epochs, i + 1, len(train_iter), loss_avg, correct_num / total_num))

            # 记录训练loss
            writer.add_scalars('Loss', {'train_loss': loss_avg}, epoch)
            # 记录learning rate
            writer.add_scalar('learning rate', scheduler.get_lr()[0], epoch)
            # 记录Accuracy
            writer.add_scalars('Accuracy', {'train_acc': correct_num / total_num}, epoch)

    # ---------------------------------------验证------------------------------
    if epoch % 2 == 0:
        loss_one_epoch = 0.0
        classes_num = 20
        conf_mat = np.zeros([classes_num, classes_num])
        model.eval()
        for i, batch in enumerate(val_iter):
            label, content = batch.label, batch.content
            pred = model(content)
            pred.detach()
            # 计算loss
            loss = criterion(pred, label)
            loss_one_epoch += loss.item()

            # 统计预测信息
            total_num += label.size(0)
            # 预测有多少个标签是预测中的,并加总
            correct_num += (torch.argmax(pred, dim=1) == label).sum().float().item()
            loss_one_epoch += loss.item()

	    # 统计混淆矩阵
	    for j in range(len(label)):
	        cate_i = label[j].item()
		pre_i = torch.argmax(pred, dim=1)[j].item()
		conf_mat[cate_i, pre_i] += 1.0

        # 打印验证集的准确率,numpy的trace()就是求对角线元素的和sum()是求所有元素的和
        print('{} set Accuracy:{:.2%}'.format('Valid', conf_mat.trace() / conf_mat.sum()))

        # 记录Loss, accuracy
        writer.add_scalars('Loss', {'valid_loss': loss_one_epoch / len(val_iter)}, epoch)
        writer.add_scalars('Accuracy', {'valid_acc': conf_mat.trace() / conf_mat.sum()}, epoch)

    # 每个epoch计算一下验证集准确率如果模型效果变好,保存模型
    if (conf_mat.trace() / conf_mat.sum()) > best_valid_acc:
        print("超过最好模型了")
	best_valid_acc = (conf_mat.trace() / conf_mat.sum())
	torch.save(model.state_dict(), model_save_path)

总结

到这,一个简单的TextCNN就算是完成了。
在训练和测试中,统计混淆矩阵本来是为了计算召回率、F1、精确率等的,埋了个坑。
不过因为我有点懒,就一直没填,等什么时候有心情了再做吧。

最后我必须得吐槽一下博客园的markdown编辑器,实在是太难用了,代码格式调了半天,最后全部调成顶格才没问题了。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM