1.訓練數據的獲取
1. 獲得鄰接矩陣
運行gen_adj_mx.py文件,可以生成adj_mx.pkl文件,這個文件中保存了一個列表對象[sensor_ids 感知器id列表,sensor_id_to_ind (傳感器id:傳感器索引)字典,adj_mx 鄰接矩陣 numpy數組[207,207]],注意,這個文件的運行需要節點距離文件distances_la_2012.csv和節點id文件graph_sensor_ids.txt,該文件代碼注釋如下:
def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1): """ :param distance_df: 距離的DataFrame,其中有3列:【from, to, distance】 :param sensor_ids: 感知器id列表 :param normalized_k: 一個閥值,如果鄰接矩陣中元素小於這個值,被置為0 :return: 1.sensor_ids 感知器id列表 2.sensor_id_to_ind (傳感器id:傳感器索引)字典 3.adj_mx 鄰接矩陣 numpy數組【207,207】 """ num_sensors = len(sensor_ids) dist_mx = np.zeros((num_sensors, num_sensors), dtype=np.float32) dist_mx[:] = np.inf # 構建 傳感器id:傳感器索引 字典 形如:{'773869': 0, '767541': 1, '767542': 2} sensor_id_to_ind = {} for i, sensor_id in enumerate(sensor_ids): sensor_id_to_ind[sensor_id] = i # 填充距離矩陣dist_max,規則為:只有from傳感器的id和to傳感器的id都存在於id列表中,才填充 for row in distance_df.values: if row[0] not in sensor_id_to_ind or row[1] not in sensor_id_to_ind: continue dist_mx[sensor_id_to_ind[row[0]], sensor_id_to_ind[row[1]]] = row[2] # 利用距離矩陣中所有非無窮大的數的標准差,構建鄰接矩陣adj_mx,注意這是非對稱矩陣,其中,節點到自身的值為1,形如: """ [[1.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00] [0.0000000e+00 1.0000000e+00 3.9095539e-01 1.7041542e-05 1.6665361e-05] [0.0000000e+00 7.1743792e-01 1.0000000e+00 6.9196528e-04 6.8192312e-04] [0.0000000e+00 9.7330502e-04 2.4218364e-06 1.0000000e+00 6.3372165e-01] [0.0000000e+00 1.4520076e-03 4.0022201e-06 6.2646437e-01 1.0000000e+00]] """ distances = dist_mx[~np.isinf(dist_mx)].flatten() std = distances.std() adj_mx = np.exp(-np.square(dist_mx / std)) # Make the adjacent matrix symmetric by taking the max. # adj_mx = np.maximum.reduce([adj_mx, adj_mx.T]) # 將鄰接矩陣中小於這個閥值的元素置為0,形如: """ [[1. 0. 0. 0. 0. ] [0. 1. 0.3909554 0. 0. ] [0. 0.7174379 1. 0. 0. ] [0. 0. 0. 1. 0.63372165] [0. 0. 0. 0.62646437 1. ]] """ adj_mx[adj_mx < normalized_k] = 0 print(adj_mx[:5, :5]) return sensor_ids, sensor_id_to_ind, adj_mx if __name__ == '__main__': # 設置參數: # sensor_ids_filename: 傳感器id文件的路徑 # distances_filename: 傳感器距離文件的路徑 # output_pkl_filename: 輸出的鄰接矩陣文件的路徑 # normalized_k: parser = argparse.ArgumentParser() parser.add_argument('--sensor_ids_filename', type=str, default='data/sensor_graph/graph_sensor_ids.txt', help='File containing sensor ids separated by comma.') parser.add_argument('--distances_filename', type=str, default='data/sensor_graph/distances_la_2012.csv', help='CSV file containing sensor distances with three columns: [from, to, distance].') parser.add_argument('--normalized_k', type=float, default=0.1, help='Entries that become lower than normalized_k after normalization are set to zero for sparsity.') parser.add_argument('--output_pkl_filename', type=str, default='data/sensor_graph/adj_mx.pkl', help='Path of the output file.') args = parser.parse_args() # 獲取感知器id存入sensor_ids列表中 with open(args.sensor_ids_filename) as f: sensor_ids = f.read().strip().split(',') # 讀取csv距離文件,數據格式如下 """ from to cost 0 1201054 1201054 0.0 1 1201054 1201066 2610.9 2 1201054 1201076 2822.7 3 1201054 1201087 2911.5 4 1201054 1201100 7160.1 """ distance_df = pd.read_csv(args.distances_filename, dtype={'from': 'str', 'to': 'str'}) normalized_k = args.normalized_k _, sensor_id_to_ind, adj_mx = get_adjacency_matrix(distance_df, sensor_ids, normalized_k) # 將上面的返回值保存成列表對象, protocol=2表示以2進制方式保存,pickle模塊用來實現對象的序列化和反序列化 with open(args.output_pkl_filename, 'wb') as f: pickle.dump([sensor_ids, sensor_id_to_ind, adj_mx], f, protocol=2)
2.獲得訓練集、測試集和驗證集
終端中運行以下命令:
# METR-LA python generate_training_data.py --output_dir=data/METR-LA --traffic_df_filename=data/metr-la.h5 # PEMS-BAY python generate_training_data.py --output_dir=data/PEMS-BAY --traffic_df_filename=data/pems-bay.h5
產生的數據集維度情況:
該文件代碼注釋如下:
def generate_graph_seq2seq_io_data( df, x_offsets, y_offsets, add_time_in_day=True, add_day_in_week=False, scaler=None ): """ 產生輸入數據和輸出數據,形狀【樣本數,序列長度(12),節點數(207),特征數(2)】,函數generate_train_val_test中有解釋 :param df: :param x_offsets: 輸入數據的索引偏置 :param y_offsets: 輸出數據的索引偏置 :param add_time_in_day: 將時間轉為以天為單位,函數generate_train_val_test有解釋 :param add_day_in_week: :param scaler: :return: 輸入數據和輸出數據 # x: (epoch_size, input_length, num_nodes, input_dim) # y: (epoch_size, output_length, num_nodes, output_dim) """ num_samples, num_nodes = df.shape # 將二維數組[M,207]擴張成三維[M,207,1],數據格式如下 """ [[[64.375 ], [67.125 ], ..., [61.875 ]], ..., [[55.88888889], [68.44444444], [62.875 ]]] """ data = np.expand_dims(df.values, axis=-1) feature_list = [data] if add_time_in_day: # 這個語句將日期轉為以天為單位,形如[0. 0.00347222 0.00694444 0.01041667 0.01388889 0.01736111 # 0.02083333 0.02430556 0.02777778 0.03125 ] time_ind = (df.index.values - df.index.values.astype("datetime64[D]")) / np.timedelta64(1, "D") time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0)) feature_list.append(time_in_day) if add_day_in_week: dow = df.index.dayofweek dow_tiled = np.tile(dow, [1, num_nodes, 1]).transpose((2, 1, 0)) feature_list.append(dow_tiled) # 將當前樣本的時間加入到了特征中,所有特征變成了:速度+時間,形狀變成[M,207,2],形如 """ [[[6.43750000e+01 0.0000], [6.76250000e+01 0.0000], [6.71250000e+01 0.0000], ..., [5.92500000e+01 0.0000]], ..., [[6.90000000e+01 0.9653], [6.18750000e+01 0.99653], ..., [6.26666667e+01 0.99653]]] """ data = np.concatenate(feature_list, axis=-1) x, y = [], [] min_t = abs(min(x_offsets)) max_t = abs(num_samples - abs(max(y_offsets))) # Exclusive for t in range(min_t, max_t): # t is the index of the last observation. x.append(data[t + x_offsets, ...]) y.append(data[t + y_offsets, ...]) x = np.stack(x, axis=0) y = np.stack(y, axis=0) return x, y def generate_train_val_test(args): """ 輸入參數,產生訓練集、測試集和測試集,比例為:0.7:0.2:0.1,保存在METR-LA文件夾中 最終數據集的形狀為【M,12,207,2】,M表示數據集/樣本長度, 【12,207,2】為單個輸入數據或輸出數據,其中12表示時間序列長度,207表示節點數量,2表示節點特征(一個是速度,另一個為當前數據的時間) 並且這個時間它是用天表示的,比如8月12號0點5分,和8月13號0點5分是一樣的,都是(5/60/24)=0.0034722天 :param args: 輸入參數,里面包括了歷史數據序列長度、要預測的時間長度,5分鍾為一個單位 :return: 無 """ # 輸入的時間序列長度和輸出的預測時間序列長度 seq_length_x, seq_length_y = args.seq_length_x, args.seq_length_y # 讀取metr-la.h5文件,其中列索引為傳感器節點id,行索引為時間,間隔為5分鍾,形如: """ 773869 767541 ... 718141 769373 2012-03-01 00:00:00 64.375000 67.625000 ... 69.000000 61.875 2012-03-01 00:05:00 62.666667 68.555556 ... 68.444444 62.875 2012-03-01 00:10:00 64.000000 63.750000 ... 69.857143 62.000 2012-03-01 00:15:00 0.000000 0.000000 ... 0.000000 0.000 2012-03-01 00:20:00 0.000000 0.000000 ... 0.000000 0.000 """ df = pd.read_hdf(args.traffic_df_filename) # 獲取輸入數據的索引偏置,為 numpy一維數組:[-11 -10 -9 -8 -7 -6 -5 -4 -3 -2 -1 0], # 這個函數應該不用這么復雜,直接np.arange(-(seq_length_x - 1), 1, 1)應該就可以了 x_offsets = np.sort(np.concatenate((np.arange(-(seq_length_x - 1), 1, 1),))) # 輸出數據也就是預測數據標簽的索引偏置,為numpy一維數組:[1 1 2 ... 12] y_offsets = np.sort(np.arange(args.y_start, (seq_length_y + 1), 1)) # 得到了輸入數據x和對應標簽數據y # x: (num_samples, input_length, num_nodes, input_dim) # y: (num_samples, output_length, num_nodes, output_dim) x, y = generate_graph_seq2seq_io_data( df, x_offsets=x_offsets, y_offsets=y_offsets, add_time_in_day=True, add_day_in_week=args.dow, ) print("x shape: ", x.shape, ", y shape: ", y.shape) # 利用numpy保存數據進.npz文件中,每個文件中都包含輸入數據、標簽數據、輸入數據索引偏置、標簽數據索引偏置, # 和上面說的差不多,只是形狀變成了[12,1]的二維數組 num_samples = x.shape[0] num_test = round(num_samples * 0.2) num_train = round(num_samples * 0.7) num_val = num_samples - num_test - num_train x_train, y_train = x[:num_train], y[:num_train] x_val, y_val = ( x[num_train: num_train + num_val], y[num_train: num_train + num_val], ) x_test, y_test = x[-num_test:], y[-num_test:] for cat in ["train", "val", "test"]: _x, _y = locals()["x_" + cat], locals()["y_" + cat] print(cat, "x: ", _x.shape, "y:", _y.shape) np.savez_compressed( os.path.join(args.output_dir, f"{cat}.npz"), x=_x, y=_y, x_offsets=x_offsets.reshape(list(x_offsets.shape) + [1]), y_offsets=y_offsets.reshape(list(y_offsets.shape) + [1]), ) if __name__ == "__main__": parser = argparse.ArgumentParser() # 設置參數, # output_dir:數據輸出文件夾路徑 # traffic_df_filename:交通流量數據文件路徑,這個文件類似於excel表,有207列,列標簽是傳感器id,有34249行,每一行的行索引是時間,間隔為5分鍾,行和列對應的數據為速度? # seq_length_x: 輸入數據的時間序列長度,默認為12,因此表示當前時刻之前1個小時的歷史數據 # seq_length_y: 輸出數據的時間序列長度,默認為12,因此表示預測未來1小時內的流量 parser.add_argument("--output_dir", type=str, default="data/METR-LA", help="Output directory.") parser.add_argument("--traffic_df_filename", type=str, default="data/metr-la.h5", help="Raw traffic readings.", ) parser.add_argument("--seq_length_x", type=int, default=12, help="Sequence Length.", ) parser.add_argument("--seq_length_y", type=int, default=12, help="Sequence Length.", ) parser.add_argument("--y_start", type=int, default=1, help="Y pred start", ) parser.add_argument("--dow", action='store_true', ) args = parser.parse_args() # 是否覆蓋文件,不重要 if os.path.exists(args.output_dir): reply = str(input(f'{args.output_dir} exists. Do you want to overwrite it? (y/n)')).lower().strip() if reply[0] != 'y': exit else: os.makedirs(args.output_dir) # 產生訓練數據 generate_train_val_test(args)