在“手寫數字識別”案例的快速入門中,我們調用飛槳提供的API(paddle.dataset.mnist)加載MNIST數據集。但在工業實踐中,我們面臨的任務和數據環境千差萬別,需要編寫適合當前任務的數據處理程序。
但是編寫自定義的數據加載函數,一般會涉及以下四個部分:
- 數據讀取與數據集划分
- 定義數據讀取器
- 校驗數據的有效性
- 異步數據讀取
在數據讀取與處理前,首先要加載飛槳平台和數據處理庫,可能使用的庫都需要加載進來:
1 #數據處理部分之前的代碼,加入部分數據處理的庫 2 import paddle 3 import paddle.fluid as fluid 4 from paddle.fluid.dygraph.nn import FC 5 import numpy as np 6 import os 7 import gzip 8 import json 9 import random
1. 數據讀取與數據集划分
實際保存到的數據存儲格式多種多樣,本節使用的mnist數據集以json格式存儲在本地。
在'./work/'目錄下讀取文件名稱為'mnist.json.gz'的MINST手寫數字識別數據,文件格式是壓縮后的json文件。文件內容包括:訓練數據、驗證數據、測試數據三部分,分別包含50000、10000、10000條手寫數字數據和兩個元素列表。
以訓練集數據為例,它為兩個元素的列表為[traim_imgs, train_labels]。
- train_imgs:一個維度為[50000, 784]的二維列表,包含50000張圖片。每張圖片用一個長度為784的向量表示,內容是28*28尺寸的像素灰度值(黑白圖片)。
- train_labels:一個維度為[50000, ]的列表,表示這些圖片對應的分類標簽,即0-9之間的一個數字。接下來我們將數據讀取出來。
1 # 聲明數據集文件位置 2 datafile = './work/mnist.json.gz' 3 print('loading mnist dataset from {} ......'.format(datafile)) 4 # 加載json數據文件 5 data = json.load(gzip.open(datafile)) 6 print('mnist dataset load done') 7 # 讀取到的數據區分訓練集,驗證集,測試集 8 train_set, val_set, eval_set = data 9 10 # 數據集相關參數,圖片高度IMG_ROWS, 圖片寬度IMG_COLS 11 IMG_ROWS = 28 12 IMG_COLS = 28 13 14 # 打印數據信息 15 imgs, labels = train_set[0], train_set[1] 16 print("訓練數據集數量: ", len(imgs),len(labels)) 17 18 # 觀察驗證集數量 19 imgs, labels = val_set[0], val_set[1] 20 print("驗證數據集數量: ", len(imgs),len(labels)) 21 22 # 觀察測試集數量 23 imgs, labels = val= eval_set[0], eval_set[1] 24 print("測試數據集數量: ", len(imgs),len(labels))
loading mnist dataset from ./work/mnist.json.gz ...... mnist dataset load done 訓練數據集數量: 50000 50000 驗證數據集數量: 10000 10000 測試數據集數量: 10000 10000
2. 定義數據讀取函數
飛槳提供分批次讀取數據函數paddle.batch,該接口是一個reader的裝飾器,返回的reader將輸入的reader的數據打包成指定的batch_size大小的批處理數據(batched.data)
在定義數據讀取函數中,我們需要做很多事情,包括但不限於:
- 打亂數據,保證每輪訓練讀取的數據順序不同。
- 數據類型轉換。
1 def load_data(mode='train'): 2 3 datafile = './work/mnist.json.gz' 4 print('loading mnist dataset from {} ......'.format(datafile)) 5 # 加載json數據文件 6 data = json.load(gzip.open(datafile)) 7 print('mnist dataset load done') 8 # 讀取到的數據區分訓練集,驗證集,測試集 9 train_set, val_set, eval_set = data 10 if mode=='train': 11 # 獲得訓練數據集 12 imgs, labels = train_set[0], train_set[1] 13 elif mode=='valid': 14 # 獲得驗證數據集 15 imgs, labels = val_set[0], val_set[1] 16 elif mode=='eval': 17 # 獲得測試數據集 18 imgs, labels = eval_set[0], eval_set[1] 19 else: 20 raise Exception("mode can only be one of ['train', 'valid', 'eval']") 21 print("訓練數據集數量: ", len(imgs)) 22 # 獲得數據集長度 23 imgs_length = len(imgs) 24 # 定義數據集每個數據的序號,根據序號讀取數據 25 index_list = list(range(imgs_length)) 26 # 讀入數據時用到的批次大小 27 BATCHSIZE = 100 28 29 # 定義數據生成器 30 def data_generator(): 31 if mode == 'train': 32 # 訓練模式下打亂數據 33 random.shuffle(index_list) 34 imgs_list = [] 35 labels_list = [] 36 for i in index_list: 37 # 將數據處理成希望的格式,比如類型為float32,shape為[1, 28, 28] 38 img = np.reshape(imgs[i], [1, IMG_ROWS, IMG_COLS]).astype('float32') 39 label = np.reshape(labels[i], [1]).astype('float32') 40 imgs_list.append(img) 41 labels_list.append(label) 42 if len(imgs_list) == BATCHSIZE: 43 # 獲得一個batchsize的數據,並返回 44 yield np.array(imgs_list), np.array(labels_list) 45 # 清空數據讀取列表 46 imgs_list = [] 47 labels_list = [] 48 49 # 如果剩余數據的數目小於BATCHSIZE, 50 # 則剩余數據一起構成一個大小為len(imgs_list)的mini-batch 51 if len(imgs_list) > 0: 52 yield np.array(imgs_list), np.array(labels_list) 53 return data_generator
上面代碼中mode參數可以取三個值中的一個,分別是train、valid、eval,選擇的模式不同,讀取的數據集也不同,為了兼容后面的代碼,讀取后的變量都相同,都是imgs、labels;
在數據生成器中,只有在mode為train的情況下我們才考慮把讀取的數據打亂;接下來是數據格式處理,目標類型是shape[1,28,28],1表示灰度圖,數據類型為float32; 通過yield關鍵字返回一個batch的數據;在最后一個index_list中,如果imgs_list長度不滿足一個batch,這時imgs_list長度不為零,會直接跳出for循環,被后面的len(imgs_list)攔截。
3. 數據校驗
實際任務原始的數據可能存在數據很“臟”的情況,這里的“臟”多指數據標注不准確,或者是數據雜亂,格式不統一等等。
因此,在完成數據處理函數時,我們需要執行數據校驗和清理的操作。
數據校驗一般有兩種方式:
- 機器校驗:加入一些校驗和清理數據的操作。
- 人工校驗:先打印數據輸出結果,觀察是否是設置的格式。再從訓練的結果驗證數據處理和讀取的有效性。
機器校驗
如下代碼所示,如果數據集中的圖片數量和標簽數量不等,說明數據邏輯存在問題,可使用assert語句校驗圖像數量和標簽數據是否一致。
1 imgs_length = len(imgs) 2 3 assert len(imgs) == len(labels), \ 4 "length of train_imgs({}) should be the same as train_labels({})".format(len(imgs), len(label))
人工校驗
人工校驗分兩步,首先打印數據輸出結果,觀察是否是設置的格式。再從訓練的結果驗證數據處理和讀取的有效性。
實現數據處理和加載函數后,我們可以調用它讀取一次數據,觀察數據的shape和類型是否與函數中設置的一致。
1 # 聲明數據讀取函數,從訓練集中讀取數據 2 train_loader = load_data('train') 3 # 以迭代的形式讀取數據 4 for batch_id, data in enumerate(train_loader()): 5 image_data, label_data = data 6 if batch_id == 0: 7 # 打印數據shape和類型 8 print(image_data.shape, label_data.shape, type(image_data), type(label_data)) 9 break
loading mnist dataset from ./work/mnist.json.gz ...... mnist dataset load done 訓練數據集數量: 50000 (100, 1, 28, 28) (100, 1) <class 'numpy.ndarray'> <class 'numpy.ndarray'>
觀察訓練結果
數據處理部分后的代碼多數保持不變,僅在讀取數據時候調用新編寫的load_data函數。由於數據格式的轉換工作在load_data函數中做了一部分,所以向模型輸入數據的代碼變得更加簡潔。下面我們使用自己實現的數據加載函數重新訓練我們的神經網絡。
1 #數據處理部分之后的代碼,數據讀取的部分調用Load_data函數 2 # 定義網絡結構,同上一節所使用的網絡結構 3 class MNIST(fluid.dygraph.Layer): 4 def __init__(self, name_scope): 5 super(MNIST, self).__init__(name_scope) 6 name_scope = self.full_name() 7 self.fc = FC(name_scope, size=1, act=None) 8 9 def forward(self, inputs): 10 outputs = self.fc(inputs) 11 return outputs 12 13 # 訓練配置,並啟動訓練過程 14 with fluid.dygraph.guard(): 15 model = MNIST("mnist") 16 model.train() 17 #調用加載數據的函數 18 train_loader = load_data('train') 19 optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.001) 20 EPOCH_NUM = 10 21 for epoch_id in range(EPOCH_NUM): 22 for batch_id, data in enumerate(train_loader()): 23 #准備數據,變得更加簡潔 24 image_data, label_data = data 25 image = fluid.dygraph.to_variable(image_data) 26 label = fluid.dygraph.to_variable(label_data) 27 28 #前向計算的過程 29 predict = model(image) 30 31 #計算損失,取一個批次樣本損失的平均值 32 loss = fluid.layers.square_error_cost(predict, label) 33 avg_loss = fluid.layers.mean(loss) 34 35 #每訓練了100批次的數據,打印下當前Loss的情況 36 if batch_id % 100 == 0: 37 print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy())) 38 39 #后向傳播,更新參數的過程 40 avg_loss.backward() 41 optimizer.minimize(avg_loss) 42 model.clear_gradients() 43 44 #保存模型參數 45 fluid.save_dygraph(model.state_dict(), 'mnist')
loading mnist dataset from ./work/mnist.json.gz ...... mnist dataset load done 訓練數據集數量: 50000 epoch: 0, batch: 0, loss is: [24.632648] epoch: 0, batch: 100, loss is: [4.4261494] epoch: 0, batch: 200, loss is: [5.5177183] epoch: 0, batch: 300, loss is: [3.5427954] epoch: 0, batch: 400, loss is: [2.7455132] epoch: 1, batch: 0, loss is: [3.4030478] epoch: 1, batch: 100, loss is: [3.3895369] epoch: 1, batch: 200, loss is: [4.0297785] epoch: 1, batch: 300, loss is: [3.658723] epoch: 1, batch: 400, loss is: [3.7493572] epoch: 2, batch: 0, loss is: [3.4815173] epoch: 2, batch: 100, loss is: [3.566256] epoch: 2, batch: 200, loss is: [4.150691] epoch: 2, batch: 300, loss is: [3.3143735] epoch: 2, batch: 400, loss is: [2.8981738] epoch: 3, batch: 0, loss is: [2.9376304] epoch: 3, batch: 100, loss is: [3.322153] epoch: 3, batch: 200, loss is: [4.5626388] epoch: 3, batch: 300, loss is: [3.1342642] epoch: 3, batch: 400, loss is: [3.2983096] epoch: 4, batch: 0, loss is: [4.223956] epoch: 4, batch: 100, loss is: [2.982598] epoch: 4, batch: 200, loss is: [2.719622] epoch: 4, batch: 300, loss is: [3.712464] epoch: 4, batch: 400, loss is: [4.1207376] epoch: 5, batch: 0, loss is: [2.5053217] epoch: 5, batch: 100, loss is: [2.8577585] epoch: 5, batch: 200, loss is: [2.9564447] epoch: 5, batch: 300, loss is: [3.4296014] epoch: 5, batch: 400, loss is: [4.3093677] epoch: 6, batch: 0, loss is: [4.5576763] epoch: 6, batch: 100, loss is: [3.20943] epoch: 6, batch: 200, loss is: [3.327529] epoch: 6, batch: 300, loss is: [2.5192072] epoch: 6, batch: 400, loss is: [3.4901175] epoch: 7, batch: 0, loss is: [3.998215] epoch: 7, batch: 100, loss is: [4.351076] epoch: 7, batch: 200, loss is: [3.8231916] epoch: 7, batch: 300, loss is: [2.151733] epoch: 7, batch: 400, loss is: [2.995807] epoch: 8, batch: 0, loss is: [3.6070685] epoch: 8, batch: 100, loss is: [4.0988545] epoch: 8, batch: 200, loss is: [3.0984952] epoch: 8, batch: 300, loss is: [3.0793695] epoch: 8, batch: 400, loss is: [2.7344913] epoch: 9, batch: 0, loss is: [3.7788324] epoch: 9, batch: 100, loss is: [3.706921] epoch: 9, batch: 200, loss is: [2.7320113] epoch: 9, batch: 300, loss is: [3.2809222] epoch: 9, batch: 400, loss is: [3.8385432]
batch size=100,數據總量為50000,所以有500個batch(0,100,200,300,400);epoch num=10,所以有10次循環(0,1,2,3,4,5,6,7,8,9)。
最后,將上述幾部分操作合並到load_data函數,方便后續調用。下面代碼為完整的數據讀取函數,可以通過數據加載函數load_data的輸入參數mode為'train', 'valid', 'eval'選擇返回的數據是訓練集,驗證集,測試集。
1 #數據處理部分的展開代碼 2 # 定義數據集讀取器 3 def load_data(mode='train'): 4 5 # 數據文件 6 datafile = './work/mnist.json.gz' 7 print('loading mnist dataset from {} ......'.format(datafile)) 8 data = json.load(gzip.open(datafile)) 9 # 讀取到的數據可以直接區分訓練集,驗證集,測試集 10 train_set, val_set, eval_set = data 11 12 # 數據集相關參數,圖片高度IMG_ROWS, 圖片寬度IMG_COLS 13 IMG_ROWS = 28 14 IMG_COLS = 28 15 # 獲得數據 16 if mode == 'train': 17 imgs = train_set[0] 18 labels = train_set[1] 19 elif mode == 'valid': 20 imgs = val_set[0] 21 labels = val_set[1] 22 elif mode == 'eval': 23 imgs = eval_set[0] 24 labels = eval_set[1] 25 else: 26 raise Exception("mode can only be one of ['train', 'valid', 'eval']") 27 28 imgs_length = len(imgs) 29 30 assert len(imgs) == len(labels), \ 31 "length of train_imgs({}) should be the same as train_labels({})".format( 32 len(imgs), len(labels)) 33 34 index_list = list(range(imgs_length)) 35 36 # 讀入數據時用到的batchsize 37 BATCHSIZE = 100 38 39 # 定義數據生成器 40 def data_generator(): 41 if mode == 'train': 42 # 訓練模式下,將訓練數據打亂 43 random.shuffle(index_list) 44 imgs_list = [] 45 labels_list = [] 46 47 for i in index_list: 48 img = np.reshape(imgs[i], [1, IMG_ROWS, IMG_COLS]).astype('float32') 49 label = np.reshape(labels[i], [1]).astype('float32') 50 imgs_list.append(img) 51 labels_list.append(label) 52 if len(imgs_list) == BATCHSIZE: 53 # 產生一個batch的數據並返回 54 yield np.array(imgs_list), np.array(labels_list) 55 # 清空數據讀取列表 56 imgs_list = [] 57 labels_list = [] 58 59 # 如果剩余數據的數目小於BATCHSIZE, 60 # 則剩余數據一起構成一個大小為len(imgs_list)的mini-batch 61 if len(imgs_list) > 0: 62 yield np.array(imgs_list), np.array(labels_list) 63 return data_generator
4. 異步數據讀取
上面提到的數據讀取是同步數據讀取方式,針對於樣本量較大、數據讀取較慢的場景,建議采用異步數據讀取方式,可以讓數據讀取和模型訓練並行化,加快數據讀取速度,犧牲一小部分內存換取數據讀取效率的提升。
說明:
- 同步數據讀取:每當模型需要數據的時候,運行數據讀取函數獲得當前批次的數據。在讀取數據期間,模型一直在等待數據讀取結束,獲得數據后才會進行計算。
- 異步數據讀取:數據讀取和模型訓練過程異步進行,讀取到的數據先放入緩存區。模型訓練完一個批次后,不用等待數據讀取過程,直接從緩存區獲得下一批次數據進行訓練。
使用飛槳實現異步數據讀取非常簡單,代碼如下所示。
1 # 定義數據讀取后存放的位置,CPU或者GPU,這里使用CPU 2 #place = fluid.CUDAPlace(0) 時,數據讀到GPU上 3 place = fluid.CPUPlace() 4 with fluid.dygraph.guard(place): 5 # 聲明數據加載函數,使用訓練模式 6 train_loader = load_data(mode='train') 7 # 定義DataLoader對象用於加載Python生成器產生的數據 8 data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True) 9 # 設置數據生成器 10 data_loader.set_batch_generator(train_loader, places=place) 11 # 迭代的讀取數據並打印數據的形狀 12 for i, data in enumerate(data_loader): 13 image_data, label_data = data 14 print(i, image_data.shape, label_data.shape) 15 if i>=5: 16 break
上面的capacity=5,表示異步list的最大長度。
loading mnist dataset from ./work/mnist.json.gz ...... 0 [100, 1, 28, 28] [100, 1] 1 [100, 1, 28, 28] [100, 1] 2 [100, 1, 28, 28] [100, 1] 3 [100, 1, 28, 28] [100, 1] 4 [100, 1, 28, 28] [100, 1] 5 [100, 1, 28, 28] [100, 1]
與同步數據讀取相比,異步數據讀取僅增加了三行代碼,如下所示。
1 place = fluid.CPUPlace() 2 data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True) 3 data_loader.set_batch_generator(train_loader, place)
我們展開解讀一下:
- 第一行代碼: 設置讀取的數據是放在CPU還是GPU上。
- 第二行代碼: 創建一個DataLoader對象用於加載Python生成器產生的數據。數據會由Python線程預先讀取,並異步送入一個隊列中。fluid.io.DataLoader.from_generator參數名稱、參數含義、默認值如下:
參數名和默認值如下:
- feed_list=None,
- capacity=None,
- use_double_buffer=True,
- iterable=True,
- return_list=False
參數含義如下:
- feed_list 僅在paddle靜態圖中使用,動態圖中設置為None,本教程默認使用動態圖的建模方式。
- capacity 表示在DataLoader中維護的隊列容量,如果讀取數據的速度很快,建議設置為更大的值。
- use_double_buffer 是一個布爾型的參數,設置為True時Dataloader會預先異步讀取下一個batch的數據放到緩存區。
- iterable 表示創建的Dataloader對象是否是可迭代的,一般設置為True。
- return_list 在動態圖下需要設置為True。
- 第三行代碼: 用創建的DataLoader對象設置一個數據生成器set_batch_generator,輸入的參數是一個Python數據生成器train_loader和服務器資源類型place(標明CPU還是GPU)。
異步數據讀取並訓練的完整案例代碼如下:
1 with fluid.dygraph.guard(): 2 model = MNIST("mnist") 3 model.train() 4 #調用加載數據的函數 5 train_loader = load_data('train') 6 # 創建異步數據讀取器 7 place = fluid.CPUPlace() 8 data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True) 9 data_loader.set_batch_generator(train_loader, places=place) 10 11 optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.001) 12 EPOCH_NUM = 3 13 for epoch_id in range(EPOCH_NUM): 14 for batch_id, data in enumerate(data_loader): 15 image_data, label_data = data 16 image = fluid.dygraph.to_variable(image_data) 17 label = fluid.dygraph.to_variable(label_data) 18 19 predict = model(image) 20 21 loss = fluid.layers.square_error_cost(predict, label) 22 avg_loss = fluid.layers.mean(loss) 23 24 if batch_id % 200 == 0: 25 print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy())) 26 27 avg_loss.backward() 28 optimizer.minimize(avg_loss) 29 model.clear_gradients() 30 31 fluid.save_dygraph(model.state_dict(), 'mnist')
loading mnist dataset from ./work/mnist.json.gz ...... epoch: 0, batch: 0, loss is: [41.8419] epoch: 0, batch: 200, loss is: [4.8599553] epoch: 0, batch: 400, loss is: [3.949173] epoch: 1, batch: 0, loss is: [3.6606312] epoch: 1, batch: 200, loss is: [3.593772] epoch: 1, batch: 400, loss is: [3.3966932] epoch: 2, batch: 0, loss is: [3.3882492] epoch: 2, batch: 200, loss is: [3.512473] epoch: 2, batch: 400, loss is: [3.8485198]
從異步數據讀取的訓練結果來看,損失函數下降與同步數據讀取訓練結果基本一致。
