循環神經網絡可以更好的利用傳統神經網絡結構所不能建模的信息,但同時也會出現——長期依賴問題(long-term dependencies)
例如,當前時刻的預測值要依賴之間時刻的信息,當兩個時間間隔較短時,RNN可以比較容易地利用先前時刻信息。但當這兩個時間間隔不斷變長時,簡單的循環神經網絡有可能會喪失學習到距離很遠的時刻的信息的能力。在一些復雜語言場景中,有用信息的間隔有大有小、長短不一,簡單的RNN網絡的性能會收到限制。而LSTM網絡的設計就是為了解決該問題。LSTM網絡的結構如下圖所示:
詳細介紹參考:http://colah.github.io/posts/2015-08-Understanding-LSTMs/
以下代碼展示了在tensorflow中實現使用LSTM結構的循環神經網絡的前向傳播過程。
# 定義一個LSTM結構,LSTM中使用的變量也會在該函數中自動被聲明 lstm=rnn_cell.BasicLSTMCell(lstm_hidden_size) # 將LSTM中的狀態初始化為全0數組,BasicLSTMCell提供了zero_state函數來生成全零的初始狀態 state=lstm.zero_state(batch_size,tf.float32) # 定義損失函數 loss=0.0 # 理論上循環神經網絡可以處理任意長度的序列,但是在訓練時為了避免梯度消散的問題,會規定一個最大的序列長度 # 下面代碼使用參數num_steps來表示這個長度 for i in range(num_steps): # 在第一個時刻聲明LSTM結構中使用的變量,在之后的時刻都需要復用之前定義好的變量 if i>0: tf.get_variable_scope().reuse_variables() # 每一步處理時間序列中的一個時刻。將當前輸入(current_input)和前一時刻狀態(state)傳入定義的LSTM結構可以得到 # 當前LSTM結構的輸出lstm_output和更新后的狀態state lstm_output,state=lstm(current_input,state) # 將當前時刻LSTM結構的輸出傳入一個全連接層得到最后的輸出 final_output=fully_connected(lstm_output) # 計算當前時刻輸出的損失 loss+=calc_loss(final_output,expected_output)
LSTM網絡的變體:雙向循環神經網絡和深層循環神經網絡
雙向循環神經網絡的主體結構是由兩個單向循環神經網絡組成的。在每一個時刻t,輸入會同時提供給這兩個方向相反的循環神經網絡,而輸出則是由這兩個單向循環神經網絡共同決定。其結構如下圖所示:
六個權值分別對應:輸入到向前和向后隱含層(w1, w3),隱含層到隱含層自己(w2, w5),向前和向后隱含層到輸出層(w4, w6)。值得注意的是:向前和向后隱含層之間沒有信息流,這保證了展開圖是非循環的。
深層循環神經網絡:為了增強模型的表達能力,該網絡在每一個時刻上將循環體結構復制多次,每一層的循環體中參數是一致的,而不同層中的參數可以不同。其結構如下圖所示:
tensorflow中提供了MultiRNNCell類來實現深層循環神經網絡的前向傳播過程。代碼如下:
# 定義一個基本的LSTM結構作為循環體的基礎結構。深層循環神經網絡也支持使用其他的循環體結構 lstm=rnn_cell.BasicLSTMCell(lstm_size) # 通過MultiRNNCell類實現深層循環神經網絡中每一個時刻的前向傳播過程。其中number_of_layers表示有多少層。 stacked_lstm=rnn_cell.MultiRNNCell([lstm]*number_of_layers) # 通過zero_state函數來獲取初始狀態 state=stacked_lstm.zero_state(batch_size,tf.float32) # 定義損失函數 loss=0.0 for i in range(num_steps): if i>0: tf.get_variable_scope().reuse_variables() stacked_lstm_output,state=stacked_lstm(current_input,state) # 將當前時刻LSTM結構的輸出傳入一個全連接層得到最后的輸出 final_output=fully_connected(stacked_lstm_output) # 計算當前時刻輸出的損失 loss+=calc_loss(final_output,expected_output)
1、在PTB數據集上使用循環神經網絡實現語言模型
讀取數據集
import tensorflow as tf import reader # 存放原始數據的路徑。 DATA_PATH='data' train_data,valid_data,test_data,_=reader.ptb_raw_data(DATA_PATH) # 讀取原始數據 print(len(train_data)) print(train_data[:100])
輸出:929589
[9970, 9971, 9972, 9974, 9975, 9976, 9980, 9981, 9982, 9983, 9984, 9986, 9987, 9988, 9989, 9991, 9992, 9993, 9994, 9995, 9996, 9997, 9998, 9999, 2, 9256, 1, 3, 72, 393, 33, 2133, 0, 146, 19, 6, 9207, 276, 407, 3, 2, 23, 1, 13, 141, 4, 1, 5465, 0, 3081, 1596, 96, 2, 7682, 1, 3, 72, 393, 8, 337, 141, 4, 2477, 657, 2170, 955, 24, 521, 6, 9207, 276, 4, 39, 303, 438, 3684, 2, 6, 942, 4, 3150, 496, 263, 5, 138, 6092, 4241, 6036, 30, 988, 6, 241, 760, 4, 1015, 2786, 211, 6, 96, 4]
該訓練數據集中總共包含了929589單詞,其中句子結束的標識符ID為2。
雖然循環神經網絡可以接受接受任意長度的序列,但在訓練時需要將序列按照某個固定的長度來截斷。如下代碼將實現句子截斷並組織成batch。
# 將訓練數據組織成batch大小為4、截斷長度為5的數組。 # 讀取每個batch中的數據,其中包括每個時刻的輸入和對應的正確輸出。 x, y = reader.ptb_producer(train_data, 4, 5) with tf.Session() as sess: coord = tf.train.Coordinator() tf.train.start_queue_runners(sess, coord=coord) try: x,y=sess.run([x,y]) print(x) print(y) finally: coord.request_stop() coord.join()
完整的模型代碼實現:
import tensorflow as tf import reader import numpy as np DATA_PATH='data' # 存放原始數據的路徑。 HIDDEN_SIZE=200 # 隱藏層大小 NUM_LAYERS=2 # 深層循環神經網絡中LSTM結構的層數 VOCAB_SIZE=10000 # 詞典規模,加上語句結束標識符和稀有單詞標識符總共一萬個單詞 LEARNING_RATE=1.0 # 學習速率 TRAIN_BATCH_SIZE=20 # 訓練數據batch的大小 TRAIN_NUM_STEP=35 # 訓練數據截斷長度 # 在測試時不需要使用截斷,所以可以將測試數據看成一個超長的序列 EVAL_BATCH_SIZE=1 # 測試數據batch的大小 EVAL_NUM_STEP=1 # 測試數據截斷長度 NUM_EPOCH=2 # 使用訓練數據的輪數 KEEP_PROB=0.5 # 節點不被dropout的概率 MAX_GRAD_NORM=5 # 用於控制梯度膨脹的參數 # 通過一個PTBModel類來描述模型。 class PTBModel(object): def __init__(self,is_training,batch_size,num_steps): # 記錄使用的batch大小和截斷長度 self.batch_size=batch_size self.num_steps=num_steps # 定義輸入層,維度為: batch_size * num_steps self.input_data=tf.placeholder(tf.int32,[batch_size,num_steps]) # 定義預期輸出。 self.targets=tf.placeholder(tf.int32,[batch_size,num_steps]) # 定義使用LSTM結構為循環體結構且使用dropout的深層循環神經網絡。 lstm_cell=tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE) if is_training: lstm_cell=tf.nn.rnn_cell.DropoutWrapper(lstm_cell,output_keep_prob=KEEP_PROB) cell=tf.nn.rnn_cell.MultiRNNCell([lstm_cell]*NUM_LAYERS) # 初始化最初的狀態,也就是全零的向量。 self.initial_state=cell.zero_state(batch_size,tf.float32) # 將單詞ID轉換成單詞向量。因為總共有VOCAL_SIZE個單詞,每個單詞向量的維度為HIDDEN_SIZE,所以 # embedding參數的維度為VOCAB_SIZE * HIDDEN_SIZE embedding=tf.get_variable('embedding',[VOCAB_SIZE,HIDDEN_SIZE]) # 將原本batch_size * num_steps個單詞ID轉化為單詞向量, # 轉化后的輸入層維度為:batch_size * num_steps * HIDDEN_SIZE inputs=tf.nn.embedding_lookup(embedding,self.input_data) # 只在訓練時使用dropout if is_training: inputs=tf.nn.dropout(inputs,KEEP_PROB) # 定義輸出列表。在這里先將不同時刻LSTM結構的輸出收集起來,再通過一個全連接 # 層得到最終的輸出 outputs=[] # state 存儲不同batch中LSTM的狀態,將其初始化為0。 state=self.initial_state with tf.variable_scope('RNN'): for time_step in range(num_steps): if time_step>0: tf.get_variable_scope().reuse_variables() # 從輸入數據中獲取當前時刻的輸入並傳入LSTM結構 cell_output,state=cell(inputs[:,time_step,:],state) # 將當前輸出加入輸出隊列 outputs.append(cell_output) # 將輸出隊列展開成[batch,hidden_size*num_steps]的形狀,然后再reshape # 成[batch*num_steps,hidden_size]的形狀 output = tf.reshape(tf.concat(outputs, 1), [-1, HIDDEN_SIZE]) # 將從LSTM中得到的輸出在經過一個全連接層得到最后的預測結果,最終的預測結果在 # 每一個時刻上都是一個長度為VOCAB_SIZE的數組,經過softmax層之后表示下一個位置 # 各單詞出現的概率 weight=tf.get_variable('weight',[HIDDEN_SIZE,VOCAB_SIZE]) bias=tf.get_variable('bias',[VOCAB_SIZE]) logits=tf.matmul(output,weight)+bias # 定義交叉熵損失函數。tesorlfow提供了 loss=tf.contrib.legacy_seq2seq.sequence_loss_by_example( [logits], # 預測的結果 [tf.reshape(self.targets,[-1])], # 正確的答案,這里將[batch_size,num_steps]二維數組轉換為一維數組 # 損失的權重。在這里所有的權重都為1,也就是說不同batch和不同時刻的重要程度是一樣的。 [tf.ones([batch_size*num_steps],dtype=tf.float32)] ) # 計算得到每個batch的平均損失 self.cost=tf.reduce_sum(loss)/batch_size self.final_state=state # 只在訓練模型時定義反向傳播操作 if not is_training: return trainable_variables=tf.trainable_variables() # 通過clip_by_global_norm函數控制梯度的大小,避免梯度膨脹問題。 grads,_=tf.clip_by_global_norm( tf.gradients(self.cost,trainable_variables),MAX_GRAD_NORM) # 定義優化方法 optimizer=tf.train.GradientDescentOptimizer(LEARNING_RATE) # 定義訓練步驟 self.train_op=optimizer.apply_gradients(zip(grads,trainable_variables)) # 使用給定的模型model在數據data上運行train_op並返回在全部數據上的perplexity值 def run_epoch(session,model,data,train_op,output_log,epoch_size): # 計算perplexity的輔助變量 total_costs=0.0 iters=0 state=session.run(model.initial_state) # 使用當前數據訓練或者訓練模型 for step in range(epoch_size): x, y = session.run(data) # 在當前batch上運行train_op並計算損失值。交叉熵損失函數計算的就是下一個單詞為給定 # 單詞的概率 cost,state,_=session.run([model.cost,model.final_state,train_op], {model.input_data:x,model.targets:y,model.initial_state:state}) # 將不同時刻、不同batch的概率加起來就可以得到第二個perplexity公式等號右邊的部分, # 再將這個和做指數運算就可以得到perplexity值。 total_costs+=cost iters+=model.num_steps # 只有在訓練時輸出日志。 if output_log and step % 100 ==0: print('After {} steps,perplexity is {}'.format(step,np.exp(total_costs/iters))) # 返回指定模型在給定數據上的perplexity值 return np.exp(total_costs/iters) def main(_): # 獲取原始數據 train_data,valid_data,test_data,_=reader.ptb_raw_data(DATA_PATH) # 計算一個epoch需要訓練的次數 train_data_len = len(train_data) train_batch_len = train_data_len // TRAIN_BATCH_SIZE train_epoch_size = (train_batch_len - 1) // TRAIN_NUM_STEP valid_data_len = len(valid_data) valid_batch_len = valid_data_len // EVAL_BATCH_SIZE valid_epoch_size = (valid_batch_len - 1) // EVAL_NUM_STEP test_data_len = len(test_data) test_batch_len = test_data_len // EVAL_BATCH_SIZE test_epoch_size = (test_batch_len - 1) // EVAL_NUM_STEP # 定義初始化函數 initializer=tf.random_uniform_initializer(-0.05,0.05) # 定義訓練用的循環神經網絡模型。 with tf.variable_scope('language_model',reuse=None,initializer=initializer): train_model=PTBModel(True,TRAIN_BATCH_SIZE,TRAIN_NUM_STEP) # 定義評測用的循環神經網絡模型。 with tf.variable_scope('language_model',reuse=True,initializer=initializer): eval_model=PTBModel(False,EVAL_BATCH_SIZE,EVAL_NUM_STEP) with tf.Session() as sess: tf.global_variables_initializer().run() train_queue = reader.ptb_producer(train_data, train_model.batch_size, train_model.num_steps) eval_queue = reader.ptb_producer(valid_data, eval_model.batch_size, eval_model.num_steps) test_queue = reader.ptb_producer(test_data, eval_model.batch_size, eval_model.num_steps) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 使用訓練數據訓練模型 for i in range(NUM_EPOCH): print('In iteration: {}'.format(i+1)) # 在所有訓練數據上訓練循環神經網絡模型 run_epoch(sess,train_model,train_queue,train_model.train_op,True,train_epoch_size) # 使用驗證數據評測模型效果 valid_perplexity=run_epoch(sess,eval_model,eval_queue,tf.no_op(),False,valid_epoch_size) print('Epoch: {} Validation Perplexity: {}'.format(i+1,valid_perplexity)) # 最后使用測試數據測試模型效果 test_perplexity=run_epoch(sess,eval_model,test_queue,tf.no_op(),False,test_epoch_size) print('Test Perplexity: {}'.format(test_perplexity)) coord.request_stop() coord.join(threads) if __name__ == '__main__': tf.app.run()
語言模型評價標准:perplexity
語言模型效果好壞的常用評價指標是復雜度(perplexity)。perplexity值刻畫的是通過某一個語言模型估計一句話出現的概率。比如$(w_{1},w_{2},...,w_{m})$表示語料庫中的一句話,則語言模型預測這句話的概率越高越好,即perplexity值越小越好。公式表示如下:
$Perplexity(S)=p(w_{1},w_{2},...,w_{m})^\frac{1}{m}=\sqrt[m]{\frac{1}{p(w_{1},w_{2},...,w_{m})}}=\sqrt[m]{\prod_{i=1}^{m}\frac{1}{p(w_{i}|w_{1},w_{2},...w_{i-1})}}$
$p(w_{i}|w_{i-n+1},...w_{i-1})=\frac{C(w_{i-n+1},...,w_{i-1},w_{i})}{C(w_{i-n+1},...,w_{i-1})}$
其中C(X)表示單詞序列X在訓練語料庫中出現的次數。復雜度perplexity表示的概念其實是平均分支系數(average branch factor),即模型預測下一個詞時的平均可選擇數量。以0~9這10個數字為例組成一個長度為m的序列,由於10個數字是隨機出現的,所以每個數字出現的概率為1/10。因此,在任意時刻,模型都有10個等概率的候選答案可以選擇,於是perplexity就是10,即$Perplexity(S)=\sqrt[m]{\prod_{i=1}^{m} \frac{1}{\frac{1}{10}}}$
perplexity的另一種常用表示形式:$log(Perplexity(S))=\frac{-\sum p(w_{i}|w_{1},w_{2},...,w_{i-1})}{m}$,將乘積開根號的運算轉換為加法運算,提高運算速度,同時可以避免概率為0時導致整個計算結果為0的問題。