TensorFlow學習筆記(六)循環神經網絡


 

 

 

一、循環神經網絡簡介

  循環神經網絡的主要用途是處理和預測序列數據。循環神經網絡刻畫了一個序列當前的輸出與之前信息的關系。從網絡結構上,循環神經網絡會記憶之前的信息,並利用之前的信息影響后面節點的輸出。

下圖展示了一個典型的循環神經網絡。

循環神經網絡的一個重要的概念就是時刻。上圖中循環神經網絡的主體結構A的輸入除了來自輸入層的Xt,還有一個自身當前時刻的狀態St。

在每一個時刻,A會讀取t時刻的輸入Xt,並且得到一個輸出Ht。同時還會得到一個當前時刻的狀態St,傳遞給下一時刻t+1。

因此,循環神經網絡理論上可看作同一神經結構被無限重復的過程。(無限重復目前還是不可行的)

將循環神經網絡按照時間序列展開,如下圖所示

 

 xt是t時刻的輸入

St是t時刻的“記憶”,St = f(WSt-1 + Uxt),f是tanh等激活函數

Ot 是t時刻的輸出

下圖給出一個最簡單的循環體或者叫記憶體的結構圖

 

下圖展示了一個循環神經網絡的前向傳播算法的具體計算過程。

在得到前向傳播計算結果之后,可以和其他網絡類似的定義損失函數。神經網絡的唯一區別在於它每一個時刻都有一個輸出,所以循環神經網絡的總損失為前面所有時刻的損失函數的總和。

我們利用代碼來實現這個簡單的前向傳播過程。

import numpy as np

X = [1,2]
state = [0.0,0.0]
#定義不同輸入部分的權重
w_cell_state = np.asarray([[0.1,0.2],[0.3,0.4]])
w_cell_input = np.asarray([0.5,0.6])
b_cell = np.asarray([0.1,-0.1])
#定義輸出層的權重
w_output = np.asarray([[0.1],[0.2]])
b_output = 0.1
#按照時間順序執行循環神經網絡的前向傳播過程
for i in range(len(X)):
    before_activetion = np.dot(state,w_cell_state) + X[i] * w_cell_input + b_cell
    state = np.tanh(before_activetion)
    #計算當前時刻的最終輸出
    final_output = np.dot(state,w_output) + b_output
    #輸出每一時刻的信息
    print("before_activation",before_activetion)
    print("state",state)
    print("final_output",final_output)

 

二、長短時記憶網絡(LSTM)結構

循環神經網絡工作的關鍵點就是使用歷史的信息來幫助當前的決策。循環神經網絡能很好的利用傳統的神經網絡不能建模的信息,但同時,也帶來了更大的挑戰——長期依賴的問題。

  在有些問題中,模型僅僅需要短期內的信息來執行當前的任務。但同時也會有一些上下文場景更加復雜的情況。當間隔不斷增大時,簡單的循環神經網絡可能會喪失學習到如此遠的信息的能力。或者在復雜的語言場景中,有用的信息的間隔有大有小,長短不一,循環神經網絡的性能也會受限。

  為了解決這類問題,設計了LSTM。與單一tanh循環結構不同,LSTM擁有三個門:“輸入門”、“輸出門”、“遺忘門”。 

 

  LSTM靠這些“門”的結構信息有選擇的影響循環神經網絡中每個時刻的狀態。所謂的“門”就是一個sigmod網絡和一個按位做乘法的操作。當sigmod輸出為1時,全部信息通過;為0時,信息無法通過。為了使循環神經網絡更有效的保持長期記憶。“遺忘門“和”輸入門”就至關重要。“遺忘門”就是讓神經網絡忘記之前沒有用的信息。從當前的輸入補充新的“記憶”是“輸入門”作用。

使用LSTM結構的循環神經網絡的前向傳播時一個比較復雜的計算過程。在TensorFlow中可以被很簡單的實現。例如下面的偽代碼:

import tensorflow as tf

#定義一個LSTM結構。TF通過一句簡單的命令就可以定義一個LSTM循環體
#LSTM中使用的變量也會自動聲明

lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_hidden_size)
#將LSTM中的狀態初始化問哦全0數組。
#BasicLSTMCell類提供了zero_state函數來生成全0 的初始狀態
state = lstm.zero_state(batch_size,tf.float32)
current_input = "hello"
#定義損失函數
loss = 0.0
#雖然rnn理論上可以處理任意長度的序列,但是在訓練時為了避免梯度消散的問題,會規定一個最大的循環長度num_temps
for i in range(num_temps):
    #在第一個時刻聲明LSTM結構中使用的變量,在之后的時刻都需要服用之前的定義好的變量。
    if i > 0:
        tf.get_variable_scope().reuse_variables()
    #每一步處理時間序列中的一個時刻
    lstm_output,state = lstm(current_input,state)
    #將當前時刻LSTM結構的輸出傳入一個全連接層得到最后的輸出
    final_output = full_connected(lstm_output)
    #計算當前時刻的輸出的損失
    loss += calc_loss(final_output,expected_output)

#利用BP后向傳播算法訓練模型

三、循環神經網絡的變種

1、雙向循環神經網絡和深層循環神經網絡

在經典的循環神經網絡中,狀態的傳輸時從前向后單向的。然而,在有些問題中,當前時刻的輸出不僅和之前的狀態有關,也和之后的轉台有關。只是后就需要使用雙向循環神經網絡來解決此類問題。雙向循環神經網絡時由連個神經網絡上下疊加在一起組成的。輸出有這兩個神經網絡的轉台共同決定的。下圖展示了一個雙向循環神經網絡。

深層循環神經網絡是循環神經網絡的另外一種變體。為了增強模型的表達能力,可以將每一時刻上的循環體重復多次。深層循環神經網絡在每一時刻上將循環體結構重復了多次。 每一層循環體中的參數是一致的,不同層的循環體參數可以不一致。TF提供了MultiRNNCell類來實現深層循環神經網絡的前向傳播過程。

 

import tensorflow as tf

#定義一個基本的LSTM結構作為循環體的基礎結構,深層循環神經網絡也可以支持其他的循環提結構
lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)

#通過MultiRNNCell類來實現深層循環神經網絡中每一時刻的前向傳播過程。其中。number_of_layers 表示了有多少層,也就是圖
#中從xi到hi需要經過多少個LSTM結構。
stacked_lstm = tf.nn.rnn_cell.MultiRNNCell([lstm]*number_of_layers)
#和經典神經網絡一樣,可以通過zero_state函數獲得初始狀態。
state = stacked_lstm.zero_state(batch_size,tf.float32)
#計算每一時刻的前向傳播過程
for i in range(num_steps):
    if i > 0:
        tf.get_variable_scope().reuse_variables()
    stacked_lstm_output  ,state = stacked_lstm(current_input,state)
    final_output =  fully_connected(stacked_lstm_output)
    loss += calc_loss(final_output,expected_output)
    

2、循環神經網絡的dropout

  dropout可以樣循環神經網絡更加的健壯。dropout一般只在不同層循環體之間使用。也就是說從t-1時刻傳遞到時刻t,RNN不會進行狀態的dropout,而在同一時刻t,不同層循環體之間會使用dropout。

在TF中,使用tf.nn.rnn_cell.DropoutWrapper類可以很容易實現dropout功能。

#定義LSTM結構
lstm  = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)
#通過DropoutWrapper來實現dropout功能。input_keep_drop參數用來控制輸入的dropout的概率,output_keep_drop參數用來控制輸出的dropout的概率,
dropout_lstm = tf.nn.rnn_cell.DropoutWrapper(lstm,input_keep_prob=0.5,output_keep_prob=0.5)
#在使用了dropout的基礎上定義深層RNN
stacked_lstm = tf.nn.rnn_cell.MultiRNNCell([dropout_lstm]* 5)

 

四、循環神經網絡的樣例應用

1、自然語言建模

  簡單的說,語言模型的目的就是為了計算一個句子的出現概率。在這里把句子看成單詞的序列S = (w1,w2,w3....wm),其中m為句子的長度,它的概率可以表示為

P(S) = p(w1,w2,w3.....wm) = p(w1)p(w2|w1)p(w3|w1,w2)p(wm| w1,w2...wm)

等式右邊的每一項都是語言模型中的一個參數。為了估計這些參數的取值,常用的方法有n-gram、決策樹、最大熵模型、條件隨機場、神經網絡模型。

  語言模型效果的好壞的常用的評價指標是復雜度(perplexity)。簡單來說,perplexity刻畫的就是通過某一語言模型估計一句話出現的概率。值越小越好。復雜度的計算公式:

下面就利用語言模型來處理PTB數據集。

為了讓PTB數據集使用更方便,TF提供了兩個函數來預處理PTB數據集。ptb_raw_data用來讀取原始數據,並將原始數據的單詞轉化為單詞ID,形成一個非常長的序列。ptb_iterator將序列按照某固定的長度來截斷,並將數據組成batch。

 

使用循環神經網絡實現語言模型

# -*- coding:utf-8 -*-

import numpy as np
import tensorflow as tf
from tensorflow.models.rnn.ptb import reader
from tensorflow.contrib.legacy_seq2seq import sequence_loss_by_example
DATA_PATH = "path/to/ptb/data"
HIDDEN_SIZE = 200 #隱藏層的規模
NUM_LAYERS = 2 #DRNN中LSTM結構的層數
VOCAB_SIZE = 10000 #詞典規模,加上語句結束符和稀有單詞結束符總共10000
LEARNING_RATE = 1.0
TRAIN_BATCH_SIZE = 20  #訓練數據BATCH大小
TRAIN_NUM_STEPS = 35    #訓練數據截斷長度
#在測試的時候不需要使用截斷
EVAL_BATCH_SIZE = EVAL_NUM_STEP = 1
NUM_EPOCH = 2 #使用訓練數據的輪數
KEEP_DROP =0.5 #節點不被dropout的概率
MAX_GRAD_NORM =5 #用於控制梯度膨脹的參數


#定義一個PTBMODEL類來描述模型,方便維護循環神經網絡中的狀態
class PTBMODEL:
    def __init__(self,batch_size,num_steps,is_training = True):
        self.batch_size = batch_size
        self.num_steps = num_steps
        #定義輸入層,維度為batch_size* num_steps
        self.input_data = tf.placeholder(tf.int32,shape=[batch_size,num_steps])
        #定義預期輸出。它的維度和ptb_iterrattor輸出的正確答案維度是一樣的。
        self.targets = tf.placeholder(tf.int32,[batch_size,num_steps])
        #定義使用LSTM結構為循環體結構且使用dropout的深層循環神經網絡
        lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
        if is_training:
            lstm_cell = tf.nn.rnn_cell.DropoutWrapper(lstm_cell,output_keep_prob=KEEP_DROP)
        cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
        #初始化初始狀態
        self.initial_state = cell.zero_state(batch_size,tf.float32)
        #將單詞ID轉換為單詞向量,總共有VOCAB_SIZE個單詞,每個單詞向量的維度為HIDDEN_SIZE,所以embedding參數的維度為
        #VOCAB_SIZE*HIDDEN_SIZE
        embedding = tf.get_variable("embedding",[VOCAB_SIZE,HIDDEN_SIZE])
        #將原本batch_size * num_steps個單詞ID轉化為單詞向量,轉化后的輸入層維度為batch_size * num_steps * HIDDEN_SIZE
        inputs = tf.nn.embedding_lookup(embedding,self.input_data)
        #只在訓練時使用dropout
        if is_training:
            inputs  = tf.nn.dropout(inputs,KEEP_DROP)
        #定義輸出列表,在這里現將不同時刻LSTM結構的輸出收集起來,再通過一個全連接層得到最終輸出
        output = []
        #state 存儲不同batch中LSTM的狀態,並且初始化為0.
        state = self.initial_state
        with tf.variable_scope("RNN"):
            for time_step  in range(num_steps):
                if time_step > 0 :
                    tf.get_variable_scope().reuse_variables()
                cell_output,state = cell(inputs[:,time_step,:],state)
                #將當前輸出加入輸出隊列
                output.append(cell_output)
        #把輸出隊列展開成[batch,hidden_size*num_steps]的形狀,然后再reshape成【batch*num_steps,hidden_size】的形狀。
        output = tf.reshape(tf.concat(output,1),[-1,HIDDEN_SIZE])
        #將從LSTM中得到的輸出再經過一個全連接層得到最后的預測結果,最終的預測結果在每一時刻上都是一個長度為VOCAB_SIZE的數組
        #經過SoftMax層之后表示下一個位置是不同單詞的概率。
        weight = tf.get_variable("weight",[HIDDEN_SIZE,VOCAB_SIZE])
        baias  =  tf.get_variable("bias",[VOCAB_SIZE])
        logits = tf.matmul(output,weight) + baias
        #定義交叉熵損失函數
        loss  = sequence_loss_by_example([logits],[tf.reshape(self.targets,[-1])],
                                                                   [tf.ones([batch_size*num_steps],dtype=tf.float32)]
                                                                   )
        #計算得到每個batch的平均損失
        self.cost = tf.reduce_sum(loss)/batch_size
        self.final_state = state
        #只在訓練模型是定義反向傳播操作
        if not is_training:
            return

        trainable_variables = tf.trainable_variables()
        #通過clip_by_global_norm函數控制梯度的大小,避免梯度膨脹的問題
        grads,_ = tf.clip_by_global_norm(tf.gradients(self.cost,trainable_variables),MAX_GRAD_NORM)
        #定義優化方法
        optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)
        #定義訓練步驟
        self.train_op = optimizer.apply_gradients(zip(grads,trainable_variables))

