RNN(Recurrent Neural Networks,循環神經網絡)是一種具有短期記憶能力的神經網絡模型,可以處理任意長度的序列,在自然語言處理中的應用非常廣泛,比如機器翻譯、文本生成、問答系統、文本分類等。
但由於梯度爆炸或梯度消失,RNN存在長期依賴問題,難以建立長距離的依賴關系,於是引入了門控機制來控制信息的累積速度,包括有選擇地加入新信息,並有選擇地遺忘之前積累的信息。比較經典的基於門控的RNN有LSTM(長短期記憶網絡)和GRU(門控循環單元網絡)。
有關RNN,LSTM和GRU的相關理論知識可以看我以前的筆記:《深度學習之循環神經網絡(RNN)》 、《循環神經網絡之LSTM和GRU》
這篇博客整理用TensorFlow構建RNN的內容,主要包括兩方面,一是分別用RNN、LSTM和GRU作為記憶細胞,構建一個單向堆疊的循環神經網絡,也就是有多個循環網絡層(單向);二是構建雙向RNN模型,這在自然語言處理中比較常見,比如Bi-LSTM+CRF做命名實體識別。
一、堆疊RNN、LSTM和GRU
1、堆疊RNN的結構和特性
RNN的一般性內容就不介紹了,如果不熟悉請看以上列出的筆記。我們先來看看堆疊RNN的結構。
以上是按時間展開的堆疊循環神經網絡。一般的,我們定義 ht(l)為在時刻 t 時第 l 層的隱狀態,則它是由時刻t-1第l層的隱狀態與時刻t第l-1層的隱狀態共同決定:
其中U(l)、W(l)是權重矩陣,b(l)是偏置,ht(0) = xt 。
我們可以看到,如果一共有T步,那么會有T個輸出:y1,y2,...,yT。但一般只取最后一個輸出yT,相應的隱狀態也取最后時刻最后一個循環層的隱狀態,比如上面就是取hT(3),這是代碼中需要注意的地方。
2、RNN、LSTM和GRU的區別
RNN、LSTM與GRU這三中循環神經網絡結構,在構建時的區別有兩個:
一是在t時刻,RNN最后一個循環層只有一個隱狀態,就用這個隱狀態來計算輸出;LSTM在最后一個循環層有兩個隱狀態,一個是長期狀態Ct,一個是短期狀態ht,長期狀態由Tanh函數激活,然后通過輸出門過濾后得到短期狀態,而用來輸入到全連接層計算模型輸出的是短期狀態。於是在下圖中可以看到,ht一方面往上傳遞去計算當前時刻的模型輸出,另一方面順時間傳遞去計算下一個隱狀態ht-1
GRU在最后一個循環層也只有一個隱狀態,因為沒有引入額外的內部狀態C,而直接在當前狀態ht和歷史狀態ht-1之間引入線性依賴關系。從下圖可以看到,隱狀態ht由前一時刻的隱狀態ht-1和候選狀態決定。
二是在TensorFlow中,要指定RNN中記憶細胞的激活函數,一般為ReLU函數,而LSTM和GRU中的激活函數已經確定為Sigmoid函數和Tanh函數了。
3、LSTM的參數初始化
還有一點要說明,一般在深度網絡參數學習時,參數初始化的值一般都比較小,在訓練 LSTM 網絡時,過小的參數值會使得遺忘門的值比較小。這意味着前一時刻的信息大部分都丟失了,這樣網絡很難捕捉到長距離的依賴信息。因此遺忘門的參數初始值一般都設得比較大,其偏置向量 bf 設為 1 或 2。而TensorFlow將bf初始化為1的向量。
4、TensorFlow核心代碼
按照上一步所說的,RNN、LSTM和GRU的不同點在於隱狀態和激活函數,這也體現在了TensorFlow的代碼中。堆疊三個循環層,每層神經元的個數均為100,這三種記憶細胞的定義代碼如下,構建的模型除了這部分不同以外,其他都是一樣的。
def cell_selected(cell):
if cell == "RNN": # 指定激活函數為ReLU函數,然后構造三個RNN細胞狀態 # 構建堆疊的RNN模型 # 每個時刻都有一個輸出和一個隱狀態(或多個隱狀態),我們只取最后一個輸出和隱狀態 # 但是TensofFlow中不知道為啥取了最后時刻的三個隱狀態,用於計算最終輸出。 rnn_cells = [tf.nn.rnn_cell.BasicRNNCell(num_units=n_neurons,activation=tf.nn.relu) for layer in range(n_layers)] multi_layer_cell = tf.nn.rnn_cell.MultiRNNCell(rnn_cells) outputs, states = tf.nn.dynamic_rnn(multi_layer_cell, X, dtype=tf.float32) return tf.concat(axis=1, values=states) elif cell == "LSTM": # 構造三個LSTM記憶細胞,不用管激活函數 # states[-1]中包含了長期狀態和短期狀態,這里取最后一個循環層的短期狀態 lstm_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) for layer in range(n_layers)] multi_cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cells) outputs, states = tf.nn.dynamic_rnn(multi_cell, X, dtype=tf.float32) return states[-1][1] elif cell == "GRU": # GRU和LSTM大致相同,但是states[-1]中只包含了短期狀態。 gru_cells = [tf.nn.rnn_cell.GRUCell(num_units=n_neurons) for layer in range(n_layers)] multi_cell = tf.nn.rnn_cell.MultiRNNCell(gru_cells) outputs, states = tf.nn.dynamic_rnn(multi_cell, X, dtype=tf.float32) return states[-1]
取記憶細胞的隱狀態這一步需要好好理解。我們一一來看,這三個細胞狀態在最后一時刻的隱狀態和經過處理用來求模型輸出的隱狀態是什么樣的。
(1)在RNN中,用tf.concat()來處理RNN記憶細胞的隱狀態。未處理之前的states是三個tuple元素,是三個循環層最后一步的隱狀態,維度是[batch-size, n_neurons],100是神經元的個數。
(<tf.Tensor 'rnn/rnn/while/Exit_3:0' shape=(?, 100) dtype=float32>, <tf.Tensor 'rnn/rnn/while/Exit_4:0' shape=(?, 100) dtype=float32>, <tf.Tensor 'rnn/rnn/while/Exit_5:0' shape=(?, 100) dtype=float32>)
用tf.cncat()按第1個維度拼接后如下。可見是將最后時刻三個循環層的隱狀態的值拼接在了一起。這是TensorFlow在堆疊RNN中想要的格式,我不太明白,按道理只要取三個中的最后一個就可以了啊。
<tf.Tensor 'concat:0' shape=(?, 300) dtype=float32>
(2)在LSTM中,得到的states包含三個tuple元素,而每個tuple中又有兩個元素,第一個是長期狀態,第二個是短期狀態。顯然我們要取的是最后一層中的短期狀態,用此用states[-1][1]取到。
(LSTMStateTuple(c=<tf.Tensor 'rnn/while/Exit_3:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'rnn/while/Exit_4:0' shape=(?, 100) dtype=float32>), LSTMStateTuple(c=<tf.Tensor 'rnn/while/Exit_5:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'rnn/while/Exit_6:0' shape=(?, 100) dtype=float32>), LSTMStateTuple(c=<tf.Tensor 'rnn/while/Exit_7:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'rnn/while/Exit_8:0' shape=(?, 100) dtype=float32>))
(3)在GRU中,得到的states包含三個元素,就是最后一步隱狀態的值,因此只要用states[-1]取到最后一層的隱狀態即可。
(<tf.Tensor 'rnn/while/Exit_3:0' shape=(?, 100) dtype=float32>, <tf.Tensor 'rnn/while/Exit_4:0' shape=(?, 100) dtype=float32>, <tf.Tensor 'rnn/while/Exit_5:0' shape=(?, 100) dtype=float32>)
5、完整代碼
為了方便在三種記憶細胞之間進行切換,我定義了以上選擇記憶細胞的函數。基於MINIST數據集,構建了一個具有三個循環層的單向RNN網絡,每個循環層的神經元個數為100,記憶細胞分別選擇RNN、LSTM和GRU。
import tensorflow as tf import numpy as np import time from datetime import timedelta # 記錄訓練花費的時間 def get_time_dif(start_time): end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() # 這里和卷積神經網絡那不同,RNN中的輸入維度是(batch-size,28,28),而不是(batch-size,784) X_train = X_train.astype(np.float32)/ 255.0 X_test = X_test.astype(np.float32)/ 255.0 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) X_valid, X_train = X_train[:5000], X_train[5000:] y_valid, y_train = y_train[:5000], y_train[5000:] def shuffle_batch(X, y, batch_size): rnd_idx = np.random.permutation(len(X)) n_batches = len(X) // batch_size for batch_idx in np.array_split(rnd_idx, n_batches): X_batch, y_batch = X[batch_idx], y[batch_idx] yield X_batch, y_batch n_steps = 28 n_inputs = 28 n_neurons = 100 n_outputs = 10 n_layers = 3 learning_rate = 0.001 X = tf.placeholder(tf.float32, [None, n_steps, n_inputs]) y = tf.placeholder(tf.int32, [None]) # 選擇記憶細胞 def cell_selected(cell): if cell == "RNN": # 指定激活函數為ReLU函數,然后構造三個RNN細胞狀態 # 構建堆疊的RNN模型 # 每個時刻都有一個輸出和一個隱狀態(或多個隱狀態),我們只取最后一個輸出和隱狀態 # 但是TensofFlow中不知道為啥取了最后時刻的三個隱狀態,用於計算最終輸出。 rnn_cells = [tf.nn.rnn_cell.BasicRNNCell(num_units=n_neurons,activation=tf.nn.relu) for layer in range(n_layers)] multi_layer_cell = tf.nn.rnn_cell.MultiRNNCell(rnn_cells) outputs, states = tf.nn.dynamic_rnn(multi_layer_cell, X, dtype=tf.float32) return tf.concat(axis=1, values=states) elif cell == "LSTM": # 構造三個LSTM記憶細胞,不用管激活函數 # states[-1]中包含了長期狀態和短期狀態,這里取最后一個循環層的短期狀態 gru_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) for layer in range(n_layers)] multi_cell = tf.nn.rnn_cell.MultiRNNCell(gru_cells) outputs, states = tf.nn.dynamic_rnn(multi_cell, X, dtype=tf.float32) return states[-1][1] elif cell == "GRU": # GRU和LSTM大致相同,但是states[-1]中只包含了短期狀態。 gru_cells = [tf.nn.rnn_cell.GRUCell(num_units=n_neurons) for layer in range(n_layers)] multi_cell = tf.nn.rnn_cell.MultiRNNCell(gru_cells) outputs, states = tf.nn.dynamic_rnn(multi_cell, X, dtype=tf.float32) return states[-1] def build_and_train(): # 調用上面定義的選擇記憶細胞的函數,定義損失函數 logits = tf.layers.dense(cell_selected(cell), n_outputs, name="softmax") xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) training_op = optimizer.minimize(loss) correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) init = tf.global_variables_initializer() saver = tf.train.Saver() n_epochs = 50 batch_size = 100 with tf.Session() as sess: init.run() start_time = time.time() # 記錄總迭代步數,一個batch算一步 # 記錄最好的驗證精度 # 記錄上一次驗證結果提升時是第幾步。 # 如果迭代2000步后結果還沒有提升就中止訓練。 total_batch = 0 best_acc_val = 0.0 last_improved = 0 require_improvement = 2000 flag = False for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) # 每次迭代10步就驗證一次 # # 如果驗證精度提升了,就替換為最好的結果,並保存模型 if total_batch % 10 == 0: acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) if acc_val > best_acc_val: best_acc_val = acc_val last_improved = total_batch save_path = saver.save(sess, "./my_model_Cell_Selected.ckpt") improved_str = 'improved!' else: improved_str = '' time_dif = get_time_dif(start_time) msg = 'Epoch:{0:>4}, Iter: {1:>6}, Acc_Train: {2:>7.2%}, Acc_Val: {3:>7.2%}, Time: {4} {5}' print(msg.format(epoch, total_batch, acc_batch, acc_val, time_dif, improved_str)) # 記錄總迭代步數 total_batch += 1 # 如果2000步以后還沒提升,就中止訓練。 if total_batch - last_improved > require_improvement: print("Early stopping in ",total_batch," step! And the best validation accuracy is ",best_acc_val, '.') flag = True break if flag: break with tf.Session() as sess: saver.restore(sess, "./my_model_Cell_Selected.ckpt") acc_test= accuracy.eval(feed_dict={X: X_test, y: y_test}) print("\nTest_accuracy:{0:>7.2%}".format(acc_test)) if __name__ == "__main__": cell = "LSTM" # RNN/LSTM/GRU,在這里選擇記憶細胞 build_and_train()
分別選擇記憶細胞為RNN、LSTM和GRU,得到的結果為:
RNN 耗時3分3秒 最好驗證精度98.72% 測試精度98.41% LSTM 耗時6分35秒 最好驗證精度99.22% 測試精度98.81% GRU 耗時6分9秒 最好驗證精度99.26% 測試精度98.97%
二、雙向LSTM模型
1、雙向LSTM的結構
雙向LSTM(Bidirectional Long-Short Term Memorry,Bi-LSTM)不僅能利用到過去的信息,還能捕捉到后續的信息,比如在詞性標注問題中,一個詞的詞性由上下文的詞所決定,那么用雙向LSTM就可以利用好上下文的信息。
雙向LSTM由兩個信息傳遞相反的LSTM循環層構成,其中第一層按時間順序傳遞信息,第二層按時間逆序傳遞信息。
沒有去找雙向LSTM的圖了,就看這個雙向RNN的結構來學習吧,理解了雙向RNN,那么把循環層的記憶細胞換成LSTM就行。
2、雙向LSTM隱狀態的計算
關鍵在於隱狀態如何計算。為了簡單,還是按照雙向RNN的公式進行理解,我們看隱狀態如何計算。可以看到t時刻第一層(順時間循環層)的隱狀態ht(1)取決於前一時刻的隱狀態ht-1(1)和輸入值xt,這一點非常容易理解。
而要注意的是第二層(逆時間循環層)的隱狀態則依然取決於前一時刻的隱狀態和輸入值x,這與堆疊的LSTM不同,堆疊的LSTM其l層的隱狀態不由輸入值x直接輸入得到,而是取決於該層前一時刻的隱狀態和當前時刻下一層的隱狀態。如下的公式就是堆疊的循環網絡層中隱狀態的計算過程。
雙向LSTM和堆疊的LSTM可以結合使用,在順時間循環層我們可以構造堆疊多層的LSTM,同樣,在逆時間循環層可以堆疊多個。
而雙向LSTM的一個循環層中有兩個隱狀態,長期狀態C用於內部傳遞信息,不拋頭露面,而短期狀態h則作為該循環層的輸出,用於其他循環層或全連接層的計算。因此在對得的雙向LSTM的最后一步,會有超過4個隱狀態存在。
這次構建的雙向LSTM模型,在順時間循環層和逆時間循環層都分別堆疊了兩層LSTM,每層神經元個數都為100,因此循環網絡層總共有4層,最后一步的隱狀態有8個。
def bi_lstm(): # 順時間循環層的記憶細胞,堆疊了兩層 lstm_fw1 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_fw2 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_forward = tf.nn.rnn_cell.MultiRNNCell(cells=[lstm_fw1,lstm_fw2]) # 逆時間循環層的記憶細胞,堆疊了兩層 lstm_bc1 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_bc2 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_backward = tf.nn.rnn_cell.MultiRNNCell(cells=[lstm_bc1,lstm_bc2]) # 計算輸出和隱狀態 outputs,states=tf.nn.bidirectional_dynamic_rnn(cell_fw=lstm_forward, cell_bw=lstm_backward,inputs=X,dtype=tf.float32) # 取到順時間循環層和擬時間循環層的最后一個隱狀態 state_forward = states[0][-1][-1] state_backward = states[1][-1][-1] # 把兩個隱狀態拼接起來。 return state_forward+state_backward
下面是隱狀態states的情況,第一個元素是順時間循環層的隱狀態,其中短期狀態有兩個,我們選擇最后一個堆疊層的短期狀態:states[0][-1][-1]。同理,第二個元素是逆時間循環層的隱狀態,我們用states[1][-1][-1]來取到最后一個堆疊層的短期狀態。
((LSTMStateTuple(c=<tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_3:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_4:0' shape=(?, 100) dtype=float32>), LSTMStateTuple(c=<tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_5:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_6:0' shape=(?, 100) dtype=float32>)), (LSTMStateTuple(c=<tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_3:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_4:0' shape=(?, 100) dtype=float32>), LSTMStateTuple(c=<tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_5:0' shape=(?, 100) dtype=float32>, h=<tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_6:0' shape=(?, 100) dtype=float32>)))
取到兩個循環層的兩個短期狀態之后,通過簡單的拼接,就可以輸入到全連接層了,即states[0][-1][-1]+states[1][-1][-1]。
3、完整代碼
對MINIST數據集構建雙向LSTM分類器,訓練耗時12分38秒,最佳驗證精度為99.16%,測試精度為98.83%
#-*- coding: utf-8 -*- from __future__ import division, print_function, unicode_literals import tensorflow as tf import numpy as np import time from datetime import timedelta # 記錄訓練花費的時間 def get_time_dif(start_time): end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() # 這里和卷積神經網絡那不同,RNN中的輸入維度是(batch-size,28,28),而不是(batch-size,784) X_train = X_train.astype(np.float32)/ 255.0 X_test = X_test.astype(np.float32)/ 255.0 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) X_valid, X_train = X_train[:5000], X_train[5000:] y_valid, y_train = y_train[:5000], y_train[5000:] def shuffle_batch(X, y, batch_size): rnd_idx = np.random.permutation(len(X)) n_batches = len(X) // batch_size for batch_idx in np.array_split(rnd_idx, n_batches): X_batch, y_batch = X[batch_idx], y[batch_idx] yield X_batch, y_batch n_steps = 28 n_inputs = 28 n_neurons = 100 n_outputs = 10 learning_rate = 0.001 X = tf.placeholder(tf.float32, [None, n_steps, n_inputs]) y = tf.placeholder(tf.int32, [None]) def bi_lstm(): # 順時間循環層的記憶細胞,堆疊了兩層 lstm_fw1 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_fw2 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_forward = tf.nn.rnn_cell.MultiRNNCell(cells=[lstm_fw1,lstm_fw2]) # 擬時間循環層的記憶細胞,堆疊了兩層 lstm_bc1 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_bc2 = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons) lstm_backward = tf.nn.rnn_cell.MultiRNNCell(cells=[lstm_bc1,lstm_bc2]) # 計算輸出和隱狀態 outputs,states=tf.nn.bidirectional_dynamic_rnn(cell_fw=lstm_forward, cell_bw=lstm_backward,inputs=X,dtype=tf.float32) # 取到順時間循環層和擬時間循環層的最后一個隱狀態 state_forward = states[0][-1][-1] state_backward = states[1][-1][-1] # 把兩個隱狀態拼接起來。 return state_forward+state_backward def build_and_train(): # 調用上面的定義雙向LSTM的函數,定義損失函數 logits = tf.layers.dense(bi_lstm(), n_outputs, name="softmax") xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) training_op = optimizer.minimize(loss) correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) init = tf.global_variables_initializer() saver = tf.train.Saver() n_epochs = 50 batch_size = 100 with tf.Session() as sess: init.run() start_time = time.time() # 記錄總迭代步數,一個batch算一步 # 記錄最好的驗證精度 # 記錄上一次驗證結果提升時是第幾步。 # 如果迭代2000步后結果還沒有提升就中止訓練。 total_batch = 0 best_acc_val = 0.0 last_improved = 0 require_improvement = 2000 flag = False for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) # 每次迭代10步就驗證一次 # # 如果驗證精度提升了,就替換為最好的結果,並保存模型 if total_batch % 10 == 0: acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) if acc_val > best_acc_val: best_acc_val = acc_val last_improved = total_batch save_path = saver.save(sess, "./my_model_Bi_LSTM.ckpt") improved_str = 'improved!' else: improved_str = '' time_dif = get_time_dif(start_time) msg = 'Epoch:{0:>4}, Iter: {1:>6}, Acc_Train: {2:>7.2%}, Acc_Val: {3:>7.2%}, Time: {4} {5}' print(msg.format(epoch, total_batch, acc_batch, acc_val, time_dif, improved_str)) # 記錄總迭代步數 total_batch += 1 # 如果2000步以后還沒提升,就中止訓練。 if total_batch - last_improved > require_improvement: print("Early stopping in ",total_batch," step! And the best validation accuracy is ",best_acc_val, '.') flag = True break if flag: break with tf.Session() as sess: saver.restore(sess, "./my_model_Bi_LSTM.ckpt") acc_test= accuracy.eval(feed_dict={X: X_test, y: y_test}) print("\nTest_accuracy:{0:>7.2%}".format(acc_test)) if __name__ == "__main__": build_and_train()
參考資料:
1、《Hands on Machine Learning with Scikit-Learn and TensorFlow》
2、https://blog.csdn.net/luoganttcc/article/details/83384823