PyTorch學習筆記9--案例4,5: Pytorch LSTM 時間序列預測


時間序列預測案例一: 正弦波

PyTorch 官方給出了時間序列的預測案例:
https://github.com/pytorch/examples/tree/master/time_sequence_prediction

這是一個初學者上手的例子。它有助於學習pytorch和時間序列預測。本例中使用兩個LSTMCell單元來學習從不同相位開始的一些正弦波信號。在學習了正弦波之后,網絡試圖預測未來的信號值。結果如下圖所示。

初始信號和預測結果如圖所示。我們首先給出一些初始信號(實線)。網絡隨后將給出一些預測結果(虛線)。可以得出結論,該網絡可以進行時間序列的預測。

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib
# Non-interactive backend, you can't call plt.show() to see the figure interactively
# matplotlib.use('Agg') must be placed before import matplotlib.pyplot
matplotlib.use('Agg') 
import matplotlib.pyplot as plt

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def generateSineWave(): 
    np.random.seed(2)
    T = 20
    L = 1000
    N = 100
    x = np.empty((N, L), 'int64') #the dataset has 100 items and each item's length is 1000
    x[:] = np.array(range(L)) + np.random.randint(-4 * T, 4 * T, N).reshape(N, 1)
    data = np.sin(x / 1.0 / T).astype('float64')
    torch.save(data, open('traindata.pt', 'wb'))

class Sequence(nn.Module):
    def __init__(self):
        super(Sequence, self).__init__()
        self.lstm1 = nn.LSTMCell(1, 51)
        self.lstm2 = nn.LSTMCell(51, 51)
        self.linear = nn.Linear(51, 1)

    def forward(self, input, future = 0):
        outputs = []
        h_t = torch.zeros(input.size(0), 51, dtype=torch.double)
        c_t = torch.zeros(input.size(0), 51, dtype=torch.double)
        h_t2 = torch.zeros(input.size(0), 51, dtype=torch.double)
        c_t2 = torch.zeros(input.size(0), 51, dtype=torch.double)

        h_t = h_t.to(device)
        c_t = c_t.to(device)
        h_t2 = h_t2.to(device)
        c_t2 = c_t2.to(device)

        for i, input_t in enumerate(input.chunk(input.size(1), dim=1)):
            h_t, c_t = self.lstm1(input_t, (h_t, c_t))
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
            output = self.linear(h_t2)  # output.shape:[batch,1]
            outputs += [output] # outputs.shape:[[batch,1],...[batch,1]], list composed of n [batch,1],
        for i in range(future):# if we should predict the future
            h_t, c_t = self.lstm1(output, (h_t, c_t)) 
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
            output = self.linear(h_t2) # output.shape:[batch,1]
            outputs += [output]  # outputs.shape:[[batch,1],...[batch,1]], list composed of n [batch,1],
        outputs = torch.stack(outputs, 1).squeeze(2) # shape after stack:[batch, n, 1], shape after squeeze: [batch,n]
        return outputs


if __name__ == '__main__':
    # 1. generate sine wave data
    generateSineWave()
    # set random seed to 0
    np.random.seed(0)
    torch.manual_seed(0)
    # load data and make training set
    data = torch.load('traindata.pt')
    input = torch.from_numpy(data[3:, :-1])
    target = torch.from_numpy(data[3:, 1:])
    test_input = torch.from_numpy(data[:3, :-1])
    test_target = torch.from_numpy(data[:3, 1:])
    input = input.to(device)
    target = target.to(device)
    test_input = test_input.to(device)
    test_target = test_target.to(device)
    # 2. build the model
    seq = Sequence()
    seq.double()
    print(seq)
    # move to cuda
    # if torch.cuda.device_count()>1:
    #     seq = nn.DataParallel(seq)
    seq = seq.to(device)
    
    # 3 loss function
    criterion = nn.MSELoss()
    # 4 use LBFGS as optimizer since we can load the whole data to train
    optimizer = optim.LBFGS(seq.parameters(), lr=0.8)
    # 5 begin to train
    for i in range(1):
        print('STEP: ', i)
        def closure():
            # forward
            out = seq(input)
            loss = criterion(out, target)
            print('loss:', loss.item())
            # backward
            optimizer.zero_grad()
            loss.backward()
            return loss
        optimizer.step(closure)
        # begin to predict, no need to track gradient here
        with torch.no_grad():
            future = 1000
            pred = seq(test_input, future=future)
            loss = criterion(pred[:, :-future], test_target)
            print('test loss:', loss.item())

            y = pred.detach().cpu()
            y = y.numpy()
        # draw the result
        plt.figure(figsize=(30,10))
        plt.title('Predict future values for time sequences\n(Dashlines are predicted values)', fontsize=30)
        plt.xlabel('x', fontsize=20)
        plt.ylabel('y', fontsize=20)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)
        def draw(yi, color):
            plt.plot(np.arange(input.size(1)), yi[:input.size(1)], color, linewidth = 2.0)
            plt.plot(np.arange(input.size(1), input.size(1) + future), yi[input.size(1):], color + ':', linewidth = 2.0)
        draw(y[0], 'r')
        draw(y[1], 'g')
        draw(y[2], 'b')
        plt.savefig('predict%d.pdf'%i)
        plt.close()

時間序列預測案例二: 股票預測

原文地址: https://www.7forz.com/3319/

學習使用 LSTM 來預測時間序列,本文中使用上證指數的收盤價。

首先用 tushare 下載上證指數的K線數據,然后作標准化處理。

import numpy as np
import tushare as ts
data_close = ts.get_k_data('000001', start='2018-01-01', index=True)['close'].values  # 獲取上證指數從20180101開始的收盤價的np.ndarray
data_close = data_close.astype('float32')  # 轉換數據類型
# 將價格標准化到0~1
max_value = np.max(data_close)
min_value = np.min(data_close)
data_close = (data_close - min_value) / (max_value - min_value)

原始數據:上證指數從2018-01-01到2019-05-24的收盤價(未標准化處理)

把K線數據進行分割,每 DAYS_FOR_TRAIN 個收盤價對應 1 個未來的收盤價。例如K線為 [1,2,3,4,5], DAYS_FOR_TRAIN=3,那么將會生成2組數據:

第1組的輸入是 [1,2,3],對應輸出 4;

第2組的輸入是 [2,3,4],對應輸出 5。

然后只使用前70%的數據用於訓練,剩下的不用,用來與實際數據進行對比。

DAYS_FOR_TRAIN = 10

def create_dataset(data, days_for_train=5) -> (np.array, np.array):
    """
        根據給定的序列data,生成數據集
        
        數據集分為輸入和輸出,每一個輸入的長度為days_for_train,每一個輸出的長度為1。
        也就是說用days_for_train天的數據,對應下一天的數據。

        若給定序列的長度為d,將輸出長度為(d-days_for_train+1)個輸入/輸出對
    """
    dataset_x, dataset_y= [], []
    for i in range(len(data)-days_for_train):
        _x = data[i:(i+days_for_train)]
        dataset_x.append(_x)
        dataset_y.append(data[i+days_for_train])
    return (np.array(dataset_x), np.array(dataset_y))

dataset_x, dataset_y = create_dataset(data_close, DAYS_FOR_TRAIN)

# 划分訓練集和測試集,70%作為訓練集
train_size = int(len(dataset_x) * 0.7)

train_x = dataset_x[:train_size]
train_y = dataset_y[:train_size]

# 將數據改變形狀,RNN 讀入的數據維度是 (seq_size, batch_size, feature_size)
train_x = train_x.reshape(-1, 1, DAYS_FOR_TRAIN)
train_y = train_y.reshape(-1, 1, 1)


# 轉為pytorch的tensor對象
train_x = torch.from_numpy(train_x)
train_y = torch.from_numpy(train_y)

定義網絡、優化器、loss函數

import torch
from torch import nn
class LSTM_Regression(nn.Module):
    """
        使用LSTM進行回歸
        
        參數:
        - input_size: feature size
        - hidden_size: number of hidden units
        - output_size: number of output
        - num_layers: layers of LSTM to stack
    """
    def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, _x):
        x, _ = self.lstm(_x)  # _x is input, size (seq_len, batch, input_size)
        s, b, h = x.shape  # x is output, size (seq_len, batch, hidden_size)
        x = x.view(s*b, h)
        x = self.fc(x)
        x = x.view(s, b, -1)  # 把形狀改回來
        return x
model = LSTM_Regression(DAYS_FOR_TRAIN, 8, output_size=1, num_layers=2)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

訓練

for i in range(1000):                   
    out = model(train_x)
    loss = loss_function(out, train_y)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    if (i+1) % 100 == 0:
        print('Epoch: {}, Loss:{:.5f}'.format(i+1, loss.item()))

測試

import matplotlib.pyplot as plt
model = model.eval() # 轉換成測試模式
# 注意這里用的是全集 模型的輸出長度會比原數據少DAYS_FOR_TRAIN 填充使長度相等再作圖
dataset_x = dataset_x.reshape(-1, 1, DAYS_FOR_TRAIN)  # (seq_size, batch_size, feature_size)
dataset_x = torch.from_numpy(dataset_x)
pred_test = model(dataset_x) # 全量訓練集的模型輸出 (seq_size, batch_size, output_size)
pred_test = pred_test.view(-1).data.numpy()
pred_test = np.concatenate((np.zeros(DAYS_FOR_TRAIN), pred_test))  # 填充0 使長度相同
assert len(pred_test) == len(data_close)
plt.plot(pred_test, 'r', label='prediction')
plt.plot(data_close, 'b', label='real')
plt.plot((train_size, train_size), (0, 1), 'g--')
plt.legend(loc='best')
plt.show()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM