1、導入依賴包,初始化一些常量
import collections import numpy as np import tensorflow as tf TRAIN_DATA = "./data/ptb.train.txt" # 訓練數據路徑 TEST_DATA = "./data/ptb.test.txt" # 測試數據路徑 EVAL_DATA = "./data/ptb.valid.txt" # 驗證數據路徑 HIDDEN_SIZE = 300 # 隱藏層中cell的個數 NUM_LAYERS = 2 # 深度循環神經網絡中LSTM結構的層數 VOCAB_SIZE = 10000 # 詞典規模 TRAIN_BATCH_SIZE = 20 # 訓練數據batch的大小 TRAIN_NUM_STEP = 35 # 訓練數據的截斷長度,也可以看作是序列的長度 EVAL_BATCH_SIZE = 1 EVAL_NUM_STEP = 35 NUM_EPOCH = 30 # 使用訓練數據的輪數 LSTM_KEEP_PROB = 0.9 # LSTM節點不被dropout的概率 EMBEDDING_KEEP_PROB = 0.9 # 詞向量不被dropout的概率 MAX_GRAD_NORM = 5 # 用於控制梯度膨脹的梯度大小上限 SHARE_EMB_AND_SOFTMAX = True # 在softmax層和詞向量層之間共享參數
2、處理數據集
def read_data(file_path): """ 讀取文件數據,將文本中的詞轉換成詞空間中對應的索引 :param file_path: 文件路徑 :return: 由數值取代后的文本詞列表 """ # 采用TensorFlow中的讀取文件的方法去讀取文件 with tf.gfile.GFile(file_path, "r") as f: # 將文本讀取出來,並且進行分詞,將換行符替換成<eos>,eos的意思就是end of sentence word_list = f.read().replace("\n", "<eos>").split() # 對分詞后的列表進行統計,統計每個單詞出現的數量, 返回的數據類型Counter({'jiang': 2, 'zhang': 1}) counter = collections.Counter(word_list) # 對每個詞按照詞頻排序,對於詞頻相同的按詞本身進行排序,返回的數據類型[('jiang', 2), ('zhang', 1)] count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0])) # 取出單詞元組,返回的數據類型words=('jiang', 'zhang') words, _ = list(zip(*count_pairs)) # 將上面排序后的詞(無重復的詞空間)標記索引數值,返回的數據類型{'jiang': 0, 'zhang': 1} word_to_id = dict(zip(words, range(len(words)))) # 將文本中所有的單詞用索引數值取代,組成新的列表 id_list = [word_to_id[word] for word in word_list if word in word_to_id] return id_list def make_batches(id_list, batch_size, num_steps): """ 將原始的數據集轉換成mini-batch進行訓練 :param id_list: 原始的數值文本列表 :param batch_size: :param num_steps: 一個樣本的序列長度 :return: 整個樣本轉換后的batchs數據 """ # 計算總的batch數量。每個batch包含的單詞數量是batch_size * num_steps,batch_size為一個batch中樣本的數量, # num_steps為一個樣本的序列長度,因此num_batchs表示整個訓練文本能分成的batch的數量 num_batches = (len(id_list) - 1) // (batch_size * num_steps) # 根據上面分配好的num_batchs, batch_size, num_steps參數來構建數據集,先取出能整除的序列長度 data = np.array(id_list[: num_batches * batch_size * num_steps]) # reshape取出來的序列(一維數組)成二維數組,因為是序列數據,所以行為batch_size,列為num_batchs * num_steps,之后訓練在橫向上分割 data = np.reshape(data, [batch_size, num_batches * num_steps]) # 將數據在axis=1的軸上分割,分割的數量就是num_batchs data_batches = np.split(data, num_batches, axis=1) # 因為是根據前面的詞預測后面的詞,因此輸出的值要往后移一位,其余操作和上面的一致 label = np.array(id_list[1: num_batches * batch_size * num_steps + 1]) label = np.reshape(label, [batch_size, num_batches * num_steps]) label_batches = np.split(label, num_batches, axis=1) return list(zip(data_batches, label_batches))
3、構建模型
主要是定義各種變量或者對象,有些變量是經過計算得到的
class PTBModel(object): def __init__(self, is_training, batch_size, num_steps): # 記錄使用的batch大小和截斷長度(也就是一個樣本的序列長度) self.batch_size = batch_size self.num_steps = num_steps # 定義每一步的輸入和預期輸出。兩者的維度都是[batch_size, num_steps],batch_size時間上就是指一個batch中樣本的數量 self.input_data = tf.placeholder(tf.int32, [batch_size, num_steps]) self.targets = tf.placeholder(tf.int32, [batch_size, num_steps]) # 定義dropout的值,訓練時取0.9,否則取1.0,表示不做dropout dropout_keep_prob = LSTM_KEEP_PROB if is_training else 1.0 # 定義lstm cell的結構,dropout相當於裝飾器直接包裹在lstm_cell上,此時的cell是垂直方向的,所以for循環中的值是NUM_LAYERS lstm_cells = [tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), output_keep_prob=dropout_keep_prob) for i in range(NUM_LAYERS)] # 組合成多層循環 cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cells) # 初始化最初的狀態值,即全為0的向量。這個量只在每個epoch初始化第一個batch時使用 # #返回[batch_size, 2*len(cells)],或者[batch_size, s],至於為什么是2 * ,這是因為初始值有兩個h0和C0 self.initial_state = cell.zero_state(batch_size, tf.float32) # 定義單詞的詞向量矩陣,這里用get_variable,之后在測試時就可以實現參數共享了 # VOCAB_SIZE是指詞向量空間中詞的個數(在這里起始是len(words)的長度,也等於10000),HIDDEN_SIZE是值詞嵌入之后的詞向量長度 embedding = tf.get_variable("embedding", [VOCAB_SIZE, HIDDEN_SIZE]) # 將輸入的單詞轉化為詞向量,相當於將每個序列中的單詞按照索引(之前轉化為了數值在這里就很方便的),直接將序列中的每個詞在已經訓練好的 # 詞空間中尋找對應的向量,此空間應該是二維的,輸出的結果應該是三維的,也就是batch_size * num_steps * HIDDEN_SIZE inputs = tf.nn.embedding_lookup(embedding, self.input_data) # 只在訓練時使用dropout來訓練詞向量 if is_training: inputs = tf.nn.dropout(inputs, EMBEDDING_KEEP_PROB) # 定義輸出列表。在這里先將不同時刻LSTM結構的輸出收集起來,再一起提供給softmax層,在這里是實現時間序上的cell輸出 outputs = [] state = self.initial_state with tf.variable_scope("RNN"): for time_step in range(num_steps): if time_step > 0: # 實現在同一個variable_scope下共享參數 tf.get_variable_scope().reuse_variables() # 從這里就可以看出之前embedding輸出的是三維,我們根據時間序取出詞來進行訓練, # cell_output shape=(batch_size, HIDDEN_SIZE) cell_output, state = cell(inputs[:, time_step, :], state) outputs.append(cell_output) # 把輸出隊列展開成[batch_size, num_steps * hidden_size],然后再reshape成[batch_size * num_steps, hidden_size] output = tf.reshape(tf.concat(outputs, 1), [-1, HIDDEN_SIZE]) # softmax層:將RNN在每個位置的輸出轉化為各個單詞的logits # weight的shape是[HIDDEN_SIZE, VOCAB_SIZE] if SHARE_EMB_AND_SOFTMAX: weight = tf.transpose(embedding) else: weight = tf.get_variable("weight", [HIDDEN_SIZE, VOCAB_SIZE]) bias = tf.get_variable("bias", [VOCAB_SIZE]) # 算出最終輸出的logits,用於之后的交叉熵和softmax計算 logits = tf.matmul(output, weight) + bias # 定義交叉熵損失函數和平均損失函數,返回的loss是和labels、logits相同的shape loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=tf.reshape(self.targets, [-1]), logits=logits) # 求平均損失 self.cost = tf.reduce_sum(loss) / batch_size # 該狀態用來存儲訓練完一個batch之后的狀態,在訓練下一個batch時,會將該狀態作為初始狀態,這個在run_epoch函數中控制的 self.final_state = state # 只在訓練模型時定義反向傳播操作 if not is_training: return # 拿到所有的訓練參數,用於之后的梯度下降更新參數 trainable_variables = tf.trainable_variables() # 梯度截斷控制梯度大小,是為了避免梯度彌散和梯度爆炸,將梯度控制在某一范圍內 grads, _ = tf.clip_by_global_norm( tf.gradients(self.cost, trainable_variables), MAX_GRAD_NORM ) optimizer = tf.train.GradientDescentOptimizer(learning_rate=1.0) self.train_op = optimizer.apply_gradients(zip(grads, trainable_variables))
4、創建run_epoch函數,用來控制模型的訓練
def run_epoch(session, model, batches, train_op, output_log, step): """ 訓練模型和模型預測。使用給定的模型model在數據data上運行train_op並返回在全部數據上的perplexity值。 :param session: :param model: :param batches: :param train_op: :param output_log: 判斷是訓練過程還是其他過程 :param step: :return: """ # 計算平均perplexity的輔助變量,perplexity表示在模型生成一句話時下一個詞有perplexity個合理的選擇,認為perplexity小於100都是比較好的 # 結果,該值越小,說明模型越好,也可以認為模型預測的精確度越高 total_costs = 0.0 # 存儲損失值 iters = 0 state = session.run(model.initial_state) # 訓練一個epoch for x, y in batches: # 可以在run函數中通過字典的形式給模型輸入數據,也可以直接讀取出模型的值,session.run()方法通過驅動三個operation來驅動整個圖 # 其中train_op是只是用來驅動訓練過程的 cost, state, _ = session.run([model.cost, model.final_state, model.train_op], feed_dict={model.input_data: x, model.targets: y, model.initial_state: state} ) total_costs += cost iters += model.num_steps if output_log and step % 100 == 0: print("After {} steps, perplexity id {}".format(step, np.exp(total_costs / iters))) step += 1 # pplx是語言模型perplexity指標 pplx = np.exp(total_costs / iters) return step, pplx
5、定義main函數
def main(): with tf.Graph().as_default(): # 定義初始化函數, 用於決定之后的variable_scope中的變量的初始值取值范圍 initializer = tf.random_uniform_initializer(-0.05, 0.05) # 定義訓練用的循環神經網絡模型 with tf.name_scope("train"): train_batches = make_batches(read_data(TRAIN_DATA), TRAIN_BATCH_SIZE, TRAIN_NUM_STEP)
# 利用variable_scope()和get_variable()來實現變量共享 with tf.variable_scope("language_model", reuse=None, initializer=initializer): train_model = PTBModel(True, TRAIN_BATCH_SIZE, TRAIN_NUM_STEP) # 定義測試用的循環神經網絡模型,它與train_model共享參數,但沒有dropout,可以通過is_training來控制 with tf.name_scope('test'): eval_batches = make_batches(read_data(EVAL_DATA), EVAL_BATCH_SIZE, EVAL_NUM_STEP) test_batches = make_batches(read_data(TEST_DATA), EVAL_BATCH_SIZE, EVAL_NUM_STEP) # 設置同樣的名稱language_model來實現共享變量,變量共享和name_scope無關 with tf.variable_scope("language_model", reuse=True, initializer=initializer): eval_model = PTBModel(False, EVAL_BATCH_SIZE, EVAL_NUM_STEP) # 訓練模型 with tf.Session() as session: tf.global_variables_initializer().run() step = 0 for i in range(NUM_EPOCH): print("In iteration: {}".format(i + 1)) step, train_pplx = run_epoch(session, train_model, train_batches, train_model.train_op, True, step) print("Epoch: {} Train Perplexity: {}".format(i+1, train_pplx)) _, eval_pplx = run_epoch(session, eval_model, eval_batches, tf.no_op(), False, 0) print("Epoch: {} Eval Perplexity: {}".format(i+1, eval_pplx)) _, test_pplx = run_epoch(session, eval_model, test_batches, tf.no_op(), False, 0) print("Epoch: {} Test Perplexity: {}".format(i+1, test_pplx))