Transformer模型(文本分類僅用到Encoder部分):


1.數據預處理
和上一個博客https://www.cnblogs.com/cxq1126/p/13504437.html中的數據和預處理基本都一致。
1 import numpy as np 2 import torch 3 from torch import nn, optim 4 import torch.nn.functional as F 5 from torchtext import data 6 from torch.autograd import Variable 7 8 import math 9 import time 10 import copy 11 import random 12 13 SEED = 126 14 BATCH_SIZE = 128 15 EMBEDDING_DIM = 100 16 LEARNING_RATE = 1e-3 17 18 #為了保證實驗結果可以復現,我們經常會把各種random seed固定在某一個值 19 random.seed(SEED) 20 np.random.seed(SEED) 21 torch.manual_seed(SEED) 22 23 TEXT = data.Field(tokenize=lambda x: x.split(), batch_first=True, lower=True) 24 LABEL = data.LabelField(dtype=torch.float) 25 26 #get_dataset構造並返回Dataset所需的examples和fields 27 def get_dataset(corpur_path, text_field, label_field): 28 fields = [('text', text_field), ('label', label_field)] #torchtext中於文件配對關系 29 examples = [] 30 31 with open(corpur_path) as f: 32 #解析html格式 33 li = [] 34 while True: 35 content = f.readline().replace('\n', '') 36 if not content: #為空行,表示取完一次數據(一次的數據保存在li中) 37 if not li: #如果列表也為空,則表示數據讀完,結束循環 38 break 39 label = li[0][10] 40 text = li[1][6:-7] 41 examples.append(data.Example.fromlist([text, label], fields)) 42 li = [] 43 else: 44 li.append(content) #["<Polarity>標簽</Polarity>", "<text>句子內容</text>"] 45 46 return examples, fields 47 48 #得到構建Dataset所需的examples和fields 49 train_examples, train_fields = get_dataset("corpus//trains.txt", TEXT, LABEL) 50 dev_examples, dev_fields = get_dataset("corpus//dev.txt", TEXT, LABEL) 51 test_examples, test_fields = get_dataset("corpus//tests.txt", TEXT, LABEL) 52 53 54 #構建Dataset數據集 55 train_data = data.Dataset(train_examples, train_fields) 56 dev_data = data.Dataset(dev_examples, dev_fields) 57 test_data = data.Dataset(test_examples, test_fields) 58 59 60 print('len of train data:', len(train_data)) #1000 61 print('len of dev data:', len(dev_data)) #200 62 print('len of test data:', len(test_data)) #300 63 64 print(train_data.examples[15].text) 65 print(train_data.examples[15].label) 66 67 68 #創建vocabulary 69 TEXT.build_vocab(train_data, max_size=5000, vectors='glove.6B.100d') 70 LABEL.build_vocab(train_data) 71 print(len(TEXT.vocab)) #3287 72 print(TEXT.vocab.itos[:12]) #['<unk>', '<pad>', 'the', 'and', 'a', 'to', 'is', 'was', 'i', 'of', 'for', 'in'] 73 print(TEXT.vocab.stoi['like']) #43 74 print(LABEL.vocab.stoi) #defaultdict(None, {'0': 0, '1': 1}) 75 76 77 #創建iterators,每個itartion都會返回一個batch的examples 78 train_iterator, dev_iterator, test_iterator = data.BucketIterator.splits( 79 (train_data, dev_data, test_data), 80 batch_size=BATCH_SIZE, 81 sort = False)
2.定義模型
2.1Embedding
1 class InputEmbeddings(nn.Module): 2 3 def __init__(self, vocab_size, embedding_dim): 4 super(InputEmbeddings, self).__init__() 5 self.embedding_dim = embedding_dim 6 self.embed = nn.Embedding(vocab_size, embedding_dim) 7 8 def forward(self, x): 9 return self.embed(x) * math.sqrt(self.embedding_dim)
2.2PositionalEncoding

1 class PositionalEncoding(nn.Module): 2 3 def __init__(self, embedding_dim, dropout, max_len=5000): 4 super(PositionalEncoding, self).__init__() 5 self.dropout = nn.Dropout(p=dropout) 6 7 pe = torch.zeros(max_len, embedding_dim) 8 9 position = torch.arange(0., max_len).unsqueeze(1) #[max_len, 1] 10 div_term = torch.exp(torch.arange(0., embedding_dim, 2) * -(math.log(10000.0) / embedding_dim)) 11 12 pe[:, 0::2] = torch.sin(position * div_term) 13 pe[:, 1::2] = torch.cos(position * div_term) 14 pe = pe.unsqueeze(0) 15 self.register_buffer('pe', pe) #內存中定一個常量,模型保存和加載的時候可以寫入和讀出 16 17 def forward(self, x): 18 x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False) #Embedding+PositionalEncoding 19 return self.dropout(x)
2.3MultiHeadAttention
1 def clones(module, N): 2 return nn.ModuleList([copy.deepcopy(module) for _ in range(N)]) 3 4 5 def attention(query, key, value, mask=None, dropout=None): #q,k,v:[batch, h, seq_len, d_k] 6 7 d_k = query.size(-1) #query的維度 8 scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) #打分機制 [batch, h, seq_len, seq_len] 9 if mask is not None: 10 scores = scores.masked_fill(mask == 0, -1e9) #mask==0的內容填充-1e9,使計算softmax時概率接近0 11 p_attn = F.softmax(scores, dim = -1) #對最后一個維度歸一化得分 [batch, h, seq_len, seq_len] 12 13 if dropout is not None: 14 p_attn = dropout(p_attn) 15 return torch.matmul(p_attn, value), p_attn #[batch, h, seq_len, d_k] 16 17 18 class MultiHeadedAttention(nn.Module): 19 20 def __init__(self, h, embedding_dim, dropout=0.1): 21 22 super(MultiHeadedAttention, self).__init__() 23 assert embedding_dim % h == 0 24 25 self.d_k = embedding_dim // h #將embedding_dim分割成h份后的維度 26 self.h = h #h指的是head數量 27 self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4) 28 self.attn = None 29 self.dropout = nn.Dropout(p=dropout) 30 31 def forward(self, query, key, value, mask=None): #q,k,v:[batch, seq_len, embedding_dim] 32 33 if mask is not None: 34 mask = mask.unsqueeze(1) #[batch, seq_len, 1] 35 nbatches = query.size(0) 36 37 # 1) Do all the linear projections in batch from embedding_dim => h x d_k 38 # [batch, seq_len, h, d_k] -> [batch, h, seq_len, d_k] 39 query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) 40 for l, x in zip(self.linears, (query, key, value))] 41 42 # 2) Apply attention on all the projected vectors in batch. 43 x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout) #x:[batch, h, seq_len, d_k], attn:[batch, h, seq_len, seq_len] 44 45 # 3) "Concat" using a view and apply a final linear. 46 x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k) #[batch, seq_len, embedding_dim] 47 return self.linears[-1](x)
2.4MyTransformerModel
1 class MyTransformerModel(nn.Module): 2 3 def __init__(self, vocab_size, embedding_dim, p_drop, h, output_size): 4 super(MyTransformerModel,self).__init__() 5 self.drop = nn.Dropout(p_drop) 6 7 self.embeddings = InputEmbeddings(vocab_size, embedding_dim) 8 self.position = PositionalEncoding(embedding_dim, p_drop) 9 self.attn = MultiHeadedAttention(h, embedding_dim) 10 self.norm = nn.LayerNorm(embedding_dim) 11 self.linear = nn.Linear(embedding_dim, output_size) 12 self.init_weights() 13 14 def init_weights(self): 15 initrange = 0.1 16 self.linear.bias.data.zero_() 17 self.linear.weight.data.uniform_(-initrange, initrange) 18 19 def forward(self,inputs,mask): #維度均為[batch, seq_len] 20 21 embeded = self.embeddings(inputs) #1.InputEmbedding [batch, seq_len, embedding_dim] 22 23 embeded = self.position(embeded) #2.PostionalEncoding [batch, seq_len, embedding_dim] 24 25 mask = mask.unsqueeze(2) #[batch,seq_len,1] 26 27 inp_attn = self.attn(embeded, embeded, embeded, mask) #3.1MultiHeadedAttention [batch, seq_len, embedding_dim] 28 inp_attn = self.norm(inp_attn + embeded) #3.2LayerNorm 29 30 inp_attn = inp_attn * mask #4. linear [batch, seq_len, embedding_dim] 31 32 h_avg = inp_attn.sum(1)/(mask.sum(1) + 1e-5) #[batch, embedding_dim] 33 return self.linear(h_avg).squeeze() #[batch, 1] -> [batch] 34
使用模型,使用預訓練過的embedding來替換隨機初始化(nn.Embedding),定義優化器、損失函數。
1 model = MyTransformerModel(len(TEXT.vocab), EMBEDDING_DIM, p_drop=0.5, h=2, output_size=1) 2 3 pretrained_embedding = TEXT.vocab.vectors 4 print('pretrained_embedding:', pretrained_embedding.shape) #torch.Size([3287, 100]) 5 model.embeddings.embed.weight.data.copy_(pretrained_embedding) #embeddings是MyTransformerModel的參數, embed是InputEmbedding的參數 6 print('embedding layer inited.') 7 8 optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.001) 9 criteon = nn.BCEWithLogitsLoss()
3.訓練、評估函數
常規套路:計算准確率、訓練函數、評估函數、打印模型表現、用保存的模型參數預測測試數據。
1 #計算准確率 2 def binary_acc(preds, y): 3 4 preds = torch.round(torch.sigmoid(preds)) 5 correct = torch.eq(preds, y).float() 6 acc = correct.sum() / len(correct) 7 return acc 8 9 10 #訓練函數 11 def train(model, iterator, optimizer, criteon): 12 13 avg_loss = [] 14 avg_acc = [] 15 model.train() #表示進入訓練模式 16 17 for i, batch in enumerate(iterator): 18 19 mask = 1 - (batch.text == TEXT.vocab.stoi['<pad>']).float() #[batch, seq_len]增加了這句,其他都一樣 20 pred = model(batch.text, mask) 21 22 loss = criteon(pred, batch.label) 23 acc = binary_acc(pred, batch.label).item() #計算每個batch的准確率 24 avg_loss.append(loss.item()) 25 avg_acc.append(acc) 26 27 optimizer.zero_grad() 28 loss.backward() 29 optimizer.step() 30 31 avg_acc = np.array(avg_acc).mean() 32 avg_loss = np.array(avg_loss).mean() 33 return avg_loss, avg_acc 34 35 36 #評估函數 37 def evaluate(model, iterator, criteon): 38 39 avg_loss = [] 40 avg_acc = [] 41 model.eval() #表示進入測試模式 42 43 with torch.no_grad(): 44 for batch in iterator: 45 mask = 1 - (batch.text == TEXT.vocab.stoi['<pad>']).float() 46 pred = model(batch.text, mask) 47 48 loss = criteon(pred, batch.label) 49 acc = binary_acc(pred, batch.label).item() 50 avg_loss.append(loss.item()) 51 avg_acc.append(acc) 52 53 avg_loss = np.array(avg_loss).mean() 54 avg_acc = np.array(avg_acc).mean() 55 return avg_loss, avg_acc 56 57 58 #訓練模型,並打印模型的表現 59 best_valid_acc = float('-inf') 60 61 for epoch in range(30): 62 63 start_time = time.time() 64 65 train_loss, train_acc = train(model, train_iterator, optimizer, criteon) 66 dev_loss, dev_acc = evaluate(model, dev_iterator, criteon) 67 68 end_time = time.time() 69 70 epoch_mins, epoch_secs = divmod(end_time - start_time, 60) 71 72 if dev_acc > best_valid_acc: #只要模型效果變好,就保存 73 best_valid_acc = dev_acc 74 torch.save(model.state_dict(), 'wordavg-model.pt') 75 76 print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs:.2f}s') 77 print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%') 78 print(f'\t Val. Loss: {dev_loss:.3f} | Val. Acc: {dev_acc*100:.2f}%') 79 80 81 #用保存的模型參數預測數據 82 model.load_state_dict(torch.load("wordavg-model.pt")) 83 test_loss, test_acc = evaluate(model, test_iterator, criteon) 84 print(f'Test. Loss: {test_loss:.3f} | Test. Acc: {test_acc*100:.2f}%')
結果並沒有提升很多,可能因為數據量小,句子比較短:

