這一節使用TensorFlow中的函數搭建一個簡單的RNN網絡,使用一串隨機的模擬數據作為原始信號,讓RNN網絡來擬合其對應的回聲信號。
樣本數據為一串隨機的由0,1組成的數字,將其當成發射出去的一串信號。當碰到阻擋被反彈回來時,會收到原始信號的回聲。
如果步長為3,那么輸入和輸出的序列如下圖所示:
原序列 | 0 | 1 | 1 | 0 | 1 | 0 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | 1 |
回聲序列 | null | null | null | 0 | 1 | 1 | 0 | 1 | 0 | 1 | 1 | 0 | 0 | 1 | 1 |
如上表所示,回聲序列的前三項是null,原序列的第一個信號為0,對應的是回聲序列的第四項,即回聲序列的每一個數都比原序列滯后3個時序。本例的任務就是把序列截取出來,對於每個原序列來預測它的回聲序列。。
構建的網絡結構如下圖所示:
上圖中,初始的輸入有5個,xt個為t時刻輸入序列值,另外4個為t-1時刻隱藏層的輸出值ht-1。通過一層具有4個節點的RNN網絡,再接一個全連接輸出兩個類別,分別表示輸出0,和1類別的概率。這樣每個序列都會有一個對應的預測分類值,最終將整個序列生成了預測序列。
下面我們會演示一個例子,這里隨機生成一個具有50000個序列樣本數據,然后根據原序列生成50000個回聲序列樣本數據。我們每個訓練截取15個序列作為一個樣本,我們設置小批量大小batch_size為5。
- 我們把50000個序列,轉換為5x10000的數組。
- 對數組的每一行按長度為15進行分割,每一個小批量含有5x15個序列。
- 針對每一小批量的序列,我們使用RNN網絡開始迭代,迭代每一個批次中的每一組序列(5x1)。
注意這里面的5就是我們設置的batch_size大小,這和我們之前在CNN以及DNN網絡中的batch_size是一樣的,即一次訓練使用batch_size個樣本。
下面是一個小批量的原序列數據和回聲序列數據,這里回聲序列的前三個序列值是無效的,這主要是與我們原序列切割方式有關的。
一 定義參數並生成樣本數據
np.random.seed(0) ''' 一 定義參數生成樣本數據 ''' num_epochs = 5 #迭代輪數 total_series_length = 50000 #序列樣本數據長度 truncated_backprop_length = 15 #測試時截取數據長度 state_size = 4 #中間狀態長度 num_classes = 2 #輸出類別個數 echo_step = 3 #回聲步長 batch_size = 5 #小批量大小 learning_rate = 0.4 #學習率 num_batches =total_series_length//batch_size//truncated_backprop_length #計算一輪可以分為多少批 def generate_date(): ''' 生成原序列和回聲序列數據,回聲序列滯后原序列echo_step個步長 返回原序列和回聲序列組成的元組 ''' #生成原序列樣本數據 random.choice()隨機選取內容從0和1中選取total_series_length個數據,0,1數據的概率都是0.5 x = np.array(np.random.choice(2,total_series_length,p=[0.5,0.5])) #向右循環移位 如11110000->00011110 y =np.roll(x,echo_step) #回聲序列,前echo_step個數據清0 y[0:echo_step] = 0 x = x.reshape((batch_size,-1)) #5x10000 #print(x) y = y.reshape((batch_size,-1)) #5x10000 #print(y) return (x,y)
二 定義占位符處理輸入數據
定義三個占位符,batch_x為原始序列,batch_y為回聲序列真實值,init_state為循環節點的初始值。batch_x是逐個輸入網絡的,所以需要將輸進去的數據打散,按照時間序列變成15個數組,每個數組有batch_size個元素,進行統一批處理。
''' 二 定義占位符處理輸入數據 ''' batch_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,truncated_backprop_length]) #原始序列 batch_y = tf.placeholder(dtype=tf.int32,shape=[batch_size,truncated_backprop_length]) #回聲序列 作為標簽 init_state = tf.placeholder(dtype=tf.float32,shape=[batch_size,state_size]) #循環節點的初始狀態值 #將batch_x沿axis = 1(列)的軸進行拆分 返回一個list 每個元素都是一個數組 [(5,),(5,)....] 一共15個元素,即15個序列 inputs_series = tf.unstack(batch_x,axis=1) labels_series = tf.unstack(batch_y,axis=1)
三 定義網絡結構
定義一層循環與一層全網絡連接。由於數據是一個二維數組序列,所以需要通過循環將輸入數據按照原有序列逐個輸入網絡,並輸出對應的predictions序列,同樣的,對於每個序列值都要對其做loss計算,在loss計算使用了spare_softmax_cross_entropy_with_logits函數,因為label的最大值正好是1,而且是一位的,就不需要在使用one_hot編碼了,最終將所有的loss均值放入優化器中。
''' 三 定義RNN網絡結構 一個輸入樣本由15個輸入序列組成 一個小批量包含5個輸入樣本 ''' current_state = init_state #存放當前的狀態 predictions_series = [] #存放一個小批量中每個輸入樣本的預測序列值 每個元素為5x2 共有15個元素 losses = [] #存放一個小批量中每個輸入樣本訓練的損失值 每個元素是一個標量,共有15個元素 #使用一個循環,按照序列逐個輸入 for current_input,labels in zip(inputs_series,labels_series): #確定形狀為batch_size x 1 current_input = tf.reshape(current_input,[batch_size,1]) ''' 加入初始狀態 5 x 1序列值和 5 x 4中間狀態 按列連接,得到 5 x 5數組 構成輸入數據 ''' input_and_state_concatenated = tf.concat([current_input,current_state],1) #隱藏層激活函數選擇tanh 5x4 next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated,state_size,activation_fn = tf.tanh) current_state = next_state #輸出層 激活函數選擇None,即直接輸出 5x2 logits = tf.contrib.layers.fully_connected(next_state,num_classes,activation_fn = None) #計算代價 loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits = logits)) losses.append(loss) #經過softmax計算預測值 5x2 注意這里並不是標簽值 這里是one_hot編碼 predictions = tf.nn.softmax(logits) predictions_series.append(predictions) total_loss = tf.reduce_mean(losses) train_step = tf.train.AdagradOptimizer(learning_rate).minimize(total_loss)
四 建立session訓練數據並可視化輸出
建立session,初始化RNN循環節點的值為0。總樣本迭代5輪,每一輪迭代完調用plot函數生成圖像。
''' 四 建立session訓練數據 ''' with tf.Session() as sess: sess.run(tf.global_variables_initializer()) loss_list = [] #list 存放每一小批量的代價值 #開始迭代每一輪 for epoch_idx in range(num_epochs): #生成原序列和回聲序列數據 x,y = generate_date() #初始化循環節點狀態值 _current_state = np.zeros((batch_size,state_size)) print('New date,epoch',epoch_idx) #迭代每一小批量 for batch_idx in range(num_batches): #計算當前batch的起始索引 start_idx = batch_idx * truncated_backprop_length #計算當前batch的結束索引 end_idx = start_idx + truncated_backprop_length #當前批次的原序列值 batchx = x[:,start_idx:end_idx] #當前批次的回聲序列值 batchy = y[:,start_idx:end_idx] #開始訓練當前批次樣本 _total_loss,_train_step,_current_state,_predictions_series = sess.run( [total_loss,train_step,current_state,predictions_series], feed_dict = { batch_x:batchx, batch_y:batchy, init_state:_current_state }) loss_list.append(_total_loss) if batch_idx % 100 == 0: print('Step {0} Loss {1}'.format(batch_idx,_total_loss)) #可視化輸出 plot(loss_list,_predictions_series,batchx,batchy) #print(batchx) #print(batchy)
def plot(loss_list, predictions_series, batchx, batchy): ''' 繪制一個小批量中每一個原序列樣本,回聲序列樣本,預測序列樣本圖像 args: loss_list:list 存放每一個批次訓練的代價值 predictions_series:list長度為5 存放一個批次中每個輸入序列的預測序列值 注意這里每個元素(5x2)都是one_hot編碼 batchx:當前批次的原序列 5x15 batchy:當前批次的回聲序列 5x15 ''' plt.figure(figsize=(3.2*3,2.4*2)) #創建子圖 2行3列選擇第一個 繪制代價值 plt.subplot(2, 3, 1) plt.cla() plt.plot(loss_list) #迭代每一個序列 循環5次 for batch_series_idx in range(batch_size): #獲取第batch_series_idx個序列的預測值(one_hot編碼) 15x2 one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :] #轉換為標簽值 (15,) single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series]) #繪制第batch_series_idx + 2個子圖 plt.subplot(2, 3, batch_series_idx + 2) plt.cla() #設置x軸 y軸坐標值范圍 plt.axis([0, truncated_backprop_length, 0, 2]) #獲取原序列x坐標值 left_offset = range(truncated_backprop_length) #獲取回聲序列x坐標值 滯后3個步長 left_offset2 = range(echo_step,truncated_backprop_length + echo_step) label1 = "past values" label2 = "True echo values" label3 = "Predictions" #繪制原序列 plt.plot(left_offset2, batchx[batch_series_idx, :]*0.2+1.5, "o--b", label=label1) #繪制真實回聲序列 plt.plot(left_offset, batchy[batch_series_idx, :]*0.2+0.8,"x--b", label=label2) #繪制預測回聲序列 plt.plot(left_offset, single_output_series*0.2+0.1 , "o--r", label=label3) plt.legend(loc='best') plt.show()
函數中將輸入的原序列,回聲序列和預測的序列同時輸出在圖像中。按照小批量樣本的個數生成圖像。為了讓三個序列看起來更明顯,將其縮放0.2,並且調節每個圖像的高度。同時將原始序列在顯示中滯后echo_step個序列,將三個圖像放在同一序列順序比較。
如上圖,最下面的是預測的序列,中間的為回聲序列,從圖像上可以看出預測序列和回聲序列幾乎相同,表明RNN網絡已經完全可以學習到回聲的規則。
完整代碼:

# -*- coding: utf-8 -*- """ Created on Tue May 8 08:45:40 2018 @author: zy """ ''' 使用RNN網絡擬合回聲信號序列 使用一串隨機的模擬數據作為原始信號,讓RNN網絡來擬合其對應的回聲信號 樣本數據為一串隨機的由0,1組成的數字,將其當成發射出去的一串信號。當碰到阻擋被反彈回來時,會收到原始信號的回音 如果步長為3,那么輸入和輸出的序列序列如下: 原序列 0 1 1 | 0 1 0 ..... 1 回聲序列 null null null | 0 1 1 ..... 0 ''' import tensorflow as tf import numpy as np import matplotlib.pyplot as plt np.random.seed(0) ''' 一 定義參數生成樣本數據 ''' num_epochs = 5 #迭代輪數 total_series_length = 50000 #序列樣本數據長度 truncated_backprop_length = 15 #測試時截取數據長度 state_size = 4 #中間狀態長度 num_classes = 2 #輸出類別個數 echo_step = 3 #回聲步長 batch_size = 5 #小批量大小 learning_rate = 0.4 #學習率 num_batches =total_series_length//batch_size//truncated_backprop_length #計算一輪可以分為多少批 def generate_date(): ''' 生成原序列和回聲序列數據,回聲序列滯后原序列echo_step個步長 返回原序列和回聲序列組成的元組 ''' #生成原序列樣本數據 random.choice()隨機選取內容從0和1中選取total_series_length個數據,0,1數據的概率都是0.5 x = np.array(np.random.choice(2,total_series_length,p=[0.5,0.5])) #向右循環移位 如11110000->00011110 y =np.roll(x,echo_step) #回聲序列,前echo_step個數據清0 y[0:echo_step] = 0 x = x.reshape((batch_size,-1)) #5x10000 #print(x) y = y.reshape((batch_size,-1)) #5x10000 #print(y) return (x,y) ''' 二 定義占位符處理輸入數據 ''' batch_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,truncated_backprop_length]) #原始序列 batch_y = tf.placeholder(dtype=tf.int32,shape=[batch_size,truncated_backprop_length]) #回聲序列 作為標簽 init_state = tf.placeholder(dtype=tf.float32,shape=[batch_size,state_size]) #循環節點的初始狀態值 #將batch_x沿axis = 1(列)的軸進行拆分 返回一個list 每個元素都是一個數組 [(5,),(5,)....] 一共15個元素,即15個序列 inputs_series = tf.unstack(batch_x,axis=1) labels_series = tf.unstack(batch_y,axis=1) ''' 三 定義RNN網絡結構 一個輸入樣本由15個輸入序列組成 一個小批量包含5個輸入樣本 ''' current_state = init_state #存放當前的狀態 predictions_series = [] #存放一個小批量中每個輸入樣本的預測序列值 每個元素為5x2 共有15個元素 losses = [] #存放一個小批量中每個輸入樣本訓練的損失值 每個元素是一個標量,共有15個元素 #使用一個循環,按照序列逐個輸入 for current_input,labels in zip(inputs_series,labels_series): #確定形狀為batch_size x 1 current_input = tf.reshape(current_input,[batch_size,1]) ''' 加入初始狀態 5 x 1序列值和 5 x 4中間狀態 按列連接,得到 5 x 5數組 構成輸入數據 ''' input_and_state_concatenated = tf.concat([current_input,current_state],1) #隱藏層激活函數選擇tanh 5x4 next_state = tf.contrib.layers.fully_connected(input_and_state_concatenated,state_size,activation_fn = tf.tanh) current_state = next_state #輸出層 激活函數選擇None,即直接輸出 5x2 logits = tf.contrib.layers.fully_connected(next_state,num_classes,activation_fn = None) #計算代價 loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels,logits = logits)) losses.append(loss) #經過softmax計算預測值 5x2 注意這里並不是標簽值 這里是one_hot編碼 predictions = tf.nn.softmax(logits) predictions_series.append(predictions) total_loss = tf.reduce_mean(losses) train_step = tf.train.AdagradOptimizer(learning_rate).minimize(total_loss) def plot(loss_list, predictions_series, batchx, batchy): ''' 繪制一個小批量中每一個原序列樣本,回聲序列樣本,預測序列樣本圖像 args: loss_list:list 存放每一個批次訓練的代價值 predictions_series:list長度為5 存放一個批次中每個輸入序列的預測序列值 注意這里每個元素(5x2)都是one_hot編碼 batchx:當前批次的原序列 5x15 batchy:當前批次的回聲序列 5x15 ''' plt.figure(figsize=(3.2*3,2.4*2)) #創建子圖 2行3列選擇第一個 繪制代價值 plt.subplot(2, 3, 1) plt.cla() plt.plot(loss_list) #迭代每一個序列 循環5次 for batch_series_idx in range(batch_size): #獲取第batch_series_idx個序列的預測值(one_hot編碼) 15x2 one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :] #轉換為標簽值 (15,) single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series]) #繪制第batch_series_idx + 2個子圖 plt.subplot(2, 3, batch_series_idx + 2) plt.cla() #設置x軸 y軸坐標值范圍 plt.axis([0, truncated_backprop_length, 0, 2]) #獲取原序列x坐標值 left_offset = range(truncated_backprop_length) #獲取回聲序列x坐標值 滯后3個步長 left_offset2 = range(echo_step,truncated_backprop_length + echo_step) label1 = "past values" label2 = "True echo values" label3 = "Predictions" #繪制原序列 plt.plot(left_offset2, batchx[batch_series_idx, :]*0.2+1.5, "o--b", label=label1) #繪制真實回聲序列 plt.plot(left_offset, batchy[batch_series_idx, :]*0.2+0.8,"x--b", label=label2) #繪制預測回聲序列 plt.plot(left_offset, single_output_series*0.2+0.1 , "o--r", label=label3) plt.legend(loc='best') plt.show() ''' 四 建立session訓練數據 ''' with tf.Session() as sess: sess.run(tf.global_variables_initializer()) loss_list = [] #list 存放每一小批量的代價值 #開始迭代每一輪 for epoch_idx in range(num_epochs): #生成原序列和回聲序列數據 x,y = generate_date() #初始化循環節點狀態值 _current_state = np.zeros((batch_size,state_size)) print('New date,epoch',epoch_idx) #迭代每一小批量 for batch_idx in range(num_batches): #計算當前batch的起始索引 start_idx = batch_idx * truncated_backprop_length #計算當前batch的結束索引 end_idx = start_idx + truncated_backprop_length #當前批次的原序列值 batchx = x[:,start_idx:end_idx] #當前批次的回聲序列值 batchy = y[:,start_idx:end_idx] #開始訓練當前批次樣本 _total_loss,_train_step,_current_state,_predictions_series = sess.run( [total_loss,train_step,current_state,predictions_series], feed_dict = { batch_x:batchx, batch_y:batchy, init_state:_current_state }) loss_list.append(_total_loss) if batch_idx % 100 == 0: print('Step {0} Loss {1}'.format(batch_idx,_total_loss)) #可視化輸出 plot(loss_list,_predictions_series,batchx,batchy) #print(batchx) #print(batchy)