#使用給定的模型model在數據data上運行train_op並返回全部數據上的perplexity值

def run_epoch(session,model,data,train_op,output_log):
    #計算perplexity的輔助變量
    total_costs = 0.0
    iters = 0
    state = session.run(model.initial_state)
    #使用當前數據訓練或者測試模型
    for step ,(x,y) in  enumerate(reader.ptb_iterator( data,model.batch_size,model.num_steps)):
        cost,state,_ = session.run([model.cost,model.final_output,model.train_op],{
            model.input_data:x,model.targets:y,
            model.initial_state:state
        })
        total_costs += cost
        iters += model.num_steps
        #只有在訓練時輸出日志
        if output_log and step % 100 == 0:
            print("After %s steps ,perplexity is %.3f"%(step,np.exp(total_costs/iters)))

    #返回給定模型在給定數據上的perplexity
    return np.exp(total_costs/iters)


def main(_):
    #獲取原始數據
    train_data,valid_data,test_data = reader.ptb_raw_data(DATA_PATH)
    #定義初始化函數
    initializer = tf.random_uniform_initializer(-0.05,0.05)
    #定義訓練用的循環神經網絡模型
    with tf.variable_scope("language_model",reuse=True,initializer=initializer):
        train_model = PTBMODEL(TRAIN_BATCH_SIZE,TRAIN_NUM_STEPS,is_training=True)
    #定義評估用的循環神經網絡模型
    with tf.variable_scope("language_model",reuse=True,initializer=initializer):
        eval_model = PTBMODEL(EVAL_BATCH_SIZE,EVAL_NUM_STEP,is_training=False)
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        #使用訓練數據訓練模型
        for i in range(NUM_EPOCH):
            print("In iteration:%s"%(i+1))
            #在所有訓練數據上訓練RNN
            run_epoch(sess,train_model,train_data,train_model.train_op,True)
            #使用驗證集評測模型效果
            valid_perplexity = run_epoch(sess,eval_model,valid_data,tf.no_op(),False)
            print("Epoch %s ,Validation perplexity :%.3f"%(i+1,valid_perplexity))
        # 最后使用測試集驗證模型效果
        test_perplexity = run_epoch(sess,eval_model,valid_data,tf.no_op(),False)
        print("TEST perplexity :%.3f"%(test_perplexity))

if __name__ == '__main__':
    tf.app.run()

 四、時間序列預測

   怎么用循環神經網絡來預測正弦函數,可利用TF的高級封裝--TFLearn.

  1、使用TFLearn自定義模型

  

from sklearn  import cross_validation
from sklearn import datasets
from sklearn import metrics
import tensorflow as tf
from tensorflow.contrib.learn import models,Estimator,SKCompat
from tensorflow.contrib import layers,framework
import numpy as np
#導入TFLearn

#自定義模型,對於給定的輸入數據以及其對應的正確答案,返回在這些輸入上的預測值、損失值以及訓練步驟
def my_model(feature,target):
    #將預測的模型轉換為one-hot編碼的形式,因為共有三個類別,所以向量長度為3.經過轉化后,三個個類別(1,0,0),(0,1,0),(0,0,1)
    target = tf.one_hot(target,3,1,0)
    #定義模型以及其在給定數據上的損失函數。TFLearn通過logistic_regression封裝了一個單層全鏈接神經網絡
    logits,loss = models.logistic_regression(feature,target)
    #創建模型的優化器,並得到優化步驟
    train_op = layers.optimize_loss(loss,   #損失函數
                                    framework.get_global_step(), #獲取訓練步數並在訓練時更新
                                    optimizer="Adagrad",  #定義優化器
                                    learning_rate=0.1 #定義學習率
                                    )
    #返回在給定數據上的預測結果、損失值以及優化步驟
    return tf.argmax(logits,1) ,loss,train_op

#加載iris數據集,並划分為訓練集合和測試集合
iris  = datasets.load_iris()
x_train,x_test,y_train,y_test = cross_validation.train_test_split(iris.data,iris.target,test_size=0.2,random_state=0)
#對自定義的模型進行封裝
classifier =Estimator(model_fn=my_model)
classifier = SKCompat(classifier)
#使用封裝好的模型和訓練數據執行100輪的迭代
classifier.fit(x_train,y_train,steps=100)
#使用訓練好的模型進行預測
y_predicted = classifier.predict(x_test)



#計算模型的准確度
score  = metrics.accuracy_score(y_test,y_predicted)
print("Accuracy: %.2f %%"%(score * 100))

2、預測正選函數

  因為標准的RNN預測的是離散值,所以程序需要將連續的sin函數曲線離散化。

  每個SAMPLE_ITERVAL對sin函數進行一次采樣,采樣得到的序列就是sin函數離散化之后的結果

 

import numpy as np
import tensorflow as tf
import matplotlib as mpl
from matplotlib import pyplot as plt
from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat

# TensorFlow的高層封裝TFLearn
learn = tf.contrib.learn

# 神經網絡參數
HIDDEN_SIZE = 30  # LSTM隱藏節點個數
NUM_LAYERS = 2  # LSTM層數
TIMESTEPS = 10  # 循環神經網絡截斷長度
BATCH_SIZE = 32  # batch大小

# 數據參數
TRAINING_STEPS = 3000  # 訓練輪數
TRAINING_EXAMPLES = 10000  # 訓練數據個數
TESTING_EXAMPLES = 1000  # 測試數據個數
SAMPLE_GAP = 0.01  # 采樣間隔


def generate_data(seq):
    # 序列的第i項和后面的TIMESTEPS-1項合在一起作為輸入,第i+TIMESTEPS項作為輸出
    X = []
    y = []
    for i in range(len(seq) - TIMESTEPS - 1):
        X.append([seq[i:i + TIMESTEPS]])
        y.append([seq[i + TIMESTEPS]])
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)


# LSTM結構單元
def LstmCell():
    lstm_cell = tf.contrib.rnn.BasicLSTMCell(HIDDEN_SIZE)
    return lstm_cell


def lstm_model(X, y):
    # 使用多層LSTM,不能用lstm_cell*NUM_LAYERS的方法,會導致LSTM的tensor名字都一樣
    cell = tf.contrib.rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)])

    # 將多層LSTM結構連接成RNN網絡並計算前向傳播結果
    output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
    output = tf.reshape(output, [-1, HIDDEN_SIZE])

    # 通過無激活函數的全聯接層計算線性回歸,並將數據壓縮成一維數組的結構
    predictions = tf.contrib.layers.fully_connected(output, 1, None)

    # 將predictions和labels調整為統一的shape
    y = tf.reshape(y, [-1])
    predictions = tf.reshape(predictions, [-1])

    # 計算損失值
    loss = tf.losses.mean_squared_error(predictions, y)

    # 創建模型優化器並得到優化步驟
    train_op = tf.contrib.layers.optimize_loss(
        loss,
        tf.train.get_global_step(),
        optimizer='Adagrad',
        learning_rate=0.1)

    return predictions, loss, train_op


# 用sin生成訓練和測試數據集
test_start = TRAINING_EXAMPLES * SAMPLE_GAP
test_end = (TRAINING_EXAMPLES + TESTING_EXAMPLES) * SAMPLE_GAP
train_X, train_y = generate_data(
    np.sin(np.linspace(0, test_start, TRAINING_EXAMPLES, dtype=np.float32)))
test_X, test_y = generate_data(
    np.sin(
        np.linspace(test_start, test_end, TESTING_EXAMPLES, dtype=np.float32)))

# 建立深層循環網絡模型
regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='model/'))

# 調用fit函數訓練模型
regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS)

# 使用訓練好的模型對測試集進行預測
predicted = [[pred] for pred in regressor.predict(test_X)]

# 計算rmse作為評價指標
rmse = np.sqrt(((predicted - test_y)**2).mean(axis=0))
print('Mean Square Error is: %f' % (rmse[0]))

# 對預測曲線繪圖,並存儲到sin.jpg
fit = plt.figure()
plot_predicted = plt.plot(predicted,label = "predicted")
plot_test = plt.plot(test_y,label = "real_sin")
plt.legend([plot_predicted, plot_test], ['predicted', 'real_sin'])

plt.savefig("sin.png")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM