本文基於Pytorch實現,省略細節專注於seq2seq模型的大體框架
並參考 https://github.com/bentrevett/pytorch-seq2seq (本文的大多圖片都來源於此)
介紹
大部分的NLP中的Seq2seq模型都是使用的encoder-decoder框架,即以一個Encoder來編碼輸入的Sequence,再以一個Decoder來輸出Sequence。其中具體的細節會在后面對應的 Encoder 與 Decoder 中展開介紹, 這里只需要知道Seq2seq模型的大致框架是一個序列經過decoder得到一個隱狀態,再通過這個隱狀態使用decoder得到最終需要的序列。如下圖所示為一個德語翻譯為英語的文本翻譯任務,這個圖就很好的展示了seq2seq模型的構造。Embedding
在說具體模型實現之前,我們需要知道模型需要的輸入和輸出並非直接是一整個句子,它無法處理這些句子。所以我們就需要幫模型處理好這些句子,模型需要的是sequence,即一個接一個的token。輸入每一個黃色方塊的就是一個token,不難理解token指得就是一個單詞,由於我們的Encoder與Decoder采用的都是RNN,在每一個時間步中只需要一個token,所以這就是為什么我們把句子拆分為多個token。可能你已經注意到了"guten morgen"是一句德語句子,那么開頭和結尾的這兩個token是我們人為規定的,有大用途。由於我們的句子有時候是成批進行輸入的,很多個句子都頭尾相接的一股腦輸入,所以需要用這兩個token來區分從哪到哪是一條句子,這是原因之一。除此之外,由於在Decoder中預測句子的時候是一個詞一個詞預測的,在預測其中一個詞的時候只知道前面的詞是什么,並不知道后面的詞,那預測第一個詞的時候怎么辦?這時候就以
說到這,其實還沒說到關鍵的Embedding部分,只是在上面那個圖里的黃色方框之前的輸入部分而已,而Embedding的工作就是將token變成那個黃色方塊。黃色方塊代表的就是一個向量,這個向量的大小應該是[1, emb_size],其中emb_size是嵌入詞向量的大小。簡單介紹下為什么要這樣,因為模型無法接受token的輸入,模型的運算只接受向量或者矩陣,所以只能將token轉換為向量。而如果直接用one-hot形式的向量(也就是每一個token對應一個類別,如果是這個token這一列的值為1,否則全為0)會導致向量過大,且會出現很多無用的空間(一個向量大部分都是0)。所以進行一次矩陣相乘來壓縮向量大小,將這個one-hot向量與一個系數矩陣相乘從而得到更小的向量,這就是Embedding的過程,它的關鍵就是找到一個好的系數矩陣,來使得到的這個更小的向量能更准確的表示這個token。所以說白了Embedding就是一種表示方式,把token表示為向量。
現在用的Embedding大多都是采用預訓練好的Embedding模型來做,例如word2vec和GloVe等(因為自己做太麻煩了,而且這兩個模型都是很經典很好用的模型,且他們用的數據量比我們能接觸的大得多)。
於是我們終於得到了輸入Encoder之前的黃色方塊,接下來就要開始真正的第一步———編碼。
Encoder
現在我們有輸入$X = \{x_1,x_2,...,x_T\}$,就是上一節中講的token的序列,通過Embedding得到很多個向量,定義$x_t$得到的向量為$e(x_t)$。然后再人為給定一個初始隱狀態$h_0$,通常它被初始化為全0,或是一些可訓練的參數。大部分基礎的Seq2Seq模型中的Encoder與Decoder都是用的RNN,我們這里也不例外。當然,現在很少人直接用RNN了,一般都采用RNN的變種LSTM或是GRU,這里以RNN籠統的代表諸如這一類的神經網絡。
所以每一個時間步的隱狀態就是
那么以上這個公式就是Encoder的核心公式,在我們的模型里,最重要的是得到這個網絡訓練出來的最后一層隱狀態\(h_T\),也就是圖中的紅色方塊\(z\),以供Decoder使用。可以看作是這個模塊干的事就是將所有輸入的序列信息全部集成到這一個小方塊中,來作為后面模塊的輸出。
本文的實現基於LSTM來進行編碼,拋開LSTM的細節不談,這里需要知道的就是LSTM中有兩個傳輸狀態,除了隱狀態\(h_t\),還有一個cell state \(c_t\),具體的計算方法這里就不細說了,可以去了解一下LSTM。
所以Encoder的核心公式就變成了
抽象地說,就是原本第\(t\)個時間步中的綠色方塊本來只有一個狀態\(h_t\),現在增加了一個狀態\(c_t\)。所以相應的初始狀態也會增加一個\(c_0\),於是Encoder就變成了這樣:
其實兩層的計算相似,只是第二層需要的不是輸入的embedding,而是第一層的隱狀態輸出,圖上就畫的十分直觀。
理解了這些原理后,就可以使用torch.nn.Module
來實現我們的Encoder了,具體的解釋都寫在代碼中。
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.hid_dim = hid_dim # hid_dim是hidden和cell狀態的維度(即向量的大小)
self.n_layers = n_layers # 指有幾層,剛剛說的就是2層的模型
self.embedding = nn.Embedding(input_dim, emb_dim) # input_dim就是輸入的維度,也就是將輸入的單詞轉成one-hot向量的向量大小。emb_dim就是進行embedding后的向量大小
self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)
self.dropout = nn.Dropout(dropout) # 是一個正則化參數,用以防止過擬合,在embedding層使用
def forward(self, src):
# src就是就是輸入語句,因為實際訓練的過程中不是一句一句訓練而是一個batch一個batch訓練,所以這里batch size就是句子的條數。src的大小就是句子長度*句子條數
#src = [src len, batch size]
embedded = self.dropout(self.embedding(src)) # 源語句embedding后再經過一層drop得到RNN的輸入
# 這里多出一維是因為每個單詞都變成了一個向量,emb_dim就是向量的大小
#embedded = [src len, batch size, emb dim]
outputs, (hidden, cell) = self.rnn(embedded) # outputs是每一個時間步最頂層的輸出結果,如上圖中綠色方塊最上面輸出的全部h_t與c_t,而(hidden, cell)是每一層的最后一個時間步的輸出結果z
# 下面維度中的n directions指的是RNN是單向還是雙向的,我們這里用的是單向,即默認值1。雙向時值為2
#outputs = [src len, batch size, hid dim * n directions]
#hidden = [n layers * n directions, batch size, hid dim]
#cell = [n layers * n directions, batch size, hid dim]
#outputs are always from the top hidden layer
return hidden, cell # 返回這個是因為decoder中需要的就是最后一個時間步的輸出結果z,而不是所有時間步的頂層輸出
Decoder
接下來講講Decoder。總體上看,它只接受Encoder的一個狀態輸出,然后輸出句子$\hat{Y} = \{\hat{y_1},\hat{y_2},...,\hat{y_t}\}$,這里$y$頭上戴個帽子是因為它是預測值,為了與真實值$Y = \{y_1,y_2,...,y_t\}$區分開來。 有了Encoder的經驗,不難看懂Decoder的核心公式: $$s_t = DecoderRNN(d(y_t),s_{t-1})$$ 為了區分不同模塊的隱狀態,這里用$s_t$來代表Decoder中的隱狀態。在每一個時間步,Decoder接受當前單詞的embedding表示與上一個時間步的隱狀態,來計算這個時間步的隱狀態,而當前單詞的embedding是由上一個時間步生成的。同Encoder一樣是RNN模型,所以Decoder也需要一個初始的隱狀態,這里不像Encoder全0或隨機初始化,它以Encoder最后輸出的隱狀態來做為它的初始隱狀態,這樣就可以集成所有輸入的信息了。相似的,我們采用2層LSTM來建造Decoder。其第1層與第2層的計算公式如下:
除此之外與Encoder不同的是,由於RNN頂層的輸出還是一個隱狀態,為了得到預測出的這個位置的詞,我們將頂層的隱狀態\(s_t^L\)傳入一個線性層\(f\),這樣就會得到一個所有詞在這個位置出現的概率分布,從中選出概率最高的那個詞\(\hat{y_{t+1}}\)作為我們預測出的詞,即
直觀地理解,就是將Encoder中的最終隱狀態拿來做這里的初始隱狀態,然后以
根據這些來實現這個Decoder:
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.output_dim = output_dim # 輸出的one-hot向量大小,來表示是哪個詞
self.hid_dim = hid_dim # hidden和cell狀態的維度(即向量的大小)
self.n_layers = n_layers # 指有幾層,剛剛說的就是2層的模型
self.embedding = nn.Embedding(output_dim, emb_dim) # 和encoder一樣
self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)
self.fc_out = nn.Linear(hid_dim, output_dim) # 線性層
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, cell):
#input = [batch size]
#hidden = [n layers * n directions, batch size, hid dim]
#cell = [n layers * n directions, batch size, hid dim]
#n directions in the decoder will both always be 1, therefore:
#hidden = [n layers, batch size, hid dim]
#context = [n layers, batch size, hid dim]
# context就是Encoder輸出的hidden,在這里作為初始隱狀態
input = input.unsqueeze(0)
#input = [1, batch size]
embedded = self.dropout(self.embedding(input))
#embedded = [1, batch size, emb dim]
output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
#output = [seq len, batch size, hid dim * n directions]
#hidden = [n layers * n directions, batch size, hid dim]
#cell = [n layers * n directions, batch size, hid dim]
#seq len and n directions will always be 1 in the decoder, therefore:
#output = [1, batch size, hid dim]
#hidden = [n layers, batch size, hid dim]
#cell = [n layers, batch size, hid dim]
prediction = self.fc_out(output.squeeze(0)) # 預測的單詞
#prediction = [batch size, output dim]
return prediction, hidden, cell
Seq2Seq
最后我們將Encoder與Decoder結合起來,完成我們最終的模型。這個模型接受句子作為輸入,使用Encoder來生成context向量,再用Decoder來生成目標語句。將上面兩個模塊結合起來就得到了整體的模型在這里還用到了teacher forcing技術,主要是為了更好的訓練我們的模型。在Decoder逐詞生成句子的時候,我們會設置一個比重,若生成的詞概率分布中最大的概率都小於這個比重,說明生成的詞嚴重不正確,我們就強制將其替換成正確的值。就像學生在學習時學到某個地方嚴重偏離了學習軌跡,教師將其糾正過來以保證后面的學習沒有問題,這個技術大概就是這個思想。
但是在我們的模型中用到的teacher forcing技術不同,為了節省操作,我們設置用以預測下一個時間步的詞中50%使用目標語句中絕對正確的詞,而50%的詞使用上一個時間步預測到的詞。所以在下面實現代碼的teacher_forcing_ratio
這個變量就設置為使用目標語句中絕對正確的詞占比。
接下來看實現
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device # 如果有GPU,device用以將張量放入GPU中計算
# 由於我們這里的題目設計要求我們的encoder和decoder的hidden層的隱狀態大小相同,且層數相同,所以寫了這兩個斷言。實際可以依照自己需求略微改變模型
assert encoder.hid_dim == decoder.hid_dim, \
"Hidden dimensions of encoder and decoder must be equal!"
assert encoder.n_layers == decoder.n_layers, \
"Encoder and decoder must have equal number of layers!"
def forward(self, src, trg, teacher_forcing_ratio = 0.5):
#src = [src len, batch size]
#trg = [trg len, batch size]
batch_size = trg.shape[1]
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device) # 存放最終生成的結果
hidden, cell = self.encoder(src) # 使用encoder的最后一個隱狀態得到需要的decoder的初始隱狀態
input = trg[0,:] # decoder的第一個輸入即正確答案的第一個token——<sos>
# 一個時間步一個時間步的循環生成詞
for t in range(1, trg_len):
#insert input token embedding, previous hidden and previous cell states
#receive output tensor (predictions) and new hidden and cell states
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[t] = output # 將預測結果放入存放所有預測結果的tensor中
teacher_force = random.random() < teacher_forcing_ratio # 決定是否要使用teacher forcing
top1 = output.argmax(1) # 得到預測詞概率分布中概率最大的詞
input = trg[t] if teacher_force else top1 # 若不用teacher forcing則使用這次預測到的詞作為下一個時間步的input,反之則用正確答案的詞
return outputs