一、簡介
上一篇中我們較為詳細地鋪墊了關於RNN及其變種LSTM的一些基本知識,也提到了LSTM在時間序列預測上優越的性能,本篇就將對如何利用tensorflow,在實際時間序列預測任務中搭建模型來完成任務,若你對RNN及LSTM不甚了解,請移步上一篇數據科學學習手札39;
二、數據說明及預處理
2.1 數據說明
我們本文使用到的第一個數據來自R中自帶的數據集AirPassengers,這個數據集記錄了Box & Jenkins航空公司1949-1960年共144個觀測值(對應每個月的國際航線乘客數),是一個經典的時間序列數據集,你可以從R中導出或去uci的網站下載;
2.2 數據預處理
我們都知道,RNN最終經由tanh激活后輸出的值位於[-1,1]內,若為分類任務則可以經由softmax進行處理,但我們這里要做的是對連續數值的預測,因此需要的輸出即為tanh的輸出,因此需要將原始數據進行尺度放縮,而尺度放縮的方法主要有兩種,一種是極差規格化,即將原數據通過下面的公式無損地映射到[0,1]之間:
另一種是標准化,將原數據通過下面的公式轉換為均值為0,標准差為1的服從正態分布的隨機變量:
我們這里選擇標准化(選極差規格化也可以,讀者們可以自己嘗試,我懶得寫了。。。);
三、模型建立及訓練
數據預處理部分:
這一部分,我們完成原始數據的導入和預處理,為了配合之后的采樣過程,這里選擇列表作為預處理后原始數據的儲存對象:
import numpy as np import tensorflow as tf from tensorflow.contrib import rnn import matplotlib.pyplot as plt from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat from matplotlib import style import pandas as pd '''讀入原始數據並轉為list''' path = 'C:\\Users\\windows\\Desktop\\' data = pd.read_csv(path+'AirPassenger.csv') data = data.iloc[:,0].tolist() '''自定義數據尺度縮放函數''' def data_processing(raw_data,scale=True): if scale == True: return (raw_data-np.mean(raw_data))/np.std(raw_data)#標准化 else: return (raw_data-np.min(raw_data))/(np.max(raw_data)-np.min(raw_data))#極差規格化
數據觀察部分:
這一部分,我們需要初步觀察到原數據的一些基本特性,以便確定之后的一些參數,如LSTM單元內一個時間步內的遞歸次數:
'''觀察數據''' '''設置繪圖風格''' style.use('ggplot') plt.plot(data)
可以看出,我們的數據集具有很明顯的周期性與上升趨勢,下面就基於此,對LSTM的一些基本參數進行設置;
LSTM基本參數設置:
這里我們需要設置的參數有隱層層數,因為數據集比較簡單,我們設置為1;隱層神經元個數,這里我隨意設置為40個;時間步中遞歸次數,這里根據上面觀察的結論,設置為12;訓練輪數,這里也是隨意設置的不宜過少,2000;訓練批尺寸,這里隨意設置為20,表示每一輪從訓練集中抽出20組序列樣本進行訓練:
'''設置隱層神經元個數''' HIDDEN_SIZE = 40 '''設置隱層層數''' NUM_LAYERS = 1 '''設置一個時間步中折疊的遞歸步數''' TIMESTEPS = 12 '''設置訓練輪數''' TRAINING_STEPS = 2000 '''設置訓練批尺寸''' BATCH_SIZE = 20
生成訓練集數據:
這里為了將原始的單變量時序數據處理成LSTM可以接受的數據類型(有X輸入,有真實標簽Y),我們通過自編函數,將原數據(144個)從第一個開始,依次采樣長度為12的連續序列作為一個時間步內部的輸入序列X,並采樣其之后一期的數據作為一個Y,具體過程如下:
'''樣本數據生成函數''' def generate_data(seq): X = []#初始化輸入序列X Y= []#初始化輸出序列Y '''生成連貫的時間序列類型樣本集,每一個X內的一行對應指定步長的輸入序列,Y內的每一行對應比X滯后一期的目標數值''' for i in range(len(seq) - TIMESTEPS - 1): X.append([seq[i:i + TIMESTEPS]])#從輸入序列第一期出發,等步長連續不間斷采樣 Y.append([seq[i + TIMESTEPS]])#對應每個X序列的滯后一期序列值 return np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32)
構造LSTM模型主體:
'''定義LSTM cell組件,該組件將在訓練過程中被不斷更新參數''' def LstmCell(): lstm_cell = rnn.BasicLSTMCell(HIDDEN_SIZE, state_is_tuple=True)# return lstm_cell '''定義LSTM模型''' def lstm_model(X, y): '''以前面定義的LSTM cell為基礎定義多層堆疊的LSTM,我們這里只有1層''' cell = rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)]) '''將已經堆疊起的LSTM單元轉化成動態的可在訓練過程中更新的LSTM單元''' output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32) '''根據預定義的每層神經元個數來生成隱層每個單元''' output = tf.reshape(output, [-1, HIDDEN_SIZE]) '''通過無激活函數的全連接層計算線性回歸,並將數據壓縮成一維數組結構''' predictions = tf.contrib.layers.fully_connected(output, 1, None) '''統一預測值與真實值的形狀''' labels = tf.reshape(y, [-1]) predictions = tf.reshape(predictions, [-1]) '''定義損失函數,這里為正常的均方誤差''' loss = tf.losses.mean_squared_error(predictions, labels) '''定義優化器各參數''' train_op = tf.contrib.layers.optimize_loss(loss, tf.contrib.framework.get_global_step(), optimizer='Adagrad', learning_rate=0.6) '''返回預測值、損失函數及優化器''' return predictions, loss, train_op '''載入tf中仿sklearn訓練方式的模塊''' learn = tf.contrib.learn '''初始化我們的LSTM模型,並保存到工作目錄下以方便進行增量學習''' regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='Models/model_2'))
訓練部分:
'''對原數據進行尺度縮放''' data = data_processing(data) '''將所有樣本來作為訓練樣本''' train_X, train_y = generate_data(data) '''將所有樣本作為測試樣本''' test_X, test_y = generate_data(data) '''以仿sklearn的形式訓練模型,這里指定了訓練批尺寸和訓練輪數''' regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS)
評價部分:
這里我們將原數據(尺度縮放之后的)feed進我們已經訓練好的模型中,得到對應的預測值:
'''利用已訓練好的LSTM模型,來生成對應測試集的所有預測值''' predicted = np.array([pred for pred in regressor.predict(test_X)]) '''繪制反標准化之前的真實值與預測值對比圖''' plt.plot(predicted, label='預測值') plt.plot(test_y, label='真實值') plt.title('反標准化之前') plt.legend() plt.show()
可以看到,預測值與真實值非常的吻合,但這並不是我們需要的形式,我們需要的是反標准化后的真實數值,下面進行相關操作;
'''自定義反標准化函數''' def scale_inv(raw_data,scale=True): '''讀入原始數據並轉為list''' path = 'C:\\Users\\windows\\Desktop\\' data = pd.read_csv(path + 'AirPassenger.csv') data = data.iloc[:, 0].tolist() if scale == True: return raw_data*np.std(data)+np.mean(data) else: return raw_data*(np.max(data)-np.min(data))+np.min(data) '''繪制反標准化之前的真實值與預測值對比圖''' plt.figure() plt.plot(scale_inv(predicted), label='預測值') plt.plot(scale_inv(test_y), label='真實值') plt.title('反標准化之后') plt.legend() plt.show()
實際使用中,若想利用已訓練好的LSTM模型來預測未出現的下一期,則直接輸入最后12步(這里是12步)即可得到未來的一步預測值,若想要獲得更遠更多期的預測值,則可以逐步將預測值積累起來,相當於用預測值當作真實發生的值進行預測,這樣的壞處是越往后可能越不准,以上這個過程的完整代碼如下:
import numpy as np import tensorflow as tf from tensorflow.contrib import rnn import matplotlib.pyplot as plt from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat from matplotlib import style import pandas as pd '''讀入原始數據並轉為list''' path = 'C:\\Users\\windows\\Desktop\\' data = pd.read_csv(path+'AirPassenger.csv') data = data.iloc[:,0].tolist() '''自定義數據尺度縮放函數''' def data_processing(raw_data,scale=True): if scale == True: return (raw_data-np.mean(raw_data))/np.std(raw_data)#標准化 else: return (raw_data-np.min(raw_data))/(np.max(raw_data)-np.min(raw_data))#極差規格化 '''觀察數據''' '''設置繪圖風格''' style.use('ggplot') plt.plot(data) '''設置隱層神經元個數''' HIDDEN_SIZE = 40 '''設置隱層層數''' NUM_LAYERS = 1 '''設置一個時間步中折疊的遞歸步數''' TIMESTEPS = 12 '''設置訓練輪數''' TRAINING_STEPS = 10000 '''設置訓練批尺寸''' BATCH_SIZE = 20 '''樣本數據生成函數''' def generate_data(seq): X = []#初始化輸入序列X Y= []#初始化輸出序列Y '''生成連貫的時間序列類型樣本集,每一個X內的一行對應指定步長的輸入序列,Y內的每一行對應比X滯后一期的目標數值''' for i in range(len(seq) - TIMESTEPS - 1): X.append([seq[i:i + TIMESTEPS]])#從輸入序列第一期出發,等步長連續不間斷采樣 Y.append([seq[i + TIMESTEPS]])#對應每個X序列的滯后一期序列值 return np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32) '''定義LSTM cell組件,該組件將在訓練過程中被不斷更新參數''' def LstmCell(): lstm_cell = rnn.BasicLSTMCell(HIDDEN_SIZE, state_is_tuple=True)# return lstm_cell '''定義LSTM模型''' def lstm_model(X, y): '''以前面定義的LSTM cell為基礎定義多層堆疊的LSTM,我們這里只有1層''' cell = rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)]) '''將已經堆疊起的LSTM單元轉化成動態的可在訓練過程中更新的LSTM單元''' output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32) '''根據預定義的每層神經元個數來生成隱層每個單元''' output = tf.reshape(output, [-1, HIDDEN_SIZE]) '''通過無激活函數的全連接層計算線性回歸,並將數據壓縮成一維數組結構''' predictions = tf.contrib.layers.fully_connected(output, 1, None) '''統一預測值與真實值的形狀''' labels = tf.reshape(y, [-1]) predictions = tf.reshape(predictions, [-1]) '''定義損失函數,這里為正常的均方誤差''' loss = tf.losses.mean_squared_error(predictions, labels) '''定義優化器各參數''' train_op = tf.contrib.layers.optimize_loss(loss, tf.contrib.framework.get_global_step(), optimizer='Adagrad', learning_rate=0.6) '''返回預測值、損失函數及優化器''' return predictions, loss, train_op '''載入tf中仿sklearn訓練方式的模塊''' learn = tf.contrib.learn '''初始化我們的LSTM模型,並保存到工作目錄下以方便進行增量學習''' regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='Models/model_2')) '''對原數據進行尺度縮放''' data = data_processing(data) '''將所有樣本來作為訓練樣本''' train_X, train_y = generate_data(data) '''將所有樣本作為測試樣本''' test_X, test_y = generate_data(data) '''以仿sklearn的形式訓練模型,這里指定了訓練批尺寸和訓練輪數''' regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS) '''利用已訓練好的LSTM模型,來生成對應測試集的所有預測值''' predicted = np.array([pred for pred in regressor.predict(test_X)]) '''繪制反標准化之前的真實值與預測值對比圖''' plt.figure() plt.plot(predicted, label='預測值') plt.plot(test_y, label='真實值') plt.title('反標准化之前') plt.legend() plt.show() '''自定義反標准化函數''' def scale_inv(raw_data,scale=True): '''讀入原始數據並轉為list''' path = 'C:\\Users\\windows\\Desktop\\' data = pd.read_csv(path + 'AirPassenger.csv') data = data.iloc[:, 0].tolist() if scale == True: return raw_data*np.std(data)+np.mean(data) else: return raw_data*(np.max(data)-np.min(data))+np.min(data) '''繪制反標准化之前的真實值與預測值對比圖''' plt.figure() plt.plot(scale_inv(predicted), label='預測值') plt.plot(scale_inv(test_y), label='真實值') plt.title('反標准化之后') plt.legend() plt.show()
以上就是本篇文章的全部內容,如有筆誤或混淆不清之處,望指出。