語料鏈接:https://pan.baidu.com/s/1rIv4eWPkornhZj92A8r6oQ
提取碼:haor
語料中分為pos.txt和neg.txt,每一行是一個完整的句子,句子之間用空格分開,句子平均長度為20(提前代碼計算,設定超參數)。
提前導包:
1 import numpy as np 2 import matplotlib.pyplot as plt 3 from collections import Counter 4 import tqdm 5 import random 6 import torch 7 from torch import nn, optim 8 9 10 random.seed(53113) 11 np.random.seed(53113) 12 torch.manual_seed(53113) 13 14 # 設定一些超參數 15 SENTENCE_LIMIT_SIZE = 20 # 句子平均長度 16 BATCH_SIZE = 128 # the batch size 每輪迭代1個batch的數量 17 LEARNING_RATE = 1e-3 # the initial learning rate 學習率 18 EMBEDDING_SIZE = 200 #詞向量維度
1.加載數據
1 def read(filename): 2 with open(filename, encoding='mac_roman') as f: 3 text = f.read().lower() 4 return text 5 6 pos_text, neg_text = read("corpurs/pos.txt"), read("corpurs/neg.txt") 7 total_text = pos_text +'\n'+ neg_text #合並文本
2.構造詞典和映射
構造詞典的步驟一般就是對文本進行分詞再進行去重。當我們對文本的單詞進行統計后,會發現有很多出現頻次僅為1次的單詞,這類單詞會增加詞典容量並且會給文本處理帶來一定的噪聲。因此在構造詞典過程中僅保留在語料中出現頻次大於1的單詞。其中<pad>和<unk>是兩個初始化的token,<pad>用來做句子填補,<unk>用來替代語料中未出現過的單詞。
1 text = total_text.split() 2 vocab = [w for w, f in Counter(text).most_common() if f>1] 3 vocab = ['<pad>', '<unk>'] + vocab 4 5 token_to_word = {i:word for i, word in enumerate(vocab)} #編碼到單詞 6 word_to_token = {word:i for i, word in token_to_word.items()} #單詞到編碼 7 8 VOCAB_SIZE = len(token_to_word) #詞匯表單詞數10382
3.轉換文本
根據映射表將原始文本轉換為機器可識別的編碼。另外為了保證句子有相同的長度,需要對句子長度進行處理,這里設置20作為句子的標准長度:
- 對於超過20個單詞的句子進行截斷;
- 對於不足20個單詞的句子進行PAD補全;
1 def convert_text_to_token(sentence, word_to_token_map=word_to_token, limit_size=SENTENCE_LIMIT_SIZE): 2 """ 3 根據單詞-編碼映射表將單個句子轉化為token 4 5 @param sentence: 句子,str類型 6 @param word_to_token_map: 單詞到編碼的映射 7 @param limit_size: 句子最大長度。超過該長度的句子進行截斷,不足的句子進行pad補全 8 9 return: 句子轉換為token后的列表 10 """ 11 # 獲取unknown單詞和pad的token 12 unk_id = word_to_token_map["<unk>"] 13 pad_id = word_to_token_map["<pad>"] 14 15 # 對句子進行token轉換,對於未在詞典中出現過的詞用unk的token填充 16 tokens = [word_to_token_map.get(word, unk_id) for word in sentence.lower().split()] 17 18 if len(tokens) < limit_size: #補齊 19 tokens.extend([0] * (limit_size - len(tokens))) 20 else: #截斷 21 tokens = tokens[:limit_size] 22 23 return tokens
接下來對pos文本和neg文本進行轉換:
1 pos_tokens = [convert_text_to_token(sentence) for sentence in pos_text.split('\n')] 2 neg_tokens = [convert_text_to_token(sentence) for sentence in neg_text.split('\n')] 3 4 #為了方便處理數據,轉化成numpy格式 5 pos_tokens = np.array(pos_tokens) 6 neg_tokens = np.array(neg_tokens) 7 total_tokens = np.concatenate((pos_tokens, neg_tokens), axis=0) #(10662, 20) 8 9 pos_targets = np.ones((pos_tokens.shape[0])) 10 neg_targets = np.zeros((neg_tokens.shape[0])) 11 total_targets = np.concatenate((pos_targets, neg_targets), axis=0).reshape(-1, 1) #(10662, 1)
4.加載預訓練的詞向量
4.1法一:gensim加載Glove向量
1 def load_embedding_model(): 2 """ Load GloVe Vectors 3 Return: 4 wv_from_bin: All 400000 embeddings, each lengh 200 5 """ 6 import gensim.downloader as api 7 wv_from_bin = api.load("glove-wiki-gigaword-200") 8 print("Loaded vocab size %i" % len(wv_from_bin.vocab.keys())) 9 return wv_from_bin 10 11 model = load_embedding_model()
4.2法二:gensim加載Word2Vec向量
1 from gensim.test.utils import datapath, get_tmpfile 2 from gensim.models import KeyedVectors 3 from gensim.scripts.glove2word2vec import glove2word2vec 4 5 6 glove_file = datapath('F:/python_DemoCode/PytorchEx/.vector_cache/glove.6B.100d.txt') #輸入文件 7 word2vec_glove_file = get_tmpfile("F:/python_DemoCode/PytorchEx/.vector_cache/glove.6B.100d.word2vec.txt") #輸出文件 8 glove2word2vec(glove_file, word2vec_glove_file) #轉換 9 10 model = KeyedVectors.load_word2vec_format(word2vec_glove_file) #加載轉化后的文件
基於上面某一種預訓練方法來構建自己的word embeddings
1 static_embeddings = np.zeros([VOCAB_SIZE, EMBEDDING_SIZE]) 2 for word, token in tqdm.tqdm(word_to_token.items()): 3 #用詞向量填充 4 if word in model.vocab.keys(): 5 static_embeddings[token, :] = model[word] 6 elif word == '<pad>': #如果是空白,用零向量填充 7 static_embeddings[token, :] = np.zeros(EMBEDDING_SIZE) 8 else: #如果沒有對應的詞向量,則用隨機數填充 9 static_embeddings[token, :] = 0.2 * np.random.random(EMBEDDING_SIZE) - 0.1 10 11 print(static_embeddings.shape) #(10382, 200) 即(vocab_size,embedding_dim)
4.划分數據集
1 from sklearn.model_selection import train_test_split 2 X_train,X_test,y_train,y_test=train_test_split(total_tokens, total_targets, test_size=0.2) #會打亂順序的 3 print(X_train.shape, y_train.shape) #(8529, 20) (8529, 1)
5.生成batch
1 def get_batch(x, y, batch_size=BATCH_SIZE, shuffle=True): 2 assert x.shape[0] == y.shape[0], print("error shape!") 3 4 if shuffle: 5 shuffled_index = np.random.permutation(range(x.shape[0])) 6 x = x[shuffled_index] 7 y = y[shuffled_index] 8 9 n_batches = int(x.shape[0] / batch_size) #統計共幾個完整的batch 10 11 for i in range(n_batches - 1): 12 x_batch = x[i*batch_size: (i+1)*batch_size] 13 y_batch = y[i*batch_size: (i+1)*batch_size] 14 15 yield x_batch, y_batch
6.CNN模型
輸入句子序列后,經過embedding,獲得每個單詞的詞向量(此例中詞向量維度為200,而圖中為6),那我們就得到一個seq_len* embedding_dim的矩陣(此例中seq_len是指句子平均長度20)。在這里我們可以將它看做是channel=1,height=seq_len,width=embedding_dim的一張圖片,然后就可以用filter去做卷積操作。
由於我們采用了多種filter(filter size=3, 4, 5),因此對於卷積池化操作要分filter處理。對於每個filter,先經過卷積操作得到conv(卷積操作的目的是在height方向滑動來捕捉詞與詞之間的局部關系,得到多個列向量),再經過relu函數激活后進行池化,得到max_pooling(max-pooling操作來提取每個列向量中的最重要的信息)。由於我們每個filter有100個,因此最終經過flatten后我們可以得到100*3=300維向量,用於連接全連接層。
Tip:關於nn.ModuleList
在構造函數__init__中用到list、tuple、dict等對象時,考慮用ModuleList,它被設計用來存儲任意數量的nn. module。
1 class TextCNN(nn.Module): #output_size為輸出類別(2個類別,0和1),三種kernel,size分別是3,4,5,每種kernel有100個 2 def __init__(self, vocab_size, embedding_dim, output_size, filter_num=100, kernel_lst=(3,4,5), dropout=0.5): 3 super(TextCNN, self).__init__() 4 5 self.embedding = nn.Embedding(vocab_size, embedding_dim) 6 self.convs = nn.ModuleList([ 7 nn.Sequential(nn.Conv2d(1, filter_num, (kernel, embedding_dim)), #1表示channel_num,filter_num即輸出數據通道數,卷積核大小為(kernel, embedding_dim) 8 nn.ReLU(), 9 nn.MaxPool2d((SENTENCE_LIMIT_SIZE - kernel + 1, 1))) 10 for kernel in kernel_lst]) 11 self.fc = nn.Linear(filter_num * len(kernel_lst), output_size) 12 self.dropout = nn.Dropout(dropout) 13 14 def forward(self, x): 15 x = self.embedding(x) #[128, 20, 200](batch, seq_len, embedding_dim) 16 x = x.unsqueeze(1) #[128, 1, 20, 200] 即(batch, channel_num, seq_len, embedding_dim) 17 out = [conv(x) for conv in self.convs] 18 19 out = torch.cat(out, dim=1) #[128, 300, 1, 1] 20 out = out.view(x.size(0), -1) #[128, 300] 21 out = self.dropout(out) 22 logit = self.fc(out) #[128, 2] 23 return logit
這張圖有助於理解整體過程:
調用CNN模型,使用預訓練過的embedding來替換隨機初始化:
1 cnn = TextCNN(VOCAB_SIZE, 200, 2) 2 cnn.embedding.weight.data.copy_(torch.FloatTensor(static_embeddings))
查看模型:
1 print(cnn)
TextCNN( (embedding): Embedding(10382, 200) (convs): ModuleList( (0): Sequential( (0): Conv2d(1, 100, kernel_size=(3, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(18, 1), stride=(18, 1), padding=0, dilation=1, ceil_mode=False) ) (1): Sequential( (0): Conv2d(1, 100, kernel_size=(4, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(17, 1), stride=(17, 1), padding=0, dilation=1, ceil_mode=False) ) (2): Sequential( (0): Conv2d(1, 100, kernel_size=(5, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(16, 1), stride=(16, 1), padding=0, dilation=1, ceil_mode=False) ) ) (fc): Linear(in_features=300, out_features=2, bias=True) (dropout): Dropout(p=0.5, inplace=False) )
定義優化器和交叉熵損失:
1 optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE) 2 criteon = nn.CrossEntropyLoss()
7.訓練函數
首先定義計算准確率的函數
1 def binary_acc(preds, y): 2 correct = torch.eq(preds, y).float() 3 acc = correct.sum() / len(correct) 4 return acc
訓練函數
1 def train(cnn, optimizer, criteon): 2 3 avg_loss = [] 4 avg_acc = [] 5 cnn.train() #表示進入訓練模式 6 7 8 for x_batch, y_batch in get_batch(X_train, y_train): #遍歷每一個batch,x_batch.shape=(128, 20) y_batch.shape=(128, 1) 9 x_batch = torch.LongTensor(x_batch) #要先轉成Tensor類型,否則計算交叉熵時會報錯 10 y_batch = torch.LongTensor(y_batch) 11 12 y_batch = y_batch.squeeze() #torch.Size([128]) 13 pred = cnn(x_batch) #torch.Size([128, 2]) 14 15 #torch.max(pred, dim=1)[1]得到每一行概率值較大的索引 16 acc = binary_acc(torch.max(pred, dim=1)[1], y_batch) #計算每個batch的准確率 17 avg_acc.append(acc) 18 19 loss = criteon(pred, y_batch) 20 21 optimizer.zero_grad() 22 loss.backward() 23 optimizer.step() 24 25 avg_acc = np.array(avg_acc).mean() 26 return avg_acc
8.評估函數
和訓練函數類似,但沒有反向傳播過程。
1 def evaluate(cnn, criteon): 2 avg_acc = [] 3 cnn.eval() #表示進入測試模式 4 5 with torch.no_grad(): 6 for x_batch, y_batch in get_batch(X_test, y_test): 7 x_batch = torch.LongTensor(x_batch) 8 y_batch = torch.LongTensor(y_batch) 9 10 y_batch = y_batch.squeeze() #torch.Size([128]) 11 pred = cnn(x_batch) #torch.Size([128, 2]) 12 13 acc = binary_acc(torch.max(pred, dim=1)[1], y_batch) 14 avg_acc.append(acc) 15 16 avg_acc = np.array(avg_acc).mean() 17 return avg_acc
迭代訓練:
1 cnn_train_acc, cnn_test_acc = [], [] 2 3 for epoch in range(50): 4 5 train_acc = train(cnn, optimizer, criteon) 6 print('epoch={},訓練准確率={}'.format(epoch, train_acc)) 7 test_acc = evaluate(cnn, criteon) 8 print("epoch={},測試准確率={}".format(epoch, test_acc)) 9 cnn_train_acc.append(train_acc) 10 cnn_test_acc.append(test_acc) 11 12 plt.plot(cnn_train_acc) 13 plt.plot(cnn_test_acc) 14 plt.ylim(ymin=0.5, ymax=1.01) 15 plt.title("The accuracy of CNN model") 16 plt.legend(["train", "test"])
由於沒有正則化,出現了過擬合的現象。
為了解決上述問題,可以在定義優化器時加上weight_decay參數。
1 optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE, weight_decay = 0.01)
參考鏈接(這是用TensorFlow實現的):https://zhuanlan.zhihu.com/p/37978321
9.調用torchtext的版本
torchtext基本知識https://blog.csdn.net/qq_40334856/article/details/104208296
1 import numpy as np 2 import torch 3 from torch import nn, optim 4 from torchtext import data 5 import matplotlib.pyplot as plt 6 7 import random 8 9 SEED = 123 10 BATCH_SIZE = 128 11 LEARNING_RATE = 1e-3 #學習率 12 EMBEDDING_SIZE = 200 #詞向量維度 13 SENTENCE_LIMIT_SIZE = 20 #句子平均長度 14 15 torch.manual_seed(SEED) 16 17 TEXT = data.Field(tokenize='spacy', lower=True, fix_length=20, batch_first=True) #以空格分開,小寫,fix_length指定了每條文本的長度,截斷補長 18 LABEL = data.LabelField(dtype=torch.float) 19 20 #get_dataset構造並返回Dataset所需的examples和fields 21 def get_dataset(corpur_path, text_field, label_field, datatype): 22 fields = [('text', text_field), ('label', label_field)] #torchtext文件配對關系 23 examples = [] 24 25 with open(corpur_path, encoding='mac_roman') as f: 26 27 content = f.readline().replace('\n', '') 28 while content: 29 if datatype == 'pos': 30 label = 1 31 else: 32 label = 0 33 examples.append(data.Example.fromlist([content[:-2], label], fields)) 34 content = f.readline().replace('\n', '') 35 36 return examples, fields 37 38 39 #得到構建Dataset所需的examples和fields 40 pos_examples, pos_fields = get_dataset("corpurs//pos.txt", TEXT, LABEL, 'pos') 41 neg_examples, neg_fields = get_dataset("corpurs//neg.txt", TEXT, LABEL, 'neg') 42 all_examples, all_fields = pos_examples+neg_examples, pos_fields+neg_fields 43 44 45 #構建Dataset數據集 46 total_data = data.Dataset(all_examples, all_fields) 47 48 #分割訓練集和測試集 49 train_data, test_data = total_data.split(random_state=random.seed(SEED), split_ratio=0.8) 50 51 print('len of train data:', len(train_data)) #8530 52 print('len of test data:', len(test_data)) #2132 53 54 print(train_data.examples[15].text) 55 print(train_data.examples[15].label) 56 #['if', 'you', 'go', 'into', 'the', 'theater', 'expecting', 'a', 'scary', ',', 'action-packed', 'chiller', ',', 'you', 'might', 'soon', 'be', 'looking', 'for', 'a', 'sign', '.', 'an', 'exit', 'sign', ',', 'that', 'is'] 57 #0 58 59 60 #創建vocabulary,把每個單詞一一映射到一個數字 61 TEXT.build_vocab(train_data, max_size=10000, vectors='glove.6B.200d') 62 LABEL.build_vocab(train_data) 63 print(len(TEXT.vocab)) #10002 64 print(TEXT.vocab.itos[:12]) #['<unk>', '<pad>', 'the', ',', 'a', 'and', 'of', 'to', '.', 'is', 'in', 'that'] 65 print(TEXT.vocab.stoi['like']) #32 66 print(LABEL.vocab.stoi) #defaultdict(None, {0: 0, 1: 1}) 67 68 #創建iterator 69 train_iterator, test_iterator = data.BucketIterator.splits( 70 (train_data, test_data), 71 batch_size=BATCH_SIZE, 72 sort = False) 73 74 print(next(iter(train_iterator)).text.shape) #torch.Size([128, 20])如果第17行不加batch_first=True,默認False,這邊會顯示[20, 128] 75 print(next(iter(train_iterator)).label.shape) #torch.Size([128]) 76 77 78 79 class TextCNN(nn.Module): #output_size為輸出類別(2個類別,0和1),三種kernel,size分別是3,4,5,每種kernel有100個 80 def __init__(self, vocab_size, embedding_dim, output_size, filter_num=100, kernel_lst=(3,4,5), dropout=0.5): 81 super(TextCNN, self).__init__() 82 83 self.embedding = nn.Embedding(vocab_size, embedding_dim) 84 self.convs = nn.ModuleList([ 85 nn.Sequential(nn.Conv2d(1, filter_num, (kernel, embedding_dim)), 86 nn.ReLU(), 87 nn.MaxPool2d((SENTENCE_LIMIT_SIZE - kernel + 1, 1))) 88 for kernel in kernel_lst]) 89 self.fc = nn.Linear(filter_num * len(kernel_lst), output_size) 90 self.dropout = nn.Dropout(dropout) 91 92 def forward(self, x): 93 x = self.embedding(x) #(batch, word_num, embedding_dim) 94 x = x.unsqueeze(1) #[128, 1, 20, 200] 即(batch, channel_num, word_num, embedding_dim) 95 out = [conv(x) for conv in self.convs] 96 97 out = torch.cat(out, dim=1) # [128, 300, 1, 1] 98 out = out.view(x.size(0), -1) #[128, 300] 99 out = self.dropout(out) 100 logit = self.fc(out) #[128, 2] 101 102 return logit 103 104 105 PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token] 106 UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token] 107 108 #使用預訓練過的embedding來替換隨機初始化 109 cnn = TextCNN(len(TEXT.vocab), 200, 2) 110 111 pretrained_embedding = TEXT.vocab.vectors #torch.Size([10002, 200]) 112 cnn.embedding.weight.data.copy_(pretrained_embedding) 113 cnn.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_SIZE) 114 cnn.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_SIZE) 115 116 117 optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE, weight_decay = 0.01) 118 criteon = nn.CrossEntropyLoss() 119 120 121 #計算准確率 122 def binary_acc(preds, y): 123 124 correct = torch.eq(preds, y).float() 125 acc = correct.sum() / len(correct) 126 return acc 127 128 129 #訓練函數 130 def train(cnn, iterator, optimizer, criteon): 131 avg_acc = [] 132 cnn.train() #表示進入訓練模式 133 134 for i, batch in enumerate(iterator): 135 pred = cnn(batch.text) #torch.Size([128, 2]) 136 loss = criteon(pred, batch.label.long()) #不加.long()會報錯 137 138 #torch.max(pred, dim=1)[1]得到每一行概率值較大的索引 139 acc = binary_acc(torch.max(pred, dim=1)[1], batch.label) #計算每個batch的准確率 140 avg_acc.append(acc) 141 142 optimizer.zero_grad() 143 loss.backward() 144 optimizer.step() 145 146 avg_acc = np.array(avg_acc).mean() 147 return avg_acc 148 149 150 151 #評估函數 152 def evaluate(cnn, iterator, criteon): 153 avg_acc = [] 154 cnn.eval() #表示進入測試模式 155 156 with torch.no_grad(): 157 for i, batch in enumerate(iterator): 158 pred = cnn(batch.text) #torch.Size([128, 2]) 159 acc = binary_acc(torch.max(pred, dim=1)[1], batch.label) 160 avg_acc.append(acc) 161 162 avg_acc = np.array(avg_acc).mean() 163 return avg_acc 164 165 166 167 cnn_train_acc, cnn_test_acc = [], [] 168 169 for epoch in range(50): 170 171 train_acc = train(cnn, train_iterator, optimizer, criteon) 172 print('epoch={},訓練准確率={}'.format(epoch, train_acc)) 173 174 test_acc = evaluate(cnn, test_iterator, criteon) 175 print("epoch={},測試准確率={}".format(epoch, test_acc)) 176 177 cnn_train_acc.append(train_acc) 178 cnn_test_acc.append(test_acc) 179 180 plt.plot(cnn_train_acc) 181 plt.plot(cnn_test_acc) 182 plt.ylim(ymin=0.5, ymax=1.01) 183 plt.title("The accuracy of CNN model") 184 plt.legend(["train", "test"])
使用LabelField時,定義label_field = tdata.LabelField(dtype=torch.int, sequential=False, use_vocab=False)
下面label_field.build_vocab(train_data)就可以省略了。