第二十節,使用RNN網絡擬合回聲信號序列


這一節使用TensorFlow中的函數搭建一個簡單的RNN網絡,使用一串隨機的模擬數據作為原始信號,讓RNN網絡來擬合其對應的回聲信號。

樣本數據為一串隨機的由0,1組成的數字,將其當成發射出去的一串信號。當碰到阻擋被反彈回來時,會收到原始信號的回聲。

如果步長為3,那么輸入和輸出的序列如下圖所示:

原序列 0 1 1 0 1 0 1 1 0 0 1 1 0 1 1
回聲序列 null null null 0 1 1 0 1 0 1 1 0 0 1 1

 

 

如上表所示,回聲序列的前三項是null,原序列的第一個信號為0,對應的是回聲序列的第四項,即回聲序列的每一個數都比原序列滯后3個時序。本例的任務就是把序列截取出來,對於每個原序列來預測它的回聲序列。。

構建的網絡結構如下圖所示:

上圖中,初始的輸入有5個,xt個為t時刻輸入序列值,另外4個為t-1時刻隱藏層的輸出值ht-1。通過一層具有4個節點的RNN網絡,再接一個全連接輸出兩個類別,分別表示輸出0,和1類別的概率。這樣每個序列都會有一個對應的預測分類值,最終將整個序列生成了預測序列。

下面我們會演示一個例子,這里隨機生成一個具有50000個序列樣本數據,然后根據原序列生成50000個回聲序列樣本數據。我們每個訓練截取15個序列作為一個樣本,我們設置小批量大小batch_size為5。

  • 我們把50000個序列,轉換為5x10000的數組。
  • 對數組的每一行按長度為15進行分割,每一個小批量含有5x15個序列。
  • 針對每一小批量的序列,我們使用RNN網絡開始迭代,迭代每一個批次中的每一組序列(5x1)。

注意這里面的5就是我們設置的batch_size大小,這和我們之前在CNN以及DNN網絡中的batch_size是一樣的,即一次訓練使用batch_size個樣本。

下面是一個小批量的原序列數據和回聲序列數據,這里回聲序列的前三個序列值是無效的,這主要是與我們原序列切割方式有關的。

 

一 定義參數並生成樣本數據

np.random.seed(0)
'''
一 定義參數生成樣本數據
'''

num_epochs = 5                   #迭代輪數
total_series_length = 50000      #序列樣本數據長度
truncated_backprop_length = 15   #測試時截取數據長度
state_size = 4                   #中間狀態長度
num_classes = 2                  #輸出類別個數
echo_step =  3                   #回聲步長
batch_size = 5                   #小批量大小
learning_rate = 0.4              #學習率
num_batches =total_series_length//batch_size//truncated_backprop_length   #計算一輪可以分為多少批

def generate_date():
    '''
    生成原序列和回聲序列數據,回聲序列滯后原序列echo_step個步長
    
    返回原序列和回聲序列組成的元組    
    '''
    #生成原序列樣本數據  random.choice()隨機選取內容從0和1中選取total_series_length個數據,0,1數據的概率都是0.5
    x = np.array(np.random.choice(2,total_series_length,p=[0.5,0.5]))
    #向右循環移位  如11110000->00011110
    y =np.roll(x,echo_step)
    #回聲序列,前echo_step個數據清0
    y[0:echo_step] = 0

    x = x.reshape((batch_size,-1))    #5x10000
    #print(x)
    y = y.reshape((batch_size,-1))    #5x10000
    #print(y)
    return (x,y)

 

二 定義占位符處理輸入數據

定義三個占位符,batch_x為原始序列,batch_y為回聲序列真實值,init_state為循環節點的初始值。batch_x是逐個輸入網絡的,所以需要將輸進去的數據打散,按照時間序列變成15個數組,每個數組有batch_size個元素,進行統一批處理。

'''
二 定義占位符處理輸入數據
'''
batch_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,truncated_backprop_length])    #原始序列  
batch_y = tf.placeholder(dtype=tf.int32,shape=[batch_size,truncated_backprop_length])      #回聲序列 作為標簽
init_state = tf.placeholder(dtype=tf.float32,shape=[batch_size,state_size])                #循環節點的初始狀態值 


#將batch_x沿axis = 1(列)的軸進行拆分    返回一個list 每個元素都是一個數組   [(5,),(5,)....] 一共15個元素,即15個序列
inputs_series = tf.unstack(batch_x,axis=1)     
labels_series = tf.unstack(batch_y,axis=1)

 

三 定義網絡結構

定義一層循環與一層全網絡連接。由於數據是一個二維數組序列,所以需要通過循環將輸入數據按照原有序列逐個輸入網絡,並輸出對應的predictions序列,同樣的,對於每個序列值都要對其做loss計算,在loss計算使用了spare_softmax_cross_entropy_with_logits函數,因為label的最大值正好是1,而且是一位的,就不需要在使用one_hot編碼了,最終將所有的loss均值放入優化器中。

'''
三 定義RNN網絡結構

一個輸入樣本由15個輸入序列組成 一個小批量包含5個輸入樣本
'''
current_state = init_state     #存放當前的狀態
predictions_series = []        #存放一個小批量中每個輸入樣本的預測序列值 每個元素為5x2 共有15個元素
losses = []                    #存放一個小批量中每個輸入樣本訓練的損失值 每個元素是一個標量,共有15個元素  

#使用一個循環,按照序列逐個輸入   
for current_input,labels in zip(inputs_series,labels_series):
    #確定形狀為batch_size x 1
    current_input = tf.reshape(current_input,[batch_size,1])
    '''
       加入初始狀態 
       5 x 1序列值和 5 x 4中間狀態 按列連接,得到 5 x 5數組 構成輸入數據
    '''
    input_and_state_concatenated = tf.concat([current_input,current_state],1)
    #隱藏層激活函數選擇tanh 5x4
    next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated,state_size,activation_fn = tf.tanh)
    current_state = next_state
    #輸出層 激活函數選擇None,即直接輸出 5x2
    logits = tf.contrib.layers.fully_connected(next_state,num_classes,activation_fn = None)
    #計算代價 
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits = logits))
    losses.append(loss)
    #經過softmax計算預測值 5x2  注意這里並不是標簽值 這里是one_hot編碼
    predictions =  tf.nn.softmax(logits)        
    predictions_series.append(predictions)    

total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(learning_rate).minimize(total_loss)

 

四  建立session訓練數據並可視化輸出

建立session,初始化RNN循環節點的值為0。總樣本迭代5輪,每一輪迭代完調用plot函數生成圖像。

'''
四 建立session訓練數據
'''
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    loss_list = []            #list 存放每一小批量的代價值
    
    #開始迭代每一輪
    for epoch_idx in range(num_epochs):
        #生成原序列和回聲序列數據
        x,y = generate_date()
        
        #初始化循環節點狀態值
        _current_state = np.zeros((batch_size,state_size))
        
        print('New date,epoch',epoch_idx)
        
        #迭代每一小批量
        for batch_idx in range(num_batches):
            #計算當前batch的起始索引
            start_idx = batch_idx * truncated_backprop_length
            #計算當前batch的結束索引
            end_idx = start_idx + truncated_backprop_length
            
            #當前批次的原序列值
            batchx = x[:,start_idx:end_idx]
            #當前批次的回聲序列值
            batchy = y[:,start_idx:end_idx]
            
            #開始訓練當前批次樣本
            _total_loss,_train_step,_current_state,_predictions_series = sess.run(
                    [total_loss,train_step,current_state,predictions_series],
                    feed_dict = {
                            batch_x:batchx,
                            batch_y:batchy,
                            init_state:_current_state
                            })
    
            loss_list.append(_total_loss)        
        
            if batch_idx % 100 == 0:
                print('Step {0} Loss {1}'.format(batch_idx,_total_loss))
        #可視化輸出
        plot(loss_list,_predictions_series,batchx,batchy)
        #print(batchx)
        #print(batchy)                    
def plot(loss_list, predictions_series, batchx, batchy):
    '''
    繪制一個小批量中每一個原序列樣本,回聲序列樣本,預測序列樣本圖像
    
    args:
        loss_list:list 存放每一個批次訓練的代價值 
        predictions_series:list長度為5  存放一個批次中每個輸入序列的預測序列值 注意這里每個元素(5x2)都是one_hot編碼 
        batchx:當前批次的原序列 5x15
        batchy:當前批次的回聲序列 5x15
    '''   
    plt.figure(figsize=(3.2*3,2.4*2)) 
    #創建子圖 2行3列選擇第一個  繪制代價值
    plt.subplot(2, 3, 1)
    plt.cla()
    plt.plot(loss_list)


    #迭代每一個序列  循環5次
    for batch_series_idx in range(batch_size):
        #獲取第batch_series_idx個序列的預測值(one_hot編碼) 15x2
        one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]        
        #轉換為標簽值 (15,)
        single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])
        

        #繪制第batch_series_idx + 2個子圖
        plt.subplot(2, 3, batch_series_idx + 2)
        plt.cla()
        #設置x軸 y軸坐標值范圍
        plt.axis([0, truncated_backprop_length, 0, 2])
        #獲取原序列x坐標值
        left_offset = range(truncated_backprop_length)
        #獲取回聲序列x坐標值 滯后3個步長
        left_offset2 = range(echo_step,truncated_backprop_length + echo_step)
        
        label1 = "past values"
        label2 = "True echo values" 
        label3 = "Predictions"      
        #繪制原序列
        plt.plot(left_offset2, batchx[batch_series_idx, :]*0.2+1.5, "o--b", label=label1)
        #繪制真實回聲序列
        plt.plot(left_offset, batchy[batch_series_idx, :]*0.2+0.8,"x--b", label=label2)
        #繪制預測回聲序列
        plt.plot(left_offset,  single_output_series*0.2+0.1 , "o--r", label=label3)
    
    plt.legend(loc='best')
    plt.show()

函數中將輸入的原序列,回聲序列和預測的序列同時輸出在圖像中。按照小批量樣本的個數生成圖像。為了讓三個序列看起來更明顯,將其縮放0.2,並且調節每個圖像的高度。同時將原始序列在顯示中滯后echo_step個序列,將三個圖像放在同一序列順序比較。

如上圖,最下面的是預測的序列,中間的為回聲序列,從圖像上可以看出預測序列和回聲序列幾乎相同,表明RNN網絡已經完全可以學習到回聲的規則。

完整代碼:

# -*- coding: utf-8 -*-
"""
Created on Tue May  8 08:45:40 2018

@author: zy
"""

'''
使用RNN網絡擬合回聲信號序列

使用一串隨機的模擬數據作為原始信號,讓RNN網絡來擬合其對應的回聲信號
樣本數據為一串隨機的由0,1組成的數字,將其當成發射出去的一串信號。當碰到阻擋被反彈回來時,會收到原始信號的回音

如果步長為3,那么輸入和輸出的序列序列如下:
原序列     0   1    1   |   0   1    0   .....     1
回聲序列 null null null |   0   1    1   .....     0
'''

import tensorflow  as tf
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(0)
'''
一 定義參數生成樣本數據
'''

num_epochs = 5                   #迭代輪數
total_series_length = 50000      #序列樣本數據長度
truncated_backprop_length = 15   #測試時截取數據長度
state_size = 4                   #中間狀態長度
num_classes = 2                  #輸出類別個數
echo_step =  3                   #回聲步長
batch_size = 5                   #小批量大小
learning_rate = 0.4              #學習率
num_batches =total_series_length//batch_size//truncated_backprop_length   #計算一輪可以分為多少批

def generate_date():
    '''
    生成原序列和回聲序列數據,回聲序列滯后原序列echo_step個步長
    
    返回原序列和回聲序列組成的元組    
    '''
    #生成原序列樣本數據  random.choice()隨機選取內容從0和1中選取total_series_length個數據,0,1數據的概率都是0.5
    x = np.array(np.random.choice(2,total_series_length,p=[0.5,0.5]))
    #向右循環移位  如11110000->00011110
    y =np.roll(x,echo_step)
    #回聲序列,前echo_step個數據清0
    y[0:echo_step] = 0

    x = x.reshape((batch_size,-1))    #5x10000
    #print(x)
    y = y.reshape((batch_size,-1))    #5x10000
    #print(y)
    return (x,y)


'''
二 定義占位符處理輸入數據
'''
batch_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,truncated_backprop_length])    #原始序列  
batch_y = tf.placeholder(dtype=tf.int32,shape=[batch_size,truncated_backprop_length])      #回聲序列 作為標簽
init_state = tf.placeholder(dtype=tf.float32,shape=[batch_size,state_size])                #循環節點的初始狀態值 


#將batch_x沿axis = 1(列)的軸進行拆分    返回一個list 每個元素都是一個數組   [(5,),(5,)....] 一共15個元素,即15個序列
inputs_series = tf.unstack(batch_x,axis=1)     
labels_series = tf.unstack(batch_y,axis=1)


'''
三 定義RNN網絡結構

一個輸入樣本由15個輸入序列組成 一個小批量包含5個輸入樣本
'''
current_state = init_state     #存放當前的狀態
predictions_series = []        #存放一個小批量中每個輸入樣本的預測序列值 每個元素為5x2 共有15個元素
losses = []                    #存放一個小批量中每個輸入樣本訓練的損失值 每個元素是一個標量,共有15個元素  

#使用一個循環,按照序列逐個輸入   
for current_input,labels in zip(inputs_series,labels_series):
    #確定形狀為batch_size x 1
    current_input = tf.reshape(current_input,[batch_size,1])
    '''
       加入初始狀態 
       5 x 1序列值和 5 x 4中間狀態 按列連接,得到 5 x 5數組 構成輸入數據
    '''
    input_and_state_concatenated = tf.concat([current_input,current_state],1)
    #隱藏層激活函數選擇tanh 5x4
    next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated,state_size,activation_fn = tf.tanh)
    current_state = next_state
    #輸出層 激活函數選擇None,即直接輸出 5x2
    logits = tf.contrib.layers.fully_connected(next_state,num_classes,activation_fn = None)
    #計算代價 
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits = logits))
    losses.append(loss)
    #經過softmax計算預測值 5x2  注意這里並不是標簽值 這里是one_hot編碼
    predictions =  tf.nn.softmax(logits)    
    
    predictions_series.append(predictions)
    

total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(learning_rate).minimize(total_loss)


def plot(loss_list, predictions_series, batchx, batchy):
    '''
    繪制一個小批量中每一個原序列樣本,回聲序列樣本,預測序列樣本圖像
    
    args:
        loss_list:list 存放每一個批次訓練的代價值 
        predictions_series:list長度為5  存放一個批次中每個輸入序列的預測序列值 注意這里每個元素(5x2)都是one_hot編碼 
        batchx:當前批次的原序列 5x15
        batchy:當前批次的回聲序列 5x15
    '''   
    plt.figure(figsize=(3.2*3,2.4*2)) 
    #創建子圖 2行3列選擇第一個  繪制代價值
    plt.subplot(2, 3, 1)
    plt.cla()
    plt.plot(loss_list)


    #迭代每一個序列  循環5次
    for batch_series_idx in range(batch_size):
        #獲取第batch_series_idx個序列的預測值(one_hot編碼) 15x2
        one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]        
        #轉換為標簽值 (15,)
        single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])
        

        #繪制第batch_series_idx + 2個子圖
        plt.subplot(2, 3, batch_series_idx + 2)
        plt.cla()
        #設置x軸 y軸坐標值范圍
        plt.axis([0, truncated_backprop_length, 0, 2])
        #獲取原序列x坐標值
        left_offset = range(truncated_backprop_length)
        #獲取回聲序列x坐標值 滯后3個步長
        left_offset2 = range(echo_step,truncated_backprop_length + echo_step)
        
        label1 = "past values"
        label2 = "True echo values" 
        label3 = "Predictions"      
        #繪制原序列
        plt.plot(left_offset2, batchx[batch_series_idx, :]*0.2+1.5, "o--b", label=label1)
        #繪制真實回聲序列
        plt.plot(left_offset, batchy[batch_series_idx, :]*0.2+0.8,"x--b", label=label2)
        #繪制預測回聲序列
        plt.plot(left_offset,  single_output_series*0.2+0.1 , "o--r", label=label3)
    
    plt.legend(loc='best')
    plt.show()

'''
四 建立session訓練數據
'''
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    loss_list = []            #list 存放每一小批量的代價值
    
    #開始迭代每一輪
    for epoch_idx in range(num_epochs):
        #生成原序列和回聲序列數據
        x,y = generate_date()
        
        #初始化循環節點狀態值
        _current_state = np.zeros((batch_size,state_size))
        
        print('New date,epoch',epoch_idx)
        
        #迭代每一小批量
        for batch_idx in range(num_batches):
            #計算當前batch的起始索引
            start_idx = batch_idx * truncated_backprop_length
            #計算當前batch的結束索引
            end_idx = start_idx + truncated_backprop_length
            
            #當前批次的原序列值
            batchx = x[:,start_idx:end_idx]
            #當前批次的回聲序列值
            batchy = y[:,start_idx:end_idx]
            
            #開始訓練當前批次樣本
            _total_loss,_train_step,_current_state,_predictions_series = sess.run(
                    [total_loss,train_step,current_state,predictions_series],
                    feed_dict = {
                            batch_x:batchx,
                            batch_y:batchy,
                            init_state:_current_state
                            })
    
            loss_list.append(_total_loss)        
        
            if batch_idx % 100 == 0:
                print('Step {0} Loss {1}'.format(batch_idx,_total_loss))
        #可視化輸出
        plot(loss_list,_predictions_series,batchx,batchy)
        #print(batchx)
        #print(batchy)                    
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM