用numpy實現CNN卷積神經網絡


為了加深對卷積神經網絡底層原理的理解,本文通過使用numpy來搭建一個基礎的包含卷積層、池化層、全連接層和Softmax層的卷積神經網絡,並選擇relu作為我們的激活函數,選擇多分類交叉熵損失函數,最后使用了mnist數據集進行了訓練和測試。

關於卷積網絡的詳細原理和實現可參考下列文章:

劉建平Pinard:卷積網絡前向反向傳播算法

卷積層的反向傳播

手把手帶你 Numpy實現CNN

1、卷積層

卷積層的前向傳播輸出由卷積核和特征圖作卷積運算得到,反向傳播時需要計算kernel和bias的梯度以及delta的反向傳播誤差,kernel的梯度由原特征圖和delta作卷積得到,bias每個通道的梯度由對delta每個通道直接求和得到,delta的反向傳播誤差由delta和旋轉180度的卷積核作卷積運算得到。其中卷積運算在實現時先將特征圖的對應部分和卷積核展開成了向量的形式,再作向量乘法運算,這樣可以通過並行運算加快速度,實現代碼如下:

def img2col(x, ksize, stride):
    wx, hx, cx = x.shape                     # [width,height,channel]
    feature_w = (wx - ksize) // stride + 1   # 返回的特征圖尺寸       
    image_col = np.zeros((feature_w*feature_w, ksize*ksize*cx))
    num = 0
    for i in range(feature_w):
        for j in range(feature_w): 
            image_col[num] =  x[i*stride:i*stride+ksize, j*stride:j*stride+ksize, :].reshape(-1)
            num += 1
    return image_col
    
class Conv(object):
    def __init__(self, kernel_shape, stride=1, pad=0):
        width, height, in_channel, out_channel = kernel_shape
        self.stride = stride
        self.pad = pad
        scale = np.sqrt(3*in_channel*width*height/out_channel)   #batch=3
        self.k = np.random.standard_normal(kernel_shape) / scale
        self.b = np.random.standard_normal(out_channel) / scale
        self.k_gradient = np.zeros(kernel_shape)
        self.b_gradient = np.zeros(out_channel)

    def forward(self, x):
        self.x = x
        if self.pad != 0:
            self.x = np.pad(self.x, ((0,0),(self.pad,self.pad),(self.pad,self.pad),(0,0)), 'constant')
        bx, wx, hx, cx = self.x.shape
        wk, hk, ck, nk = self.k.shape             # kernel的寬、高、通道數、個數
        feature_w = (wx - wk) // self.stride + 1  # 返回的特征圖尺寸
        feature = np.zeros((bx, feature_w, feature_w, nk))
                       
        self.image_col = []
        kernel = self.k.reshape(-1, nk)
        for i in range(bx):
            image_col = img2col(self.x[i], wk, self.stride)                       
            feature[i] = (np.dot(image_col, kernel)+self.b).reshape(feature_w,feature_w,nk)
            self.image_col.append(image_col)
        return feature

    def backward(self, delta, learning_rate):
        bx, wx, hx, cx = self.x.shape # batch,14,14,inchannel
        wk, hk, ck, nk = self.k.shape # 5,5,inChannel,outChannel
        bd, wd, hd, cd = delta.shape  # batch,10,10,outChannel

        # 計算self.k_gradient,self.b_gradient
        delta_col = delta.reshape(bd, -1, cd)
        for i in range(bx):
            self.k_gradient += np.dot(self.image_col[i].T, delta_col[i]).reshape(self.k.shape)
        self.k_gradient /= bx
        self.b_gradient += np.sum(delta_col, axis=(0, 1))
        self.b_gradient /= bx    

        # 計算delta_backward
        delta_backward = np.zeros(self.x.shape)
        k_180 = np.rot90(self.k, 2, (0,1))      # numpy矩陣旋轉180度
        k_180 = k_180.swapaxes(2, 3)
        k_180_col = k_180.reshape(-1,ck)

        if hd-hk+1 != hx:
            pad = (hx-hd+hk-1) // 2
            pad_delta = np.pad(delta, ((0,0),(pad,pad),(pad,pad),(0,0)), 'constant')
        else:
            pad_delta = delta

        for i in range(bx):
            pad_delta_col = img2col(pad_delta[i], wk, self.stride)
            delta_backward[i] = np.dot(pad_delta_col, k_180_col).reshape(wx,hx,ck)

        # 反向傳播
        self.k -=  self.k_gradient * learning_rate
        self.b -=  self.b_gradient * learning_rate

        return delta_backward

在這里順便給出relu的實現代碼:

class Relu(object):        
    def forward(self, x):
        self.x = x
        return np.maximum(x, 0)
    
    def backward(self, delta):
        delta[self.x<0] = 0
        return delta

2、池化層

池化層實現了ksize=2、stride=2的最大池化,前向傳播時取對應核的最大值作為輸出,並記錄最大值的位置,反向傳播時先將特征圖按原值擴充一次,再將非最大值位置置0即可。

class Pool(object):
    def forward(self, x):
        b, w, h, c = x.shape
        feature_w = w // 2
        feature = np.zeros((b, feature_w, feature_w, c))
        self.feature_mask = np.zeros((b, w, h, c))   # 記錄最大池化時最大值的位置信息用於反向傳播
        for bi in range(b):
            for ci in range(c):
                for i in range(feature_w):
                    for j in range(feature_w):
                        feature[bi, i, j, ci] = np.max(x[bi,i*2:i*2+2,j*2:j*2+2,ci])
                        index = np.argmax(x[bi,i*2:i*2+2,j*2:j*2+2,ci])
                        self.feature_mask[bi, i*2+index//2, j*2+index%2, ci] = 1                    
        return feature

    def backward(self, delta):
        return np.repeat(np.repeat(delta, 2, axis=1), 2, axis=2) * self.feature_mask

3、全連接層

全連接層的實現前文已經給出,這里給出了封裝成單獨的類后的形式,增強了復用性:

class Linear(object):
    def __init__(self, inChannel, outChannel):
        scale = np.sqrt(inChannel/2)
        self.W = np.random.standard_normal((inChannel, outChannel)) / scale
        self.b = np.random.standard_normal(outChannel) / scale
        self.W_gradient = np.zeros((inChannel, outChannel))
        self.b_gradient = np.zeros(outChannel)

    def forward(self, x):
        self.x = x
        x_forward = np.dot(self.x, self.W) + self.b
        return x_forward

    def backward(self, delta, learning_rate):
        ## 梯度計算
        batch_size = self.x.shape[0]
        self.W_gradient = np.dot(self.x.T, delta) / batch_size  # bxin bxout
        self.b_gradient = np.sum(delta, axis=0) / batch_size 
        delta_backward = np.dot(delta, self.W.T)                # bxout inxout
        ## 反向傳播
        self.W -= self.W_gradient * learning_rate
        self.b -= self.b_gradient * learning_rate 

        return delta_backward

4、Softmax層

一般分類模型在全連接層給出每個類別的預測值后會再經過softmax層來得到最終的預測值,其前向傳播公式如下:

\[a_j=\frac{e^{z_j}}{\sum\limits_k{e^{z_k}}} \]

在將標簽onehot編碼后,反向傳播公式可給出向量形式如下:

\[\delta=a-y \]

對單個樣本,其多分類交叉熵loss計算公式給出向量形式如下:

\[loss=-\sum y\log a \]

最后給出代碼實現:

class Softmax(object):
    def cal_loss(self, predict, label):
        batchsize, classes = predict.shape
        self.predict(predict)
        loss = 0
        delta = np.zeros(predict.shape)
        for i in range(batchsize):
            delta[i] = self.softmax[i] - label[i]
            loss -= np.sum(np.log(self.softmax[i]) * label[i])
        loss /= batchsize
        return loss, delta

    def predict(self, predict):
        batchsize, classes = predict.shape
        self.softmax = np.zeros(predict.shape)
        for i in range(batchsize):
            predict_tmp = predict[i] - np.max(predict[i])
            predict_tmp = np.exp(predict_tmp)
            self.softmax[i] = predict_tmp / np.sum(predict_tmp)
        return self.softmax

5、訓練和測試

訓練和測試是直接使用的torchvision集成的mnist數據集,訓練后將權重參數通過numpy提供的接口保存到本地文件中,測試時再從文件中讀取權重參數,在只訓練了兩個epoch的情況下測試集的准確率達到了98.05%,相比使用全連接的神經網絡提高了不少。訓練和測試的代碼如下:

def train():
    # Mnist手寫數字集
    dataset_path = "D://datasets//mnist"
    train_data = torchvision.datasets.MNIST(root=dataset_path, train=True, download=True)
    train_data.data = train_data.data.numpy()  # [60000,28,28]
    train_data.targets = train_data.targets.numpy()  # [60000]
    train_data.data = train_data.data.reshape(60000, 28, 28, 1) / 255.   # 輸入向量處理
    train_data.targets = onehot(train_data.targets, 60000) # 標簽one-hot處理 (60000, 10) 
    
    conv1 = Conv(kernel_shape=(5,5,1,6))   # 24x24x6
    relu1 = Relu()
    pool1 = Pool()                         # 12x12x6
    conv2 = Conv(kernel_shape=(5,5,6,16))  # 8x8x16
    relu2 = Relu()
    pool2 = Pool()                         # 4x4x16
    nn = Linear(256, 10)
    softmax = Softmax()
   
    lr = 0.01
    batch = 3
    for epoch in range(10):        
        for i in range(0, 60000, batch):
            X = train_data.data[i:i+batch]
            Y = train_data.targets[i:i+batch]

            predict = conv1.forward(X)
            predict = relu1.forward(predict)
            predict = pool1.forward(predict)
            predict = conv2.forward(predict)
            predict = relu2.forward(predict)
            predict = pool2.forward(predict)
            predict = predict.reshape(batch, -1)
            predict = nn.forward(predict)

            loss, delta = softmax.cal_loss(predict, Y)

            delta = nn.backward(delta, lr)
            delta = delta.reshape(batch,4,4,16)
            delta = pool2.backward(delta)
            delta = relu2.backward(delta)
            delta = conv2.backward(delta, lr)
            delta = pool1.backward(delta)
            delta = relu1.backward(delta)
            conv1.backward(delta, lr)

            print("Epoch-{}-{:05d}".format(str(epoch), i), ":", "loss:{:.4f}".format(loss))

        lr *= 0.95**(epoch+1)
        np.savez("data2.npz", k1=conv1.k, b1=conv1.b, k2=conv2.k, b2=conv2.b, w3=nn.W, b3=nn.b)

def eval():
    r = np.load("data2.npz")

    # Mnist手寫數字集
    dataset_path = "D://datasets//mnist"
    test_data = torchvision.datasets.MNIST(root=dataset_path, train=False)
    test_data.data = test_data.data.numpy()        # [10000,28,28]
    test_data.targets = test_data.targets.numpy()  # [10000]

    test_data.data = test_data.data.reshape(10000, 28, 28, 1) / 255.

    conv1 = Conv(kernel_shape=(5, 5, 1, 6))  # 24x24x6
    relu1 = Relu()
    pool1 = Pool()  # 12x12x6
    conv2 = Conv(kernel_shape=(5, 5, 6, 16))  # 8x8x16
    relu2 = Relu()
    pool2 = Pool()  # 4x4x16
    nn = Linear(256, 10)
    softmax = Softmax()

    conv1.k = r["k1"]
    conv1.b = r["b1"]
    conv2.k = r["k2"]
    conv2.b = r["b2"]
    nn.W = r["w3"]
    nn.n = r["b3"]

    num = 0
    for i in range(10000):
        X = test_data.data[i]
        X = X[np.newaxis, :]
        Y = test_data.targets[i]

        predict = conv1.forward(X)
        predict = relu1.forward(predict)
        predict = pool1.forward(predict)
        predict = conv2.forward(predict)
        predict = relu2.forward(predict)
        predict = pool2.forward(predict)
        predict = predict.reshape(1, -1)
        predict = nn.forward(predict)

        predict = softmax.predict(predict)

        if np.argmax(predict) == Y:
            num += 1

    print("TEST-ACC: ", num/10000*100, "%")

6、完整代碼

import numpy as np
import torchvision
import time, functools
import logging
np.set_printoptions(threshold=np.inf)  


def onehot(targets, num):
    result = np.zeros((num, 10))
    for i in range(num):
        result[i][targets[i]] = 1
    return result


def img2col(x, ksize, stride):
    wx, hx, cx = x.shape                     # [width,height,channel]
    feature_w = (wx - ksize) // stride + 1   # 返回的特征圖尺寸       
    image_col = np.zeros((feature_w*feature_w, ksize*ksize*cx))
    num = 0
    for i in range(feature_w):
        for j in range(feature_w): 
            image_col[num] =  x[i*stride:i*stride+ksize, j*stride:j*stride+ksize, :].reshape(-1)
            num += 1
    return image_col
    

## nn
class Linear(object):
    def __init__(self, inChannel, outChannel):
        scale = np.sqrt(inChannel/2)
        self.W = np.random.standard_normal((inChannel, outChannel)) / scale
        self.b = np.random.standard_normal(outChannel) / scale
        self.W_gradient = np.zeros((inChannel, outChannel))
        self.b_gradient = np.zeros(outChannel)

    def forward(self, x):
        self.x = x
        x_forward = np.dot(self.x, self.W) + self.b
        return x_forward

    def backward(self, delta, learning_rate):
        ## 梯度計算
        batch_size = self.x.shape[0]
        self.W_gradient = np.dot(self.x.T, delta) / batch_size  # bxin bxout
        self.b_gradient = np.sum(delta, axis=0) / batch_size 
        delta_backward = np.dot(delta, self.W.T)                # bxout inxout
        ## 反向傳播
        self.W -= self.W_gradient * learning_rate
        self.b -= self.b_gradient * learning_rate 

        return delta_backward

## conv
class Conv(object):
    def __init__(self, kernel_shape, stride=1, pad=0):
        width, height, in_channel, out_channel = kernel_shape
        self.stride = stride
        self.pad = pad
        scale = np.sqrt(3*in_channel*width*height/out_channel)
        self.k = np.random.standard_normal(kernel_shape) / scale
        self.b = np.random.standard_normal(out_channel) / scale
        self.k_gradient = np.zeros(kernel_shape)
        self.b_gradient = np.zeros(out_channel)

    def forward(self, x):
        self.x = x
        if self.pad != 0:
            self.x = np.pad(self.x, ((0,0),(self.pad,self.pad),(self.pad,self.pad),(0,0)), 'constant')
        bx, wx, hx, cx = self.x.shape
        wk, hk, ck, nk = self.k.shape             # kernel的寬、高、通道數、個數
        feature_w = (wx - wk) // self.stride + 1  # 返回的特征圖尺寸
        feature = np.zeros((bx, feature_w, feature_w, nk))
                       
        self.image_col = []
        kernel = self.k.reshape(-1, nk)
        for i in range(bx):
            image_col = img2col(self.x[i], wk, self.stride)                       
            feature[i] = (np.dot(image_col, kernel)+self.b).reshape(feature_w,feature_w,nk)
            self.image_col.append(image_col)
        return feature

    def backward(self, delta, learning_rate):
        bx, wx, hx, cx = self.x.shape # batch,14,14,inchannel
        wk, hk, ck, nk = self.k.shape # 5,5,inChannel,outChannel
        bd, wd, hd, cd = delta.shape  # batch,10,10,outChannel

        # 計算self.k_gradient,self.b_gradient
        delta_col = delta.reshape(bd, -1, cd)
        for i in range(bx):
            self.k_gradient += np.dot(self.image_col[i].T, delta_col[i]).reshape(self.k.shape)
        self.k_gradient /= bx
        self.b_gradient += np.sum(delta_col, axis=(0, 1))
        self.b_gradient /= bx    

        # 計算delta_backward
        delta_backward = np.zeros(self.x.shape)
        k_180 = np.rot90(self.k, 2, (0,1))      # numpy矩陣旋轉180度
        k_180 = k_180.swapaxes(2, 3)
        k_180_col = k_180.reshape(-1,ck)

        if hd-hk+1 != hx:
            pad = (hx-hd+hk-1) // 2
            pad_delta = np.pad(delta, ((0,0),(pad,pad),(pad,pad),(0,0)), 'constant')
        else:
            pad_delta = delta

        for i in range(bx):
            pad_delta_col = img2col(pad_delta[i], wk, self.stride)
            delta_backward[i] = np.dot(pad_delta_col, k_180_col).reshape(wx,hx,ck)

        # 反向傳播
        self.k -=  self.k_gradient * learning_rate
        self.b -=  self.b_gradient * learning_rate

        return delta_backward

## pool
class Pool(object):
    def forward(self, x):
        b, w, h, c = x.shape
        feature_w = w // 2
        feature = np.zeros((b, feature_w, feature_w, c))
        self.feature_mask = np.zeros((b, w, h, c))   # 記錄最大池化時最大值的位置信息用於反向傳播
        for bi in range(b):
            for ci in range(c):
                for i in range(feature_w):
                    for j in range(feature_w):
                        feature[bi, i, j, ci] = np.max(x[bi,i*2:i*2+2,j*2:j*2+2,ci])
                        index = np.argmax(x[bi,i*2:i*2+2,j*2:j*2+2,ci])
                        self.feature_mask[bi, i*2+index//2, j*2+index%2, ci] = 1                    
        return feature

    def backward(self, delta):
        return np.repeat(np.repeat(delta, 2, axis=1), 2, axis=2) * self.feature_mask

## Relu
class Relu(object):        
    def forward(self, x):
        self.x = x
        return np.maximum(x, 0)
    
    def backward(self, delta):
        delta[self.x<0] = 0
        return delta

## Softmax
class Softmax(object):
    def cal_loss(self, predict, label):
        batchsize, classes = predict.shape
        self.predict(predict)
        loss = 0
        delta = np.zeros(predict.shape)
        for i in range(batchsize):
            delta[i] = self.softmax[i] - label[i]
            loss -= np.sum(np.log(self.softmax[i]) * label[i])
        loss /= batchsize
        return loss, delta

    def predict(self, predict):
        batchsize, classes = predict.shape
        self.softmax = np.zeros(predict.shape)
        for i in range(batchsize):
            predict_tmp = predict[i] - np.max(predict[i])
            predict_tmp = np.exp(predict_tmp)
            self.softmax[i] = predict_tmp / np.sum(predict_tmp)
        return self.softmax

def train():
    # Mnist手寫數字集
    dataset_path = "D://datasets//mnist"
    train_data = torchvision.datasets.MNIST(root=dataset_path, train=True, download=True)
    train_data.data = train_data.data.numpy()  # [60000,28,28]
    train_data.targets = train_data.targets.numpy()  # [60000]
    train_data.data = train_data.data.reshape(60000, 28, 28, 1) / 255.   # 輸入向量處理
    train_data.targets = onehot(train_data.targets, 60000) # 標簽one-hot處理 (60000, 10) 
    
    conv1 = Conv(kernel_shape=(5,5,1,6))   # 24x24x6
    relu1 = Relu()
    pool1 = Pool()                         # 12x12x6
    conv2 = Conv(kernel_shape=(5,5,6,16))  # 8x8x16
    relu2 = Relu()
    pool2 = Pool()                         # 4x4x16
    nn = Linear(256, 10)
    softmax = Softmax()
   
    lr = 0.01
    batch = 3
    for epoch in range(10):        
        for i in range(0, 60000, batch):
            X = train_data.data[i:i+batch]
            Y = train_data.targets[i:i+batch]

            predict = conv1.forward(X)
            predict = relu1.forward(predict)
            predict = pool1.forward(predict)
            predict = conv2.forward(predict)
            predict = relu2.forward(predict)
            predict = pool2.forward(predict)
            predict = predict.reshape(batch, -1)
            predict = nn.forward(predict)

            loss, delta = softmax.cal_loss(predict, Y)

            delta = nn.backward(delta, lr)
            delta = delta.reshape(batch,4,4,16)
            delta = pool2.backward(delta)
            delta = relu2.backward(delta)
            delta = conv2.backward(delta, lr)
            delta = pool1.backward(delta)
            delta = relu1.backward(delta)
            conv1.backward(delta, lr)

            print("Epoch-{}-{:05d}".format(str(epoch), i), ":", "loss:{:.4f}".format(loss))

        lr *= 0.95**(epoch+1)
        np.savez("data2.npz", k1=conv1.k, b1=conv1.b, k2=conv2.k, b2=conv2.b, w3=nn.W, b3=nn.b)

def eval():
    r = np.load("data2.npz")

    # Mnist手寫數字集
    dataset_path = "D://datasets//mnist"
    test_data = torchvision.datasets.MNIST(root=dataset_path, train=False)
    test_data.data = test_data.data.numpy()        # [10000,28,28]
    test_data.targets = test_data.targets.numpy()  # [10000]

    test_data.data = test_data.data.reshape(10000, 28, 28, 1) / 255.

    conv1 = Conv(kernel_shape=(5, 5, 1, 6))  # 24x24x6
    relu1 = Relu()
    pool1 = Pool()  # 12x12x6
    conv2 = Conv(kernel_shape=(5, 5, 6, 16))  # 8x8x16
    relu2 = Relu()
    pool2 = Pool()  # 4x4x16
    nn = Linear(256, 10)
    softmax = Softmax()

    conv1.k = r["k1"]
    conv1.b = r["b1"]
    conv2.k = r["k2"]
    conv2.b = r["b2"]
    nn.W = r["w3"]
    nn.n = r["b3"]

    num = 0
    for i in range(10000):
        X = test_data.data[i]
        X = X[np.newaxis, :]
        Y = test_data.targets[i]

        predict = conv1.forward(X)
        predict = relu1.forward(predict)
        predict = pool1.forward(predict)
        predict = conv2.forward(predict)
        predict = relu2.forward(predict)
        predict = pool2.forward(predict)
        predict = predict.reshape(1, -1)
        predict = nn.forward(predict)

        predict = softmax.predict(predict)

        if np.argmax(predict) == Y:
            num += 1

    print("TEST-ACC: ", num/10000*100, "%")

if __name__ == '__main__':
    #train()
    eval()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM