卷積神經網絡:下面要說的這個網絡,由下面三層所組成
卷積網絡:卷積層 + 激活層relu+ 池化層max_pool組成
神經網絡:線性變化 + 激活層relu
神經網絡: 線性變化(獲得得分值)
代碼說明:
代碼主要有三部分組成
第一部分: 數據讀入
第二部分:模型的構建,用於生成loss和梯度值
第三部分:將數據和模型輸入,使用batch_size數據進行模型參數的訓練
第一部分:數據讀入
第一步:輸入文件的地址
第二步: 創建列表,用於文件數據的保存
第三步:使用pickle.load進行數據的讀取
第四步: 創建mask索引,用於生成驗證集,訓練集,測試集
第五步: 對圖像進行均值化操作
第六步:使用.transpose將圖像的維度進行轉換
第七步:創建字典將data數據進行返回
代碼:data_utils.py
import os import pickle import sys import importlib importlib.reload(sys) import numpy as np def load_single_data(dir): # 第三步:打開文件,進行數據的讀取 with open(dir, 'rb') as f: # 文件的讀取 data_dict = pickle.load(f, encoding='latin1') # 獲得數據 X = data_dict['data'] # 獲得標簽 y = data_dict['labels'] # 進行數據的維度重構 X = X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype('float') # 將標簽轉換為np.array格式 y = np.array(y) return X, y def load_CIFAR10_data(root): # 第二步:創建列表,用於文件數據的保存 xs = [] ys = [] for i in range(1, 2): filename = os.path.join(root, 'data_batch_%d'%i) X, y = load_single_data(filename) xs.append(X) ys.append(y) xtr = np.concatenate(xs) ytr = np.concatenate(ys) # 獲得測試數據 xte, yte = load_single_data(os.path.join(root, 'test_batch')) return xtr, ytr, xte, yte def get_CIFAR10_data(num_train=5000, num_val=500, num_test=500): # 第一步:輸入文件的地址 filename_dir = 'D://BaiduNetdiskDownload//神經網絡入門基礎(PPT,代碼)//紲炵粡緗戠粶鍏ラ棬鍩虹錛圥PT錛屼唬鐮侊級//cifar-10-batches-py//' # 獲得訓練數據和測試數據 train_X, train_y, test_X, test_y = load_CIFAR10_data(filename_dir) # 第四步:創建一個mask索引,用於生成500個驗證集val數據, 5000個訓練數據和500個測試數據 mask = np.arange(num_train, num_train+num_val) val_X = train_X[mask] val_y = train_y[mask] mask = np.arange(num_train) train_X = train_X[mask] train_y = train_y[mask] mask = np.arange(num_test) test_X = test_X[mask] test_y = test_y[mask] # 第五步:對圖像進行均值化操作 mean_img = np.mean(train_X, axis=0, keepdims=True) train_X -= mean_img val_X -= mean_img test_X -= mean_img # 第六步:將圖片的維度進行轉換 train_X = train_X.transpose(0, 3, 1, 2) val_X = val_X.transpose(0, 3, 1, 2) test_X = test_X.transpose(0, 3, 1, 2) # 第七步:創建一個字典返回數據 return {'train_X':train_X, 'train_y':train_y, 'val_X':val_X, 'val_y':val_y, 'test_X':test_X, 'test_y':test_y}
第二部分:進行模型的構建,用於生成lscores, loss和grads
有標簽值y輸出loss和grads
沒有標簽值y 輸出scores得分
第一步:輸入參數的初始化,包括輸入圖片維度,filter卷積核個數, filter_size卷積核的大小, num_hidden: 隱藏層個數, num_classes:分類的結果,weight_scale表示權重參數的偏置,reg表示正則化懲罰項的力度
第二步:初始化構造卷積的參數維度,構建字典self.params進行存放,將卷積參數進行數據類型的轉換,轉換為np.float32
第三步:構造loss函數,獲得各個層的參數值
第四步:前向傳播:
第一步:卷積 + relu激活 + pool的前向傳播, 保存輸入的cache用於后續的反向傳播
第二步: 第一次全連接層的前向傳播,這里使用.reshape(N, -1)進行層的維度變化
第三步:第二次全連接層的前向傳播,用於計算各個類別的得分值
第四步:如果沒有標簽值,就返回scores得分值
第五步:反向傳播
第一步:計算損失值loss和softmax概率的反向傳播dout
第二步:計算第二次全連接的反向傳播
第三步:計算第一次全連接的relu和線性的反向傳播,這是輸入結果與輸出結果的維度相同,用於進行(N, 32, 32, 32)的矩陣變換
第四步:pool + relu + conv 的反向傳播
第六步:在dw的基礎上,加上正則化的梯度求導結果,同時對於加上正則化的損失值
第七步:構建grads字典,返回grads和loss
代碼:主函數:cnn.py
import numpy as np from layer_utils import * class ThreeLayerConvNet(object): # 第一步:輸入參數的初始化 def __init__(self, input_image=(3, 32, 32), filter_size=7, num_hidden=100, filter_num = 32, num_classes=10, weight_scale=1e-3, reg=0.0, dtype=np.float32): # 第二步:建立存放參數的字典,並對參數進行零值化的構造其維度,使用.astype進行類型轉換 # 正則化懲罰項 self.reg = reg # 數據類型 self.dtype = dtype # 用於存放參數 self.params = {} # 輸入樣本的維度 C, H, W = input_image # 對w參數,進行正態化的初始化,W1卷積層的維度, F卷積核個數, C上一通道的維度(圖片維度), HH(卷積核長), WW(卷積核寬), self.params['W1'] = weight_scale * np.random.randn(filter_num, C, filter_size, filter_size) # 對b參數,使用零值初始化,b1卷積核的常熟項,F表示b的個數,每一個卷積核對應一個b self.params['b1'] = np.zeros(filter_num) # 全連接層,池化后的數據維度為N, int(H*W*filter_num/4) 構造w2.shape(int(H*W*filter_num/4, num_hidden)) self.params['W2'] = weight_scale * np.random.randn(int(H*W*filter_num/4), num_hidden) # 全連接層b的參數為num_hidden隱藏層的個數 self.params['b2'] = np.zeros(num_hidden) # 全連接層w3:用於進行得分值得計算,維度為(num_hidden, num_classes) self.params['W3'] = weight_scale * np.random.randn(num_hidden, num_classes) # 全連接層b3,得分層計算的偏置項b self.params['b3'] = np.zeros(num_classes) # 將參數轉換為np.float32 for k, v in self.params.items(): self.params[k] = v.astype(dtype=self.dtype) def loss(self, X, y=None): # 第三步:從self.params獲得各個層的參數 W1, b1 = self.params['W1'], self.params['b1'] W2, b2 = self.params['W2'], self.params['b2'] W3, b3 = self.params['W3'], self.params['b3'] # 第四步:進行前向傳播 # 卷積的步長 stride = 1 # 補零的維度,為了保證卷積后的維度不變 pad = int((W1.shape[2] - 1) / 2) # 組合成卷積的參數 conv_params = {'stride': stride, 'pad': pad} # 組合成池化層的參數 pool_params = {'pool_height':2, 'pool_width':2, 'stride':2} # 進行卷積,激活層和pool池化層的前向傳播,保存cache用於后續的反向傳播 a1, cache1 = conv_relu_pool_forward(X, W1, b1, conv_params, pool_params) # 進行全連接層的前向傳播,線性變化和relu層 a2, cache2 = affine_relu_forward(a1, W2, b2) # 進行全連接層的前向傳播,線性變化獲得得分值 scores, cache3 = affine_forward(a2, W3, b3) # 如果沒有標簽值,返回得分 if y is None: return scores # 第五步:計算反向傳播的結果 # 計算data的損失值loss,以及反向傳輸softmax的結果,即dloss/dprob的求導結果 data_loss, dscores = softmax_loss(scores, y) # 進行第二層全連接的反向傳播 da2, dW3, db3 = affine_backward(dscores, cache3) # 進行第一層全連接層的反向傳播,包括relu層和線性變換層 da1, dW2, db2 = affine_relu_backward(da2, cache2) # 進行卷積層的反向傳播,包括pool, relu, conv卷積層的反向傳播,輸出梯度值 dx, dW1, db1 = conv_relu_pool_backward(da1, cache1) # 第六步:加上正則化的損失值,同時梯度dw加上w正則化的求導值 reg_loss = 0.5 * np.sum(W1*W1) + 0.5 * np.sum(W2*W2) + 0.5 * np.sum(W3*W3) loss = reg_loss + data_loss dW1 += self.reg * W1 dW2 += self.reg * W2 dW3 += self.reg * W3 # 第七步:構造grads梯度字典,並返回梯度值和損失值 grads = {'W1': dW1, 'b1': db1, 'W2': dW2, 'b2': db2, 'W3': dW3, 'b3': db3} return loss, grads
副函數:layer_utils.py
from layers import * # 卷積層,激活層,池化層的前向傳播 def conv_relu_pool_forward(x, w, b, conv_params, pool_params): # 卷積層的前向傳播使用對每個filterF, np.sum對C通道乘積進行加和,再加上b偏置項 a, cache_conv = conv_forward(x, w, b, conv_params) # relu層的前向傳播, np.maxmuim(0, x) 小於零的值使用零表示 r, cache_relu = relu_forward(a) # pool層的前向傳播,對卷積部分的圖像求出最大值,作為pool池化后的大小 out, cache_pool = pool_forward(r, pool_params) # 將各個輸入組合成一個cache,用於反向傳播 cache = (cache_conv, cache_relu, cache_pool) return out, cache # pool,relu, conv的反向傳播 def conv_relu_pool_backward(dout, cache): # 獲得三個層的輸入參數 cache_conv, cache_relu, cache_pool = cache # 進行池化層的反向傳播,構造最大值的[[false, false], [false, True]]列表,最大值部分不變,其他部位使用0值填充 dpool = pool_backward(dout, cache_pool) # 進行relu層的反向傳播,dout[x<0] = 0, 將輸入小於0的dout置為0 drelu = relu_backward(dpool, cache_relu) # 卷積層的反向傳播,對dx, dw, db進行反向傳播,dx[i, :, j*s] += dout * w[f], dw[f] += windows * dout, db[f] += dout dconv, dw, db = conv_backward(drelu, cache_conv) return dconv, dw, db # 線性傳播和池化層的前向傳播,即全連接層的前向傳播 def affine_relu_forward(x, w, b): a, cache_affine = affine_forward(x, w, b) r, cache_relu = relu_forward(a) cache = (cache_affine, cache_relu) return r, cache # 線性傳播和池化層的反向傳播,即全連接層的反向傳播 def affine_relu_backward(dout, cache): affine_cache, relu_cache = cache r = relu_backward(dout, relu_cache) dx, dw, db = affine_backward(r, affine_cache) return dx, dw, db
副副函數:layers.py
import numpy as np # 卷積的前向傳播 def conv_forward(x, w, b, conv_params): N, C, H, W = x.shape F, C, HH, WW = w.shape pad = conv_params['pad'] stride = conv_params['stride'] # 進行補零操作 x_pad = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant') # 進行卷積后的H和W的維度計算 H_new = int((H - HH + 2*pad) / stride + 1) W_new = int((W - WW + 2*pad) / stride + 1) s = stride # 構造輸出矩陣 out = np.zeros((N, F, H_new, W_new)) for i in range(N): for f in range(F): for j in range(H_new): for k in range(W_new): # 將C通道分別進行相乘,和最后的相加操作,再加上一個b值,作為最后的輸出 out[i, f, j, k] = np.sum(x_pad[i, :, j*s:j*s+HH, k*s:k*s+WW] * w[f]) + b[f] cache = (x, w, b, conv_params) return out, cache # 卷積的反向傳播 def conv_backward(dout, cache): (x, w, b, conv_params) = cache N, C, H, W = x.shape F, C, HH, WW = w.shape pad = conv_params['pad'] stride = conv_params['stride'] s = stride H_new = dout.shape[2] W_new = dout.shape[3] # 進行補零操作 x_pad = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant') # 構造dw, dx, db的輸出矩陣,即與輸入矩陣的維度相同 dw = np.zeros_like(w) dx = np.zeros_like(x_pad) db = np.zeros_like(b) for i in range(N): for f in range(F): for j in range(H_new): for k in range(W_new): # 獲得前向傳播的x windows = x_pad[i, :, j*s:j*s+HH, k*s:k*s+WW] # dw[f] = dout[i, f, j, k] * x dw[f] += dout[i, f, j, k] * windows # dx = dout * w dx[i, :, j*s:j*s+HH, k*s:k*s+WW] += dout[i, f, j, k] * w[f] # db[f] += dout[i, f, j, k] db[f] += dout[i, f, j, k] # 進行裁剪,去除補零部分 dx = dx[:, :, int(pad):pad + H, pad:pad + W] return dx, dw, db # 池化的前向傳播 def pool_forward(x, pool_params): N, C, H, W = x.shape # 池化的維度 pool_height, pool_width = pool_params['pool_height'], pool_params['pool_width'] pool_stride = pool_params['stride'] # 池化后的維度 H_new = int((H - pool_height) / pool_stride + 1) W_new = int((W - pool_width) / pool_stride + 1) s = pool_stride HH = pool_height WW = pool_width out = np.zeros((N, C, H_new, W_new)) for i in range(N): for c in range(C): for j in range(H_new): for k in range(W_new): # 將圖像上卷積區域的最大值,賦值給池化后的數據 out[i, c, j, k] = np.max(x[i, c, j*s:j*s+HH, k*s:k*s+WW]) cache = (x, pool_params) return out, cache # 池化層的反向傳播 def pool_backward(dout, cache): # 獲得輸入層的輸入 (x, pool_params) = cache HH = pool_params['pool_height'] WW = pool_params['pool_width'] stride = pool_params['stride'] s = stride N, C, H, W = x.shape # 迭代的次數,這與池化層的前向傳播的次數是相同的 H_new = int((H - HH) / stride + 1) W_new = int((W - WW) / stride + 1) # 構造輸出矩陣 out = np.zeros_like(x) for i in range(N): for c in range(C): for j in range(H_new): for k in range(W_new): # 生成[[false, false],[false, True]] window = (np.array(x[i, c, j*s:j*s+HH, k*s:k*s+WW]) == dout[i, c, j, k]) # [[false, false],[false, True]] * dout[i, c, j, k] = [[0, 0], [0, dout[i, c, j, k]] out[i, c, j*s:j*s+HH, k*s:k*s+WW] = window * dout[i, c, j, k] return out # 線性變化的前向傳播 def affine_forward(x, w, b): N = x.shape[0] x_row = x.reshape(N, -1) out = np.dot(x_row, w) + b cache = (x, w, b) return out, cache # 線性變化的反向傳播 def affine_backward(dout, cache): x, w, b = cache dx = np.dot(dout, w.T) dx = dx.reshape(x.shape) x_row = x.reshape(x.shape[0], -1) dw = np.dot(x_row.T, dout) db = np.sum(dout, axis=0, keepdims=True) return dx, dw, db # relu層的前向傳播 def relu_forward(x): out = None cache = x out = ReLU(x) return out, cache # relu層的反向傳播 def relu_backward(dout, cache): out = None x = cache out = dout.copy() out[x < 0] = 0 return out # relu激活函數 def ReLU(x): return np.maximum(0, x) # 計算損失值,即dloss/dprob的損失函數對概率的反導 def softmax_loss(scores, y): N = scores.shape[0] #softmax概率值 probs = np.exp(scores) probs /= np.sum(probs, axis=1, keepdims=True) # 計算損失值函數 loss = -np.sum(np.log(probs[np.arange(N), y])) / N # 損失值對softmax概率值求導 dout = probs.copy() dout[np.arange(N), y] -= 1 dout /= N return loss, dout
第三部分:將數據data和模型model輸入,使用batch_size數據進行self.model.loss進行損失值得計算和參數的更新
第一步:獲得data中的訓練數據和驗證集的數據
第二步:使用kwargs.pop()獲得傳入字典中的參數
第三步: 進行部分參數初始化,同時構造參數對應的學習率和momentum字典,即后續的v
第四步:使用num_data和batch_size, 即num_epoches構造出迭代的次數
第五步:進行循環,使用動量梯度sgd,即self.model.loss計算損失值和更新self.model.params的參數
第六步:每一個print_every 打印損失值loss
第七步:每一個epoch值,進行學習率的衰減
第八步:在開始或者結束,以及每一個epoch值,打印准確率
第九步:對於最好的驗證集的准確率,保存當前的self.model.params,迭代結束,將最好的驗證集參數賦值給self.model.params
主函數:Solver.py
import numpy as np import optim class Solver(object): def __init__(self, data, model, **kwargs): # 第一步:從data中獲得訓練數據和驗證集數據 self.train_X = data['train_X'] self.train_y = data['train_y'] self.val_X = data['val_X'] self.val_y = data['val_y'] self.model = model # 第二步:獲得傳入的參數,學習率衰減值,訓練的batch_size, epoch的大小,學習率和momentum, 梯度下降的方式 self.lr_decay = kwargs.pop('lr_decay', 1.0) self.print_every = kwargs.pop('print_every', 10) self.num_epochs = kwargs.pop('num_epochs', 2) self.batch_size = kwargs.pop('batch_size', 2) self.update_rule = kwargs.pop('update_rule', 'sgd') self.optim_config = kwargs.pop('optim_config', {}) self.verbose = kwargs.pop('verbose', True) # 如果存在未知的輸入則報錯 if len(kwargs) > 0: extra = ','.join('%s'% k for k in kwargs) raise ValueError('Unrecognized arguments %s' % extra) # 如果optim中不存在,梯度下降的方式就報錯 if not hasattr(optim, self.update_rule): raise ValueError('Unrecognized arguments %s' % self.update_rule) # 將optim中的函數功能賦予函數名self.update_rule self.update_rule = getattr(optim, self.update_rule) # 第三步:進行部分初始化操作 self._reset() def _reset(self): # 迭代epoch的次數 self.epoch = 0 # 損失值的list self.loss_history = [] # 准確率的list self.acc_train = [] self.acc_val = [] # 最好的驗證集的准確率 self.best_val = 0 # 每個dw和db對應的學習率和momentum self.optim_configs = {} # 建立每個參數對應的學習率和momentum for p in self.model.params: d = {k:v for k, v in self.optim_config.items()} self.optim_configs[p] = d def train(self): # 第四步:使用樣本數和batch_size,即epoch_num,構造迭代的次數 num_data = self.train_X.shape[0] num_every_epoch = max(num_data / self.batch_size, 1) num_iterations = num_every_epoch * self.num_epochs for t in range(int(num_iterations)): # 第五步:循環,計算損失值和梯度值,並使用sgd_momentum進行參數更新 self._step() # 第六步:每一個print_every打印損失值 if self.verbose and t % self.print_every == 0: print('%d / %d %f'%(t+1, num_iterations, self.loss_history[-1])) # 第七步:每一個循環進行一次學習率的下降 epoch_end = (t + 1) % self.batch_size == 0 if epoch_end: self.epoch += 1 for p in self.optim_configs: self.optim_configs[p]['learning_rate'] *= self.lr_decay start = 0 end = num_iterations - 1 # 第八步:開始或者結束,或者每一個epoch計算准確率,同時獲得驗證集最好的參數 if t == start or t == end or t+1 % num_every_epoch == 0: train_acc = self.check_accuracy(self.train_X, self.train_y, num_sample=4) val_acc = self.check_accuracy(self.val_X, self.val_y, num_sample=4) print('%d/%d train acc%.2f'%(self.epoch, self.num_epochs, train_acc)) print('%d/%d val acc%.2f' % (self.epoch, self.num_epochs, val_acc)) self.acc_train.append(train_acc) self.acc_val.append(val_acc) if val_acc > self.best_val: self.best_params = {} for k, v in self.model.params.items(): self.best_params[k] = v self.best_val = val_acc # 將驗證集最好的參數賦予給當前的模型參數 self.model.params = self.best_params # 進行准確率的計算 def check_accuracy(self, X, y, num_sample=None, batch_size=2): # num_sample表示使用多少個數據計算准確率 N = X.shape[0] if N > num_sample: # 隨機從N個樣本中,抽取num_sample個樣本 mask = np.random.choice(N, num_sample) X = X[mask] y = y[mask] num_batch = num_sample / batch_size if num_batch % batch_size != 0: num_batch += 1 y_pred = [] for i in range(int(num_batch)): start_id = i * batch_size end_id = (i + 1) * batch_size # 不傳入y,獲得scores得分 scores = self.model.loss(X[start_id:end_id]) y_pred.append(np.argmax(scores, axis=1)) # 將數據進行橫向排列 y_pred = np.hstack(y_pred) # 計算結果的平均值 accr = np.mean((y_pred == y)) return accr # 計算loss和進行參數更新 def _step(self): # 獲得當前樣本的個數 self.num_data = self.train_X.shape[0] # 隨機抽取2個樣本,用於進行參數的更新 batch_mask = np.random.choice(self.num_data, self.batch_size) X_batch = self.train_X[batch_mask] y_batch = self.train_y[batch_mask] # 計算損失值和梯度方向 loss, grads = self.model.loss(X_batch, y_batch) # 將損失值的結果進行添加 self.loss_history.append(loss) # 對每個參數進行循環 for p, v in self.model.params.items(): # 獲得當前的學習率和momentum, 以及后續加入的v config = self.optim_configs[p] # 獲得w和dw梯度值 w, dw = self.model.params[p], grads[p] # 將w,dw, config傳入到動量梯度算法,進行參數更新 next_w, next_config = self.update_rule(w, dw, config) # 將更新后的config替代字典中的config self.optim_configs[p] = next_config # 將更新后的參數替換成模型中的參數 self.model.params[p] = next_w
副函數:optim.py
import numpy as np # sgd_momentum 計算動量梯度下降 def sgd_momentum(w, dw, configs=None): # 傳入每一個參數對應的學習率和momentum if configs is None: configs = {} # 如果不存在該屬性,使用默認值 learning_rate = configs.setdefault('learning_rate', 1e-2) momentum = configs.setdefault('momentum', 1.0) # 獲得前一次傳播的v,沒有就使用構造全零 v = configs.get('velocity', np.zeros_like(w)) # 進行當前v的更新,即v*momentum - dw * learning_rate v = v*momentum - dw * learning_rate # 進行w參數更新 w = v + w # 將v替換,最為下一次的前一次傳播v configs['velocity'] = v return w, configs
上述代碼的主要函數:start.py
import numpy as np from data_utils import get_CIFAR10_data from cnn import ThreeLayerConvNet from solver import Solver import matplotlib.pyplot as plt # 第一步數據讀取 data = get_CIFAR10_data() # 第二步:建立model用於進行loss和grads的計算 model = ThreeLayerConvNet(reg=0.9) # 第三步:使用batch_size進行參數的更新 solver = Solver(data, model, lr_decay=0.95, print_every=10, num_epochs=5, batch_size=2, update_rule='sgd_momentum', optim_config={'learning_rate': 5e-4, 'momentum': 0.9}) solver.train() # 畫出loss圖 plt.subplot(2, 1, 1) plt.title('Training loss') plt.plot(solver.loss_history, 'o') plt.xlabel('Iteration') # 畫出准確率的圖 plt.subplot(2, 1, 2) plt.title('Accuracy') plt.plot(solver.train_acc_history, '-o', label='train') plt.plot(solver.val_acc_history, '-o', label='val') plt.plot([0.5]*len(solver.val_acc_history), 'k--') plt.xlabel('Epoch') plt.legend(loc='lower right') plt.show() # 計算測試值得准確率 best_model = model test_X, test_y = data['test_X'], data['test_y'] print('test_value', np.mean(np.argmax(model.loss(test_X), axis=1) == test_y))