在本章節中,我們會學習如何用Theano實現最基本的對數回歸分類器。首先,我們會簡單的復習一個這個模型,在這個過程中,大家可以進一步的了解如何把數學表達式和Theano的圖模型結合起來。
數學模型
對數回歸模型是試過線性概率分類器,它有兩個參數,權重矩陣$W$和偏移向量$b$.分類的過程是把數據投影到一組高維超平面上,數據和平面的距離反應了它屬於這個類別的概率。這個模型的數學公式可以表示為:
$$P(Y=i|x, W,b) = softmax_i(W x + b) \\ = \frac {e^{W_i x + b_i}} {\sum_j e^{W_j x + b_j}}$$
模型的輸出即為預測的結果,它的值為:
$$y_{pred} = argmax_i P(Y=i|x,W,b)$$
在Theano中,通過以下函數實現如下功能
# generate symbolic variables for input (x and y represent a # minibatch) x = T.fmatrix('x') y = T.lvector('y') # allocate shared variables model params b = theano.shared(numpy.zeros((10,)), name='b') W = theano.shared(numpy.zeros((784, 10)), name='W') # symbolic expression for computing the vector of # class-membership probabilities p_y_given_x = T.nnet.softmax(T.dot(x, W) + b) # compiled Theano function that returns the vector of class-membership # probabilities get_p_y_given_x = theano.function(inputs=[x], outputs=p_y_given_x) # print the probability of some example represented by x_value # x_value is not a symbolic variable but a numpy array describing the # datapoint print 'Probability that x is of class %i is %f' % (i, get_p_y_given_x(x_value)[i]) # symbolic description of how to compute prediction as class whose probability # is maximal y_pred = T.argmax(p_y_given_x, axis=1) # compiled theano function that returns this value classify = theano.function(inputs=[x], outputs=y_pred)
在以上代碼中,首先定義了輸入變量$x$,$y$. 因為模型在訓練過程中要保持一個穩定的狀態,模型參數$W$,$b$定義成共享變量,這種定義不僅可以聲明變量,還會初始化他們的值。接下來,點乘和softmax操作用來計算模型輸出$P(Y|x,W,b)$. 結果保存在變量p_y_given_x中。
到目前為止,我們僅僅訂了Theano運行的計算圖模型。為了得到真實的$P(Y|x,W,b)$值,我們需要創建函數 get_p_y_given_x, 它以x為參數,輸出值為p_y_given_x。我們可以遍歷它的值,並得到數據屬於每一個類別的概率。
現在,讓我們結束Theano圖的創建。為了得到模型的預測結果,我們用T.argmax操作符,這個操作返回p_y_given_x中做大值得索引。
類似的,為了得到給定輸入的預測結果,我們定義函數classify。該函數以模型輸入矩陣$x$為參數,輸出為列向量,表示了每個實例的預測類別。
當然,這個模型還沒有任何用途,因為模型參數還處於初始狀態。下面的章節中,我們將學習如何訓練模型 。
損失函數
模型的訓練過程也就是最小化損失函數的過程。 在多類別的對數回歸模型中,通常采用負對數似然函數作為模型的參數。這相當於在以$\theta$為參數的模型中,最大化訓練數據的似然。如果我們定義似然和損失函數如下:
$$\mathcal{L} (\theta=\{W,b\}, \mathcal{D}) = \sum_{i=0}^{|\mathcal{D}|} \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\ \ell (\theta=\{W,b\}, \mathcal{D}) = - \mathcal{L} (\theta=\{W,b\}, \mathcal{D})$$
下面的代碼演示了如何計算一個minbatch的損失
loss = -T.mean(T.log(p_y_given_x)[T.arange(y.shape[0]), y]) # note on syntax: T.arange(y.shape[0]) is a vector of integers [0,1,2,...,len(y)]. # Indexing a matrix M by the two vectors [0,1,...,K], [a,b,...,k] returns the # elements M[0,a], M[1,b], ..., M[K,k] as a vector. Here, we use this # syntax to retrieve the log-probability of the correct labels, y.
創建LogisticRegression類
現在我們已經有了LogisticRegression類的所有功能。該類的代碼如下,這些代碼涵蓋了我們之前學習的所有功能。
class LogisticRegression(object): def __init__(self, input, n_in, n_out): """ Initialize the parameters of the logistic regression :type input: theano.tensor.TensorType :param input: symbolic variable that describes the input of the architecture (e.g., one minibatch of input images) :type n_in: int :param n_in: number of input units, the dimension of the space in which the datapoint lies :type n_out: int :param n_out: number of output units, the dimension of the space in which the target lies """
# initialize with 0 the weights W as a matrix of shape (n_in, n_out)
self.W = theano.shared(value=numpy.zeros((n_in, n_out), dtype=theano.config.floatX), name='W' ) # initialize the baises b as a vector of n_out 0s
self.b = theano.shared(value=numpy.zeros((n_out,), dtype=theano.config.floatX), name='b' ) # compute vector of class-membership probabilities in symbolic form
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) # compute prediction as class whose probability is maximal in
# symbolic form
self.y_pred=T.argmax(self.p_y_given_x, axis=1) def negative_log_likelihood(self, y): """Return the mean of the negative log-likelihood of the prediction of this model under a given target distribution. .. math:: \frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) = \frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|} \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\ \ell (\theta=\{W,b\}, \mathcal{D}) :param y: corresponds to a vector that gives for each example the correct label; Note: we use the mean instead of the sum so that the learning rate is less dependent on the batch size """
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
這個類可以通過以下方式實例化:
# allocate symbolic variables for the data x = T.fmatrix() # the data is presented as rasterized images (each being a 1-D row vector in x) y = T.lvector() # the labels are presented as 1D vector of [long int] labels # construct the logistic regression class classifier = LogisticRegression( input=x.reshape((batch_size, 28 * 28)), n_in=28 * 28, n_out=10)
最后,定義損失函數:
cost = classifier.negative_log_likelihood(y)
模型的訓練
為了在編程語言里面實現MSGD,我們需要手動計算模型的微分。如果模型比較復雜的話,計算過程會變得非常困難。
在Theano中,這個工作可以通過函數自動的完成,實例代碼如下:
# compute the gradient of cost with respect to theta = (W,b) g_W = T.grad(cost, classifier.W) g_b = T.grad(cost, classifier.b)
g_W和g_b是符號變量,他們可以用在計算圖模型中。下面代碼演示了執行一步梯度下降算法的過程:
# compute the gradient of cost with respect to theta = (W,b) g_W = T.grad(cost=cost, wrt=classifier.W) g_b = T.grad(cost=cost, wrt=classifier.b) # specify how to update the parameters of the model as a list of # (variable, update expression) pairs updates = [(classifier.W, classifier.W - learning_rate * g_W), (classifier.b, classifier.b - learning_rate * g_b)] # compiling a Theano function `train_model` that returns the cost, but in # the same time updates the parameter of the model based on the rules # defined in `updates` train_model = theano.function(inputs=[index], outputs=cost, updates=updates, givens={ x: train_set_x[index * batch_size: (index + 1) * batch_size], y: train_set_y[index * batch_size: (index + 1) * batch_size]})
update這列表里面包含了對每一個變量的隨機梯度算法下面的更新操作。givens 字典里面包含數據和計算圖模型中變量的映射關系。整個train_model定義了:
- 輸入:為通過index索引的mini-batch,其數據定義為$x$,相應的label表示為$y$.
- 返回值,為相應的損失
- 每次函數調用的時候,首先通過index檢索相應的參數 $x$, $y$, 然后計算在這個minbatch上面的函數損失,並應用定義在updates 列表中的操作更新參數。
函數 train_model(index) 調用的時候,它會計算並返回近似的損失,並執行一步MSGD操作。整個學習過程包括一系列的在該數據集上的循環,也就是是一個反復的調用這個函數的過程
模型的測試
正如第一節介紹的,我們對模型的測試主要是關心它的錯誤分類的數據的數量,而不僅僅是似然函數。因此類 LogisticRegression 中需要一個成員函數,用於建立返回測試數據上面的誤分數據的數目符號圖(symbolic graph)。 代碼如下:
class LogisticRegression(object): def errors(self, y): """Return a float representing the number of errors in the minibatch over the total number of examples of the minibatch ; zero one loss over the size of the minibatch """ return T.mean(T.neq(self.y_pred, y))
接下來我們定義函數 test_model和validte_model, 以便於得到這個函數的值。 validate_model是實現前期結束的關鍵(見前一節)。兩個函數的功能都是對已一個給定的batch,計算其誤分類的實例的數目。兩個函數的區別在於,它們一個運行在測試數據上,一個運行在驗證數據上。相應的函數代碼如下:
# compiling a Theano function that computes the mistakes that are made by # the model on a minibatch test_model = theano.function(inputs=[index], outputs=classifier.errors(y), givens={ x: test_set_x[index * batch_size: (index + 1) * batch_size], y: test_set_y[index * batch_size: (index + 1) * batch_size]}) validate_model = theano.function(inputs=[index], outputs=classifier.errors(y), givens={ x: valid_set_x[index * batch_size: (index + 1) * batch_size], y: valid_set_y[index * batch_size: (index + 1) * batch_size]})
綜合所有功能
如果把以上所有的功能整合在一起,就得到如下的代碼:
""" This tutorial introduces logistic regression using Theano and stochastic gradient descent. Logistic regression is a probabilistic, linear classifier. It is parametrized by a weight matrix :math:`W` and a bias vector :math:`b`. Classification is done by projecting data points onto a set of hyperplanes, the distance to which is used to determine a class membership probability. Mathematically, this can be written as: .. math:: P(Y=i|x, W,b) &= softmax_i(W x + b) \\ &= \frac {e^{W_i x + b_i}} {\sum_j e^{W_j x + b_j}} The output of the model or prediction is then done by taking the argmax of the vector whose i'th element is P(Y=i|x). .. math:: y_{pred} = argmax_i P(Y=i|x,W,b) This tutorial presents a stochastic gradient descent optimization method suitable for large datasets, and a conjugate gradient optimization method that is suitable for smaller datasets. References: - textbooks: "Pattern Recognition and Machine Learning" - Christopher M. Bishop, section 4.3.2 """ __docformat__ = 'restructedtext en' import cPickle import gzip import os import sys import time import numpy import theano import theano.tensor as T class LogisticRegression(object): """Multi-class Logistic Regression Class The logistic regression is fully described by a weight matrix :math:`W` and bias vector :math:`b`. Classification is done by projecting data points onto a set of hyperplanes, the distance to which is used to determine a class membership probability. """ def __init__(self, input, n_in, n_out): """ Initialize the parameters of the logistic regression :type input: theano.tensor.TensorType :param input: symbolic variable that describes the input of the architecture (one minibatch) :type n_in: int :param n_in: number of input units, the dimension of the space in which the datapoints lie :type n_out: int :param n_out: number of output units, the dimension of the space in which the labels lie """ # initialize with 0 the weights W as a matrix of shape (n_in, n_out) self.W = theano.shared(value=numpy.zeros((n_in, n_out), dtype=theano.config.floatX), name='W', borrow=True) # initialize the baises b as a vector of n_out 0s self.b = theano.shared(value=numpy.zeros((n_out,), dtype=theano.config.floatX), name='b', borrow=True) # compute vector of class-membership probabilities in symbolic form self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) # compute prediction as class whose probability is maximal in # symbolic form self.y_pred = T.argmax(self.p_y_given_x, axis=1) # parameters of the model self.params = [self.W, self.b] def negative_log_likelihood(self, y): """Return the mean of the negative log-likelihood of the prediction of this model under a given target distribution. .. math:: \frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) = \frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|} \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\ \ell (\theta=\{W,b\}, \mathcal{D}) :type y: theano.tensor.TensorType :param y: corresponds to a vector that gives for each example the correct label Note: we use the mean instead of the sum so that the learning rate is less dependent on the batch size """ # y.shape[0] is (symbolically) the number of rows in y, i.e., # number of examples (call it n) in the minibatch # T.arange(y.shape[0]) is a symbolic vector which will contain # [0,1,2,... n-1] T.log(self.p_y_given_x) is a matrix of # Log-Probabilities (call it LP) with one row per example and # one column per class LP[T.arange(y.shape[0]),y] is a vector # v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ..., # LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is # the mean (across minibatch examples) of the elements in v, # i.e., the mean log-likelihood across the minibatch. return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) def errors(self, y): """Return a float representing the number of errors in the minibatch over the total number of examples of the minibatch ; zero one loss over the size of the minibatch :type y: theano.tensor.TensorType :param y: corresponds to a vector that gives for each example the correct label """ # check if y has same dimension of y_pred if y.ndim != self.y_pred.ndim: raise TypeError('y should have the same shape as self.y_pred', ('y', target.type, 'y_pred', self.y_pred.type)) # check if y is of the correct datatype if y.dtype.startswith('int'): # the T.neq operator returns a vector of 0s and 1s, where 1 # represents a mistake in prediction return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError() def load_data(dataset): ''' Loads the dataset :type dataset: string :param dataset: the path to the dataset (here MNIST) ''' ############# # LOAD DATA # ############# # Download the MNIST dataset if it is not present data_dir, data_file = os.path.split(dataset) if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz': import urllib origin = 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz' print 'Downloading data from %s' % origin urllib.urlretrieve(origin, dataset) print '... loading data' # Load the dataset f = gzip.open(dataset, 'rb') train_set, valid_set, test_set = cPickle.load(f) f.close() #train_set, valid_set, test_set format: tuple(input, target) #input is an numpy.ndarray of 2 dimensions (a matrix) #witch row's correspond to an example. target is a #numpy.ndarray of 1 dimensions (vector)) that have the same length as #the number of rows in the input. It should give the target #target to the example with the same index in the input. def shared_dataset(data_xy, borrow=True): """ Function that loads the dataset into shared variables The reason we store our dataset in shared variables is to allow Theano to copy it into the GPU memory (when code is run on GPU). Since copying data into the GPU is slow, copying a minibatch everytime is needed (the default behaviour if the data is not in a shared variable) would lead to a large decrease in performance. """ data_x, data_y = data_xy shared_x = theano.shared(numpy.asarray(data_x, dtype=theano.config.floatX), borrow=borrow) shared_y = theano.shared(numpy.asarray(data_y, dtype=theano.config.floatX), borrow=borrow) # When storing data on the GPU it has to be stored as floats # therefore we will store the labels as ``floatX`` as well # (``shared_y`` does exactly that). But during our computations # we need them as ints (we use labels as index, and if they are # floats it doesn't make sense) therefore instead of returning # ``shared_y`` we will have to cast it to int. This little hack # lets ous get around this issue return shared_x, T.cast(shared_y, 'int32') test_set_x, test_set_y = shared_dataset(test_set) valid_set_x, valid_set_y = shared_dataset(valid_set) train_set_x, train_set_y = shared_dataset(train_set) rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)] return rval def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, dataset='../data/mnist.pkl.gz', batch_size=600): """ Demonstrate stochastic gradient descent optimization of a log-linear model This is demonstrated on MNIST. :type learning_rate: float :param learning_rate: learning rate used (factor for the stochastic gradient) :type n_epochs: int :param n_epochs: maximal number of epochs to run the optimizer :type dataset: string :param dataset: the path of the MNIST dataset file from http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz """ datasets = load_data(dataset) train_set_x, train_set_y = datasets[0] valid_set_x, valid_set_y = datasets[1] test_set_x, test_set_y = datasets[2] # compute number of minibatches for training, validation and testing n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size ###################### # BUILD ACTUAL MODEL # ###################### print '... building the model' # allocate symbolic variables for the data index = T.lscalar() # index to a [mini]batch x = T.matrix('x') # the data is presented as rasterized images y = T.ivector('y') # the labels are presented as 1D vector of # [int] labels # construct the logistic regression class # Each MNIST image has size 28*28 classifier = LogisticRegression(input=x, n_in=28 * 28, n_out=10) # the cost we minimize during training is the negative log likelihood of # the model in symbolic format cost = classifier.negative_log_likelihood(y) # compiling a Theano function that computes the mistakes that are made by # the model on a minibatch test_model = theano.function(inputs=[index], outputs=classifier.errors(y), givens={ x: test_set_x[index * batch_size: (index + 1) * batch_size], y: test_set_y[index * batch_size: (index + 1) * batch_size]}) validate_model = theano.function(inputs=[index], outputs=classifier.errors(y), givens={ x: valid_set_x[index * batch_size:(index + 1) * batch_size], y: valid_set_y[index * batch_size:(index + 1) * batch_size]}) # compute the gradient of cost with respect to theta = (W,b) g_W = T.grad(cost=cost, wrt=classifier.W) g_b = T.grad(cost=cost, wrt=classifier.b) # specify how to update the parameters of the model as a list of # (variable, update expression) pairs. updates = [(classifier.W, classifier.W - learning_rate * g_W), (classifier.b, classifier.b - learning_rate * g_b)] # compiling a Theano function `train_model` that returns the cost, but in # the same time updates the parameter of the model based on the rules # defined in `updates` train_model = theano.function(inputs=[index], outputs=cost, updates=updates, givens={ x: train_set_x[index * batch_size:(index + 1) * batch_size], y: train_set_y[index * batch_size:(index + 1) * batch_size]}) ############### # TRAIN MODEL # ############### print '... training the model' # early-stopping parameters patience = 5000 # look as this many examples regardless patience_increase = 2 # wait this much longer when a new best is # found improvement_threshold = 0.995 # a relative improvement of this much is # considered significant validation_frequency = min(n_train_batches, patience / 2) # go through this many # minibatche before checking the network # on the validation set; in this case we # check every epoch best_params = None best_validation_loss = numpy.inf test_score = 0. start_time = time.clock() done_looping = False epoch = 0 while (epoch < n_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index in xrange(n_train_batches): minibatch_avg_cost = train_model(minibatch_index) # iteration number iter = (epoch - 1) * n_train_batches + minibatch_index if (iter + 1) % validation_frequency == 0: # compute zero-one loss on validation set validation_losses = [validate_model(i) for i in xrange(n_valid_batches)] this_validation_loss = numpy.mean(validation_losses) print('epoch %i, minibatch %i/%i, validation error %f %%' % \ (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100.)) # if we got the best validation score until now if this_validation_loss < best_validation_loss: #improve patience if loss improvement is good enough if this_validation_loss < best_validation_loss * \ improvement_threshold: patience = max(patience, iter * patience_increase) best_validation_loss = this_validation_loss # test it on the test set test_losses = [test_model(i) for i in xrange(n_test_batches)] test_score = numpy.mean(test_losses) print((' epoch %i, minibatch %i/%i, test error of best' ' model %f %%') % (epoch, minibatch_index + 1, n_train_batches, test_score * 100.)) if patience <= iter: done_looping = True break end_time = time.clock() print(('Optimization complete with best validation score of %f %%,' 'with test performance %f %%') % (best_validation_loss * 100., test_score * 100.)) print 'The code run for %d epochs, with %f epochs/sec' % ( epoch, 1. * epoch / (end_time - start_time)) print >> sys.stderr, ('The code for file ' + os.path.split(__file__)[1] + ' ran for %.1fs' % ((end_time - start_time))) if __name__ == '__main__': sgd_optimization_mnist()
這段程序采用SGD邏輯回歸算法學習分類器,在DeepLearningTutorials文件夾中,可以通過以下命令調用:
python code/logistic_sgd.py