前言
沒有我想象中的難,畢竟站在前人的肩膀上,但還是遇到許多小困難,甚至一度想放棄
用時:兩整天(白天)
目的:訓練一個transformer模型,輸入[1,2,3,4],能預測出[5,6,7,8]
最終效果:transformer model各層及維度符合預期,能train,predict還有一點點問題
主要參考:
https://github.com/aladdinpersson/Machine-Learning-Collection/blob/master/ML/Pytorch/more_advanced/transformer_from_scratch/transformer_from_scratch.py
https://github.com/aladdinpersson/Machine-Learning-Collection/blob/master/ML/Pytorch/more_advanced/seq2seq_transformer/seq2seq_transformer.py
https://zhuanlan.zhihu.com/p/415318478
http://nlp.seas.harvard.edu/2018/04/03/attention.html
https://arxiv.org/pdf/1706.03762.pdf
Transformer部分
主要依據就是論文中的這張圖:
先寫重點部分:
1. 注意力機制
假設batch_size=2, seq_len=100, d_model=256, heads=8
這里Q,K,V維度都是相同的,由於分頭了,將d_model例如拆成heads份,所以維數是[2, 8, 100, 32]
def attention(query, key, value, mask=None, dropout=None): # 取query的最后一維,即embedding的維數 d_k = query.size(-1) #按照注意力公式,將query與key的轉置相乘,這里面key是將最后兩個維度進行轉置,再除以縮放系數得到注意力得分張量scores # 如果query是[len, embed], 那么socres是[len, len] scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) if mask is not None: # mask(也是[len, len]) 與 score 每個位置一一比較,如果mask[i][j]為0,則將scores[i][j]改為-1e9 # 負很大的數,在softmax的相當於沒有 scores = scores.masked_fill(mask==0, -1e9) # 對最后一維進行softmax scores = F.softmax(scores, dim=-1) if dropout is not None: scores = dropout(scores) # 最后,根據公式將p_attn與value張量相乘獲得最終的query注意力表示,同時返回權重 return torch.matmul(scores, value), scores
2. MultiHead Attention
只是將d_model拆成了8份,但並不需要寫8次循環,將維數調整成[batch_size, heads, len, d_k],調用前面的attention函數能直接計算
class MultihHeadAttention(nn.Module): def __init__(self, d_model, h, dropout=0.1): super(MultihHeadAttention, self).__init__() # 判斷h是否能被d_model整除,這是因為我們之后要給每個頭分配等量的詞特征 assert d_model % h == 0 #得到每個頭獲得的分割詞向量維度d_k self.d_k = d_model // h self.h = h self.w_key = nn.Linear(d_model, d_model) self.w_query = nn.Linear(d_model, d_model) self.w_value = nn.Linear(d_model, d_model) self.fc_out = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) self.atten = None # 返回的attention張量,現在還沒有,保存給可視化使用 def forward(self, query, key, value, mask=None): if mask is not None: mask = mask.unsqueeze(1) # head導致query等多了一維 batch_size = query.size(0) query = self.w_query(query).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) key = self.w_key(key).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) value = self.w_value(value).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) x, self.atten = attention(query, key, value, mask, self.dropout) x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k) return self.fc_out(x)
還有兩個相對比較簡單的層,
3. LayerNorm層
ref https://pytorch.org/docs/stable/generated/torch.nn.LayerNorm.html
可以直接用Pytorch中自帶的LayerNorm層,這里自己實現,
就是概率論里的標准化吧,(x-均值)/標准差,只是加了一些調節因子
調節因子的維數可以是和 X 一樣,也可以是X的最后一維?試了都能運算,有點沒整明白
class LayerNorm(nn.Module): def __init__(self, embedding_dim, eps=1e-6): # embedding_dim: 是一個size, 例如[batch_size, len, embedding_dim], 也可以是embedding_dim。。 super(LayerNorm, self).__init__() # 用 parameter 封裝,代表模型的參數,作為調節因子 self.a = nn.Parameter(torch.ones(embedding_dim)) self.b = nn.Parameter(torch.zeros(embedding_dim)) self.eps = eps def forward(self, x): # 其實就是對最后一維做標准化 mean = x.mean(-1, keepdim=True) std = x.std(-1, keepdim=True) return self.a * (x-mean) / (std+self.eps) + self.b
4. FeedForwardLayer層

class FeedForwardLayer(nn.Module): def __init__(self, d_model, forward_expansion): super(FeedForwardLayer, self).__init__() self.w1 = nn.Linear(d_model, d_model*forward_expansion) self.w2 = nn.Linear(d_model*forward_expansion, d_model) def forward(self, x): return self.w2((F.relu(self.w1(x))))
5. Embedding層
然后還有兩個Embedding層,

class PositionEmbedding(nn.Module): def __init__(self, d_model, max_len=1000): # max_len是每個句子的最大長度 super(PositionEmbedding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0)/d_model)) x = position * div_term pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # pe: [max_len, d_model] self.register_buffer('pe', pe) def forward(self, x): x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False) return x
6. Encoder層
首先定義一個TransformerBlock模塊,Encoder只是將其重復num_encoder_layers次
注意有殘差運算
class TransformerBlock(nn.Module): def __init__(self, embed_size, head, forward_expansion, dropout): super(TransformerBlock, self).__init__() self.attn = MultihHeadAttention(embed_size, head) self.norm1 = LayerNorm(embed_size) self.norm2 = LayerNorm(embed_size) self.feed_forward = FeedForwardLayer(embed_size, forward_expansion) self.dropout = nn.Dropout(dropout) def forward(self, query, key, value, mask): # ipdb.set_trace() attention = self.attn(query, key, value, mask) x = self.dropout(self.norm1(attention + query)) forward = self.feed_forward(x) out = self.dropout(self.norm2(forward + x)) return out
Encoder真的就只是重復幾次,注意,這里我把輸入處理放在模塊之外
class Encoder(nn.Module): def __init__( self, embed_size, num_layers, heads, forward_expansion, dropout=0.1, ): super(Encoder, self).__init__() self.layers = nn.ModuleList( [ TransformerBlock(embed_size, heads, forward_expansion, dropout) for _ in range(num_layers) ] ) self.dropout = nn.Dropout(dropout) def forward(self, x, mask): # ipdb.set_trace() for layer in self.layers: x = layer(x, x, x, mask) return x
7. Decoder層
定義基本模塊為 DecoderBlock,Decoder也只是將其重復多次
有一點需要注意的是這里的query=x,即decoder的上一層輸出,而value, key都是來自encoder_out,即encoder最后一層的輸出,如圖所示:
class DecoderBlock(nn.Module): def __init__(self, embed_size, heads, forward_expansion, dropout=0.1): super(DecoderBlock, self).__init__() self.norm = LayerNorm(embed_size) self.attn = MultihHeadAttention(embed_size, heads, dropout) self.transformer = TransformerBlock(embed_size, heads, forward_expansion, dropout) self.dropout = nn.Dropout(dropout) def forward(self, x, value, key, src_mask, trg_mask): attn = self.attn(x, x, x, trg_mask) query = self.dropout(self.norm(attn+x)) out = self.attn(query, value, key, src_mask) return out
class Decoder(nn.Module): def __init__( self, embed_size, num_layers, heads, forward_expansion, dropout=0.1, ): super(Decoder, self).__init__() self.layers = nn.ModuleList( [ DecoderBlock(embed_size, heads, forward_expansion, dropout) for _ in range(num_layers) ] ) self.dropout = nn.Dropout(dropout) def forward(self, x, encoder_out, src_mask, trg_mask): for layer in self.layers: x = layer(x, encoder_out, encoder_out, src_mask, trg_mask) return x
8. Transformer模塊
將Encoder和Decoder拼起來,並在這里集中處理兩者的輸入
注意,這里有兩個mask,一個是為了避免pad=0參與運算,一個是為了atten加權求和的時候不計算后面的
記錄一下維數:
假如src和trg是[batch_size, len]
則最終結果是[batch_size, len, trg_vocab_size]
class Transformer(nn.Module): def __init__( self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, embed_size=512, num_encoder_layers=6, num_decoder_layers=6, forward_expansion=4, heads=8, dropout=0, max_length=100, device="cpu", ): super(Transformer, self).__init__() self.src_pad_idx = src_pad_idx self.trg_pad_idx = trg_pad_idx self.device = device self.encoder = Encoder( embed_size, num_encoder_layers, heads, forward_expansion, dropout, ) self.decoder = Decoder( embed_size, num_decoder_layers, heads, forward_expansion, dropout, ) # self.word_embedding = WordEmbeddings(embed_size, src_vocab_size) # self.position_embedding = PositionEmbedding(embed_size, max_length) # self.word_embedding_2 = WordEmbeddings(embed_size, trg_vocab_size) # self.position_embedding_2 = PositionEmbedding(embed_size, max_length) self.src_word_embedding = nn.Embedding(src_vocab_size, embed_size) self.src_position_embedding = nn.Embedding(max_length, embed_size) self.trg_word_embedding = nn.Embedding(trg_vocab_size, embed_size) self.trg_position_embedding = nn.Embedding(max_length, embed_size) self.fc_out = nn.Linear(embed_size, trg_vocab_size) self.dropout = nn.Dropout(dropout) def make_src_mask(self, src): src_mask = (src != self.src_pad_idx).unsqueeze(1) # (N, 1, src_len) return src_mask.to(self.device) def make_trg_mask(self, trg): N, trg_len = trg.shape trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand( N, trg_len, trg_len ) def forward(self, src, trg): # ipdb.set_trace() N, src_seq_length = src.shape N, trg_seq_length = trg.shape src_positions = ( torch.arange(0, src_seq_length) .unsqueeze(0) .expand(N, src_seq_length) .to(self.device) ) trg_positions = ( torch.arange(0, trg_seq_length) .unsqueeze(0) .expand(N, trg_seq_length) .to(self.device) ) src_mask = self.make_src_mask(src) trg_mask = self.make_trg_mask(trg) # encoder部分 x = self.dropout( self.src_word_embedding(src) + self.src_position_embedding(src_positions) ) encoder_out = self.encoder(x, src_mask) # decoder部分 x = self.dropout( self.trg_word_embedding(trg) + self.trg_position_embedding(trg_positions) ) decoder_out = self.decoder(x, encoder_out, src_mask, trg_mask) out = self.fc_out(decoder_out) return out
Train部分
相比起model部分,train部分難寫得多。因為model結構固定,網上參考的也很多;train部分則與自己的數據緊密相關
1. 生成數據集
ref:
https://towardsdatascience.com/how-to-use-datasets-and-dataloader-in-pytorch-for-custom-text-data-270eed7f7c00
https://pytorch.org/tutorials/beginner/data_loading_tutorial.html
https://sparrow.dev/pytorch-dataloader/
我也單獨進行了總結 https://www.cnblogs.com/lfri/p/15479166.html
import csv import random import config header = ['sentence_a', 'sentence_b'] data = [[1,2,3,4], [5,6,7,8]] max_length = config.max_length entry_num = config.entry_num with open(config.file_root, 'w', encoding='UTF8') as f: writer = csv.writer(f) # write the header writer.writerow(header) # write the data # writer.writerow(data) for _ in range(entry_num): s = random.randint(1, max_length/2) len = random.randint(1, max_length/4) data[0] = [i for i in range(s, s+len)] data[1] = [i for i in range(s+len, s+2*len)] writer.writerow(data)
2. 訓練
創建Dataset和上面的迭代器train_iterator
dataset = SeqDataset(config.file_root, max_length=config.max_length) train_iterator = DataLoader(dataset, batch_size=config.batch_size, shuffle=False, num_workers=0, collate_fn=None) --snip-- for batch_idx, batch in enumerate(train_iterator): # Get input and targets and get to cuda src, trg = batch src = src.to(config.device) trg = trg.to(config.device)
這樣可以得到src和trg,然后可以輸入到模型得到輸出
output = model(src, trg)
那output與trg計算交叉熵,也就是loss
假如output: [batch_size, len, trg_vocab_size], trg: [batch_size, len],並不能直接計算,需要分別resize成二維和一維
ref https://www.cnblogs.com/lfri/p/15480326.html
output = output.reshape(-1, config.trg_vocab_size) trg = trg.reshape(-1) loss = criterion(output, trg)
然后再反向傳播、梯度下降
# Back prop loss.backward() # Gradient descent step optimizer.step()
為了可視化loss,使用了tensorboard
ref
https://towardsdatascience.com/a-complete-guide-to-using-tensorboard-with-pytorch-53cb2301e8c3
https://towardsdatascience.com/pytorch-performance-analysis-with-tensorboard-7c61f91071aa
writer.add_scalar("Training loss", loss, global_step=step) # writer.add_graph(model, [src, target]) # writer.add_histogram("weight", model.decoder.layers[2].attn.atten ,step)
不僅可以可視化loss,還可以可視化model,甚至model某一個的某個權重
3. 預測
最后是進行預測
我沒有使用單獨的測試集,而只是取一個固定序列,實時檢驗模型的效果
其中用到了argmax函數:取某維中的最大值,相當於one-hot轉index
ref https://www.cnblogs.com/lfri/p/15480326.html
# 評估 model.eval() translated_sentence = my_predict( model, config.device, config.max_length ) --snip-- def my_predict(model, device, max_lenght): indexes = [3, 4, 5, 6, 7] sentence_tensor = torch.LongTensor(indexes).unsqueeze(0).to(device) outputs = [8] for i in range(max_lenght): trg_tensor = torch.LongTensor(outputs).unsqueeze(0).to(device) with torch.no_grad(): output = model(sentence_tensor, trg_tensor) best_guess = output.argmax(2)[:, -1].item() outputs.append(best_guess) # print("best_guess: ", best_guess) if best_guess == 0: break return outputs
訓練效果
數據條目:100
num_epochs = 100
測試效果
測試是不可能測試的,能run起來就算成功
my_predict就是測試了,並沒有如預想的一樣,生成緊接着的等長序列
訓練不夠?數據集不夠?或者模型有問題?或All FAKE?
其他細節
1. to.device()
哪些東西需要綁定到GPU呢?
目前知道的有model, src, trg,以及模型中forward時創建的中間變量,例如本項目中的 src_positions 和 trg_possition
2. dropout
為了防止過擬合,通常都會加一些Dropout層,什么時候加,加到哪有什么講究嗎?
3. bug
又發現一些明顯錯誤,竟然能run。。
To do
- 加數據、epoch_nunms、網絡層數訓練
- attention可視化