(數據科學學習手札40)tensorflow實現LSTM時間序列預測


一、簡介

  上一篇中我們較為詳細地鋪墊了關於RNN及其變種LSTM的一些基本知識,也提到了LSTM在時間序列預測上優越的性能,本篇就將對如何利用tensorflow,在實際時間序列預測任務中搭建模型來完成任務,若你對RNN及LSTM不甚了解,請移步上一篇數據科學學習手札39;

 

二、數據說明及預處理

2.1 數據說明

  我們本文使用到的第一個數據來自R中自帶的數據集AirPassengers,這個數據集記錄了Box & Jenkins航空公司1949-1960年共144個觀測值(對應每個月的國際航線乘客數),是一個經典的時間序列數據集,你可以從R中導出或去uci的網站下載;

 

2.2 數據預處理

  我們都知道,RNN最終經由tanh激活后輸出的值位於[-1,1]內,若為分類任務則可以經由softmax進行處理,但我們這里要做的是對連續數值的預測,因此需要的輸出即為tanh的輸出,因此需要將原始數據進行尺度放縮,而尺度放縮的方法主要有兩種,一種是極差規格化,即將原數據通過下面的公式無損地映射到[0,1]之間:

  另一種是標准化,將原數據通過下面的公式轉換為均值為0,標准差為1的服從正態分布的隨機變量:

我們這里選擇標准化(選極差規格化也可以,讀者們可以自己嘗試,我懶得寫了。。。);

 

三、模型建立及訓練

數據預處理部分:

  這一部分,我們完成原始數據的導入和預處理,為了配合之后的采樣過程,這里選擇列表作為預處理后原始數據的儲存對象:

import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import matplotlib.pyplot as plt
from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat
from matplotlib import style
import pandas as pd

'''讀入原始數據並轉為list'''
path = 'C:\\Users\\windows\\Desktop\\'

data = pd.read_csv(path+'AirPassenger.csv')

data = data.iloc[:,0].tolist()

'''自定義數據尺度縮放函數'''
def data_processing(raw_data,scale=True):
    if scale == True:
        return (raw_data-np.mean(raw_data))/np.std(raw_data)#標准化
    else:
        return (raw_data-np.min(raw_data))/(np.max(raw_data)-np.min(raw_data))#極差規格化

 

數據觀察部分:

  這一部分,我們需要初步觀察到原數據的一些基本特性,以便確定之后的一些參數,如LSTM單元內一個時間步內的遞歸次數:

'''觀察數據'''

'''設置繪圖風格'''
style.use('ggplot')

plt.plot(data)

可以看出,我們的數據集具有很明顯的周期性與上升趨勢,下面就基於此,對LSTM的一些基本參數進行設置;

LSTM基本參數設置:

  這里我們需要設置的參數有隱層層數,因為數據集比較簡單,我們設置為1;隱層神經元個數,這里我隨意設置為40個;時間步中遞歸次數,這里根據上面觀察的結論,設置為12;訓練輪數,這里也是隨意設置的不宜過少,2000;訓練批尺寸,這里隨意設置為20,表示每一輪從訓練集中抽出20組序列樣本進行訓練:

'''設置隱層神經元個數'''
HIDDEN_SIZE = 40
'''設置隱層層數'''
NUM_LAYERS = 1
'''設置一個時間步中折疊的遞歸步數'''
TIMESTEPS = 12
'''設置訓練輪數'''
TRAINING_STEPS = 2000
'''設置訓練批尺寸'''
BATCH_SIZE = 20

 

生成訓練集數據:

  這里為了將原始的單變量時序數據處理成LSTM可以接受的數據類型(有X輸入,有真實標簽Y),我們通過自編函數,將原數據(144個)從第一個開始,依次采樣長度為12的連續序列作為一個時間步內部的輸入序列X,並采樣其之后一期的數據作為一個Y,具體過程如下:

'''樣本數據生成函數'''
def generate_data(seq):
    X = []#初始化輸入序列X
    Y= []#初始化輸出序列Y
    '''生成連貫的時間序列類型樣本集,每一個X內的一行對應指定步長的輸入序列,Y內的每一行對應比X滯后一期的目標數值'''
    for i in range(len(seq) - TIMESTEPS - 1):
        X.append([seq[i:i + TIMESTEPS]])#從輸入序列第一期出發,等步長連續不間斷采樣
        Y.append([seq[i + TIMESTEPS]])#對應每個X序列的滯后一期序列值
    return np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32)

 

構造LSTM模型主體

'''定義LSTM cell組件,該組件將在訓練過程中被不斷更新參數'''
def LstmCell():
    lstm_cell = rnn.BasicLSTMCell(HIDDEN_SIZE, state_is_tuple=True)#
    return lstm_cell

'''定義LSTM模型'''
def lstm_model(X, y):
    '''以前面定義的LSTM cell為基礎定義多層堆疊的LSTM,我們這里只有1層'''
    cell = rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)])

    '''將已經堆疊起的LSTM單元轉化成動態的可在訓練過程中更新的LSTM單元'''
    output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)

    '''根據預定義的每層神經元個數來生成隱層每個單元'''
    output = tf.reshape(output, [-1, HIDDEN_SIZE])

    '''通過無激活函數的全連接層計算線性回歸,並將數據壓縮成一維數組結構'''
    predictions = tf.contrib.layers.fully_connected(output, 1, None)

    '''統一預測值與真實值的形狀'''
    labels = tf.reshape(y, [-1])
    predictions = tf.reshape(predictions, [-1])

    '''定義損失函數,這里為正常的均方誤差'''
    loss = tf.losses.mean_squared_error(predictions, labels)

    '''定義優化器各參數'''
    train_op = tf.contrib.layers.optimize_loss(loss,
                                               tf.contrib.framework.get_global_step(),
                                               optimizer='Adagrad',
                                               learning_rate=0.6)
    '''返回預測值、損失函數及優化器'''
    return predictions, loss, train_op

'''載入tf中仿sklearn訓練方式的模塊'''
learn = tf.contrib.learn

'''初始化我們的LSTM模型,並保存到工作目錄下以方便進行增量學習'''
regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='Models/model_2'))

 

訓練部分:

'''對原數據進行尺度縮放'''
data = data_processing(data)

'''將所有樣本來作為訓練樣本'''
train_X, train_y = generate_data(data)

'''將所有樣本作為測試樣本'''
test_X, test_y = generate_data(data)

'''以仿sklearn的形式訓練模型,這里指定了訓練批尺寸和訓練輪數'''
regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS)

 

評價部分:

  這里我們將原數據(尺度縮放之后的)feed進我們已經訓練好的模型中,得到對應的預測值:

'''利用已訓練好的LSTM模型,來生成對應測試集的所有預測值'''
predicted = np.array([pred for pred in regressor.predict(test_X)])

'''繪制反標准化之前的真實值與預測值對比圖'''
plt.plot(predicted, label='預測值')
plt.plot(test_y, label='真實值')
plt.title('反標准化之前')
plt.legend()
plt.show()

可以看到,預測值與真實值非常的吻合,但這並不是我們需要的形式,我們需要的是反標准化后的真實數值,下面進行相關操作;

'''自定義反標准化函數'''
def scale_inv(raw_data,scale=True):
    '''讀入原始數據並轉為list'''
    path = 'C:\\Users\\windows\\Desktop\\'

    data = pd.read_csv(path + 'AirPassenger.csv')

    data = data.iloc[:, 0].tolist()

    if scale == True:
        return raw_data*np.std(data)+np.mean(data)
    else:
        return raw_data*(np.max(data)-np.min(data))+np.min(data)


'''繪制反標准化之前的真實值與預測值對比圖'''
plt.figure()
plt.plot(scale_inv(predicted), label='預測值')
plt.plot(scale_inv(test_y), label='真實值')
plt.title('反標准化之后')
plt.legend()
plt.show()

實際使用中,若想利用已訓練好的LSTM模型來預測未出現的下一期,則直接輸入最后12步(這里是12步)即可得到未來的一步預測值,若想要獲得更遠更多期的預測值,則可以逐步將預測值積累起來,相當於用預測值當作真實發生的值進行預測,這樣的壞處是越往后可能越不准,以上這個過程的完整代碼如下:

import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import matplotlib.pyplot as plt
from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat
from matplotlib import style
import pandas as pd

'''讀入原始數據並轉為list'''
path = 'C:\\Users\\windows\\Desktop\\'

data = pd.read_csv(path+'AirPassenger.csv')

data = data.iloc[:,0].tolist()

'''自定義數據尺度縮放函數'''
def data_processing(raw_data,scale=True):
    if scale == True:
        return (raw_data-np.mean(raw_data))/np.std(raw_data)#標准化
    else:
        return (raw_data-np.min(raw_data))/(np.max(raw_data)-np.min(raw_data))#極差規格化

'''觀察數據'''

'''設置繪圖風格'''
style.use('ggplot')

plt.plot(data)

'''設置隱層神經元個數'''
HIDDEN_SIZE = 40
'''設置隱層層數'''
NUM_LAYERS = 1
'''設置一個時間步中折疊的遞歸步數'''
TIMESTEPS = 12
'''設置訓練輪數'''
TRAINING_STEPS = 10000
'''設置訓練批尺寸'''
BATCH_SIZE = 20




'''樣本數據生成函數'''
def generate_data(seq):
    X = []#初始化輸入序列X
    Y= []#初始化輸出序列Y
    '''生成連貫的時間序列類型樣本集,每一個X內的一行對應指定步長的輸入序列,Y內的每一行對應比X滯后一期的目標數值'''
    for i in range(len(seq) - TIMESTEPS - 1):
        X.append([seq[i:i + TIMESTEPS]])#從輸入序列第一期出發,等步長連續不間斷采樣
        Y.append([seq[i + TIMESTEPS]])#對應每個X序列的滯后一期序列值
    return np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32)


'''定義LSTM cell組件,該組件將在訓練過程中被不斷更新參數'''
def LstmCell():
    lstm_cell = rnn.BasicLSTMCell(HIDDEN_SIZE, state_is_tuple=True)#
    return lstm_cell

'''定義LSTM模型'''
def lstm_model(X, y):
    '''以前面定義的LSTM cell為基礎定義多層堆疊的LSTM,我們這里只有1層'''
    cell = rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)])

    '''將已經堆疊起的LSTM單元轉化成動態的可在訓練過程中更新的LSTM單元'''
    output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)

    '''根據預定義的每層神經元個數來生成隱層每個單元'''
    output = tf.reshape(output, [-1, HIDDEN_SIZE])

    '''通過無激活函數的全連接層計算線性回歸,並將數據壓縮成一維數組結構'''
    predictions = tf.contrib.layers.fully_connected(output, 1, None)

    '''統一預測值與真實值的形狀'''
    labels = tf.reshape(y, [-1])
    predictions = tf.reshape(predictions, [-1])

    '''定義損失函數,這里為正常的均方誤差'''
    loss = tf.losses.mean_squared_error(predictions, labels)

    '''定義優化器各參數'''
    train_op = tf.contrib.layers.optimize_loss(loss,
                                               tf.contrib.framework.get_global_step(),
                                               optimizer='Adagrad',
                                               learning_rate=0.6)
    '''返回預測值、損失函數及優化器'''
    return predictions, loss, train_op

'''載入tf中仿sklearn訓練方式的模塊'''
learn = tf.contrib.learn

'''初始化我們的LSTM模型,並保存到工作目錄下以方便進行增量學習'''
regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='Models/model_2'))

'''對原數據進行尺度縮放'''
data = data_processing(data)

'''將所有樣本來作為訓練樣本'''
train_X, train_y = generate_data(data)

'''將所有樣本作為測試樣本'''
test_X, test_y = generate_data(data)

'''以仿sklearn的形式訓練模型,這里指定了訓練批尺寸和訓練輪數'''
regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS)

'''利用已訓練好的LSTM模型,來生成對應測試集的所有預測值'''
predicted = np.array([pred for pred in regressor.predict(test_X)])

'''繪制反標准化之前的真實值與預測值對比圖'''
plt.figure()
plt.plot(predicted, label='預測值')
plt.plot(test_y, label='真實值')
plt.title('反標准化之前')
plt.legend()
plt.show()


'''自定義反標准化函數'''
def scale_inv(raw_data,scale=True):
    '''讀入原始數據並轉為list'''
    path = 'C:\\Users\\windows\\Desktop\\'

    data = pd.read_csv(path + 'AirPassenger.csv')

    data = data.iloc[:, 0].tolist()

    if scale == True:
        return raw_data*np.std(data)+np.mean(data)
    else:
        return raw_data*(np.max(data)-np.min(data))+np.min(data)


'''繪制反標准化之前的真實值與預測值對比圖'''
plt.figure()
plt.plot(scale_inv(predicted), label='預測值')
plt.plot(scale_inv(test_y), label='真實值')
plt.title('反標准化之后')
plt.legend()
plt.show()

 

  以上就是本篇文章的全部內容,如有筆誤或混淆不清之處,望指出。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM