import theano import numpy import os import pdb from theano import tensor as T from collections import OrderedDict class model(object): def __init__(self, nh, nc, ne, de, cs): ''' nh :: dimension of the hidden layer nc :: number of classes ne :: number of word embeddings in the vocabulary de :: dimension of the word embeddings cs :: word window context size ''' # parameters of the model self.emb = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (ne+1, de)).astype(theano.config.floatX)) # add one for PADDING at the end self.Wx = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (de * cs, nh)).astype(theano.config.floatX)) self.Wh = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (nh, nh)).astype(theano.config.floatX)) self.W = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (nh, nc)).astype(theano.config.floatX)) self.bh = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX)) self.b = theano.shared(numpy.zeros(nc, dtype=theano.config.floatX)) self.h0 = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX)) # bundle self.params = [ self.emb, self.Wx, self.Wh, self.W, self.bh, self.b, self.h0 ] self.names = ['embeddings', 'Wx', 'Wh', 'W', 'bh', 'b', 'h0'] idxs = T.imatrix() # as many columns as context window size/lines as words in the sentence x = self.emb[idxs].reshape((idxs.shape[0], de*cs)) y = T.iscalar('y') # label def recurrence(x_t, h_tm1): h_t = T.nnet.sigmoid(T.dot(x_t, self.Wx) + T.dot(h_tm1, self.Wh) + self.bh) s_t = T.nnet.softmax(T.dot(h_t, self.W) + self.b) return [h_t, s_t] [h, s], _ = theano.scan(fn=recurrence, \ sequences=x, outputs_info=[self.h0, None], \ n_steps=x.shape[0]) p_y_given_x_lastword = s[-1,0,:] p_y_given_x_sentence = s[:,0,:] y_pred = T.argmax(p_y_given_x_sentence, axis=1) # cost and gradients and learning rate lr = T.scalar('lr') nll = -T.log(p_y_given_x_lastword)[y] gradients = T.grad( nll, self.params ) updates = OrderedDict(( p, p-lr*g ) for p, g in zip( self.params , gradients)) # theano functions self.classify = theano.function(inputs=[idxs], outputs=y_pred) self.train = theano.function( inputs = [idxs, y, lr], outputs = nll, updates = updates ) self.normalize = theano.function( inputs = [], updates = {self.emb:\ self.emb/T.sqrt((self.emb**2).sum(axis=1)).dimshuffle(0,'x')}) def save(self, folder): for param, name in zip(self.params, self.names): numpy.save(os.path.join(folder, name + '.npy'), param.get_value())
上述是RNN在deep learning tutorial上的代碼,我們來逐層解釋一下。
self.emb = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (ne+1, de)).astype(theano.config.floatX)) # add one for PADDING at the end self.Wx = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (de * cs, nh)).astype(theano.config.floatX)) self.Wh = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (nh, nh)).astype(theano.config.floatX)) self.W = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\ (nh, nc)).astype(theano.config.floatX)) self.bh = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX)) self.b = theano.shared(numpy.zeros(nc, dtype=theano.config.floatX)) self.h0 = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX))
這一段很明顯是初始化參數,emb是詞向量,一共ne+1個詞,de是維度,是超參數,需要給定。在elman-forward中有這樣對應的輸入:
s = {'fold':3, # 5 folds 0,1,2,3,4 'lr':0.0627142536696559, 'verbose':1, 'decay':False, # decay on the learning rate if improvement stops 'win':7, # number of words in the context window 'bs':9, # number of backprop through time steps 'nhidden':100, # number of hidden units 'seed':345, 'emb_dimension':100, # dimension of word embedding 'nepochs':50} folder = os.path.basename(__file__).split('.')[0] if not os.path.exists(folder): os.mkdir(folder) # load the dataset train_set, valid_set, test_set, dic = load.atisfold(s['fold']) idx2label = dict((k,v) for v,k in dic['labels2idx'].iteritems()) idx2word = dict((k,v) for v,k in dic['words2idx'].iteritems()) train_lex, train_ne, train_y = train_set valid_lex, valid_ne, valid_y = valid_set test_lex, test_ne, test_y = test_set vocsize = len(dic['words2idx']) nclasses = len(dic['labels2idx']) nsentences = len(train_lex) # instanciate the model numpy.random.seed(s['seed']) random.seed(s['seed']) rnn = model( nh = s['nhidden'], nc = nclasses, ne = vocsize, de = s['emb_dimension'], cs = s['win'] )
我們可以看到在
train_set, valid_set, test_set, dic = load.atisfold(s['fold'])
以及 vocsize = len(dic['words2idx'])可知emb的行是總單詞的個數。emb也是需要訓練得到的。wx是(de*cs)*h的矩陣,是輸入到隱藏層之間的參數,每個單詞擴充到cs窗口大小,每個單詞維度是詞向量維度de,所以一個單詞長度就是de*cs,bh為這兩層之間的bias,wh是h*h的矩陣,隱藏層到隱藏層,h0是bias,w是h*c隱藏層到輸出層,b為bias。由
for e in xrange(s['nepochs']): # shuffle shuffle([train_lex, train_ne, train_y], s['seed']) s['ce'] = e tic = time.time() for i in xrange(nsentences): cwords = contextwin(train_lex[i], s['win'])
words = map(lambda x: numpy.asarray(x).astype('int32'),\ minibatch(cwords, s['bs']))
labels = train_y[i]
for word_batch , label_last_word in zip(words, labels):
rnn.train(word_batch, label_last_word, s['clr'])
rnn.normalize()
cwords = contextwin(train_lex[i], s['win'])是將每一條訓練句子擴充成窗口,比如此時窗口為7,則[0,1,2,3,4]將變為5行7列的矩陣,中心為0,1,2,3,4,不足處用-1填充,[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4,-1], [ 0, 1, 2, 3, 4,-1,-1], [ 1, 2, 3, 4,-1,-1,-1]],minibatch是將list分組,每組1~bs(或最大長度)行,擴充后
[[[-1, -1, -1, 0, 1, 2, 3]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1], [1, 2, 3, 4, -1, -1, -1]]]
labels = train_y[i] 則labels就是一條句子的每個單詞標簽list,比如[0,1,2,3,4]對應的可能是[126,126,45,126,55],(在idxtowords中0,1,2,3,4可以轉換為word,在idxtolabels中126,126,45,126,55可以轉變為labels,所以word_batch,label_last_word為[[-1, -1, -1, 0, 1, 2, 3]]和126,以此類推。
self.train = theano.function( inputs = [idxs, y, lr], outputs = nll, updates = updates )
idxs傳入后也就是這里的word_batch,先初始化為詞向量x = self.emb[idxs].reshape((idxs.shape[0], de*cs)),比如第二個batch處理后就是2*700的x,然后
def recurrence(x_t, h_tm1): h_t = T.nnet.sigmoid(T.dot(x_t, self.Wx) + T.dot(h_tm1, self.Wh) + self.bh) s_t = T.nnet.softmax(T.dot(h_t, self.W) + self.b) return [h_t, s_t] [h, s], _ = theano.scan(fn=recurrence, \ sequences=x, outputs_info=[self.h0, None], \ n_steps=x.shape[0])
相當於前一個單詞的context window組成的700維詞向量(直接拼接)與wx相乘加上初始h0乘以wh加上偏置bh得到第二個隱藏層h_t,通過h_t與W相乘加上偏置得到輸出s_t,如果x不只兩列,就是如此循環下去,n列相當於考慮了n個單詞,rnn循環了n次,[h,s]是每一層的隱藏層與輸出層,都是三維矩陣。(此處不太明白s具體為什么)
p_y_given_x_lastword = s[-1,0,:] p_y_given_x_sentence = s[:,0,:] y_pred = T.argmax(p_y_given_x_sentence, axis=1)
p_y_given_x_lastword是最后一個單詞分為變成nc(這個數據集里是127類)類對應於每一類的概率(向量),而 p_y_given_x_sentence是這個句子里每個單詞對應每一類的概率
(矩陣)
lr = T.scalar('lr') nll = -T.log(p_y_given_x_lastword)[y] gradients = T.grad( nll, self.params ) updates = OrderedDict(( p, p-lr*g ) for p, g in zip( self.params , gradients)) # theano functions self.classify = theano.function(inputs=[idxs], outputs=y_pred)
所以上面這段代碼nll是最后一個單詞正確分類的概率,取-log函數,求導,此處params有
self.params = [ self.emb, self.Wx, self.Wh, self.W, self.bh, self.b, self.h0 ]
修改每個參數,梯度下降法,相當於一次訓練一個單詞,當然利用到了前面n-1個單詞的信息,從第一個訓練到最后一個單詞,即
[[[-1, -1, -1, 0, 1, 2, 3]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1]], [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1], [1, 2, 3, 4, -1, -1, -1]
這里面一次訓練一行經過emb處理后的n×700維矩陣,只對最后一個單詞求代價cost,而分類classify里面包含了一個句子的所有單詞,取每個單詞最終127個分類的最大概率作為
單詞分類(標簽)
predictions_test = [ map(lambda x: idx2label[x], \ rnn.classify(numpy.asarray(contextwin(x, s['win'])).astype('int32')))\ for x in test_lex ] groundtruth_test = [ map(lambda x: idx2label[x], y) for y in test_y ] words_test = [ map(lambda x: idx2word[x], w) for w in test_lex]
train_lex, train_ne, train_y = train_set valid_lex, valid_ne, valid_y = valid_set test_lex, test_ne, test_y = test_set
這里面不知道test_ne是啥,不過train_lex,test_lex都是二維矩陣,每一行是一個句子,我們再看上上面那段代碼,predictions_test相當於取出每個test_lex中的句子,先擴充成n×7的矩陣,每一行是一個單詞的context window,放入classify分類器里面得到的是每個單詞的label ID,再轉化成label,groundtruth_test是真正每個單詞的label,words_test是每個句子原本的句子。
最后輸出是一個文件,包括單詞,真實標簽,預測標簽。