簡介
本次作業所用到的數據為Twitter上的推文,訓練數據會被打上正面或負面的標簽,最終我們要對無標簽的句子分類。
帶標簽的訓練數據,中間的+++$+++只是分隔符,共200000條數據。
不帶標簽的訓練數據,共1178614條數據。
測試數據,共200000條數據。
數據處理
讀數據
import torch import pandas as pd import torch.nn as nn def load_training_data(path='./data/training_label.txt'): """ 讀取訓練數據 :param path:數據路徑 :return: 若為帶標簽的訓練數據,返回x,y 若為不帶標簽的訓練數據,返回y """ if 'training_label' in path: with open(path, 'r', encoding='UTF-8') as f: lines = f.readlines() lines = [line.strip().split(' ') for line in lines] train_x = [line[2:] for line in lines] train_y = [line[0] for line in lines] return train_x, train_y else: with open(path, 'r', encoding='UTF-8') as f: lines = f.readlines() train_x = [line.strip().split(' ') for line in lines] return train_x def load_testing_data(path='./data/testing_data.txt'): """ 讀取測試數據 :param path:數據路徑 :return:返回x """ with open(path, 'r', encoding='UTF-8') as f: lines = f.readlines() test_data = [''.join(line.strip('\n').split(',', 1)[1:]).strip() for line in lines[1:]] # 在第一個,處將句子分成兩半,並取后一半 test_data = [sen.split(' ') for sen in test_data] return test_data
詞向量
導入第三方庫gensim計算得到。
Word2vec參數含義: size: 詞向量的維度。 alpha: 模型初始的學習率。 window: 表示在一個句子中,當前詞於預測詞在一個句子中的最大距離。 min_count: 用於過濾操作,詞頻少於 min_count 次數的單詞會被丟棄掉,默認值為 5。 max_vocab_size: 設置詞向量構建期間的 RAM 限制。如果所有的獨立單詞數超過這個限定詞,那么就刪除掉其中詞頻最低的那個。根據統計,每一千萬個單詞大概需要 1GB 的RAM。如果我們把該值設置為 None ,則沒有限制。 sample: 高頻詞匯的隨機降采樣的配置閾值,默認為 1e-3,范圍是 (0, 1e-5)。 seed: 用於隨機數發生器。與詞向量的初始化有關。 workers: 控制訓練的並行數量。 min_alpha: 隨着訓練進行,alpha 線性下降到 min_alpha。 sg: 用於設置訓練算法。當 sg=0,使用 CBOW 算法來進行訓練;當 sg=1,使用 skip-gram 算法來進行訓練。 hs: 如果設置為 1 ,那么系統會采用 hierarchica softmax 技巧。如果設置為 0(默認情況),則系統會采用 negative samping 技巧。 negative: 如果這個值大於 0,那么 negative samping 會被使用。該值表示 “noise words” 的數量,一般這個值是 5 - 20,默認是 5。如果這個值設置為 0,那么 negative samping 沒有使用。 cbow_mean: 如果這個值設置為 0,那么就采用上下文詞向量的總和。如果這個值設置為 1 (默認情況下),那么我們就采用均值。但這個值只有在使用 CBOW 的時候才起作用。 hashfxn: hash函數用來初始化權重,默認情況下使用 Python 自帶的 hash 函數。 iter: 算法迭代次數,默認為 5。 trim_rule: 用於設置詞匯表的整理規則,用來指定哪些詞需要被剔除,哪些詞需要保留。默認情況下,如果 word count < min_count,那么該詞被剔除。這個參數也可以被設置為 None,這種情況下 min_count 會被使用。 sorted_vocab: 如果這個值設置為 1(默認情況下),則在分配 word index 的時候會先對單詞基於頻率降序排序。 batch_words: 每次批處理給線程傳遞的單詞的數量,默認是 10000。
from gensim.models import Word2Vec from utils import load_training_data from utils import load_testing_data def train_word2vec(x): model = Word2Vec(x, size=250, window=5, min_count=5, workers=12, iter=10, sg=1) return model print('loading training data ...') train_x, train_y = load_training_data() train_x_no_label = load_training_data('./data/training_nolabel.txt') print('load testing data ...') test_x = load_testing_data() word2evc_model = train_word2vec(train_x + test_x) print('saving model ...') word2evc_model.save('w2v.model')
PreProcess
定義一個數據預處理的類,這個類主要實現調整單詞長度、生成單詞與詞向量映射關系等。
import torch from gensim.models import Word2Vec class PreProcess(): def __init__(self, sentences, sen_len, w2v_path): self.w2v_path = w2v_path # 模型存儲地址 self.sentences = sentences # 句子 self.sen_len = sen_len # 句子長度 self.idx2word = [] self.word2idx = {} self.embedding_matrix = [] # 詞向量矩陣 def get_w2v_model(self): # 讀取之前訓練好的 word2vec self.embedding = Word2Vec.load(self.w2v_path) self.embedding_dim = self.embedding.vector_size def add_embedding(self, word): # 這里的 word 只會是 "<PAD>" 或 "<UNK>" # 把一個隨機生成的表征向量 vector 作為 "<PAD>" 或 "<UNK>" 的嵌入 vector = torch.empty(1, self.embedding_dim) torch.nn.init.uniform_(vector) self.idx2word.append(word) self.word2idx[word] = len(self.word2idx) self.embedding_matrix = torch.cat([self.embedding_matrix, vector], 0) def make_embedding(self, load=True): # 生成詞向量矩陣 print("Get embedding ...") if load: print("loading word to vec model ...") self.get_w2v_model() # 獲取訓練好的 Word2vec word embedding else: raise NotImplementedError for i, word in enumerate(self.embedding.wv.vocab): # 遍歷詞向量 print('\r當前構建詞向量矩陣進度:{:.2f}%'.format(i/len(self.embedding.wv.vocab)*100), end='') self.idx2word.append(word) # idx2word是一個列表,列表的下標索引對應了單詞 self.word2idx[word] = len(self.word2idx) # self.word2idx[word] = self.idx2word.index(word) # 也可以這樣寫,但這樣速度會慢一些 self.embedding_matrix.append(self.embedding[word]) # 在embedding_matrix中加入詞向量,word所對應的索引就是詞向量在embedding_matrix所在的行 print('') self.embedding_matrix = torch.tensor(self.embedding_matrix) # 轉成tensor # 將 <PAD> 和 <UNK> 加入 embedding self.add_embedding("<PAD>") # 訓練時需要將每個句子調整成相同的長度,短的句子需要補<PAD> self.add_embedding("<UNK>") # word2vec時有些詞頻低的被刪掉了,所以有些詞可能沒有詞向量,對於這種詞,統一用一個隨機的<UNK>詞向量表示 print("total words: {}".format(len(self.embedding_matrix))) return self.embedding_matrix def pad_sequence(self, sentence): # 將句子調整成相同長度,即sen_len if len(sentence) > self.sen_len: sentence = sentence[:self.sen_len] # 截斷 else: pad_len = self.sen_len - len(sentence) # 補<PAD> for _ in range(pad_len): sentence.append(self.word2idx['<PAD>']) assert len(sentence) == self.sen_len return sentence def sentence_word2idx(self): # 將句子單詞用詞向量索引表示 sentence_list = [] for i, sen in enumerate(self.sentences): sentence_idx = [] for word in sen: if word in self.word2idx.keys(): sentence_idx.append(self.word2idx[word]) else: sentence_idx.append(self.word2idx["<UNK>"]) # 表中沒有的詞用<UNK>表示 sentence_idx = self.pad_sequence(sentence_idx) # 調整長度 sentence_list.append(sentence_idx) return torch.LongTensor(sentence_list) # torch.size(句子數, sen_len) def labels_to_tensor(self, y): # 把 labels 轉成 tensor y = [int(label) for label in y] return torch.LongTensor(y)
DataSet
from torch.utils.data import Dataset class TwitterDataset(Dataset): def __init__(self, X, y): self.data = X self.label = y def __getitem__(self, idx): if self.label is None: return self.data[idx] return self.data[idx], self.label[idx] def __len__(self): return len(self.data)
模型定義
定義一個簡單的只有一層的LSTM,其中詞向量由gensim訓練得到的數據導入,關於LSTM參數不了解的可以看:傳送門
import torch from gensim.models import Word2Vec import torch.nn as nn class LSTM_Net(nn.Module): def __init__(self, embedding, embedding_dim, hidden_dim, num_layers, dropout=0.5, fix_embedding=True): super(LSTM_Net, self).__init__() self.embedding = torch.nn.Embedding(embedding.size(0), embedding.size(1)) self.embedding.weight = torch.nn.Parameter(embedding) # 是否將embedding固定住,不固定的話embedding會在訓練過程中隨之改變 self.embedding.weight.requires_grad = False if fix_embedding else True self.embedding_dim = embedding.size(1) # 詞向量的維度,也就是之后的input_size self.hidden_dim = hidden_dim # 隱藏層維度 self.num_layers = num_layers # LSTM層數 self.dropout = dropout self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True) self.classifier = nn.Sequential( nn.Dropout(dropout), nn.Linear(hidden_dim, 1), nn.Sigmoid() ) def forward(self, inputs): inputs = self.embedding(inputs) # 先將索引映射為詞向量 x, _ = self.lstm(inputs, None) # x的dimension (batch, seq_len, hidden_size) # 取句子最后一個單詞輸出的hidden state丟到分類器中 x = x[:, -1, :] x = self.classifier(x) return x
模型訓練
from sklearn.model_selection import train_test_split from utils import load_training_data, load_testing_data from _class import PreProcess, LSTM_Net, TwitterDataset from torch.utils.data import DataLoader import torch import torch.nn as nn def evaluation(outputs, labels): # outputs => 預測值,概率(float) # labels => 真實值,標簽(0或1) outputs[outputs >= 0.5] = 1 # 大於等於0.5為正面 outputs[outputs < 0.5] = 0 # 小於0.5為負面 accuracy = torch.sum(torch.eq(outputs, labels)).item() return accuracy def training(batch_size, n_epoch, lr, train, valid, model, device): # 輸出模型總的參數數量、可訓練的參數數量 total = sum(p.numel() for p in model.parameters()) # 返回數組中元素的個數 trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) print('\nstart training, parameter total:{}, trainable:{}\n'.format(total, trainable)) loss = nn.BCELoss() # 定義損失函數為二元交叉熵損失 binary cross entropy loss t_batch = len(train) # training數據的batch數量 v_batch = len(valid) # validation數據的batch數量 optimizer = torch.optim.Adam(model.parameters(), lr=lr) # optimizer用Adam,設置適當的學習率lr total_loss, total_acc, best_acc = 0, 0, 0 for epoch in range(n_epoch): total_loss, total_acc = 0, 0 # training model.train() for i, (inputs, labels) in enumerate(train): inputs = inputs.to(device, dtype=torch.long) # 因為 device 為 "cuda",將 inputs 轉成 torch.cuda.LongTensor labels = labels.to(device, dtype=torch.float) # 因為 device 為 "cuda",將 labels 轉成 torch.cuda.FloatTensor,loss()需要float optimizer.zero_grad() # 由於 loss.backward() 的 gradient 會累加,所以每一個 batch 后需要歸零 outputs = model(inputs) # 模型輸入Input,輸出output outputs = outputs.squeeze() # 去掉最外面的 dimension,好讓 outputs 可以丟進 loss() batch_loss = loss(outputs, labels) # 計算模型此時的 training loss batch_loss.backward() # 計算 loss 的 gradient optimizer.step() # 更新模型參數 accuracy = evaluation(outputs, labels) # 計算模型此時的 training accuracy total_acc += (accuracy / batch_size) total_loss += batch_loss.item() print('Epoch | {}/{}'.format(epoch + 1, n_epoch)) print('Train | Loss:{:.5f} Acc: {:.3f}'.format(total_loss / t_batch, total_acc / t_batch * 100)) # validation model.eval() with torch.no_grad(): total_loss, total_acc = 0, 0 for i, (inputs, labels) in enumerate(valid): inputs = inputs.to(device, dtype=torch.long) labels = labels.to(device, dtype=torch.float) outputs = model(inputs) outputs = outputs.squeeze() batch_loss = loss(outputs, labels) accuracy = evaluation(outputs, labels) total_acc += (accuracy / batch_size) total_loss += batch_loss.item() print("Valid | Loss:{:.5f} Acc: {:.3f} ".format(total_loss / v_batch, total_acc / v_batch * 100)) if total_acc > best_acc: # 如果 validation 的結果優於之前所有的結果,就把當下的模型保存下來,用於之后的testing best_acc = total_acc torch.save(model, "ckpt.model") print('-----------------------------------------------') device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') sen_len = 20 fix_embedding = True batch_size = 128 epoch = 10 lr = 0.001 w2v_path = 'w2v.model' print('loading data...') train_x, train_y = load_training_data() train_x_no_label = load_training_data('./data/training_nolabel.txt') pre_process = PreProcess(train_x, sen_len, w2v_path) embedding = pre_process.make_embedding(load=True) train_x = pre_process.sentence_word2idx() train_y = pre_process.labels_to_tensor(train_y) model = LSTM_Net(embedding, embedding_dim=250, hidden_dim=150, num_layers=1, dropout=0.5, fix_embedding=fix_embedding) model = model.to(device) # stratify=y是指按照y標簽來分層,也就是數據分層后標簽的比例大致等同於原先標簽比例 X_train, X_val, y_train, y_val = train_test_split(train_x, train_y, test_size=0.1, random_state=1, stratify=train_y) train_dataset = TwitterDataset(X_train, y_train) val_dataset = TwitterDataset(X_val, y_val) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0) # 開始訓練 training(batch_size, epoch, lr, train_loader, val_loader, model, device)
預測
import torch from utils import load_testing_data from _class import PreProcess, TwitterDataset, LSTM_Net from torch.utils.data import DataLoader import pandas as pd def testing(batch_size, test_loader, model, device): model.eval() ret_output = [] with torch.no_grad(): for i, inputs in enumerate(test_loader): inputs = inputs.to(device, dtype=torch.long) outputs = model(inputs) outputs = outputs.squeeze() outputs[outputs >= 0.5] = 1 outputs[outputs < 0.5] = 0 ret_output += outputs.int().tolist() # outputs是Tensor且是float,故轉化一下 return ret_output sen_len = 20 batch_size = 128 w2v_path = 'w2v.model' device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') test = load_testing_data() pre_process = PreProcess(test, sen_len, w2v_path) embedding = pre_process.make_embedding(load=True) test = pre_process.sentence_word2idx() test_dataset = TwitterDataset(test, None) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0) model = torch.load('ckpt.model') outputs = testing(batch_size, test_loader, model, device) tmp = pd.DataFrame({'id': [str(i) for i in range(len(test))], 'label': outputs}) print("save csv ...") tmp.to_csv('predict.csv', index=False) print("Finish Predicting")
優化
增加語料庫
之前只使用了有標簽的訓練數據來構建詞向量,現在將無標簽的訓練數據也一並加上,只需要在訓練word2vec處加上tanin_x_no_label的數據,即:
# word2evc_model = train_word2vec(train_x + test_x) word2evc_model = train_word2vec(train_x + train_x_no_label + test_x) print('saving model ...') # word2evc_model.save('w2v.model') word2evc_model.save('new_w2v.model')
注釋部分為原先的代碼,之后也要記得把代碼中使用w2v.model的部分改為new_w2v.model。
可以看到,增大語料庫還是有作用的。
self_training
接下來用到李宏毅老師視頻里所講到的技術,對於無標簽數據,可以根據訓練得到的模型對它進行分類,當然,需要設定一個閾值,達到這個閾值的數據就可以標上標簽成為訓練集的一部分,沒有達到閾值的則繼續后新的模型進行預測。
先對類PreProcess()略作修改,之前的代碼在創建對象的時候就固定了數據,但是這次要加入無標簽的數據,它也要詞嵌入,所以改成在調用類方法的時候輸入數據。
class PreProcess(): def __init__(self, sen_len, w2v_path): self.w2v_path = w2v_path # 模型存儲地址 self.sen_len = sen_len # 句子長度 self.idx2word = [] self.word2idx = {} self.embedding_matrix = [] # 詞向量矩陣 def get_w2v_model(self): # 讀取之前訓練好的 word2vec self.embedding = Word2Vec.load(self.w2v_path) self.embedding_dim = self.embedding.vector_size def add_embedding(self, word): # 這里的 word 只會是 "<PAD>" 或 "<UNK>" # 把一個隨機生成的表征向量 vector 作為 "<PAD>" 或 "<UNK>" 的嵌入 vector = torch.empty(1, self.embedding_dim) torch.nn.init.uniform_(vector) self.idx2word.append(word) self.word2idx[word] = len(self.word2idx) self.embedding_matrix = torch.cat([self.embedding_matrix, vector], 0) def make_embedding(self, load=True): # 生成詞向量矩陣 print("Get embedding ...") if load: print("loading word to vec model ...") self.get_w2v_model() # 獲取訓練好的 Word2vec word embedding else: raise NotImplementedError for i, word in enumerate(self.embedding.wv.vocab): # 遍歷詞向量 print('\r當前構建詞向量矩陣進度:{:.2f}%'.format(i/len(self.embedding.wv.vocab)*100), end='') self.idx2word.append(word) # idx2word是一個列表,列表的下標索引對應了單詞 self.word2idx[word] = len(self.word2idx) # self.word2idx[word] = self.idx2word.index(word) # 也可以這樣寫,但這樣速度會慢一些 self.embedding_matrix.append(self.embedding[word]) # 在embedding_matrix中加入詞向量,word所對應的索引就是詞向量在embedding_matrix所在的行 print('') self.embedding_matrix = torch.tensor(self.embedding_matrix) # 轉成tensor # 將 <PAD> 和 <UNK> 加入 embedding self.add_embedding("<PAD>") # 訓練時需要將每個句子調整成相同的長度,短的句子需要補<PAD> self.add_embedding("<UNK>") # word2vec時有些詞頻低的被刪掉了,所以有些詞可能沒有詞向量,對於這種詞,統一用一個隨機的<UNK>詞向量表示 print("total words: {}".format(len(self.embedding_matrix))) return self.embedding_matrix def pad_sequence(self, sentence): # 將句子調整成相同長度,即sen_len if len(sentence) > self.sen_len: sentence = sentence[:self.sen_len] # 截斷 else: pad_len = self.sen_len - len(sentence) # 補<PAD> for _ in range(pad_len): sentence.append(self.word2idx['<PAD>']) assert len(sentence) == self.sen_len return sentence def sentence_word2idx(self, sentences): # 將句子單詞用詞向量索引表示 sentence_list = [] for i, sen in enumerate(sentences): sentence_idx = [] for word in sen: if word in self.word2idx.keys(): sentence_idx.append(self.word2idx[word]) else: sentence_idx.append(self.word2idx["<UNK>"]) # 表中沒有的詞用<UNK>表示 sentence_idx = self.pad_sequence(sentence_idx) # 調整長度 sentence_list.append(sentence_idx) return torch.LongTensor(sentence_list) # torch.size(句子數, sen_len) def labels_to_tensor(self, y): # 把 labels 轉成 tensor y = [int(label) for label in y] return torch.LongTensor(y)
添加方法add_train(),threshold是設定的閾值,當outputs中的概率值大於threshold時,我們就可以認為它的標簽就是1;相反,如果小於1-threshold,則認為它的標簽為0。其余的數據還比較迷,需要繼續用模型去區分。
def add_train(outputs, threshold=0.9): idx = (outputs >= threshold) | (outputs < 1 - threshold) outputs[outputs > threshold] = 1 outputs[outputs < 1-threshold] = 0 return outputs, idx
訓練的代碼需要修改的比較多,每次訓練后都要對無標簽的數據進行預測,並將那些合格的數據加入訓練集,不合格的數據繼續用新模型預測。因為每一次迭代訓練數據都可能會增加,所以每次訓練時訓練數據都要重新封裝。
所以,訓練部分的完整代碼為:
from sklearn.model_selection import train_test_split from utils import load_training_data, load_testing_data from _class import PreProcess, LSTM_Net, TwitterDataset from torch.utils.data import DataLoader import torch import torch.nn as nn def evaluation(outputs, labels): # outputs => 預測值,概率(float) # labels => 真實值,標簽(0或1) outputs[outputs >= 0.5] = 1 # 大於等於0.5為正面 outputs[outputs < 0.5] = 0 # 小於0.5為負面 accuracy = torch.sum(torch.eq(outputs, labels)).item() return accuracy def add_train(outputs, threshold=0.9): idx = (outputs >= threshold) | (outputs < 1 - threshold) outputs[outputs > threshold] = 1 outputs[outputs < 1-threshold] = 0 return outputs, idx def training(batch_size, n_epoch, lr, X_train, y_train, valid, train_x_no_label, model, device): # 輸出模型總的參數數量、可訓練的參數數量 total = sum(p.numel() for p in model.parameters()) # 返回數組中元素的個數 trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) print('\nstart training, parameter total:{}, trainable:{}\n'.format(total, trainable)) loss = nn.BCELoss() # 定義損失函數為二元交叉熵損失 binary cross entropy loss v_batch = len(valid) # validation數據的batch數量 optimizer = torch.optim.Adam(model.parameters(), lr=lr) # optimizer用Adam,設置適當的學習率lr total_loss, total_acc, best_acc = 0, 0, 0 for epoch in range(n_epoch): total_loss, total_acc = 0, 0 # training train_dataset = TwitterDataset(X_train, y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0) t_batch = len(train_loader) print('epoch: % d | batch_num: % d' % (epoch+1, t_batch)) model.train() for i, (inputs, labels) in enumerate(train_loader): inputs = inputs.to(device, dtype=torch.long) # 因為 device 為 "cuda",將 inputs 轉成 torch.cuda.LongTensor labels = labels.to(device, dtype=torch.float) # 因為 device 為 "cuda",將 labels 轉成 torch.cuda.FloatTensor,loss()需要float optimizer.zero_grad() # 由於 loss.backward() 的 gradient 會累加,所以每一個 batch 后需要歸零 outputs = model(inputs) # torch.size([batch_size, 1]) outputs = outputs.squeeze() # torch.size([batch_size]), 與labels保持一致 batch_loss = loss(outputs, labels) # 計算模型此時的 training loss batch_loss.backward() # 計算 loss 的 gradient optimizer.step() # 更新模型參數 accuracy = evaluation(outputs, labels) # 計算模型此時的 training accuracy total_acc += (accuracy / batch_size) total_loss += batch_loss.item() print('Epoch | {}/{}'.format(epoch + 1, n_epoch)) print('Train | Loss:{:.5f} Acc: {:.3f}'.format(total_loss / t_batch, total_acc / t_batch * 100)) if epoch > 3: # 先訓練幾次,再加入no_label數據 model.eval() train_x_no_label_dataset = TwitterDataset(train_x_no_label, None) train_x_no_label_loader = DataLoader(train_x_no_label_dataset, batch_size=batch_size, shuffle=False, num_workers=0) tmp = torch.Tensor() with torch.no_grad(): for i, (inputs) in enumerate(train_x_no_label_loader): inputs = inputs.to(device, dtype=torch.long) outputs = model(inputs) outputs = outputs.squeeze() # torch.size([batch_size]) labels, idx = add_train(outputs) # 篩選 X_train = torch.cat((X_train.to(device), inputs[idx].to(device)), dim=0) # 合格的數據加入訓練集 y_train = torch.cat((y_train.to(device), labels[idx].to(device)), dim=0) tmp = torch.cat((tmp.to(device), inputs[~idx].to(device)), dim=0) # 不合格的數據繼續用新模型訓練 train_x_no_label = tmp # validation model.eval() with torch.no_grad(): total_loss, total_acc = 0, 0 for i, (inputs, labels) in enumerate(valid): inputs = inputs.to(device, dtype=torch.long) labels = labels.to(device, dtype=torch.float) outputs = model(inputs) outputs = outputs.squeeze() batch_loss = loss(outputs, labels) accuracy = evaluation(outputs, labels) total_acc += (accuracy / batch_size) total_loss += batch_loss.item() print("Valid | Loss:{:.5f} Acc: {:.3f} ".format(total_loss / v_batch, total_acc / v_batch * 100)) if total_acc > best_acc: # 如果 validation 的結果優於之前所有的結果,就把當下的模型保存下來,用於之后的testing best_acc = total_acc torch.save(model, "ckpt.model") print('-----------------------------------------------') device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') sen_len = 20 fix_embedding = True batch_size = 128 epoch = 10 lr = 0.001 # w2v_path = 'w2v.model' w2v_path = 'new_w2v.model' print('loading data...') train_x, train_y = load_training_data() train_x_no_label = load_training_data('./data/training_nolabel.txt') pre_process = PreProcess(sen_len, w2v_path) embedding = pre_process.make_embedding(load=True) train_x = pre_process.sentence_word2idx(train_x) train_y = pre_process.labels_to_tensor(train_y) train_x_no_label = pre_process.sentence_word2idx(train_x_no_label) # 新增 model = LSTM_Net(embedding, embedding_dim=250, hidden_dim=150, num_layers=1, dropout=0.5, fix_embedding=fix_embedding) model = model.to(device) # stratify=y是指按照y標簽來分層,也就是數據分層后標簽的比例大致等同於原先標簽比例 X_train, X_val, y_train, y_val = train_test_split(train_x, train_y, test_size=0.1, random_state=1, stratify=train_y) # train_dataset = TwitterDataset(X_train, y_train) val_dataset = TwitterDataset(X_val, y_val) # train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0) # 開始訓練 training(batch_size, epoch, lr, X_train, y_train, val_loader, train_x_no_label, model, device)
可以看到,批次數量在不斷增加,說明總的訓練數據batch_num * batch_size在不斷增加。
再次提交,分數又略有提高。
BiLSTM + self-attention
沒學會attention,自己胡亂寫了一通,效果嘛,也就一般般。。。
class AttenBiLSTM(nn.Module): def __init__(self, embedding, embedding_dim, hidden_dim, num_layers, dropout=0.5, fix_embedding=True): super(AttenBiLSTM, self).__init__() self.embedding = torch.nn.Embedding(embedding.size(0), embedding.size(1)) self.embedding.weight = torch.nn.Parameter(embedding) # 是否將embedding固定住,不固定的話embedding會在訓練過程中隨之改變 self.embedding.weight.requires_grad = False if fix_embedding else True self.embedding_dim = embedding.size(1) # 詞向量的維度,也就是之后的input_size self.hidden_dim = hidden_dim # 隱藏層維度 self.num_layers = num_layers # LSTM層數 self.dropout = dropout self.bi_lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True) self.classifier = nn.Sequential( nn.Dropout(dropout), nn.Linear(hidden_dim*2, hidden_dim), nn.Dropout(dropout), nn.Linear(hidden_dim, 64), nn.Dropout(dropout), nn.Linear(64, 32), nn.Dropout(dropout), nn.Linear(32, 16), nn.Dropout(dropout), nn.Linear(16, 1), nn.Sigmoid() ) self.Q_layer = nn.Sequential( nn.Linear(hidden_dim*2, hidden_dim*2), nn.ReLU() ) self.K_layer = nn.Sequential( nn.Linear(hidden_dim*2, hidden_dim*2), nn.ReLU() ) self.V_layer = nn.Sequential( nn.Linear(hidden_dim*2, hidden_dim*2), nn.ReLU() ) def attention(self, q, k, v): # q, k, v (batch_size, seq_len, hidden_size * num_direction) d_k = q.size(-1) scores = torch.bmm(q, k.transpose(1, 2)) / math.sqrt(d_k) attn = nn.functional.softmax(scores, dim=-1) context = torch.bmm(attn, v).sum(1) return context def forward(self, inputs): inputs = self.embedding(inputs) # outputs: torch.size([batch_size, seq_len, num_directions * hidden_size]) # hidden: torch.size([num_layers * num_directions, batch_size, hidden_size]) # cn: torch.size([num_layers * num_directions, batch_size, hidden_size]) outputs, (hidden, cn) = self.bi_lstm(inputs, None) # 如果采用這種寫法的話一定要將設置training,默認值是False,不會改變狀態 #query = nn.functional.dropout(outputs, p=0.5, training=self.training) # hidden = hidden.permute(1, 0, 2) q = self.Q_layer(outputs) k = self.K_layer(outputs) v = self.V_layer(outputs) atten_out = self.attention(q, k, v) return self.classifier(atten_out)
參考
【1】⭐ 李宏毅2020機器學習作業4-RNN:句子情感分類