1、測試數據下載
https://datamarket.com/data/set/22w6/portland-oregon-average-monthly-bus-ridership-100-january-1973-through-june-1982-n114#!ds=22w6&display=line
2、LSTM預測
import pandas as pd import numpy as np import matplotlib.pyplot as plt import datetime from dateutil.relativedelta import relativedelta df = pd.read_csv("C:\\Users\\\Administrator\\Downloads\\portland-oregon-average-monthly-.csv", index_col=0) df.index.name=None #將index的name取消 df.reset_index(inplace=True) df.drop(df.index[114], inplace=True) start = datetime.datetime.strptime("1973-01-01", "%Y-%m-%d") #把一個時間字符串解析為時間元組 date_list = [start + relativedelta(months=x) for x in range(0,114)] #從1973-01-01開始逐月增加組成list df['index'] =date_list df.set_index(['index'], inplace=True) df.index.name=None df.columns= ['riders'] df['riders'] = df.riders.apply(lambda x: int(x)*100) df.riders.plot(figsize=(12,8), title= 'Monthly Ridership', fontsize=14) plt.show() data = df.iloc[:,0].tolist() def data_processing(raw_data, scale=True): if scale == True: return (raw_data-np.mean(raw_data))/np.std(raw_data)#標准化 else: return (raw_data-np.min(raw_data))/(np.max(raw_data)-np.min(raw_data))#極差規格化 TIMESTEPS = 12 '''樣本數據生成函數''' def generate_data(seq): X = []#初始化輸入序列X Y= []#初始化輸出序列Y '''生成連貫的時間序列類型樣本集,每一個X內的一行對應指定步長的輸入序列,Y內的每一行對應比X滯后一期的目標數值''' for i in range(len(seq) - TIMESTEPS - 1): X.append([seq[i:i + TIMESTEPS]])#從輸入序列第一期出發,等步長連續不間斷采樣 Y.append([seq[i + TIMESTEPS]])#對應每個X序列的滯后一期序列值 return np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32) '''對原數據進行尺度縮放''' data = data_processing(data) '''將所有樣本來作為訓練樣本''' train_X, train_y = generate_data(data) '''將所有樣本作為測試樣本''' test_X, test_y = generate_data(data) from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense from tensorflow.keras.layers import LSTM model = Sequential() model.add(LSTM(16, input_shape=(train_X.shape[1], train_X.shape[2]))) model.add(Dense(train_y.shape[1])) model.compile(loss='mse', optimizer='adam', metrics=['accuracy']) model.fit(train_X, train_y, epochs=1000, batch_size=len(train_X), verbose=2, shuffle=False) #scores = model.evaluate(train_X, train_y, verbose=0) #print("Model Accuracy: %.2f%%" % (scores[1] * 100)) result = model.predict(train_X, verbose=0) '''自定義反標准化函數''' def scale_inv(raw_data,scale=True): data1 = df.iloc[:, 0].tolist() if scale == True: return raw_data*np.std(data1)+np.mean(data1) else: return raw_data*(np.max(data1)-np.min(data1))+np.min(data1) '''繪制反標准化之前的真實值與預測值對比圖''' plt.figure() plt.plot(scale_inv(result), label='predict data') plt.plot(scale_inv(test_y), label='true data') plt.title('none-normalized') plt.legend() plt.show() def generate_predata(seq): X = []#初始化輸入序列X X.append(seq) return np.array(X, dtype=np.float32) datalist = data.tolist() pre_result = [] for i in range(50): pre_x = generate_predata(datalist[len(datalist) - TIMESTEPS:]) #pre_x = pre_x[np.newaxis,:,:] pre_x = np.reshape(pre_x, (1, 1, TIMESTEPS)) pre_y = model.predict(pre_x) pre_result.append(pre_y.tolist()[0]) datalist.append(pre_y.tolist()[0][0]) all = result.tolist() all.extend(pre_result) '''繪制反標准化之前的真實值與預測值對比圖''' plt.figure() plt.plot(scale_inv(np.array(all)), label='predict data') plt.plot(scale_inv(test_y), label='true data') plt.title('none-normalized') plt.legend() plt.show()
3、運行效果

