本文轉載自:https://dongzhixiao.github.io/2018/07/21/so-hot/
今天周六,早晨出門吃飯,全身汗濕透。天氣真的是太熱了!我決定一天不出門,在屋子里面休息!
晚上,騰飛給我說了他暑假的計划,決定去長沙、成都去轉一圈,並邀請我去,還順便叫我晚上去吃飯。
最后我們就一起吃了一頓飯,不過我估計我休息的時間是下下周,因此可能不能和他一起去了。
今天總結一下本周學習到的知識:
周一
在進行神經網絡序列輸入的時候,發現了一個很好的文件代碼用來數據預處理。
注意:后面使用“數據單元”代表數據的一個最小單元,比如訓練英文數據就可以代表一個字符——’a’,訓練中文數據就可以代表一個漢字——’王’etc
文件名字叫做read_utils.py
。該文件中實現了一個類TextConverter
和一個工具函數batch_generator
。
該文件是一個工具類,用於把一個輸入文件根據編碼輸出對應的一批一批的數據用於RNN/LSTM之類的文本處理神經網絡訓練, 用法是先使用TextConverter類編碼所有的內容為數據單元對應數字,然后使用batch_generator函數將編碼好的數字分批返回 比如:
text = f.read() #f是open后得到的文件指針
converter = TextConverter(text)
arr = converter.text_to_arr(text)
g = batch_generator(arr,num_seqs,num_steps) #如果輸入本來就是編碼好的數據,則直接使用這個函數即可
下面讓我們來一個一個學習一下。
TextConverter類
TextConverter類是用來將傳入的文件中所有數據 首先,我們看看該類的構造函數。
class TextConverter(object): def __init__(self, text=None, max_vocab=5000, filename=None): if filename is not None: with open(filename, 'rb') as f: self.vocab = pickle.load(f) else: vocab = set(text) #存儲讀取文件中的數據單元所有類型的集合,比如英文文件會是:{'\n','A','b',...,'\r'} print(len(vocab)) #打印數據單元的種類的數目 # max_vocab_process vocab_count = {} #存儲每一個數據單元在整個讀入的文本中出現的次數的字典 for word in vocab: vocab_count[word] = 0 for word in text: vocab_count[word] += 1 vocab_count_list = [] #存儲元組(數據單元,對應數量)組成的列表,然后按照數量的大小排序,比如[('a',100),('d',20),...,('x',3)] for word in vocab_count: vocab_count_list.append((word, vocab_count[word])) vocab_count_list.sort(key=lambda x: x[1], reverse=True) if len(vocab_count_list) > max_vocab: #根據傳入的最大數量的數據單元數截斷前max_vocab大的數據單元,基本上不可能,除非遇到漢字之類的文本 vocab_count_list = vocab_count_list[:max_vocab] vocab = [x[0] for x in vocab_count_list] self.vocab = vocab #vocab僅僅存儲數據單元按照出現數量從大到小的列表,例如:['a','d',...,'x'] self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)} # 數據單元到數字字典{' ':0,'e':1,...,'c':20,...} self.int_to_word_table = dict(enumerate(self.vocab)) # 數字到數據單元字典{0:‘ ’,1:'e',...,20:'c',..}
可以看出,該構造函數的輸入是(文本內容,最大詞限制,文件名)。可以看出最后一個關鍵字參數filename是用來判斷文件是否為空,從而直接讀取 不用進入后面的處理環節,這個地方跟后面的保存模塊對應的:
def save_to_file(self, filename): #僅僅存儲數據單元按照出現數量從大到小的列表到指定文件filename處,例如:['a','d',...,'x'] with open(filename, 'wb') as f: pickle.dump(self.vocab, f)
保存后,以后就可以直接使用這個詞表了。 如果沒有傳入文件名,則說明需要進行后續的處理,我們仔細看一下后面的代碼,發現實際上做的工作就是:
- 找到所有數據中“數據單元”
- 遍歷文件記錄每個“數據單元”出現的次數,根據次數大小對“數據單元”排序
- 根據傳入參數max_vocab截斷數據單元,只去前max_vocab個“數據單元”
- 將留下的數據單元一一映射到自然數0,1,2…上
注意傳入的text
是一個列表或者列表生成器之類的數據結構,因為后面的代碼把它這樣子用了(比如去text的集合,用for迭代text等)。
在構造函數中已經實現了“數據單元”到自然數列的映射,因此互相轉換的函數就顯而易見了,如下所示:
def word_to_int(self, word): #返回數據單元對應的整數 if word in self.word_to_int_table: return self.word_to_int_table[word] else: return len(self.vocab) #如果出現了沒有出現的詞,則變為<unk>對應的標記 def int_to_word(self, index): #返回整數對應的數據單元 if index == len(self.vocab): return '<unk>' #沒有出現的詞被標記為unknown的縮寫 elif index < len(self.vocab): return self.int_to_word_table[index] else: raise Exception('Unknown index!')
由上面的函數可知,在映射的時候如果詞沒有出現在詞表中,則標記為<unk>
返回,這個是非常重要的一個處理,因為在實際進行數據 輸入的時候,由於截斷引起的超出數據記錄的詞,或者在進行測試集的時候很有可能出現這種情況!
既然有了單個“數據單元”和自然數的映射,多個“數據單元”組成的列表當然也能相互轉化:
def text_to_arr(self, text): #將輸入的text根據word_to_int返回得到對應的編碼數,並構成np.ndarray並返回,例如:輸入' a\n',則返回類似array([ 0, 0, 4, 10]) arr = [] for word in text: arr.append(self.word_to_int(word)) return np.array(arr) def arr_to_text(self, arr): #輸入列表類型的數據,返回對應的數據單元的組合 words = [] for index in arr: words.append(self.int_to_word(index)) return "".join(words)
batch_generator
有了數據編碼的類,下面就需要一個樣本生成的函數了。 根據輸入的數據(這個輸入一般就是全部樣本組成的文本,並且已經根據所有數據單元編碼成為了數字列表), 返回對應的生成器,滿足輸入的序列個數和序列長度
def batch_generator(arr, n_seqs, n_steps): #根據輸入的arr(這個輸入一般就是全部樣本組成的文本,並且已經根據所有數據單元編碼成為了數字列表),返回對應的生成器,滿足輸入的序列個數和序列長度 arr = copy.copy(arr) batch_size = n_seqs * n_steps #計算沒次輸入需要使用的數據單元 n_batches = int(len(arr) / batch_size) #一共可以得到多少組輸入數據 arr = arr[:batch_size * n_batches] #直接忽略了后面不能構成一組輸入的數據! arr = arr.reshape((n_seqs, -1)) while True: np.random.shuffle(arr) #將所有行打亂順序 for n in range(0, arr.shape[1], n_steps): x = arr[:, n:n + n_steps] #每次選擇對應n_seqs行,n_steps列的數據 y = np.zeros_like(x) #返回跟x同形狀的n維數組,數據全部都是0 y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0] yield x, y
可以看出,該函數根據輸入的所有訓練數據,和對應的序列一批的個數(n_seqs)和每個輸入的序列的長度(n_steps),然后 通過生成器函數不斷的迭代取出來數據用於訓練。每一個輸入和輸出的序列剛錯開一位,比如:
#如果輸入的x是[[48 49 50]
# [ 0 1 2]]
# 則輸出的y是[[49 50 48]
# [ 1 2 0]]
周二
文件名字叫做model.py
。該文件中實現了一個類CharRNN
和一個工具函數pick_top_n
。
CharRNN
下面介紹模型類,這個模型使用的是TensorFlow模塊,然后進行網絡的搭建,首先看構造函數:
class CharRNN: def __init__(self, num_classes, num_seqs=64, num_steps=50, lstm_size=128, num_layers=2, learning_rate=0.001, grad_clip=5, sampling=False, train_keep_prob=0.5, use_embedding=False, embedding_size=128): if sampling is True: num_seqs, num_steps = 1, 1 else: num_seqs, num_steps = num_seqs, num_steps self.num_classes = num_classes self.num_seqs = num_seqs #序列個數 self.num_steps = num_steps #序列長度 self.lstm_size = lstm_size self.num_layers = num_layers self.learning_rate = learning_rate self.grad_clip = grad_clip self.train_keep_prob = train_keep_prob self.use_embedding = use_embedding self.embedding_size = embedding_size tf.reset_default_graph() self.build_inputs() #構建輸入層 self.build_lstm() #構建LSTM層 self.build_loss() #構建損失函數 self.build_optimizer() #構建優化器 self.saver = tf.train.Saver() #保存設置 #下面測試,增加總結 tf.summary.scalar('loss',self.loss) for var in tf.trainable_variables(): tf.summary.histogram(var.op.name, var) self.merge_summary = tf.summary.merge_all() self.train_writer = tf.summary.FileWriter('./model') self.train_writer.add_graph(tf.get_default_graph())
可以看出,該構造函數根據輸入的參數,搭建了一個R輸入-R輸出的神經網絡,隱狀態用的是LSTM模型。 首先先保存各個輸入的設定,然后分別構建各個層和優化保存相關的設置,我們一個一個看:
def build_inputs(self): with tf.name_scope('inputs'): self.inputs = tf.placeholder(tf.int32, shape=( self.num_seqs, self.num_steps), name='inputs') self.targets = tf.placeholder(tf.int32, shape=( self.num_seqs, self.num_steps), name='targets') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') # 對於中文,需要使用embedding層 # 英文字母沒有必要用embedding層 if self.use_embedding is False: self.lstm_inputs = tf.one_hot(self.inputs, self.num_classes) else: with tf.device("/cpu:0"): embedding = tf.get_variable('embedding', [self.num_classes, self.embedding_size]) self.lstm_inputs = tf.nn.embedding_lookup(embedding, self.inputs)
上面的函數就是輸入層,可以看出,根據輸入的參數embedding
來確定輸入層是否增加一個嵌入層,顯然,如果數據的詞表 比較大,比如中文,就需要嵌入層降維,如果比較小,就可以不用嵌入層。
然后是LSTM層:
def build_lstm(self): # 創建單個cell並堆疊多層 def get_a_cell(lstm_size, keep_prob): lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size) drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob) return drop with tf.name_scope('lstm'): cell = tf.nn.rnn_cell.MultiRNNCell( [get_a_cell(self.lstm_size, self.keep_prob) for _ in range(self.num_layers)] ) self.initial_state = cell.zero_state(self.num_seqs, tf.float32) # 通過dynamic_rnn對cell展開時間維度 self.lstm_outputs, self.final_state = tf.nn.dynamic_rnn(cell, self.lstm_inputs, initial_state=self.initial_state) # 通過lstm_outputs得到概率 seq_output = tf.concat(self.lstm_outputs, 1) x = tf.reshape(seq_output, [-1, self.lstm_size]) with tf.variable_scope('softmax'): softmax_w = tf.Variable(tf.truncated_normal([self.lstm_size, self.num_classes], stddev=0.1)) softmax_b = tf.Variable(tf.zeros(self.num_classes)) self.logits = tf.matmul(x, softmax_w) + softmax_b self.proba_prediction = tf.nn.softmax(self.logits, name='predictions')
可以看出,LSTM層使用的是多層,層數根據參數self.num_layers
確定LSTM的隱層的層數。然后得到輸出使用的是softmax
激活函數,可以 得到輸出的每一個類別的概率。
之后是損失和優化:
def build_loss(self): with tf.name_scope('loss'): y_one_hot = tf.one_hot(self.targets, self.num_classes) y_reshaped = tf.reshape(y_one_hot, self.logits.get_shape()) loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=y_reshaped) self.loss = tf.reduce_mean(loss) def build_optimizer(self): # 使用clipping gradients tvars = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, tvars), self.grad_clip) train_op = tf.train.AdamOptimizer(self.learning_rate) self.optimizer = train_op.apply_gradients(zip(grads, tvars))
損失使用的就是一般常用的交叉熵損失,優化則使用的是比較著名的自適應優化器adam
之后就可以開始訓練了:
def train(self, batch_generator, max_steps, save_path, save_every_n, log_every_n): self.session = tf.Session() with self.session as sess: sess.run(tf.global_variables_initializer()) # Train network step = 0 new_state = sess.run(self.initial_state) for x, y in batch_generator: step += 1 start = time.time() feed = {self.inputs: x, self.targets: y, self.keep_prob: self.train_keep_prob, self.initial_state: new_state} batch_loss, new_state, _ , train_summary = sess.run([self.loss, self.final_state, self.optimizer, self.merge_summary], feed_dict=feed) end = time.time() # control the print lines if step % log_every_n == 0: print('step: {}/{}... '.format(step, max_steps), 'loss: {:.4f}... '.format(batch_loss), '{:.4f} sec/batch'.format((end - start))) self.train_writer.add_summary(train_summary, step) if (step % save_every_n == 0): self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step) if step >= max_steps: break self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
可以看出,訓練就是根據前面搭建的網絡和生成的樣本,往里面不斷的喂數據。然后將結果不斷保存。
訓練好模型后,我們就可以讀取保存好的模型:
def load(self, checkpoint): self.session = tf.Session() self.saver.restore(self.session, checkpoint) print('Restored from: {}'.format(checkpoint))
讀取了保存好模型中的各種參數后,就看一通過這個網絡生成樣本:
def sample(self, n_samples, prime, vocab_size): #n_samples:一共輸出多少個基本單元;prime:開始的幾個基本單元;vocab_size:一共有多少個類型的基本單元+1(未知數據編碼) samples = [c for c in prime] sess = self.session new_state = sess.run(self.initial_state) preds = np.ones((vocab_size, )) # for prime=[] for c in prime: #根據輸入的“基本單元”的多少,不斷更新狀態,直到最后的輸入為止!真好的實現! x = np.zeros((1, 1)) # 輸入單個字符 x[0, 0] = c feed = {self.inputs: x, self.keep_prob: 1., self.initial_state: new_state} #每次輸入時更新狀態即可達到連續的效果,對應LSTM狀態是元組(c,h) preds, new_state = sess.run([self.proba_prediction, self.final_state], feed_dict=feed) c = pick_top_n(preds, vocab_size) # 添加字符到samples中 samples.append(c) # 不斷生成字符,直到達到指定數目 for i in range(n_samples): x = np.zeros((1, 1)) x[0, 0] = c feed = {self.inputs: x, self.keep_prob: 1., self.initial_state: new_state} preds, new_state = sess.run([self.proba_prediction, self.final_state], feed_dict=feed) c = pick_top_n(preds, vocab_size) samples.append(c) return np.array(samples)
注意這個函數是根據輸入的前幾個自然數序列(已經通過“基本單元”映射為自然數了),預測下一個輸出的對應自然數。 其中第一個for循環出色的使用了權重共享的思想,使用sample這個函數的時候使得在構造函數時sample
這個參數為True
。 然后一個一個的將“基本單元”映射后的自然數輸入,這樣每次僅更新隱狀態輸出的狀態參數。 之后第二個for循環依次生成后續的一個一個自然數。
pick_top_n
在上一小節的最后一個sample
函數中,用到了pick_top_n
函數,這個函數的內容如下:
def pick_top_n(preds, vocab_size, top_n=5): p = np.squeeze(preds) #squeeze函數從數組的形狀中刪除單維度條目,即把shape中為1的維度去掉 # 將除了top_n個預測值的位置都置為0 p[np.argsort(p,kind = 'mergesort')[:-top_n]] = 0 #argsort函數可以按照給定方法排序 # 歸一化概率 p = p / np.sum(p) # 隨機選取一個字符 c = np.random.choice(vocab_size, 1, p=p)[0] return c
可以看出,該函數通過輸入的各個序列的概率,然后根據n
取得概率前幾個最大的概率,之后通過這些概率進行歸一化,然后得到留下來 的數字序列對應的概率分布律,最后通過np.random.choice
按照各個字符的分布律來隨機選擇一個字符並返回。
周三
預測和精度
今天,通過前兩天的代碼的學習,我今天將我需要用到的數據序列通過read_utils.py
預處理,之后放到model.py
里面進行訓練。 之后設置了20000步的訓練,結果發現可以成功運行並根據輸入生成一系列新的輸出,但是我希望能夠直接得到下一個字符的概率,因此 可以按照如下的方式進行實現:
也可以通過這個網絡預測下一個出現的“數據單元”的概率:
def prediction_next_n(self,prime,vocab_size,next_n =3 , **k): #prime:開始的幾個基本單元;vocab_size:一共有多少個類型的基本單元+1(未知數據編碼) # samples = [c for c in prime] sess = self.session new_state = sess.run(self.initial_state) preds = np.ones((vocab_size,)) # for prime=[] for c in prime: # 根據輸入的“基本單元”的多少,不斷更新狀態,直到最后的輸入為止!真好的實現! x = np.zeros((1, 1)) # 輸入單個字符 x[0, 0] = c feed = {self.inputs: x, self.keep_prob: 1., self.initial_state: new_state} # 每次輸入時更新狀態即可達到連續的效果,對應LSTM狀態是元組(c,h) preds, new_state = sess.run([self.proba_prediction, self.final_state], feed_dict=feed) p = np.squeeze(preds) #squeeze函數從數組的形狀中刪除單維度條目,即把shape中為1的維度去掉 # 將next_n個最大的概率的位置得到 next_n_num = np.argsort(p,kind = 'mergesort')[-next_n:] #argsort函數可以按照給定方法排序 #返回的應該是標號和對應的概率值 s_p_d = [] for i in next_n_num: s_p_d.append((i,p[i])) return s_p_d
返回的這個各個自然數的概率,就可以進行預測生成新的數據對應的結果了。
為了后續的測試,我需要得到精度,因此實現一個計算精度的函數:
def get_accuracy(self,dualList,vocab_size,next_n =3): #輸入的序列滿足有開始的標記,沒有結尾的標記 success_num = 0 for one_session in dualList: if one_session[-1] in self.prediction_next_n(one_session[:-1],vocab_size,next_n): success_num = success_num + 1 print(success_num,len(dualList)) print('精度是:%.4f' % (success_num/len(dualList)) )
TensorBoard的使用
為了將所有數據都顯示出來,我使用了TensorBoard進行顯示。