問題引入
使用cnn進行食物分類,給出的圖片如上所示,前面的是分類,后面的是編號。
數據處理
首先讀入圖片,並進行縮放處理,使像素數相同。
def readfile(path, flag): """ :param path: 圖片所在文件夾位置 :param flag: 1:訓練集或驗證集 0:測試集 :return: 圖片數值化后的數據 """ image_dir = os.listdir(path) x = np.zeros((len(image_dir), 128, 128, 3), dtype=np.uint8) # 因為是圖片,所以這里設為uint8 y = np.zeros((len(image_dir))) # print(x.shape) # print(y.shape) for i, file in enumerate(image_dir): # 遍歷每一張圖片 img = cv2.imread(os.path.join(path, file)) # cv2.imread()返回多維數組,前兩維表示像素,后一維表示通道數 x[i, :, :, :] = cv2.resize(img, (128, 128)) # 因為每張圖片的大小不一樣,所以先統一大小,每張圖片的大小為(128,128,3) # cv2.imshow('new_image', x[i]) # cv2.waitKey(0) if flag: y[i] = file.split('_')[0] if flag: return x, y else: return x
PyTorch中的DataSet和DataLoader用來處理數據十分方便。
DataSet可以實現對數據的封裝,當我們繼承了DataSet類后,需要重寫len和getitem這兩個方法,len方法提供了dataset的大小, getitem方法支持索引從 0 到 len(self)的數據,這也是為什么需要len方法。
DataLoader通過getitem函數獲取單個的數據,然后組合成batch。
class ImgDataset(Dataset): def __init__(self, x, y=None, transform=None): self.x = x # label is required to be a LongTensor self.y = y if y is not None: self.y = torch.LongTensor(y) self.transform = transform def __len__(self): return len(self.x) def __getitem__(self, index): res_x = self.x[index] if self.transform is not None: res_x = self.transform(res_x) if self.y is not None: res_y = self.y[index] return res_x, res_y else: # 如果沒有標簽那么只返回x return res_x
在訓練之前,還可以做一些數據增強,並且需要把數據轉換成張量的形式。
train_transform = transforms.Compose([ transforms.ToPILImage(), # 增強數據 transforms.RandomHorizontalFlip(), # 隨即將圖片水平翻轉 transforms.RandomRotation(15), # 隨即旋轉圖片15度 transforms.ToTensor(), # 將圖片轉成Tensor ]) # testing 時不需做 data augmentation test_transform = transforms.Compose([ transforms.ToPILImage(), transforms.ToTensor(), ])
接下來就可以使用下面的語句來調用上面的定義:
train_set = ImgDataset(train_x, train_y, train_transform) val_set = ImgDataset(val_x, val_y, test_transform) train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
網絡構建
構建網絡的話需要繼承nn.Module,並且調用nn.Module的構造函數。
class Classifier(nn.Module): def __init__(self): super(Classifier, self).__init__() # 需要調用module的構造函數 # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding) # torch.nn.MaxPool2d(kernel_size, stride, padding) self.cnn = nn.Sequential( # 模型會依次執行Sequential中的函數 # 卷積層1 nn.Conv2d(3, 64, 3, 1, 1), # output: 64 * 128 * 128 nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 64 * 64 * 64 # 卷積層2 nn.Conv2d(64, 128, 3, 1, 1), # output: 128 * 64 * 64 nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 128 * 32 * 32 # 卷積層3 nn.Conv2d(128, 256, 3, 1, 1), # output: 256 * 32 * 32 nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 256 * 16 * 16 # 卷積層4 nn.Conv2d(256, 512, 3, 1, 1), # output: 512 * 16 * 16 nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 512 * 8 * 8 # 卷積層5 nn.Conv2d(512, 512, 3, 1, 1), # output: 512 * 8 * 8 nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 512 * 4 * 4 ) self.fc = nn.Sequential( nn.Linear(512 * 4 * 4, 1024), nn.ReLU(), nn.Linear(1024, 512), nn.ReLU(), nn.Linear(512, 11) ) def forward(self, x): out = self.cnn(x) out = out.view(out.size()[0], -1) return self.fc(out)
模型訓練
def training(train_loader, val_loader): model = Classifier().cuda() loss = nn.CrossEntropyLoss() # troch.nn中已經封裝好了各類損失函數 optimizer = torch.optim.Adam(model.parameters(), lr=0.001) num_epoch = 30 # 迭代次數 for epoch in range(num_epoch): epoch_start_time = time.time() train_acc = 0.0 train_loss = 0.0 val_acc = 0.0 val_loss = 0.0 model.train() # 保證BN層用每一批數據的均值和方差 for i, data in enumerate(train_loader): optimizer.zero_grad() # 清空之前的梯度 train_pred = model(data[0].cuda()) # data[0] = x, data[1] = y batch_loss = loss(train_pred, data[1].cuda()) batch_loss.backward() optimizer.step() # .data表示將Variable中的Tensor取出來 # train_pred是(50,11)的數據,np.argmax()返回最大值的索引,axis=1則是對行進行,返回的索引正好就對應了標簽,然后和y真實標簽比較,則可得到分類正確的數量 train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy()) train_loss += batch_loss.item() # 張量中只有一個值就可以使用item()方法讀取 model.eval() # 固定均值和方差,使用之前每一批訓練數據的均值和方差的平均值 with torch.no_grad(): # 進行驗證,不進行梯度跟蹤 for i, data in enumerate(val_loader): val_pred = model(data[0].cuda()) batch_loss = loss(val_pred, data[1].cuda()) val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy()) val_loss += batch_loss.item() print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \ (epoch + 1, num_epoch, time.time() - epoch_start_time, \ train_acc / train_set.__len__(), train_loss / train_set.__len__(), val_acc / val_set.__len__(), val_loss / val_set.__len__()))
之前一直迷惑為什么train_pred = model(data[0].cuda())沒提到forward函數卻可以正常運行?后來查詢資料明白:
因為nn.Module的__call__函數中調用了forward()函數,那么__call__作用是什么呢?它允許我們把一個實例當作對象一樣來調用,舉個簡單的例子吧:
class test(): def __call__(self): return 1 def forward(self): return 2 if __name__ == '__main__': t = test() print(t())
上面的代碼輸出結果為1。
接下來我們再看下nn.Module中的__call__,可以看到它調用了forward。
def __call__(self, *input, **kwargs): for hook in self._forward_pre_hooks.values(): result = hook(self, input) if result is not None: if not isinstance(result, tuple): result = (result,) input = result if torch._C._get_tracing_state(): result = self._slow_forward(*input, **kwargs) else: result = self.forward(*input, **kwargs) for hook in self._forward_hooks.values(): hook_result = hook(self, input, result) if hook_result is not None: result = hook_result if len(self._backward_hooks) > 0: var = result while not isinstance(var, torch.Tensor): if isinstance(var, dict): var = next((v for v in var.values() if isinstance(v, torch.Tensor))) else: var = var[0] grad_fn = var.grad_fn if grad_fn is not None: for hook in self._backward_hooks.values(): wrapper = functools.partial(hook, self) functools.update_wrapper(wrapper, hook) grad_fn.register_hook(wrapper) return result
最后訓練的結果如下所示,結果不是很好,驗證集上的正確率並不高。
數據預測
def predict(test_loader, model): model.eval() result = [] with torch.no_grad(): for i, data in enumerate(test_loader): test_pred = model(data.cuda()) test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1) for y in test_label: result.append(y) return result def writefile(result): f = open('result.csv', 'a') f.write('Id,Category\n') for i, res in enumerate(result): f.write('{},{}\n'.format(i, res)) f.close()
完整代碼
import os import torch import cv2 import time import numpy as np import torch.nn as nn from torchvision import transforms from torch.utils.data import DataLoader, Dataset train_transform = transforms.Compose([ transforms.ToPILImage(), # 增強數據 transforms.RandomHorizontalFlip(), # 隨即將圖片水平翻轉 transforms.RandomRotation(15), # 隨即旋轉圖片15度 transforms.ToTensor(), # 將圖片轉成Tensor ]) # testing 時不需做 data augmentation test_transform = transforms.Compose([ transforms.ToPILImage(), transforms.ToTensor(), ]) class Classifier(nn.Module): def __init__(self): super(Classifier, self).__init__() # 需要調用module的構造函數 # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding) # torch.nn.MaxPool2d(kernel_size, stride, padding) self.cnn = nn.Sequential( # 模型會依次執行Sequential中的函數 # 卷積層1 nn.Conv2d(3, 64, 3, 1, 1), # output: 64 * 128 * 128 nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 64 * 64 * 64 # 卷積層2 nn.Conv2d(64, 128, 3, 1, 1), # output: 128 * 64 * 64 nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 128 * 32 * 32 # 卷積層3 nn.Conv2d(128, 256, 3, 1, 1), # output: 256 * 32 * 32 nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 256 * 16 * 16 # 卷積層4 nn.Conv2d(256, 512, 3, 1, 1), # output: 512 * 16 * 16 nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 512 * 8 * 8 # 卷積層5 nn.Conv2d(512, 512, 3, 1, 1), # output: 512 * 8 * 8 nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2, 2, 0), # output: 512 * 4 * 4 ) self.fc = nn.Sequential( nn.Linear(512 * 4 * 4, 1024), nn.ReLU(), nn.Linear(1024, 512), nn.ReLU(), nn.Linear(512, 11) ) def forward(self, x): out = self.cnn(x) out = out.view(out.size()[0], -1) return self.fc(out) class ImgDataset(Dataset): def __init__(self, x, y=None, transform=None): self.x = x # label is required to be a LongTensor self.y = y if y is not None: self.y = torch.LongTensor(y) self.transform = transform def __len__(self): return len(self.x) def __getitem__(self, index): res_x = self.x[index] if self.transform is not None: res_x = self.transform(res_x) if self.y is not None: res_y = self.y[index] return res_x, res_y else: # 如果沒有標簽那么只返回x return res_x def readfile(path, flag): """ :param path: 圖片所在文件夾位置 :param flag: 1:訓練集或驗證集 0:測試集 :return: 圖片數值化后的數據 """ image_dir = os.listdir(path) x = np.zeros((len(image_dir), 128, 128, 3), dtype=np.uint8) # 因為是圖片,所以這里設為uint8 y = np.zeros((len(image_dir))) # print(x.shape) # print(y.shape) for i, file in enumerate(image_dir): # 遍歷每一張圖片 img = cv2.imread(os.path.join(path, file)) # cv2.imread()返回多維數組,前兩維表示像素,后一維表示通道數 x[i, :, :, :] = cv2.resize(img, (128, 128)) # 因為每張圖片的大小不一樣,所以先統一大小,每張圖片的大小為(128,128,3) # cv2.imshow('new_image', x[i]) # cv2.waitKey(0) if flag: y[i] = file.split('_')[0] if flag: return x, y else: return x def training(train_loader, val_loader): model = Classifier().cuda() loss = nn.CrossEntropyLoss() # troch.nn中已經封裝好了各類損失函數 optimizer = torch.optim.Adam(model.parameters(), lr=0.001) num_epoch = 30 # 迭代次數 for epoch in range(num_epoch): epoch_start_time = time.time() train_acc = 0.0 train_loss = 0.0 val_acc = 0.0 val_loss = 0.0 model.train() # 保證BN層用每一批數據的均值和方差 for i, data in enumerate(train_loader): optimizer.zero_grad() # 清空之前的梯度 train_pred = model(data[0].cuda()) # data[0] = x, data[1] = y batch_loss = loss(train_pred, data[1].cuda()) batch_loss.backward() optimizer.step() # .data表示將Variable中的Tensor取出來 # train_pred是(50,11)的數據,np.argmax()返回最大值的索引,axis=1則是對行進行,返回的索引正好就對應了標簽,然后和y真實標簽比較,則可得到分類正確的數量 train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy()) train_loss += batch_loss.item() # 張量中只有一個值就可以使用item()方法讀取 model.eval() # 固定均值和方差,使用之前每一批訓練數據的均值和方差的平均值 with torch.no_grad(): # 進行驗證,不進行梯度跟蹤 for i, data in enumerate(val_loader): val_pred = model(data[0].cuda()) batch_loss = loss(val_pred, data[1].cuda()) val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy()) val_loss += batch_loss.item() print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \ (epoch + 1, num_epoch, time.time() - epoch_start_time, \ train_acc / train_set.__len__(), train_loss / train_set.__len__(), val_acc / val_set.__len__(), val_loss / val_set.__len__())) return model def predict(test_loader, model): model.eval() result = [] with torch.no_grad(): for i, data in enumerate(test_loader): test_pred = model(data.cuda()) test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1) for y in test_label: result.append(y) return result def writefile(result): f = open('result.csv', 'a') f.write('Id,Category\n') for i, res in enumerate(result): f.write('{},{}\n'.format(i, res)) f.close() if __name__ == '__main__': train_x, train_y = readfile('./data/food-11/food-11/training', True) val_x, val_y = readfile('./data/food-11/food-11/validation', True) test_x = readfile('./data/food-11/food-11/testing', False) batch_size = 50 train_set = ImgDataset(train_x, train_y, train_transform) val_set = ImgDataset(val_x, val_y, test_transform) test_set = ImgDataset(x=test_x, transform=test_transform) train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False) model = training(train_loader, val_loader) result = predict(test_loader, model) writefile(result)
參考:
[1].李宏毅機器學習-第三課作業