thumbnail: https://image.zhangxiann.com/jeison-higuita-W19AQY42rUk-unsplash.jpg
toc: true
date: 2020/2/19 20:17:25
disqusId: zhangxian
categories:
- PyTorch
tags:
- AI
- Deep Learning
本章代碼:https://github.com/zhangxiann/PyTorch_Practice/blob/master/lesson2/rmb_classification/
人民幣 二分類
實現 1 元人民幣和 100 元人民幣的圖片二分類。前面講過 PyTorch 的五大模塊:數據、模型、損失函數、優化器和迭代訓練。
數據模塊又可以細分為 4 個部分:
- 數據收集:樣本和標簽。
- 數據划分:訓練集、驗證集和測試集
- 數據讀取:對應於PyTorch 的 DataLoader。其中 DataLoader 包括 Sampler 和 DataSet。Sampler 的功能是生成索引, DataSet 是根據生成的索引讀取樣本以及標簽。
- 數據預處理:對應於 PyTorch 的 transforms

# DataLoader 與 DataSet
torch.utils.data.DataLoader()
torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)
功能:構建可迭代的數據裝載器
- dataset: Dataset 類,決定數據從哪里讀取以及如何讀取
- batchsize: 批大小
- num_works:num_works: 是否多進程讀取數據
- sheuffle: 每個 epoch 是否亂序
- drop_last: 當樣本數不能被 batchsize 整除時,是否舍棄最后一批數據
Epoch, Iteration, Batchsize
- Epoch: 所有訓練樣本都已經輸入到模型中,稱為一個 Epoch
- Iteration: 一批樣本輸入到模型中,稱為一個 Iteration
- Batchsize: 批大小,決定一個 iteration 有多少樣本,也決定了一個 Epoch 有多少個 Iteration
假設樣本總數有 80,設置 Batchsize 為 8,則共有 $80 \div 8=10$ 個 Iteration。這里 $1 Epoch = 10 Iteration$。
假設樣本總數有 86,設置 Batchsize 為 8。如果drop_last=True
則共有 10 個 Iteration;如果drop_last=False
則共有 11 個 Iteration。
torch.utils.data.Dataset
功能:Dataset 是抽象類,所有自定義的 Dataset 都需要繼承該類,並且重寫__getitem()__
方法和__len__()
方法 。__getitem()__
方法的作用是接收一個索引,返回索引對應的樣本和標簽,這是我們自己需要實現的邏輯。__len__()
方法是返回所有樣本的數量。
數據讀取包含 3 個方面
- 讀取哪些數據:每個 Iteration 讀取一個 Batchsize 大小的數據,每個 Iteration 應該讀取哪些數據。
- 從哪里讀取數據:如何找到硬盤中的數據,應該在哪里設置文件路徑參數
- 如何讀取數據:不同的文件需要使用不同的讀取方法和庫。
這里的路徑結構如下,有兩類人民幣圖片:1 元和 100 元,每一類各有 100 張圖片。
- RMB_data
- 1
- 100
首先划分數據集為訓練集、驗證集和測試集,比例為 8:1:1。
數據划分好后的路徑構造如下:
- rmb_split
- train
- 1
- 100
- valid
- 1
- 100
- test
- 1
- 100
- train
實現讀取數據的 Dataset,編寫一個get_img_info()
方法,讀取每一個圖片的路徑和對應的標簽,組成一個元組,再把所有的元組作為 list 存放到self.data_info
變量中,這里需要注意的是標簽需要映射到 0 開始的整數: rmb_label = {"1": 0, "100": 1}
。
@staticmethod
def get_img_info(data_dir):
data_info = list()
# data_dir 是訓練集、驗證集或者測試集的路徑
for root, dirs, _ in os.walk(data_dir):
# 遍歷類別
# dirs ['1', '100']
for sub_dir in dirs:
# 文件列表
img_names = os.listdir(os.path.join(root, sub_dir))
# 取出 jpg 結尾的文件
img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))
# 遍歷圖片
for i in range(len(img_names)):
img_name = img_names[i]
# 圖片的絕對路徑
path_img = os.path.join(root, sub_dir, img_name)
# 標簽,這里需要映射為 0、1 兩個類別
label = rmb_label[sub_dir]
# 保存在 data_info 變量中
data_info.append((path_img, int(label)))
return data_info
然后在Dataset
的初始化函數中調用get_img_info()
方法。
def __init__(self, data_dir, transform=None):
"""
rmb面額分類任務的Dataset
:param data_dir: str, 數據集所在路徑
:param transform: torch.transform,數據預處理
"""
# data_info存儲所有圖片路徑和標簽,在DataLoader中通過index讀取樣本
self.data_info = self.get_img_info(data_dir)
self.transform = transform
然后在__getitem__()
方法中根據index
讀取self.data_info
中路徑對應的數據,並在這里做 transform 操作,返回的是樣本和標簽。
def __getitem__(self, index):
# 通過 index 讀取樣本
path_img, label = self.data_info[index]
# 注意這里需要 convert('RGB')
img = Image.open(path_img).convert('RGB') # 0~255
if self.transform is not None:
img = self.transform(img) # 在這里做transform,轉為tensor等等
# 返回是樣本和標簽
return img, label
在__len__()
方法中返回self.data_info
的長度,即為所有樣本的數量。
# 返回所有樣本的數量
def __len__(self):
return len(self.data_info)
在train_lenet.py
中,分 5 步構建模型。
第 1 步設置數據。首先定義訓練集、驗證集、測試集的路徑,定義訓練集和測試集的transforms
。然后構建訓練集和驗證集的RMBDataset
對象,把對應的路徑和transforms
傳進去。再構建DataLoder
,設置 batch_size,其中訓練集設置shuffle=True
,表示每個 Epoch 都打亂樣本。
# 構建MyDataset實例train_data = RMBDataset(data_dir=train_dir, transform=train_transform)valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)
# 構建DataLoder
# 其中訓練集設置 shuffle=True,表示每個 Epoch 都打亂樣本
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)
第 2 步構建模型,這里采用經典的 Lenet 圖片分類網絡。
net = LeNet(classes=2)
net.initialize_weights()
第 3 步設置損失函數,這里使用交叉熵損失函數。
criterion = nn.CrossEntropyLoss()
第 4 步設置優化器。這里采用 SGD 優化器。
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9) # 選擇優化器
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 設置學習率下降策略
第 5 步迭代訓練模型,在每一個 epoch 里面,需要遍歷 train_loader 取出數據,每次取得數據是一個 batchsize 大小。這里又分為 4 步。第 1 步進行前向傳播,第 2 步進行反向傳播求導,第 3 步使用optimizer
更新權重,第 4 步統計訓練情況。每一個 epoch 完成時都需要使用scheduler
更新學習率,和計算驗證集的准確率、loss。
for epoch in range(MAX_EPOCH):
loss_mean = 0.
correct = 0.
total = 0.
net.train()
# 遍歷 train_loader 取數據
for i, data in enumerate(train_loader):
# forward
inputs, labels = data
outputs = net(inputs)
# backward
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
# update weights
optimizer.step()
# 統計分類情況
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).squeeze().sum().numpy()
# 打印訓練信息
loss_mean += loss.item()
train_curve.append(loss.item())
if (i+1) % log_interval == 0:
loss_mean = loss_mean / log_interval
print("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))
loss_mean = 0.
scheduler.step() # 更新學習率
# 每個 epoch 計算驗證集得准確率和loss
...
...
我們可以看到每個 iteration,我們是從train_loader
中取出數據的。
def __iter__(self):
if self.num_workers == 0:
return _SingleProcessDataLoaderIter(self)
else:
return _MultiProcessingDataLoaderIter(self)
這里我們沒有設置多進程,會執行_SingleProcessDataLoaderIter
的方法。我們以_SingleProcessDataLoaderIter
為例。在_SingleProcessDataLoaderIter
里只有一個方法_next_data()
,如下:
def _next_data(self):
index = self._next_index() # may raise StopIteration
data = self._dataset_fetcher.fetch(index) # may raise StopIteration
if self._pin_memory:
data = _utils.pin_memory.pin_memory(data)
return data
在該方法中,self._next_index()
是獲取一個 batchsize 大小的 index 列表,代碼如下:
def _next_index(self):
return next(self._sampler_iter) # may raise StopIteration
其中調用的sampler
類的__iter__()
方法返回 batch_size 大小的隨機 index 列表。
def __iter__(self):
batch = []
for idx in self.sampler:
batch.append(idx)
if len(batch) == self.batch_size:
yield batch
batch = []
if len(batch) > 0 and not self.drop_last:
yield batch
然后再返回看 dataloader
的_next_data()
方法:
def _next_data(self):
index = self._next_index() # may raise StopIteration
data = self._dataset_fetcher.fetch(index) # may raise StopIteration
if self._pin_memory:
data = _utils.pin_memory.pin_memory(data)
return data
在第二行中調用了self._dataset_fetcher.fetch(index)
獲取數據。這里會調用_MapDatasetFetcher
中的fetch()
函數:
def fetch(self, possibly_batched_index):
if self.auto_collation:
data = [self.dataset[idx] for idx in possibly_batched_index]
else:
data = self.dataset[possibly_batched_index]
return self.collate_fn(data)
這里調用了self.dataset[idx]
,這個函數會調用dataset.__getitem__()
方法獲取具體的數據,所以__getitem__()
方法是我們必須實現的。我們拿到的data
是一個 list,每個元素是一個 tunple,每個 tunple 包括樣本和標簽。所以最后要使用self.collate_fn(data)
把 data 轉換為兩個 list,第一個 元素 是樣本的batch 形式,形狀為 [16, 3, 32, 32] (16 是 batch size,[3, 32, 32] 是圖片像素);第二個元素是標簽的 batch 形式,形狀為 [16]。
所以在代碼中,我們使用inputs, labels = data
來接收數據。
PyTorch 數據讀取流程圖

首先在 for 循環中遍歷`DataLoader`,然后根據是否采用多進程,決定使用單進程或者多進程的`DataLoaderIter`。在`DataLoaderIter`里調用`Sampler`生成`Index`的 list,再調用`DatasetFetcher`根據`index`獲取數據。在`DatasetFetcher`里會調用`Dataset`的`__getitem__()`方法獲取真正的數據。這里獲取的數據是一個 list,其中每個元素是 (img, label) 的元組,再使用 `collate_fn()`函數整理成一個 list,里面包含兩個元素,分別是 img 和 label 的`tenser`。
下圖是我們的訓練過程的 loss 曲線:

**參考資料**
如果你覺得這篇文章對你有幫助,不妨點個贊,讓我有更多動力寫出好文章。
我的文章會首發在公眾號上,歡迎掃碼關注我的公眾號張賢同學。
