本文轉載自:https://www.jianshu.com/p/1a4f7f5b05ae
致謝以及參考
最近在做序列化標注項目,試着理解rnn的設計結構以及tensorflow中的具體實現方法。在知乎中找到這篇文章,具有很大的幫助作用,感謝作者為分享知識做出的努力。
學習目標定位
我主要重點在於理解文中連接所提供的在github上的project代碼,一句句理解數據的預處理過程以及rnn網絡搭建過程(重點在於代碼注釋,代碼改動很小,實用python3)。(進入下面環節之前,假設你已經閱讀了知乎上的關於rnn知識講解篇幅,project的readme文檔)
數據預處理
- 理解模型大概需要的重要參數:/Char-RNN-TensorFlow-master/train.py
# encoding: utf-8 import tensorflow as tf from model import CharRNN import os import codecs # 相比自帶的open函數 讀取寫入進行自我轉碼 from read_utils import TextConverter, batch_generator FLAGS = tf.flags.FLAGS # 變量定義 以及 默認值 tf.flags.DEFINE_string('name', 'default', 'name of the model') tf.flags.DEFINE_integer('num_seqs', 100, 'number of seqs in one batch') # 一個 batch 可以組成num_seqs個輸入信號序列 tf.flags.DEFINE_integer('num_steps', 100, 'length of one seq') # 一個輸入信號序列的長度, rnn網絡會更具輸入進行自動調整 tf.flags.DEFINE_integer('lstm_size', 128, 'size of hidden state of lstm') # 隱藏層節點數量,即lstm 的 cell中state數量 tf.flags.DEFINE_integer('num_layers', 2, 'number of lstm layers') # rnn的深度 tf.flags.DEFINE_boolean('use_embedding', False, 'whether to use embedding') # 如果中文字符則需要一個word2vec, 字母字符直接采用onehot編碼 tf.flags.DEFINE_integer('embedding_size', 128, 'size of embedding') # 使用word2vec的 中文字符的嵌入維度選取 tf.flags.DEFINE_float('learning_rate', 0.001, 'learning_rate') tf.flags.DEFINE_float('train_keep_prob', 0.5, 'dropout rate during training') tf.flags.DEFINE_string('input_file', '', 'utf8 encoded text file') # --input_file data/shakespeare.txt tf.flags.DEFINE_integer('max_steps', 100000, 'max steps to train') tf.flags.DEFINE_integer('save_every_n', 1000, 'save the model every n steps') tf.flags.DEFINE_integer('log_every_n', 10, 'log to the screen every n steps') # 不同於英文字符比較短幾十個就能解決,中文字符比較多,word2vec層之前輸入需要進行onehot編碼,根據字符頻數降序排列取前面的3500個編碼 tf.flags.DEFINE_integer('max_vocab', 3500, 'max char number')
- 理解main函數中數據預處理部分, 數據預處理主要采用TextConverter類
def main(_): model_path = os.path.join('model', FLAGS.name) print("模型保存位置(根據模型命名)", model_path) if os.path.exists(model_path) is False: os.makedirs(model_path) with codecs.open(FLAGS.input_file, encoding='utf-8') as f: print("建模訓練數據來源:", FLAGS.input_file) text = f.read() converter = TextConverter(text, # string # 返回一個整理文本詞典的類 FLAGS.max_vocab) print("構建該文本的字符集合數量(包含未登錄詞:):", converter.vocab_size) print("建模所用字符保存地址位置(list): ", os.path.join(model_path, 'converter.pkl')) # 用來建模詞匯的 前max_vocab個 converter.save_to_file(os.path.join(model_path, 'converter.pkl')) arr = converter.text_to_arr(text) # batch生成函數:返回一個生成器
- TextConverter類:\Char-RNN-TensorFlow-master\read_utils.py
比如 莎士比亞訓練數據用vocab組成:{v} {'} {[} {t} {u} {R} {W} {x} {?} { } {F} {I} {G} {O} {E} {$} {y} {e} {:} {L} {s} {c} {g} {Y} {]} {h} {w} {-} {a} {S} {J} {q} {V} {3} {X} {p} {T} {!} {C} {n} {;} {r} {M} {j} {f} {U} {d} {Q} {K} {b} {m} {H} {Z} {o} {i} {P} {D} {.} {l} {&} {N} {z} {A} {,} {
} {B} {k}
class TextConverter(object): def __init__(self, text=None, max_vocab=5000, filename=None): """ :param text: string :param max_vocab: :param filename: """ if filename is not None: # 如果存在 字典文件,即將字符集合進行編號的字典 with open(filename, 'rb') as f: self.vocab = pickle.load(f) else: vocab = set(text) # 組成text的所有字符,比如, i see you, 那么就是 i s e y o u logging.info('組成文本的字符集合:') logging.info("數量: %d" % len(vocab)) s = ' '.join(["{%s}" % v for v in vocab]) logging.info("vocab: %s" % s) # max_vocab_process vocab_count = defaultdict(int) # 這里相對原始代碼做了小小優化 # 統計所有字符的頻數 for word in text: vocab_count[word] += 1 vocab_count_list = list(vocab_count.items()) vocab_count_list.sort(key=lambda x: x[1], reverse=True) # 根據頻數降序排序 if len(vocab_count_list) > max_vocab: vocab_count_list = vocab_count_list[:max_vocab] # 截取最大允許部分 vocab = [x[0] for x in vocab_count_list] # 截取 前max_vocab self.vocab = vocab # 對vocab進行編序 self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)} # 詞匯進行編序號 self.int_to_word_table = dict(enumerate(self.vocab)) @property # 這個實現直接,將vocab_size作為一個變量成員調用而不是方法 def vocab_size(self): return len(self.vocab) + 1 # 加上一個未登錄詞 def word_to_int(self, word): # 更具給定的字符返回index if word in self.word_to_int_table: return self.word_to_int_table[word] else: # 未登錄詞 就是最后一個序號 return len(self.vocab) def int_to_word(self, index): # 根據給定indx返回字符 if index == len(self.vocab): return '<unk>' # 未登錄詞 elif index < len(self.vocab): return self.int_to_word_table[index] else: raise Exception('Unknown index!') def text_to_arr(self, text): # 將文本序列化:字符轉化為index arr = [] for word in text: arr.append(self.word_to_int(word)) return np.array(arr) def arr_to_text(self, arr): # 反序列化 words = [] for index in arr: words.append(self.int_to_word(index)) return "".join(words) def save_to_file(self, filename): # 存儲詞典 with open(filename, 'wb') as f: pickle.dump(self.vocab, f)
- 准備batch用於訓練
# batch生成函數:返回一個生成器 print("訓練文本長度:", len(arr)) print("num_seqs:", FLAGS.num_seqs) print("num_steps", FLAGS.num_steps) g = batch_generator(arr, # 輸入信號文本序列 FLAGS.num_seqs, # batch 信號序列數量 FLAGS.num_steps) # 一個信號序列的長度
-
重點在於理解batch_generator函數, 這個過程的理解需要理解生成文本的rnn的輸出和輸入是什么樣的(N vs N, 輸出和輸入數目是一致的)
-
- 一個單層的展開如下: 展開后h的節點個數取決於你的輸入序列向量的長度,即輸入文本的長度,圖片來源於簡書,這個鏈接可以幫助你很好從數學公式上理解。
image.png
- 一個單層的展開如下: 展開后h的節點個數取決於你的輸入序列向量的長度,即輸入文本的長度,圖片來源於簡書,這個鏈接可以幫助你很好從數學公式上理解。
-
-
一個文本序列輸入展示(這里為了直觀的展示沒有將文本數字化, 例如真正的"床"的輸入應該為一個embeding的向量, 而輸出“前”也是一個與輸入一致的長度向量)
image.png
-
-
/read_utils.py 的batch_generator函數
def batch_generator(arr, n_seqs, n_steps): """ 生成訓練用的batch :param arr: :param n_seqs: :param n_steps: :return: """ arr = copy.copy(arr) # 序列 batch_size = n_seqs * n_steps # 一個batch需要的字符數量 n_batches = int(len(arr) / batch_size) # 整個文本可以生成的batch總數 arr = arr[:batch_size * n_batches] # 截取下 以便reshape成array arr = arr.reshape((n_seqs, -1)) # 將batch, reshape成n_seqs行, 每行為一輸入信號序列(序列長度為n_steps) while True: np.random.shuffle(arr) # 打亂文本序列順序 print(arr) for n in range(0, arr.shape[1], n_steps): x = arr[:, n:(n + n_steps)] y = np.zeros_like(x) y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0] # yield x, y
-
- 測試下原來的代碼的結果
arr = np.arange(27)
for x, y in batch_generator(arr, 4, 3): print(x) print(y) break
-
- out-put: 以 6 7 8為例, 給出一個6, 生成文本的長度為3。將6對應的輸出7作為下一個state的輸入,輸出8, 然后依次這么進行下去,y應該為7,8, 9。說明一下的是最后一個輸出為啥為6 ,前面一個鏈接存在解釋。
0-26序列進行生成序列操作,每批訓練batch序列總數為4, 每個寫的長度為3 打亂排序的結果 [[ 6 7 8 9 10 11] [ 0 1 2 3 4 5] [18 19 20 21 22 23] [12 13 14 15 16 17]] x [[ 6 7 8] [ 0 1 2] [18 19 20] [12 13 14]] y [[ 7 8 6] [ 1 2 0] [19 20 18] [13 14 12]]
rnn 模型搭建
為了更好的理解這個過程下面是實際整個rnn的結構展開展示,如有錯誤請指出:
代碼中構建2層的rnn,每個state(方塊)的有兩個一樣的輸出h,得到輸出前有個softmax處理。

image.png
- train.py中main函數調用rnn部分代碼
model = CharRNN(converter.vocab_size, # 分類的數量 num_seqs=FLAGS.num_seqs, # 一個batch可以組成num_seq個信號 num_steps=FLAGS.num_steps, # 一次信號輸入RNN的字符長度,與一層的cell 的數量掛鈎 lstm_size=FLAGS.lstm_size, # 每個cell的節點數量: num_layers=FLAGS.num_layers, # RNN 的層數 learning_rate=FLAGS.learning_rate, # 學習速率 train_keep_prob=FLAGS.train_keep_prob, use_embedding=FLAGS.use_embedding, embedding_size=FLAGS.embedding_size) model.train(g, FLAGS.max_steps, model_path, FLAGS.save_every_n, FLAGS.log_every_n,)
-
重點在於model.py中的CharRNN類的調用
-
- 搭建rnn隱藏層
整個過程的理解在備注帶代碼里面,暫時不用關注類里面,sample函數
- 搭建rnn隱藏層
# coding: utf-8 from __future__ import print_function import tensorflow as tf import numpy as np import time import os def pick_top_n(preds, vocab_size, top_n=5): p = np.squeeze(preds) # 將除了top_n個預測值的位置都置為0 p[np.argsort(p)[:-top_n]] = 0 # 歸一化概率 p = p / np.sum(p) # 隨機選取一個字符 c = np.random.choice(vocab_size, 1, p=p)[0] return c class CharRNN: def __init__(self, num_classes, num_seqs=64, num_steps=50, lstm_size=128, num_layers=2, learning_rate=0.001, grad_clip=5, sampling=False, train_keep_prob=0.5, use_embedding=False, embedding_size=128): if sampling is True: # 用於 預測 num_seqs, num_steps = 1, 1 # 僅僅根據前面一個字符預測后面一個字符 else: num_seqs, num_steps = num_seqs, num_steps self.num_classes = num_classes # 分類結果數量,與字典容量一致包含未登錄字 self.num_seqs = num_seqs self.num_steps = num_steps self.lstm_size = lstm_size self.num_layers = num_layers self.learning_rate = learning_rate self.grad_clip = grad_clip self.train_keep_prob = train_keep_prob self.use_embedding = use_embedding self.embedding_size = embedding_size tf.reset_default_graph() self.build_inputs() self.build_lstm() self.build_loss() self.build_optimizer() self.saver = tf.train.Saver() def build_inputs(self): # 定義下輸入,輸出等,占位 with tf.name_scope('inputs'): # 輸入是一個3維度矩陣,但是這里並不要過多關注每個節點輸入特征的維度,中文字符額embeding或者因為字符的onehot編碼。 # 模型會自動識別和調整,暫時考慮每一個batch被reshape成 num_seqs * num_steps, 每一行為一個序列輸入信號 self.inputs = tf.placeholder(tf.int32, shape=( self.num_seqs, self.num_steps), name='inputs') # N vs N: 輸出與輸入一致 self.targets = tf.placeholder(tf.int32, shape=( self.num_seqs, self.num_steps), name='targets') # N vs N self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') # 對於中文,需要使用embedding層: ??? # 英文字母沒有必要用embedding層: ??? if self.use_embedding is False: # 對字字符進行onehot編號 self.lstm_inputs = tf.one_hot(self.inputs, self.num_classes) else: with tf.device("/cpu:0"): # 嵌入維度層word2vec和RNN連接器;起來同時訓練 作為模型的第一層 # 先進行onehot編碼然后, word2vec 所以額輸入信號維度為num_classes embedding = tf.get_variable('embedding', [self.num_classes, self.embedding_size]) self.lstm_inputs = tf.nn.embedding_lookup(embedding, self.inputs) def build_lstm(self): # 創建單個cell並堆疊多層 def get_a_cell(lstm_size, keep_prob): """ 返回一個cell :param lstm_size: cell的states數量 :param keep_prob: 節點保留率 :return: """ lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size) # state並不是采用普通rnn 而是lstm drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob) # 對每個state的節點數量進行dropout return drop with tf.name_scope('lstm'): # 構建多層 cell = tf.nn.rnn_cell.MultiRNNCell( [get_a_cell(self.lstm_size, self.keep_prob) for _ in range(self.num_layers)] ) # 定義h_0 self.initial_state = cell.zero_state(self.num_seqs, tf.float32) # 通過dynamic_rnn對cell展開時間維度 self.lstm_outputs, self.final_state = tf.nn.dynamic_rnn(cell, self.lstm_inputs, initial_state=self.initial_state) # 通過lstm_outputs得到概率 # 每個batch的輸出為lstm_outputs: num_seqs * num_steps * state_node_size(中文字符嵌入維度或英文的onehot編碼維度) # 將輸出進行拼接 dim=1 # seq out應該為 num_steps * (num_seqs * state_node_size), 即沒每個輸入信號對應state輸出進行拼接。 # 但是在train里面查看發現, dim沒有任何改變 seq_output = tf.concat(self.lstm_outputs, 1) self.seq_output = seq_output # just for output in train method # 將每個batch的每個state拼接成 一個二維的batch_size * state_node_size(lstm_size) 列矩陣 x = tf.reshape(seq_output, [-1, self.lstm_size]) # 構建一個輸出層:softmax with tf.variable_scope('softmax'): # 初始化 輸出的權重, 共享 softmax_w = tf.Variable(tf.truncated_normal([self.lstm_size, self.num_classes], stddev=0.1)) softmax_b = tf.Variable(tf.zeros(self.num_classes)) # 定義輸出:softmax 歸一化 self.logits = tf.matmul(x, softmax_w) + softmax_b self.proba_prediction = tf.nn.softmax(self.logits, name='predictions') def build_loss(self): with tf.name_scope('loss'): # 統一第輸出進行non hot編碼 y_one_hot = tf.one_hot(self.targets, self.num_classes) y_reshaped = tf.reshape(y_one_hot, self.logits.get_shape()) # 計算交叉信息熵 loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=y_reshaped) # 計算平均損失 self.loss = tf.reduce_mean(loss) def build_optimizer(self): # 使用clipping gradients:避免梯度計算迭代過程變化過大導致梯度爆炸現象 tvars = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, tvars), self.grad_clip) # ??? train_op = tf.train.AdamOptimizer(self.learning_rate) self.optimizer = train_op.apply_gradients(zip(grads, tvars)) def train(self, batch_generator, max_steps, save_path, save_every_n, log_every_n): self.session = tf.Session() with self.session as sess: sess.run(tf.global_variables_initializer()) # Train network step = 0 new_state = sess.run(self.initial_state) for x, y in batch_generator: step += 1 start = time.time() feed = {self.inputs: x, self.targets: y, self.keep_prob: self.train_keep_prob, self.initial_state: new_state} # 下一輪batch的初始h狀態采用上一輪的final_state batch_loss, new_state, _ , lstm_outputs, seq_output, prp = sess.run([self.loss, self.final_state, self.optimizer, self.lstm_outputs, self.seq_output, self.proba_prediction ], feed_dict=feed) print('lstm outpts: ', lstm_outputs.shape, self.num_seqs) print('lstm outpts: ', seq_output.shape) # ??? 為啥一直沒有改變 print(prp.shape) end = time.time() # control the print lines if step % log_every_n == 0: print('step: {}/{}... '.format(step, max_steps), 'loss: {:.4f}... '.format(batch_loss), '{:.4f} sec/batch'.format((end - start))) if (step % save_every_n == 0): self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step) if step >= max_steps: break self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step) def sample(self, n_samples, prime, vocab_size): samples = [c for c in prime] sess = self.session new_state = sess.run(self.initial_state) preds = np.ones((vocab_size,)) # for prime=[] for c in prime: x = np.zeros((1, 1))