CNN(卷積神經網絡)是傳統神經網絡的變種,CNN在傳統神經網絡的基礎上,引入了卷積和pooling。與傳統的神經網絡相比,CNN更適合用於圖像中,卷積和圖像的局部特征相對應,pooling使得通過卷積獲得的feature具有空間不變性
接觸的最多的卷積應該是高斯核,用於對圖像進行平滑,或者是實現在不同尺度下的運算等。這里的卷積和高斯核是同一個類型,就是定義了一個卷積的尺度,然后卷積的具體參數就是神經網絡中的參數,通過計算神經網絡的參數,相當於學到了多個卷積的參數,而每個卷積可以看成是對圖像進行特征提取(一個特征核),CNN網絡就可以看成是前面的幾層都是在提取圖像的特征,最后一層$softmax$用於對提取的特征進行分類。所有CNN的特征是自學習(相對於SIFT,SURF)
conv2d是theano中的用於計算卷積的方法(theano.tensor.conv2($input$, $W$)),其中$W$表示卷積核。$W$是必須是一個4D的tensor(T.tensor4),$input$也必須是一個4D的tensor。
下面說下$input$和$W$中每個維度分別表示的意義。
$input \in (batches, feature, I_h, I_w)$分別表示batch size,number of feature map, image height ,image width
$W \in (filters, feature, f_h, f_w)$ 分別表示number of filters, number of feature map, filter height, filter width
其中$W_{shape[1]}$必須等於$input_{shape[1]}$。$W_{shape[1]} = 1$表示這個filter是在2D空間中的filter,$W_{shape[1]} > 1$表示這個filter是3D中間中的filter,如果$W_{shape[1]} = 3$這是這個filter是圖像3通道上的filter,3個通道上進行卷積。
\begin{equation} input \in (batches, feature, I_h, I_w) \\ W \in (filters, feature, f_h, f_w) \end{equation}
\begin{equation} output = input \otimes W \\ output \in (batches, filters , I_h - f_h + 1, I_w - f_w + 1) \end{equation}
Pooling是在二維空間中操作的,如上圖所示,將特征按照空間位置分成大的block,然后再每個block中計算特征。$max pooling$就是在這個block中計算所有位置的最大值作為特征,$average pooling$為計算區域內的特征均值
為什么需要pooling,圖像分類中的BOW也適用了Pooling。我認為,在CNN中適用pooling的好處主要有兩點:
1.如果不使用pooling,那么通過卷積計算得到的隱層節點的個數是卷積類型的倍數。舉個例子:如上面的$input$,和$W$,$input$中每個patch的輸入節點個數為$feature \times I_h \times I_w$,通過$W$的卷積運算后,$output$的節點數目為$filters \times (I_h - f_h + 1) \times (I_w - f_w + 1)$,如果引入pooling策略,$output$的節點數目就變為$filters \times \frac{I_h - f_h + 1}{p_h} \times \frac{I_w - f_w + 1}{p_w}$其中$p_h, p_w$表示pooling中每個區域的大小。從而減少了隱含層節點的個數,降低了計算復雜度。
2.引入pooling的另外一個好處就是使得CNN的模型具有局部區域內的平移或者旋轉的一些不變性。很多精心設計的特征,如SIFT,SURF,HOG等等都具有這些不變性。不變性使得CNN在圖像分類的問題中能夠大方光彩,取得較好的performance。
在theano中,用於計算pooling的函數為$\text{theano.tensor.signal.downsample.max_pool_2d}$。對一個$N(N \geq 2)$維的輸入矩陣,通過定義$p_h, p_w$然后對輸入數據進行pooling
在Deep Learning tutorial的Convolutional Neural Network(LeNet)中,改例子用於MNIST數據集的字符識別(10個類別,識別阿拉伯數字),每個字符為$28\times28$的像素的輸入,50000個樣本用於訓練,10000個樣本用於交叉驗證,另外10000個用於測試。可以在這里下載MNIST,另外,模型采用基於mini-batch的SGD進行優化。
這個用於識別手寫數字的CDNN模型的結構是這樣過如最前面那個圖所示。
輸入層:每個mini-batch的原始圖像$image shape = (batch size, 1, 28, 28)$
layer0_input = x.reshape((batch_size, 1, 28, 28))
卷積層1:對於輸入的每個mini-batch的數據,output為卷積+pooling處理后的結果,第一層卷積類型為$nkerns[0]=20$個,卷積核的尺度為$f_h = 5, f_w = 5$
pooling的尺度為$(2,2)$
通過卷積,$filtershape=(nkerns[0],1,5,5)$,圖像的尺度變化$(I_h -f_h + 1, I_w - f_w +1) \to (28, 28) ---> (24,24)$
通過pooling后$(24, 24) ---> (24/2,24/2)$
feature map的維度變為卷積類型數,所有$outputshape=(batch size, nkerns[0], 12, 12)$
layer0 = LeNetConvPoolLayer(rng, input=layer0_input, image_shape=(batch_size,1,28,28), filter_shape=(nkerns[0], 1, 5, 5), poolsize=(2,2))
卷積層2:輸入為卷積層1的輸出,所以$inputsize=(batch size, nkerns[0], 12, 12)$
通過卷積,$filtershape=(nkerns[1],nkerns[0],5,5)$,圖像的尺度變化$(I_h -f_h + 1, I_w - f_w +1) \to (12, 12) ---> (8, 8)$
通過pooling后$(8, 8) ---> (8/2, 8/2)$
feature map的維度變為卷積類型數,所有$outputshape=(batch size, nkerns[1], 4, 4)$
layer1 = LeNetConvPoolLayer(rng, input=layer0.output, image_shape=(batch_size, nkerns[0], 12, 12), filter_shape=(nkerns[1], nkerns[0], 5,5), poolsize=(2,2))
全連接層:輸入為卷積層2的輸出,並將輸入轉化為$1D$的向量,所以$inputsize=nkerns[1]*4*4$
該層為普通的全連接層,和普通的神經網絡一樣,輸入層的每個節點都與輸出層的每個節點相連接
輸出層的output節點個數在這里設置為$500$
Layer2_input = layer1.output.flatten(2) # construct a fully-connected sigmoidal layer layer2 = HiddenLayer(rng, input=Layer2_input, n_in=nkerns[1]*4*4, n_out=500, activation=T.tanh)
SoftMax層:最后一層是用於分類的softmax層,輸入為上一層的輸出,$input=500$
輸出層的Classification的類別數,在這里為10。
# classify the values of the fully-connected sigmoidal layer layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
(1) import部分
import sys import time import theano import theano.tensor as T import numpy as np from theano.tensor.nnet import conv from theano.tensor.signal import downsample from LogistRegression import LogisticRegression, load_data from mlp import HiddenLayer
(2) LeNetConvPoolLayer的定義部分
$input:$表示輸入數據
$rng:$卷積核的隨機函數種子
$filtershape:$卷積核的參數維度
$imageshape:$輸入數據的維度
值得一提的是初始化參數的設置方法,一種方式如下:
$fanin:$每個輸出節點需要多少個input進行輸入,這種方式沒有考慮maxpooling
fan_in = np.prod(filter_shape[1:]) W_values = np.asarray(rng.uniform( low=-np.sqrt(3./fan_in), high=np.sqrt(3./fan_in), size=filter_shape), dtype=theano.config.floatX) self.W = theano.shared(value=W_values, name='W')
另外一種方式為
fan_in = np.prod(filter_shape[1:]) # each unit in the lower layer receives a gradient from: # "num output feature maps * filter height * filter width" / # pooling size fan_out = (filter_shape[0] * np.prod(filter_shape[2:])/np.prod(poolsize)) # initialize weights with random weights W_bound = np.sqrt(6. / (fan_in + fan_out)) W_values = np.asarray(rng.uniform( low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX)
$fanin:$和第一種方式一樣,只不過這里多了$fanout$
如果不考慮pooling,那么$fanout=filter_shape[0]*np.prod(filter_shape[2:])$
考慮了pooling之后,$fanout=filter_shape[0]*np.prod(filter_shape[2:])/np.prod(poolsize)$
class LeNetConvPoolLayer(object): def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2,2)): """ Alloc a LeNetConvPoolLayer with shared variable internal parameters :type rng: numpy.random.RandomState :param rng: a random number generator used to initilize weights :type input: theano.tensor.dtensor4 :param input: symbolic image tensor, of shape image_shape :type filter_shape: tuple or list of length 4 :param filter_shape: (number of filters, num input feature maps, filter height, filter width) :type image_shape: tuple or list of length 4 :param image_shape: (batch size, num input feature maps, image height, image width) :type poolsize: tuple or list of length 2 :param poolsize: the downsampling (pooling) factor (#rows, #cols) """ # why ? pleas look for http://www.cnblogs.com/cvision/p/3276577.html assert image_shape[1] == filter_shape[1] self.input = input # initilize weights values: the fan-in of each hidden neuron is # restrited by the size of the receptive fields """ fan_in = np.prod(filter_shape[1:]) W_values = np.asarray(rng.uniform( low=-np.sqrt(3./fan_in), high=np.sqrt(3./fan_in), size=filter_shape), dtype=theano.config.floatX) self.W = theano.shared(value=W_values, name='W') """ fan_in = np.prod(filter_shape[1:]) # each unit in the lower layer receives a gradient from: # "num output feature maps * filter height * filter width" / # pooling size fan_out = (filter_shape[0] * np.prod(filter_shape[2:])/np.prod(poolsize)) # initialize weights with random weights W_bound = np.sqrt(6. / (fan_in + fan_out)) W_values = np.asarray(rng.uniform( low=-W_bound, high=W_bound, size=filter_shape), dtype=theano.config.floatX) self.W = theano.shared(value=W_values, name='W') #print self.W.get_value() # the bias is a 1D theano -- one bias per output feature map b_values = np.zeros((filter_shape[0],),dtype=theano.config.floatX); self.b = theano.shared(value=b_values, name='b') # convolve input feature maps with filters conv_out = conv.conv2d(input, self.W, filter_shape=filter_shape, image_shape=image_shape) # downsample each feature map individually, using maxpooling pooled_out = downsample.max_pool_2d(conv_out, poolsize, ignore_border=True) # add the bias term. Since the bias term is a vector(1D array), we first # reshape it to a tensor of shape(1, n_filters, 1, 1). Each bias will # thus be broadcasted across mini-batches and feature map with & height self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')) self.params = [self.W, self.b]
(3) LeNet網絡結構定義
首先將數據處理成patch的格式,在這里是通過patch_index來實現的
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size
具體代碼如下
def evaluate_lenet5(learning_rate=0.1, n_epochs=200,dataset = './data/mnist.pkl.gz', nkerns=[20, 50], batch_size=500): """ Demostartes lenet on MNIST dataset :type learning_rate: float :param learning_rate: learning rate used(factor for the stochastic gradient) :type n_epochs: int :param n_epochs: maximal number of epochs to run the optimizer :type dataset: string :param dataset: path to the dataset used for training / testing :type nkerns: list of ints :param nkerns: number of kernels on each LeNetConvPoolLayer :type batch_size : int :param batch_size : size of data in each batch """ #used for LeNetConvPoolLayer to random the filter weights rng = np.random.RandomState(23455) datasets = load_data(dataset) print >> sys.stdout, '...load data is ok' # get train_set vaild_set and test set train_set_x, train_set_y = datasets[0] valid_set_x, valid_set_y = datasets[1] test_set_x, test_set_y = datasets[2] # calculate there are how many batches n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size #print "n_train_batches = %d n_valid_batches = %d n_test_batches = %d" %(train_set_x.get_value(borrow=True).shape[0], # valid_set_x.get_value(borrow=True).shape[0],test_set_x.get_value(borrow=True).shape[0]) ###################### # BUILD ACTUAL MODEL # ###################### print '...building the model' index = T.lscalar() # index to [mini]batches x = T.matrix('x') # images y = T.ivector('y') # the labels ishape = (28, 28) # the size of MNIST images # Reshape matrix of images of shape(batches, 28 * 28) # to a 4D tensor, compatible with our LeNetConvPoolLayer layer0_input = x.reshape((batch_size, 1, 28, 28)) # Construct the first convolutional pooling layer: # filtering reduce the image size to (I_h - f_h + 1, I_w - f_w + 1) # this problem is (28, 28)---->(28-5+1, 28-5+1)=(24,24) # maxpooling reduces this futher to (24/2, 24/2)= (12, 12) # so the 4D output tensor is thus of shape (batch_size, nkerns[0], 12, 12) layer0 = LeNetConvPoolLayer(rng, input=layer0_input, image_shape=(batch_size,1,28,28), filter_shape=(nkerns[0], 1, 5, 5), poolsize=(2,2)) # Construct the first convolutional pooling layer: # filtering reduces the image size to (12-5+1, 12-5+1) = (8, 8) # max pooling reduces this futert to (8/2, 8/2)=(4,4) # 4D output tensor is thus of shape (batch_size, nkerns[1], 4, 4) layer1 = LeNetConvPoolLayer(rng, input=layer0.output, image_shape=(batch_size, nkerns[0], 12, 12), filter_shape=(nkerns[1], nkerns[0], 5,5), poolsize=(2,2)) # the TanhLayer being full-connected,it operates on 2D matrices of # the shape (batches, num_pixels) (i.e matrix of rasterized images) # This will generate a matrix of (batches, nkerns[1]*4*4) Layer2_input = layer1.output.flatten(2) # construct a fully-connected sigmoidal layer layer2 = HiddenLayer(rng, input=Layer2_input, n_in=nkerns[1]*4*4, n_out=500, activation=T.tanh) # classify the values of the fully-connected sigmoidal layer layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
(4) Mini-batch SGD優化
定義用於優化的損失函數$NLL$
\begin{equation}
\frac{1}{|\mathcal{D}|}\mathcal{L}(\theta=\{W,b\},\mathcal{D})=\frac{1}
{|\mathcal{D}|}\sum_{i=0}^{|\mathcal{D}|} \log{P(Y=y^{(i)}|x^{(i)}, W, B)} \\
\ell (\theta=\{W,b\},\mathcal{D}) = - \frac{1}{|\mathcal{D}|}\mathcal{L}
(\theta=\{W,b\},\mathcal{D})
\end{equation}
# the cost we minimize during training is the NLL of the model cost = layer3.negative_log_likelihood(y)
定義用於測試當前模型在Validation和Testing集合中的性能的函數
# create a function to compute the msitaken that are made by the model test_model = theano.function([index], layer3.errors(y), givens={ x:test_set_x[index*batch_size:(index+1)*batch_size], y:test_set_y[index*batch_size:(index+1)*batch_size]}) validate_model = theano.function([index], layer3.errors(y), givens={ x:valid_set_x[index*batch_size:(index+1)*batch_size], y:valid_set_y[index*batch_size:(index+1)*batch_size]})
定義模型的所有參數以及參數的梯度
# create a list of all model parameters to be fit by gradient descent params = layer3.params + layer2.params + layer1.params + layer0.params # create a list of gradients for all model parameters grads = T.grad(cost, params)
定義SGD的優化策略,梯度更新
updates = [] for param_i, grad_i in zip(params, grads): updates.append((param_i, param_i - learning_rate * grad_i)) train_model = theano.function(inputs=[index], outputs=cost, updates=updates, givens={ x:train_set_x[index*batch_size:(index + 1)*batch_size], y:train_set_y[index*batch_size:(index + 1)*batch_size]})
總體代碼如下
def evaluate_lenet5(learning_rate=0.1, n_epochs=200,dataset = './data/mnist.pkl.gz', nkerns=[20, 50], batch_size=500): """ Demostartes lenet on MNIST dataset :type learning_rate: float :param learning_rate: learning rate used(factor for the stochastic gradient) :type n_epochs: int :param n_epochs: maximal number of epochs to run the optimizer :type dataset: string :param dataset: path to the dataset used for training / testing :type nkerns: list of ints :param nkerns: number of kernels on each LeNetConvPoolLayer :type batch_size : int :param batch_size : size of data in each batch """ #used for LeNetConvPoolLayer to random the filter weights rng = np.random.RandomState(23455) datasets = load_data(dataset) print >> sys.stdout, '...load data is ok' # get train_set vaild_set and test set train_set_x, train_set_y = datasets[0] valid_set_x, valid_set_y = datasets[1] test_set_x, test_set_y = datasets[2] # calculate there are how many batches n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size #print "n_train_batches = %d n_valid_batches = %d n_test_batches = %d" %(train_set_x.get_value(borrow=True).shape[0], # valid_set_x.get_value(borrow=True).shape[0],test_set_x.get_value(borrow=True).shape[0]) ###################### # BUILD ACTUAL MODEL # ###################### print '...building the model' index = T.lscalar() # index to [mini]batches x = T.matrix('x') # images y = T.ivector('y') # the labels ishape = (28, 28) # the size of MNIST images # Reshape matrix of images of shape(batches, 28 * 28) # to a 4D tensor, compatible with our LeNetConvPoolLayer layer0_input = x.reshape((batch_size, 1, 28, 28)) # Construct the first convolutional pooling layer: # filtering reduce the image size to (I_h - f_h + 1, I_w - f_w + 1) # this problem is (28, 28)---->(28-5+1, 28-5+1)=(24,24) # maxpooling reduces this futher to (24/2, 24/2)= (12, 12) # so the 4D output tensor is thus of shape (batch_size, nkerns[0], 12, 12) layer0 = LeNetConvPoolLayer(rng, input=layer0_input, image_shape=(batch_size,1,28,28), filter_shape=(nkerns[0], 1, 5, 5), poolsize=(2,2)) # Construct the first convolutional pooling layer: # filtering reduces the image size to (12-5+1, 12-5+1) = (8, 8) # max pooling reduces this futert to (8/2, 8/2)=(4,4) # 4D output tensor is thus of shape (batch_size, nkerns[1], 4, 4) layer1 = LeNetConvPoolLayer(rng, input=layer0.output, image_shape=(batch_size, nkerns[0], 12, 12), filter_shape=(nkerns[1], nkerns[0], 5,5), poolsize=(2,2)) # the TanhLayer being full-connected,it operates on 2D matrices of # the shape (batches, num_pixels) (i.e matrix of rasterized images) # This will generate a matrix of (batches, nkerns[1]*4*4) Layer2_input = layer1.output.flatten(2) # construct a fully-connected sigmoidal layer layer2 = HiddenLayer(rng, input=Layer2_input, n_in=nkerns[1]*4*4, n_out=500, activation=T.tanh) # classify the values of the fully-connected sigmoidal layer layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10) # the cost we minimize during training is the NLL of the model cost = layer3.negative_log_likelihood(y) # create a function to compute the msitaken that are made by the model test_model = theano.function([index], layer3.errors(y), givens={ x:test_set_x[index*batch_size:(index+1)*batch_size], y:test_set_y[index*batch_size:(index+1)*batch_size]}) validate_model = theano.function([index], layer3.errors(y), givens={ x:valid_set_x[index*batch_size:(index+1)*batch_size], y:valid_set_y[index*batch_size:(index+1)*batch_size]}) # create a list of all model parameters to be fit by gradient descent params = layer3.params + layer2.params + layer1.params + layer0.params # create a list of gradients for all model parameters grads = T.grad(cost, params) # train_model is a function that updates the model parameters by # SGD Since this model has many parameters, it would be tedious # manually create an update rule for each model paramter. We thus # crate updates list by automatically looping over all # (params[i].grad[i]) pairs updates = [] for param_i, grad_i in zip(params, grads): updates.append((param_i, param_i - learning_rate * grad_i)) train_model = theano.function(inputs=[index], outputs=cost, updates=updates, givens={ x:train_set_x[index*batch_size:(index + 1)*batch_size], y:train_set_y[index*batch_size:(index + 1)*batch_size]}) ############### # TRAIN MODEL # ############### print '... training' # early-stoping parameters patience = 10000 # look as this many examples regardless patience_increase = 2 # wait this much longer when a new best is found improvement_threshold = 0.995 # a relative improvement of this much is considered significant validation_frequency = min(n_train_batches, patience/2) best_params = None best_validation_loss = np.inf best_iter = 0 test_score = 0 start_time = time.clock() epoch = 0 done_looping = False while epoch < n_epochs and (not done_looping): epoch = epoch + 1 for minibatch_index in xrange(n_train_batches): minibatch_avg_cost = train_model(minibatch_index) iter = (epoch - 1) * n_train_batches + minibatch_index if ( iter + 1 ) % validation_frequency == 0: valication_losses = [validate_model(i) for i in xrange(n_valid_batches)] this_validation_loss = np.mean(valication_losses) print ('epoch %i, minibacth %i/%i, validation error %f %%' % \ (epoch, minibatch_index + 1 , n_train_batches, this_validation_loss * 100.)) if this_validation_loss < best_validation_loss: if this_validation_loss < best_validation_loss * improvement_threshold: patience = max(patience, iter * patience_increase) best_validation_loss = this_validation_loss # test it on the test set best_iter = iter test_losses = [test_model(i) for i in xrange(n_test_batches)] test_score = np.mean(test_losses) print ' patience %d epoch %i, minibatch %i/%i , test error of best model %f %%' %( patience, epoch, minibatch_index + 1, n_train_batches, test_score * 100.) if patience <= iter: done_looping = True break end_time = time.clock() print 'Optimization complete with best validation score of %f %% with the test performance %f %%' \ %(best_validation_loss * 100. , test_score * 100.) print 'The code run for %d epochs with %f epchos /sec' %(epoch, 1. * epoch / (end_time - start_time)) print >> sys.stderr, ('The code for file ' + os.path.split(__file__)[1] + ' ran for %.1fs' % ((end_time - start_time)))
Code百度網盤地址[code]