RNN-theano代碼解析


import theano
import numpy
import os
import pdb
from theano import tensor as T
from collections import OrderedDict

class model(object):
    
    def __init__(self, nh, nc, ne, de, cs):
        '''
        nh :: dimension of the hidden layer
        nc :: number of classes
        ne :: number of word embeddings in the vocabulary
        de :: dimension of the word embeddings
        cs :: word window context size 
        '''
        # parameters of the model
        self.emb = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (ne+1, de)).astype(theano.config.floatX)) # add one for PADDING at the end
        self.Wx  = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (de * cs, nh)).astype(theano.config.floatX))
        self.Wh  = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (nh, nh)).astype(theano.config.floatX))
        self.W   = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (nh, nc)).astype(theano.config.floatX))
        self.bh  = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX))
        self.b   = theano.shared(numpy.zeros(nc, dtype=theano.config.floatX))
        self.h0  = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX))

        # bundle
        self.params = [ self.emb, self.Wx, self.Wh, self.W, self.bh, self.b, self.h0 ]
        self.names  = ['embeddings', 'Wx', 'Wh', 'W', 'bh', 'b', 'h0']
        idxs = T.imatrix() # as many columns as context window size/lines as words in the sentence
        x = self.emb[idxs].reshape((idxs.shape[0], de*cs))
        y    = T.iscalar('y') # label

        def recurrence(x_t, h_tm1):
            h_t = T.nnet.sigmoid(T.dot(x_t, self.Wx) + T.dot(h_tm1, self.Wh) + self.bh)
            s_t = T.nnet.softmax(T.dot(h_t, self.W) + self.b)
            return [h_t, s_t]

        [h, s], _ = theano.scan(fn=recurrence, \
            sequences=x, outputs_info=[self.h0, None], \
            n_steps=x.shape[0])
        p_y_given_x_lastword = s[-1,0,:]
        p_y_given_x_sentence = s[:,0,:]
        y_pred = T.argmax(p_y_given_x_sentence, axis=1)

        # cost and gradients and learning rate
        lr = T.scalar('lr')
        nll = -T.log(p_y_given_x_lastword)[y]
        gradients = T.grad( nll, self.params )
        updates = OrderedDict(( p, p-lr*g ) for p, g in zip( self.params , gradients))
        
        # theano functions
        self.classify = theano.function(inputs=[idxs], outputs=y_pred)

        self.train = theano.function( inputs  = [idxs, y, lr],
                                      outputs = nll,
                                      updates = updates )

        self.normalize = theano.function( inputs = [],
                         updates = {self.emb:\
                         self.emb/T.sqrt((self.emb**2).sum(axis=1)).dimshuffle(0,'x')})

    def save(self, folder):   
        for param, name in zip(self.params, self.names):
            numpy.save(os.path.join(folder, name + '.npy'), param.get_value())

上述是RNN在deep learning tutorial上的代碼,我們來逐層解釋一下。

 self.emb = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (ne+1, de)).astype(theano.config.floatX)) # add one for PADDING at the end
        self.Wx  = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (de * cs, nh)).astype(theano.config.floatX))
        self.Wh  = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (nh, nh)).astype(theano.config.floatX))
        self.W   = theano.shared(0.2 * numpy.random.uniform(-1.0, 1.0,\
                   (nh, nc)).astype(theano.config.floatX))
        self.bh  = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX))
        self.b   = theano.shared(numpy.zeros(nc, dtype=theano.config.floatX))
        self.h0  = theano.shared(numpy.zeros(nh, dtype=theano.config.floatX))

這一段很明顯是初始化參數,emb是詞向量,一共ne+1個詞,de是維度,是超參數,需要給定。在elman-forward中有這樣對應的輸入:

 s = {'fold':3, # 5 folds 0,1,2,3,4
         'lr':0.0627142536696559,
         'verbose':1,
         'decay':False, # decay on the learning rate if improvement stops
         'win':7, # number of words in the context window
         'bs':9, # number of backprop through time steps
         'nhidden':100, # number of hidden units
         'seed':345,
         'emb_dimension':100, # dimension of word embedding
         'nepochs':50}

    folder = os.path.basename(__file__).split('.')[0]
    if not os.path.exists(folder): os.mkdir(folder)

    # load the dataset
    train_set, valid_set, test_set, dic = load.atisfold(s['fold'])
    idx2label = dict((k,v) for v,k in dic['labels2idx'].iteritems())
    idx2word  = dict((k,v) for v,k in dic['words2idx'].iteritems())

    train_lex, train_ne, train_y = train_set
    valid_lex, valid_ne, valid_y = valid_set
    test_lex,  test_ne,  test_y  = test_set

    vocsize = len(dic['words2idx'])
    nclasses = len(dic['labels2idx'])
    nsentences = len(train_lex)

    # instanciate the model
    numpy.random.seed(s['seed'])
    random.seed(s['seed'])
    rnn = model(    nh = s['nhidden'],
                    nc = nclasses,
                    ne = vocsize,
                    de = s['emb_dimension'],
                    cs = s['win'] )

我們可以看到在

train_set, valid_set, test_set, dic = load.atisfold(s['fold'])

 以及 vocsize = len(dic['words2idx'])可知emb的行是總單詞的個數。emb也是需要訓練得到的。wx是(de*cs)*h的矩陣,是輸入到隱藏層之間的參數,每個單詞擴充到cs窗口大小,每個單詞維度是詞向量維度de,所以一個單詞長度就是de*cs,bh為這兩層之間的bias,wh是h*h的矩陣,隱藏層到隱藏層,h0是bias,w是h*c隱藏層到輸出層,b為bias。由

for e in xrange(s['nepochs']):
        # shuffle
        shuffle([train_lex, train_ne, train_y], s['seed'])
        s['ce'] = e
        tic = time.time()
        for i in xrange(nsentences):
            cwords = contextwin(train_lex[i], s['win'])

      words = map(lambda x: numpy.asarray(x).astype('int32'),\ minibatch(cwords, s['bs']))

          labels = train_y[i]

          for word_batch , label_last_word in zip(words, labels):

             rnn.train(word_batch, label_last_word, s['clr'])

             rnn.normalize()

cwords = contextwin(train_lex[i], s['win'])是將每一條訓練句子擴充成窗口,比如此時窗口為7,則[0,1,2,3,4]將變為5行7列的矩陣,中心為0,1,2,3,4,不足處用-1填充,[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4,-1], [ 0, 1, 2, 3, 4,-1,-1], [ 1, 2, 3, 4,-1,-1,-1]],minibatch是將list分組,每組1~bs(或最大長度)行,擴充后

[[[-1, -1, -1, 0, 1, 2, 3]],
 [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4]], 
[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1]],
 [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1]], 
[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1], [1, 2, 3, 4, -1, -1, -1]]]

labels = train_y[i] 則labels就是一條句子的每個單詞標簽list,比如[0,1,2,3,4]對應的可能是[126,126,45,126,55],(在idxtowords中0,1,2,3,4可以轉換為word,在idxtolabels中126,126,45,126,55可以轉變為labels,所以word_batch,label_last_word為[[-1, -1, -1, 0, 1, 2, 3]]和126,以此類推。

self.train = theano.function( inputs  = [idxs, y, lr],
                                      outputs = nll,
                                      updates = updates )

idxs傳入后也就是這里的word_batch,先初始化為詞向量x = self.emb[idxs].reshape((idxs.shape[0], de*cs)),比如第二個batch處理后就是2*700的x,然后

        def recurrence(x_t, h_tm1):
            h_t = T.nnet.sigmoid(T.dot(x_t, self.Wx) + T.dot(h_tm1, self.Wh) + self.bh)
            s_t = T.nnet.softmax(T.dot(h_t, self.W) + self.b)
            return [h_t, s_t]

        [h, s], _ = theano.scan(fn=recurrence, \
            sequences=x, outputs_info=[self.h0, None], \
            n_steps=x.shape[0])

相當於前一個單詞的context window組成的700維詞向量(直接拼接)與wx相乘加上初始h0乘以wh加上偏置bh得到第二個隱藏層h_t,通過h_t與W相乘加上偏置得到輸出s_t,如果x不只兩列,就是如此循環下去,n列相當於考慮了n個單詞,rnn循環了n次,[h,s]是每一層的隱藏層與輸出層,都是三維矩陣。(此處不太明白s具體為什么)

        p_y_given_x_lastword = s[-1,0,:]
        p_y_given_x_sentence = s[:,0,:]
        y_pred = T.argmax(p_y_given_x_sentence, axis=1)
p_y_given_x_lastword是最后一個單詞分為變成nc(這個數據集里是127類)類對應於每一類的概率(向量),而 p_y_given_x_sentence是這個句子里每個單詞對應每一類的概率
(矩陣)
        lr = T.scalar('lr')
        nll = -T.log(p_y_given_x_lastword)[y]
        gradients = T.grad( nll, self.params )
        updates = OrderedDict(( p, p-lr*g ) for p, g in zip( self.params , gradients))
        
        # theano functions
        self.classify = theano.function(inputs=[idxs], outputs=y_pred)

所以上面這段代碼nll是最后一個單詞正確分類的概率,取-log函數,求導,此處params有

self.params = [ self.emb, self.Wx, self.Wh, self.W, self.bh, self.b, self.h0 ]

修改每個參數,梯度下降法,相當於一次訓練一個單詞,當然利用到了前面n-1個單詞的信息,從第一個訓練到最后一個單詞,即

[[[-1, -1, -1, 0, 1, 2, 3]],
 [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4]], 
[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1]],
 [[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1]], 
[[-1, -1, -1, 0, 1, 2, 3], [-1, -1, 0, 1, 2, 3, 4], [-1, 0, 1, 2, 3, 4, -1], [0, 1, 2, 3, 4, -1, -1], [1, 2, 3, 4, -1, -1, -1]
這里面一次訓練一行經過emb處理后的n×700維矩陣,只對最后一個單詞求代價cost,而分類classify里面包含了一個句子的所有單詞,取每個單詞最終127個分類的最大概率作為
單詞分類(標簽)
        predictions_test = [ map(lambda x: idx2label[x], \
                             rnn.classify(numpy.asarray(contextwin(x, s['win'])).astype('int32')))\
                             for x in test_lex ]
        groundtruth_test = [ map(lambda x: idx2label[x], y) for y in test_y ]
        words_test = [ map(lambda x: idx2word[x], w) for w in test_lex]
 
        
train_lex, train_ne, train_y = train_set
    valid_lex, valid_ne, valid_y = valid_set
    test_lex,  test_ne,  test_y  = test_set

這里面不知道test_ne是啥,不過train_lex,test_lex都是二維矩陣,每一行是一個句子,我們再看上上面那段代碼,predictions_test相當於取出每個test_lex中的句子,先擴充成n×7的矩陣,每一行是一個單詞的context window,放入classify分類器里面得到的是每個單詞的label ID,再轉化成label,groundtruth_test是真正每個單詞的label,words_test是每個句子原本的句子。

最后輸出是一個文件,包括單詞,真實標簽,預測標簽。












免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM