李宏毅機器學習HW3(CNN)


問題引入

 

使用cnn進行食物分類,給出的圖片如上所示,前面的是分類,后面的是編號。

 

數據處理

首先讀入圖片,並進行縮放處理,使像素數相同。

def readfile(path, flag):
    """
    :param path: 圖片所在文件夾位置
    :param flag: 1:訓練集或驗證集   0:測試集
    :return: 圖片數值化后的數據
    """
    image_dir = os.listdir(path)
    x = np.zeros((len(image_dir), 128, 128, 3), dtype=np.uint8)  # 因為是圖片,所以這里設為uint8
    y = np.zeros((len(image_dir)))
    # print(x.shape)
    # print(y.shape)
    for i, file in enumerate(image_dir):             # 遍歷每一張圖片
        img = cv2.imread(os.path.join(path, file))   # cv2.imread()返回多維數組,前兩維表示像素,后一維表示通道數
        x[i, :, :, :] = cv2.resize(img, (128, 128))  # 因為每張圖片的大小不一樣,所以先統一大小,每張圖片的大小為(128,128,3)
        # cv2.imshow('new_image', x[i])
        # cv2.waitKey(0)
        if flag:
            y[i] = file.split('_')[0]

    if flag:
        return x, y
    else:
        return x

 

PyTorch中的DataSet和DataLoader用來處理數據十分方便。

DataSet可以實現對數據的封裝,當我們繼承了DataSet類后,需要重寫lengetitem這兩個方法,len方法提供了dataset的大小, getitem方法支持索引從 0 到 len(self)的數據,這也是為什么需要len方法。

DataLoader通過getitem函數獲取單個的數據,然后組合成batch。

class ImgDataset(Dataset):
    def __init__(self, x, y=None, transform=None):
        self.x = x
        # label is required to be a LongTensor
        self.y = y
        if y is not None:
            self.y = torch.LongTensor(y)
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        res_x = self.x[index]
        if self.transform is not None:
            res_x = self.transform(res_x)
        if self.y is not None:
            res_y = self.y[index]
            return res_x, res_y
        else:  # 如果沒有標簽那么只返回x
            return res_x

在訓練之前,還可以做一些數據增強,並且需要把數據轉換成張量的形式。

train_transform = transforms.Compose([
    transforms.ToPILImage(),
    # 增強數據
    transforms.RandomHorizontalFlip(),  # 隨即將圖片水平翻轉
    transforms.RandomRotation(15),      # 隨即旋轉圖片15度
    transforms.ToTensor(),              # 將圖片轉成Tensor
])
# testing 時不需做 data augmentation
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
])

接下來就可以使用下面的語句來調用上面的定義:

train_set = ImgDataset(train_x, train_y, train_transform)
val_set = ImgDataset(val_x, val_y, test_transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

 

網絡構建

構建網絡的話需要繼承nn.Module,並且調用nn.Module的構造函數。

class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()    # 需要調用module的構造函數

        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        self.cnn = nn.Sequential(           # 模型會依次執行Sequential中的函數
            # 卷積層1
            nn.Conv2d(3, 64, 3, 1, 1),      # output: 64 * 128 * 128
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 64 * 64 * 64

            # 卷積層2
            nn.Conv2d(64, 128, 3, 1, 1),    # output: 128 * 64 * 64
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 128 * 32 * 32

            # 卷積層3
            nn.Conv2d(128, 256, 3, 1, 1),   # output: 256 * 32 * 32
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 256 * 16 * 16

            # 卷積層4
            nn.Conv2d(256, 512, 3, 1, 1),   # output: 512 * 16 * 16
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 512 * 8 * 8

            # 卷積層5
            nn.Conv2d(512, 512, 3, 1, 1),   # output: 512 * 8 * 8
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 512 * 4 * 4
        )

        self.fc = nn.Sequential(
            nn.Linear(512 * 4 * 4, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 11)
        )

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        return self.fc(out)

 

模型訓練

def training(train_loader, val_loader):
    model = Classifier().cuda()
    loss = nn.CrossEntropyLoss()            # troch.nn中已經封裝好了各類損失函數
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    num_epoch = 30                          # 迭代次數

    for epoch in range(num_epoch):
        epoch_start_time = time.time()
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0

        model.train()  # 保證BN層用每一批數據的均值和方差
        for i, data in enumerate(train_loader):
            optimizer.zero_grad()                                 # 清空之前的梯度
            train_pred = model(data[0].cuda())                    # data[0] = x, data[1] = y
            batch_loss = loss(train_pred, data[1].cuda())
            batch_loss.backward()
            optimizer.step()
            # .data表示將Variable中的Tensor取出來
            # train_pred是(50,11)的數據,np.argmax()返回最大值的索引,axis=1則是對行進行,返回的索引正好就對應了標簽,然后和y真實標簽比較,則可得到分類正確的數量
            train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            train_loss += batch_loss.item()            # 張量中只有一個值就可以使用item()方法讀取

        model.eval()  # 固定均值和方差,使用之前每一批訓練數據的均值和方差的平均值
        with torch.no_grad():      # 進行驗證,不進行梯度跟蹤
            for i, data in enumerate(val_loader):
                val_pred = model(data[0].cuda())
                batch_loss = loss(val_pred, data[1].cuda())

                val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
                val_loss += batch_loss.item()

        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
              (epoch + 1, num_epoch, time.time() - epoch_start_time, \
               train_acc / train_set.__len__(), train_loss / train_set.__len__(), val_acc / val_set.__len__(),
               val_loss / val_set.__len__()))

 

之前一直迷惑為什么train_pred = model(data[0].cuda())沒提到forward函數卻可以正常運行?后來查詢資料明白:

因為nn.Module的__call__函數中調用了forward()函數,那么__call__作用是什么呢?它允許我們把一個實例當作對象一樣來調用,舉個簡單的例子吧:

class test():
    def __call__(self):
        return 1

    def forward(self):
        return 2


if __name__ == '__main__':
    t = test()
    print(t())

上面的代碼輸出結果為1。

接下來我們再看下nn.Module中的__call__,可以看到它調用了forward。

def __call__(self, *input, **kwargs):
    for hook in self._forward_pre_hooks.values():
        result = hook(self, input)
        if result is not None:
            if not isinstance(result, tuple):
                result = (result,)
            input = result
    if torch._C._get_tracing_state():
        result = self._slow_forward(*input, **kwargs)
    else:
        result = self.forward(*input, **kwargs)
    for hook in self._forward_hooks.values():
        hook_result = hook(self, input, result)
        if hook_result is not None:
            result = hook_result
    if len(self._backward_hooks) > 0:
        var = result
        while not isinstance(var, torch.Tensor):
            if isinstance(var, dict):
                var = next((v for v in var.values() if isinstance(v, torch.Tensor)))
            else:
                var = var[0]
        grad_fn = var.grad_fn
        if grad_fn is not None:
            for hook in self._backward_hooks.values():
                wrapper = functools.partial(hook, self)
                functools.update_wrapper(wrapper, hook)
                grad_fn.register_hook(wrapper)
    return result

 

最后訓練的結果如下所示,結果不是很好,驗證集上的正確率並不高。

 

數據預測

def predict(test_loader, model):
    model.eval()
    result = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            test_pred = model(data.cuda())
            test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
            for y in test_label:
                result.append(y)
    return result


def writefile(result):
    f = open('result.csv', 'a')
    f.write('Id,Category\n')
    for i, res in enumerate(result):
        f.write('{},{}\n'.format(i, res))
    f.close()

 

完整代碼

import os
import torch
import cv2
import time
import numpy as np
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset


train_transform = transforms.Compose([
    transforms.ToPILImage(),
    # 增強數據
    transforms.RandomHorizontalFlip(),  # 隨即將圖片水平翻轉
    transforms.RandomRotation(15),      # 隨即旋轉圖片15度
    transforms.ToTensor(),              # 將圖片轉成Tensor
])
# testing 時不需做 data augmentation
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
])


class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()    # 需要調用module的構造函數

        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        self.cnn = nn.Sequential(           # 模型會依次執行Sequential中的函數
            # 卷積層1
            nn.Conv2d(3, 64, 3, 1, 1),      # output: 64 * 128 * 128
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 64 * 64 * 64

            # 卷積層2
            nn.Conv2d(64, 128, 3, 1, 1),    # output: 128 * 64 * 64
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 128 * 32 * 32

            # 卷積層3
            nn.Conv2d(128, 256, 3, 1, 1),   # output: 256 * 32 * 32
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 256 * 16 * 16

            # 卷積層4
            nn.Conv2d(256, 512, 3, 1, 1),   # output: 512 * 16 * 16
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 512 * 8 * 8

            # 卷積層5
            nn.Conv2d(512, 512, 3, 1, 1),   # output: 512 * 8 * 8
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),          # output: 512 * 4 * 4
        )

        self.fc = nn.Sequential(
            nn.Linear(512 * 4 * 4, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 11)
        )

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        return self.fc(out)


class ImgDataset(Dataset):
    def __init__(self, x, y=None, transform=None):
        self.x = x
        # label is required to be a LongTensor
        self.y = y
        if y is not None:
            self.y = torch.LongTensor(y)
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        res_x = self.x[index]
        if self.transform is not None:
            res_x = self.transform(res_x)
        if self.y is not None:
            res_y = self.y[index]
            return res_x, res_y
        else:  # 如果沒有標簽那么只返回x
            return res_x


def readfile(path, flag):
    """
    :param path: 圖片所在文件夾位置
    :param flag: 1:訓練集或驗證集   0:測試集
    :return: 圖片數值化后的數據
    """
    image_dir = os.listdir(path)
    x = np.zeros((len(image_dir), 128, 128, 3), dtype=np.uint8)  # 因為是圖片,所以這里設為uint8
    y = np.zeros((len(image_dir)))
    # print(x.shape)
    # print(y.shape)
    for i, file in enumerate(image_dir):             # 遍歷每一張圖片
        img = cv2.imread(os.path.join(path, file))   # cv2.imread()返回多維數組,前兩維表示像素,后一維表示通道數
        x[i, :, :, :] = cv2.resize(img, (128, 128))  # 因為每張圖片的大小不一樣,所以先統一大小,每張圖片的大小為(128,128,3)
        # cv2.imshow('new_image', x[i])
        # cv2.waitKey(0)
        if flag:
            y[i] = file.split('_')[0]

    if flag:
        return x, y
    else:
        return x


def training(train_loader, val_loader):
    model = Classifier().cuda()
    loss = nn.CrossEntropyLoss()            # troch.nn中已經封裝好了各類損失函數
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    num_epoch = 30                          # 迭代次數

    for epoch in range(num_epoch):
        epoch_start_time = time.time()
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0

        model.train()  # 保證BN層用每一批數據的均值和方差
        for i, data in enumerate(train_loader):
            optimizer.zero_grad()                                 # 清空之前的梯度
            train_pred = model(data[0].cuda())                    # data[0] = x, data[1] = y
            batch_loss = loss(train_pred, data[1].cuda())
            batch_loss.backward()
            optimizer.step()
            # .data表示將Variable中的Tensor取出來
            # train_pred是(50,11)的數據,np.argmax()返回最大值的索引,axis=1則是對行進行,返回的索引正好就對應了標簽,然后和y真實標簽比較,則可得到分類正確的數量
            train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            train_loss += batch_loss.item()            # 張量中只有一個值就可以使用item()方法讀取

        model.eval()  # 固定均值和方差,使用之前每一批訓練數據的均值和方差的平均值
        with torch.no_grad():      # 進行驗證,不進行梯度跟蹤
            for i, data in enumerate(val_loader):
                val_pred = model(data[0].cuda())
                batch_loss = loss(val_pred, data[1].cuda())

                val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
                val_loss += batch_loss.item()

        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
              (epoch + 1, num_epoch, time.time() - epoch_start_time, \
               train_acc / train_set.__len__(), train_loss / train_set.__len__(), val_acc / val_set.__len__(),
               val_loss / val_set.__len__()))
    return model


def predict(test_loader, model):
    model.eval()
    result = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            test_pred = model(data.cuda())
            test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
            for y in test_label:
                result.append(y)
    return result


def writefile(result):
    f = open('result.csv', 'a')
    f.write('Id,Category\n')
    for i, res in enumerate(result):
        f.write('{},{}\n'.format(i, res))
    f.close()


if __name__ == '__main__':
    train_x, train_y = readfile('./data/food-11/food-11/training', True)
    val_x, val_y = readfile('./data/food-11/food-11/validation', True)
    test_x = readfile('./data/food-11/food-11/testing', False)
    batch_size = 50
    train_set = ImgDataset(train_x, train_y, train_transform)
    val_set = ImgDataset(val_x, val_y, test_transform)
    test_set = ImgDataset(x=test_x, transform=test_transform)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
    model = training(train_loader, val_loader)
    result = predict(test_loader, model)
    writefile(result)

  

 

 

參考:

[1].李宏毅機器學習-第三課作業


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM