TensorFlow學習筆記13-循環、遞歸神經網絡


循環神經網絡(RNN)

卷積網絡專門處理網格化的數據,而循環網絡專門處理序列化的數據。
一般的神經網絡結構為:

一般的神經網絡結構的前提假設是:元素之間是相互獨立的,輸入、輸出都是獨立的
現實世界中的輸入並不完全獨立,如股票隨時間的變化,這就需要循環網絡

循環神經網絡的本質

循環神經網絡的本質是有記憶能力,能將前一時刻的輸出量('記憶')作為下一時刻的輸入量。

RNN的結構與原理

結構如下:

設某個神經元的

\[X_t:表示t時刻的輸入,h_t:表示t時刻的輸出,S_t:表示t時刻的狀態(state) \]

\(S_t = f(U*X_t + W*S_{t-1})\) 表示\(t\)時刻的記憶, 其中函數\(f\)就是神經網絡的激活函數,常用\(\tanh()\)

可見,\(t\)時刻的記憶是\(t-1\)時刻記憶與\(t\)時刻輸入的加權疊加。

神經元\(t\)時刻的輸出基於之前所有的記憶\(S_t\)做出,表示為

\[h_t=softmax(VS_t) \]

將輸出\(o_t\)與標簽label比較得到誤差,用梯度下降(Gradient Descent)和Back-Propagation
Through Time(BPTT)方法對網絡進行訓練。

例如,在視頻中的目標識別案例中, Xi就是一幀圖像.

LSTM基本原理

早期時,RNN被設計成可以處理整個時間序列信息,但記憶最深的還是最后輸入的信號,而之前的信號
強度則越來越低,這個缺陷導致RNN在當時的作用並不明顯。后來發現了Long-Short Term memory(LSTM)
,循環神經網絡可以記住長期的信息。如圖。它包括4層神經網絡。信息流從t-1時刻流向t時刻時,LSTM
單元可對其增加或刪減信息,這些修改的操作由LSTM單元中的Gates控制。在圖中,由於Sigmoid函數
的輸出在(0,1)之間,通過將sigmoid的輸出與信息流點乘,可以控制信息流是允許信息流通過(sigmoid=1)
或不允許通過(sigmoid=0)。這里的state就是LSTM單元中上面的那條直線,它貫穿了串聯在一起的
LSTM單元。

和卷積神經網絡的共享參數方法一樣,這里的每個神經元都共享了一組參數(U,V,W), 這樣能降低計算量。
其具體原理如下圖。

同樣地,設某個神經元的

\[X_t:表示t時刻的輸入,h_t:表示t時刻的輸出,S_t:表示t時刻的狀態(state) \]

  1. 忘記門(決定忘記多少舊記憶),當有新的輸入到來,我們希望據此對舊記憶進行忘記,保存的比例為:

\[f_t=\sigma(W_f\cdot [h_{t-1},x_t]+b_f) \]

  1. 輸入門(決定更新什么信息,並用\(\bar{C_t}\)對其篩選)

\[i_t=\sigma(W_i\cdot [h_{t-1},x_t]+b_i) \]

\[\bar{C_t}=\tanh (W_C\cdot [h_{t-1},x_t]+b_C) \]

然后把要更新的信息加入到

\[S_t=f_t *S_{t-1}+i_t *\bar{C_t} \]

  1. 輸出門(根據更新后的記憶進行輸出)

\[o_t=\sigma(W_o\cdot [h_{t-1},x_t]+b_o) \]

\[h_t=o_t *\tanh (S_t) \]

LSTM變體

  1. peephole連接

  2. coupled忘記門與輸入門

  3. GRU(Gated Recurrent Unit)


下面是用LSTM實現的語言模型。

import reader
import numpy as np
import tensorflow as tf

# 數據參數
DATA_PATH = 'simple-examples/data/'  # 數據存放路徑
VOCAB_SIZE = 10000  # 單詞數量

# 神經網絡參數
HIDDEN_SIZE = 200  # LSTM隱藏層規模
NUM_LAYERS = 2  # LSTM結構層數
LEARNING_RATE = 1.0  # 學習速率
KEEP_PROB = 0.5  # 節點不被dropout的概率
MAX_GRAD_NORM = 5  # 用於控制梯度膨脹的參數

# 訓練參數
TRAIN_BATCH_SIZE = 20  # 訓練數據batch大小
TRAIN_NUM_STEP = 35  # 訓練數據截斷長度

# 測試參數
EVAL_BATCH_SIZE = 1  # 測試數據batch大小
EVAL_NUM_STEP = 1  # 測試數據截斷
NUM_EPOCH = 2  # 使用訓練數據的輪數


# 通過PTBModel描述模型,方便維護循環神經網絡中的狀態
class PTBModel():
    def __init__(self, is_training, batch_size, num_steps):
        # 記錄batch和截斷長度
        self.batch_size = batch_size
        self.num_steps = num_steps

        # 定義輸入層
        self.input_data = tf.placeholder(tf.int32, [batch_size, num_steps])

        # 定義預期輸出
        self.targets = tf.placeholder(tf.int32, [batch_size, num_steps])

        # 定義LSTM為使用dropout的兩層網絡
        lstm_cell = tf.contrib.rnn.BasicLSTMCell(HIDDEN_SIZE)
        if is_training:
            lstm_cell = tf.contrib.rnn.DropoutWrapper(
                lstm_cell, output_keep_prob=KEEP_PROB)
        cell = tf.contrib.rnn.MultiRNNCell([lstm_cell] * NUM_LAYERS)

        # 初始化state
        self.initial_state = cell.zero_state(batch_size, tf.float32)

        # 將單詞ID轉為單詞向量。每個單詞都是HIDDEN_SIZE維
        embedding = tf.get_variable('embedding', [VOCAB_SIZE, HIDDEN_SIZE])

        # 將原本batch_size*num_steps的輸入層轉化為batch_size*num_steps*HIDDEN_SIZE
        inputs = tf.nn.embedding_lookup(embedding, self.input_data)

        # 只在訓練時使用dropout
        if is_training:
            inputs = tf.nn.dropout(inputs, KEEP_PROB)

        # 定義輸出列表
        outputs = []
        state = self.initial_state
        with tf.variable_scope('RNN'):
            for time_step in range(num_steps):
                if time_step > 0:
                    tf.get_variable_scope().reuse_variables()
                cell_output, state = cell(inputs[:, time_step, :],
                                          state)  # 將當前時刻的數據和狀態傳入LSTM
                outputs.append(cell_output)  # 將當前輸出加入輸出列表

        # 將輸出列表展開成[batch,hidden_size*num_steps]
        # 再reshape成[batch*num_steps,hidden_size]
        output = tf.reshape(tf.concat(outputs, 1), [-1, HIDDEN_SIZE])

        # 將輸出傳入全連接層,每個時刻的輸出都是長度為VOCAB_SIZE的數組
        weight = tf.get_variable('weight', [HIDDEN_SIZE, VOCAB_SIZE])
        bias = tf.get_variable('bias', [VOCAB_SIZE])
        logits = tf.matmul(output, weight) + bias

        # 定義交叉熵損失函數,sequence_loss_by_example計算一個序列的交叉熵的和
        loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
            [logits],  # 預測結果
            [tf.reshape(self.targets, [-1])
             ],  # 預期結果。將[batch_size,num_steps]壓縮成一維
            [tf.ones([batch_size * num_steps], dtype=tf.float32)
             ]  # 損失的權重。這里所有的權重都為1,表示不同batch和不同時刻的重要程度都一樣
        )

        # 計算得到每個batch的平均損失
        self.cost = tf.reduce_sum(loss) / batch_size
        self.final_state = state

        # 只在訓練時反向傳播
        if not is_training:
            return
        trainable_variables = tf.trainable_variables()
        grads, _ = tf.clip_by_global_norm(
            tf.gradients(self.cost, trainable_variables),
            MAX_GRAD_NORM)  # 控制梯度大小。避免梯度膨脹

        # 定義優化方法
        optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)

        # 定義訓練步驟
        self.train_op = optimizer.apply_gradients(
            zip(grads, trainable_variables))


# 使用給定的model在data上運行train_op並返回在全部數據上的perplexity
def run_epoch(session, model, data_queue, train_op, output_log, epoch_size):
    # 計算perplexity的輔助變量
    total_costs = 0.0
    iters = 0
    state = session.run(model.initial_state)

    # 使用當前數據訓練或測試模型
    for step in range(epoch_size):
        # 生成輸入和答案
        feed_dict = {}
        x, y = session.run(data_queue)
        feed_dict[model.input_data] = x
        feed_dict[model.targets] = y

        # 將狀態轉為字典
        for i, (c, h) in enumerate(model.initial_state):
            feed_dict[c] = state[i].c
            feed_dict[h] = state[i].h

        # 獲取損失值和下一個狀態
        cost, state, _ = session.run(
            [model.cost, model.final_state, train_op], feed_dict=feed_dict
        )  # 在當前batch上運行train_op並計算損失值。交叉熵損失函數計算的是下一個單詞為給定單詞的概率
        total_costs += cost
        iters += model.num_steps

        # 訓練時輸出日志
        if output_log and step % 100 == 0:
            print('After %d steps,perplexity is %.3f' %
                  (step, np.exp(total_costs / iters)))

    return np.exp(total_costs / iters)


def main(_):
    # 原始數據
    train_data, valid_data, test_data, _ = reader.ptb_raw_data(DATA_PATH)

    # 計算一個epoch需要訓練的次數
    train_data_len = len(train_data)  # 數據集的大小
    train_batch_len = train_data_len // TRAIN_BATCH_SIZE  # batch的個數
    train_epoch_size = (train_batch_len - 1) // TRAIN_NUM_STEP  # 該epoch的訓練次數

    valid_data_len = len(valid_data)
    valid_batch_len = valid_data_len // EVAL_BATCH_SIZE
    valid_epoch_size = (valid_batch_len - 1) // EVAL_NUM_STEP

    test_data_len = len(test_data)
    test_batch_len = test_data_len // EVAL_BATCH_SIZE
    test_epoch_size = (test_batch_len - 1) // EVAL_NUM_STEP

	# 生成數據隊列,必須放在開啟多線程之前
    train_queue = reader.ptb_producer(train_data, train_model.batch_size,
                                      train_model.num_steps)
    valid_queue = reader.ptb_producer(valid_data, eval_model.batch_size,
                                      eval_model.num_steps)
    test_queue = reader.ptb_producer(test_data, eval_model.batch_size,
                                     eval_model.num_steps)

    # 定義初始化函數
    initializer = tf.random_uniform_initializer(-0.05, 0.05)

    # 定義訓練用的模型
    with tf.variable_scope(
            'language_model', reuse=None, initializer=initializer):
        train_model = PTBModel(True, TRAIN_BATCH_SIZE, TRAIN_NUM_STEP)

    # 定義評估用的模型
    with tf.variable_scope(
            'language_model', reuse=True, initializer=initializer):
        eval_model = PTBModel(False, EVAL_BATCH_SIZE, EVAL_NUM_STEP)

    with tf.Session() as sess:
        tf.global_variables_initializer().run()

        # 開啟多線程從而支持ptb_producer()使用tf.train.range_input_producer()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        # 使用訓練數據訓練模型
        for i in range(NUM_EPOCH):
            print('In iteration: %d' % (i + 1))
            run_epoch(sess, train_model, train_queue, train_model.train_op,
                      True, train_epoch_size)  # 訓練模型
            valid_perplexity = run_epoch(sess, eval_model, valid_queue,
                                         tf.no_op(), False,
                                         valid_epoch_size)  # 使用驗證數據評估模型
            print('Epoch: %d Validation Perplexity: %.3f' % (i + 1,
                                                             valid_perplexity))

        # 使用測試數據測試模型
        test_perplexity = run_epoch(sess, eval_model, test_queue,
                                    tf.no_op(), False, test_epoch_size)
        print('Test Perplexity: %.3f' % test_perplexity)

        # 停止所有線程
        coord.request_stop()
        coord.join(threads)


if __name__ == '__main__':
    tf.app.run()

Github上有更新的代碼。

雙向循環網絡Bi-RNN的結構與原理

#coding:utf-8
#代碼主要是使用Bidirectional LSTM Classifier對MNIST數據集上進行測試
#導入常用的數據庫,並下載對應的數據集
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data", one_hot = True)

#設置對應的訓練參數
learning_rate = 0.01
max_samples = 400000
batch_size = 128
display_step = 10

n_input = 28
n_steps = 28
n_hidden = 256
n_classes = 10

#創建輸入x和學習目標y的placeholder,這里我們的樣本被理解為一個時間序列,第一個維度是時間點n_step,第二個維度是每個時間點的數據n_inpt。同時,在最后創建Softmax層的權重和偏差
x = tf.placeholder("float", [None, n_steps, n_input])
y = tf.placeholder("float", [None, n_classes])

weights = tf.Variable(tf.random_normal([2 * n_hidden, n_classes]))
biases = tf.Variable(tf.random_normal([n_classes]))

#定義Bidirectional LSTM網絡的生成函數
def BiRNN(x, weights, biases):

    x = tf.transpose(x, [1, 0, 2])
    x = tf.reshape(x, [-1, n_input])
    x = tf.split(x, n_steps)

    lstm_fw_cell = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias = 1.0)
    lstm_bw_cell = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias = 1.0)

    outputs, _, _ = tf.contrib.rnn.static_bidirectional_rnn(lstm_fw_cell,
                                                            lstm_bw_cell, x,
                                                            dtype = tf.float32)
    return tf.matmul(outputs[-1], weights) + biases

#使用tf.nn.softmax_cross_entropy_with_logits進行softmax處理並計算損失
pred = BiRNN(x, weights, biases)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = pred, labels = y))
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate).minimize(cost)

correct_pred = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

init = tf.global_variables_initializer()

#開始執行訓練和測試操作
with tf.Session() as sess:
    sess.run(init)
    step = 1
    while step * batch_size < max_samples:
        batch_x, batch_y = mnist.train.next_batch(batch_size)
        batch_x = batch_x.reshape((batch_size, n_steps, n_input))
        sess.run(optimizer, feed_dict = {x: batch_x, y: batch_y})
        if step % display_step == 0:
            acc = sess.run(accuracy, feed_dict = {x: batch_x, y: batch_y})
            loss = sess.run(cost, feed_dict = {x: batch_x, y: batch_y})
            print("Iter" + str(step * batch_size) + ", Minibatch Loss = " + \
                "{:.6f}".format(loss) + ", Training Accuracy = " + \
                "{:.5f}".format(acc))
        step += 1
    print("Optimization Finished!")

    test_len = 10000
    test_data = mnist.test.assets[:test_len].reshape((-1, n_steps, n_input))
    test_label = mnist.test.labels[:test_len]
    print("Testing Accuracy:", sess.run(accuracy, feed_dict = {x: test_data, y: test_label}))

案例

RNN與CNN的結合

在圖像處理中,目前做的最好的是CNN,而自然語言處理中,表現比較好的是RNN,因此,我們能否把他們結合起來,一起用呢?那就是看圖說話了,這個原理也比較簡單,舉個小栗子:假設我們有CNN的模型訓練了一個網絡結構,比如是這個

最后我們不是要分類嘛,那在分類前,是不是已經拿到了圖像的特征呀,那我們能不能把圖像的特征拿出來,放到RNN的輸入里,讓他學習呢?

之前的RNN是這樣的:

\[S_t=\tanh(U∗X_t+W∗S_{t−1}) \]

我們把圖像的特征加在里面,可以得到:

\[S_t=\tanh(U∗X_t+W∗S_{t−1}+V∗X) \]

其中的X就是圖像的特征。如果用的是上面的CNN網絡,X應該是一個4096X1的向量。
注:這個公式只在第一步做,后面每次更新就沒有V了,因為給RNN數據只在第一次迭代的時候給。


參考(如不允許轉載請及時聯系我郵箱charleechan@163.com)

1 循環神經網絡(RNN)原理通俗解釋


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM