RNN模型由於具有短期記憶功能,因此天然就比較適合處理自然語言等序列問題,尤其是引入門控機制后,能夠解決長期依賴問題,捕獲輸入樣本之間的長距離聯系。本文的模型是堆疊兩層的LSTM和GRU模型,模型的結構為:LSTM(GRU)—dropout—LSTM(GRU)—dropout—全連接層—輸出層,比較簡單。關於TensorFlow搭建RNN模型有關的內容,在這篇《TensorFlow之RNN:堆疊RNN、LSTM、GRU及雙向LSTM》博客里闡述得比較清楚了,這里不贅述。
盡管RNN模型天然比較適合處理自然語言的問題,可是最近CNN模型有迎頭趕上之勢。為什么呢?從這次文本分類的任務中可以體會到,RNN模型在運行速度上絲毫不占優勢,比CNN模型要慢幾倍到十幾倍。后一個時間步的輸出依賴於前一個時間步的輸出,無法進行並行處理,導致模型訓練的速度慢,這是一個致命的弱點。而RNN模型引以為傲的能夠捕獲序列中的長距離依賴關系,已經不再是獨門秘訣,因為CNN模型的卷積操作就類似於N-gram,可以捕獲上下文關系,而且通過把構建更深層的卷積層,可以捕獲更長距離的依賴關系。此外,Transformer橫空出世,不僅能夠進行並行處理,而且通過自注意力機制能夠在任意距離的兩個詞之間建立依賴關系,大有后浪把前浪拍死在沙灘上的趨勢。
另外,從模型預測的准確性來講,CNN模型的准確性不比RNN模型低,甚至超過了RNN模型。
TextRNN模型依然分為四個模塊:1、數據處理模塊;2、模型構建模塊;3、模型訓練模快;4、模型預測模塊。
GitHub地址:https://github.com/DengYangyong/Chinese_Text_Classification/tree/master/Text-Classification-On_RNN
好,下面看代碼。
一、數據處理
數據處理部分和上一篇CharCNN是一樣的,盡管我們說RNN模型可以處理任意長度的序列,但是在這個TextRNN模型中,我們還是把輸入處理成了固定長度的序列。
#coding: utf-8 import sys from collections import Counter import numpy as np import tensorflow.contrib.keras as kr if sys.version_info[0] > 2: is_py3 = True else: reload(sys) sys.setdefaultencoding("utf-8") is_py3 = False # 判斷軟件的版本,如果版本為3.6.5,那么sys.version_info的輸出為:sys.version_info(major=3, minor=6, micro=5)。 """如果在python2下面使用python3訓練的模型,可考慮調用此函數轉化一下字符編碼""" def native_word(word, encoding='utf-8'): if not is_py3: return word.encode(encoding) else: return word """is_py3函數當版本為3時返回True,否則返回False。if not 后面的值為False則將“utf-8”編碼轉換為'unicode'.""" def native_content(content): if not is_py3: return content.decode('utf-8') else: return content """ 常用文件操作,可在python2和python3間切換.""" def open_file(filename, mode='r'): if is_py3: return open(filename, mode, encoding='utf-8', errors='ignore') else: return open(filename, mode) """ 讀取文件數據""" def read_file(filename): contents, labels = [], [] with open_file(filename) as f: for line in f: try: label, content = line.strip().split('\t') if content: contents.append(list(native_content(content))) labels.append(native_content(label)) except: pass return contents, labels # line.strip().split('\t')的輸出為兩個元素的列表:['體育', '黃蜂vs湖人首發:科比帶傷戰保羅 加索爾救贖之戰 新浪體育訊...']。 # 注意這個list()函數,把一段文字轉化為了列表,元素為每個字和符號:['黃', '蜂', 'v', 's', '湖', '人', '首', '發', ':', '科', '比',...] # contents的元素為每段新聞轉化成的列表:[['黃', '蜂', 'v', 's', '湖', '人', '首', '發', ':', '科', '比',...],[],...] # labels為['體育', '體育',...] """根據訓練集構建詞匯表,存儲""" def build_vocab(train_dir, vocab_dir, vocab_size=5000): data_train, _ = read_file(train_dir) all_data = [] for content in data_train: all_data.extend(content) counter = Counter(all_data) count_pairs = counter.most_common(vocab_size - 1) words, _ = list(zip(*count_pairs)) words = ['<PAD>'] + list(words) open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n') '''讀取詞匯表''' def read_vocab(vocab_dir): with open_file(vocab_dir) as fp: words = [native_content(_.strip()) for _ in fp.readlines()] word_to_id = dict(zip(words, range(len(words)))) return words, word_to_id # readlines()讀取所有行然后把它們作為一個字符串列表返回:['頭\n', '天\n', ...]。strip()函數去掉"\n"。 # words: ['<PAD>', ',', '的', '。', '一', '是', '在', '0', '有',...] # word_to_id:{'<PAD>': 0, ',': 1, '的': 2, '。': 3, '一': 4, '是': 5,..},每個類別對應的value值為其索引ID """讀取分類目錄""" def read_category(): categories = ['體育', '財經', '房產', '家居', '教育', '科技', '時尚', '時政', '游戲', '娛樂'] categories = [native_content(x) for x in categories] cat_to_id = dict(zip(categories, range(len(categories)))) return categories, cat_to_id # cat_to_id的輸出為:{'體育': 0, '財經': 1, '房產': 2, '家居': 3,...},每個類別對應的value值為其索引ID. """ 將id表示的內容轉換為文字 """ def to_words(content, words): return ''.join(words[x] for x in content) """ 將文件轉換為id表示,進行pad """ def process_file(filename, word_to_id, cat_to_id, max_length=600): contents, labels = read_file(filename) data_id, label_id = [], [] #contents的形式為:[['黃', '蜂', 'v', 's', '湖', '人',...],[],[],...],每一個元素是一個列表,該列表的元素是每段新聞的字和符號。 #labels的形式為:['體育', '體育', '體育', '體育', '體育', ...] for i in range(len(contents)): data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id]) label_id.append(cat_to_id[labels[i]]) x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length) y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id)) return x_pad, y_pad # word_to_id是一個字典:{'<PAD>': 0, ',': 1, '的': 2, '。': 3, '一': 4, '是': 5,...} # 對於每一段新聞轉化的字列表,把每個字在字典中對應的索引找到: # data_id: 將[['黃', '蜂', 'v', 's', '湖', '人',...],[],[],...] 轉化為 [[387, 1197, 2173, 215, 110, 264,...],[],[],...]的形式 # label_id : ['體育', '體育', '體育', '體育', '體育', ...] 轉化為[0, 0, 0, 0, 0, ...] # data_id的行數為50000,即為新聞的條數,每個元素為由每段新聞的字的數字索引構成的列表; # data_id長這樣:[[387, 1197, 2173, 215, 110, 264,...],[],[],...] # 由於每段新聞的字數不一樣,因此每個元素(列表)的長度不一樣,可能大於600,也可能小於600,需要統一長度為600。 # 使用keras提供的pad_sequences來將文本pad為固定長度,x_pad的形狀為(50000,600). # label_id是形如[0, 0, 0, 0, 0, ...]的整形數組,cat_to_id是形如{'體育': 0, '財經': 1, '房產': 2, '家居': 3,...}的字典 # to_categorical是對標簽進行one-hot編碼,num-classes是類別數10,y_pad的維度是(50000,10) """生成批次數據""" def batch_iter(x, y, batch_size=64): data_len = len(x) num_batch = int((data_len - 1) / batch_size) + 1 indices = np.random.permutation(np.arange(data_len)) x_shuffle = x[indices] y_shuffle = y[indices] # 樣本長度為50000 # int()可以將其他類型轉化為整型,也可以用於向下取整,這里為782. # indices元素的范圍是0-49999,形如[256,189,2,...]的擁有50000個元素的列表 # 用indices對樣本和標簽按照行進行重新洗牌,接着上面的例子,把第256行(從0開始計)放在第0行,第189行放在第1行. for i in range(num_batch): start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id] # i=780時,end_id=781*64=49984; # 當i=781時,end_id=50000,因為782*64=50048>50000,所以最后一批取[49984:50000] # yield是生成一個迭代器,用for循環來不斷生成下一個批量。 # 為了防止內存溢出,每次只取64個,內存占用少。
二、模型搭建
模型比較簡單,下面的代碼也就是按照LSTM(GRU)—dropout—LSTM(GRU)—dropout—全連接層—輸出層這樣的結構來進行組織的。要注意的是對每層的LSTM或GRU核中的神經元進行dropout,還有取最后時刻和最后一層的LSTM或GRU的隱狀態作為全連接層的輸入。
#!/usr/bin/python # -*- coding: utf-8 -*- import tensorflow as tf class TRNNConfig(object): """RNN配置參數""" embedding_dim = 64 seq_length = 600 num_classes = 10 vocab_size = 5000 num_layers= 2 hidden_dim = 128 rnn = 'gru' # 隱藏層層數為2 # 選擇lstm 或 gru dropout_keep_prob = 0.8 learning_rate = 1e-3 batch_size = 128 num_epochs = 10 print_per_batch = 100 save_per_batch = 10 class TextRNN(object): """文本分類,RNN模型""" def __init__(self, config): self.config = config self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x') self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') self.rnn() def rnn(self): """rnn模型""" def lstm_cell(): return tf.nn.rnn_cell.LSTMCell(self.config.hidden_dim, state_is_tuple=True) def gru_cell(): return tf.nn.rnn_cell.GRUCell(self.config.hidden_dim) def dropout(): if (self.config.rnn == 'lstm'): cell = lstm_cell() else: cell = gru_cell() return tf.nn.rnn_cell.DropoutWrapper(cell, output_keep_prob=self.keep_prob) # 為每一個rnn核后面加一個dropout層 with tf.device('/gpu:0'): embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim]) embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x) with tf.name_scope("rnn"): cells = [dropout() for _ in range(self.config.num_layers)] rnn_cell = tf.nn.rnn_cell.MultiRNNCell(cells, state_is_tuple=True) # 堆疊了2層的RNN模型。 _outputs, _ = tf.nn.dynamic_rnn(cell=rnn_cell, inputs=embedding_inputs, dtype=tf.float32) last = _outputs[:, -1, :] # 取最后一個時序輸出作為結果,也就是最后時刻和第2層的LSTM或GRU的隱狀態。 with tf.name_scope("score"): # 全連接層,后面接dropout以及relu激活 fc = tf.layers.dense(last, self.config.hidden_dim, name='fc1') fc = tf.contrib.layers.dropout(fc, self.keep_prob) fc = tf.nn.relu(fc) self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2') self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1) with tf.name_scope("optimize"): cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y) self.loss = tf.reduce_mean(cross_entropy) self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss) with tf.name_scope("accuracy"): correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls) self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
三、模型訓練、驗證和測試
這一部分的代碼和CharCNN可以說沒啥區別。注意在驗證和測試時不用做dropout,還有用早停防止過擬合。
# coding: utf-8 from __future__ import print_function import os import sys import time from datetime import timedelta import numpy as np import tensorflow as tf from sklearn import metrics from rnn_model import TRNNConfig, TextRNN from cnews_loader import read_vocab, read_category, batch_iter, process_file, build_vocab base_dir = 'data/cnews' train_dir = os.path.join(base_dir, 'cnews.train.txt') test_dir = os.path.join(base_dir, 'cnews.test.txt') val_dir = os.path.join(base_dir, 'cnews.val.txt') vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textrnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳驗證結果保存路徑 def get_time_dif(start_time): """獲取已使用時間""" end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) def feed_data(x_batch, y_batch, keep_prob): feed_dict = { model.input_x: x_batch, model.input_y: y_batch, model.keep_prob: keep_prob } return feed_dict def evaluate(sess, x_, y_): """評估在某一數據上的准確率和損失""" data_len = len(x_) batch_eval = batch_iter(x_, y_, 128) total_loss = 0.0 total_acc = 0.0 for x_batch, y_batch in batch_eval: batch_len = len(x_batch) feed_dict = feed_data(x_batch, y_batch, 1.0) # 在測試時不用進行dropout y_pred_class,loss, acc = sess.run([model.y_pred_cls,model.loss, model.acc], feed_dict=feed_dict) total_loss += loss * batch_len total_acc += acc * batch_len return y_pred_class,total_loss / data_len, total_acc / data_len def train(): print("Configuring TensorBoard and Saver...") tensorboard_dir = 'tensorboard/textrnn' if not os.path.exists(tensorboard_dir): os.makedirs(tensorboard_dir) tf.summary.scalar("loss", model.loss) tf.summary.scalar("accuracy", model.acc) merged_summary = tf.summary.merge_all() writer = tf.summary.FileWriter(tensorboard_dir) # 配置 Saver saver = tf.train.Saver() if not os.path.exists(save_dir): os.makedirs(save_dir) print("Loading training and validation data...") # 載入訓練集與驗證集 start_time = time.time() x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length) x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) # 創建session session = tf.Session() session.run(tf.global_variables_initializer()) writer.add_graph(session.graph) print('Training and evaluating...') start_time = time.time() total_batch = 0 best_acc_val = 0.0 last_improved = 0 require_improvement = 1000 # 如果超過1000輪未提升,提前結束訓練 flag = False for epoch in range(config.num_epochs): print('Epoch:', epoch + 1) batch_train = batch_iter(x_train, y_train, config.batch_size) for x_batch, y_batch in batch_train: feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob) if total_batch % config.save_per_batch == 0: s = session.run(merged_summary, feed_dict=feed_dict) writer.add_summary(s, total_batch) if total_batch % config.print_per_batch == 0: feed_dict[model.keep_prob] = 1.0 loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict) y_pred_cls_1,loss_val, acc_val = evaluate(session, x_val, y_val) # todo if acc_val > best_acc_val: # 保存最好結果 best_acc_val = acc_val last_improved = total_batch saver.save(sess=session, save_path=save_path) improved_str = '*' else: improved_str = '' time_dif = get_time_dif(start_time) msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \ + ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}' print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str)) session.run(model.optim, feed_dict=feed_dict) # 運行優化 total_batch += 1 if total_batch - last_improved > require_improvement: # 驗證集正確率長期不提升,提前結束訓練 print("No optimization for a long time, auto-stopping...") flag = True break if flag: break def test(): print("Loading test data...") start_time = time.time() x_test, y_test = process_file(test_dir, word_to_id, cat_to_id, config.seq_length) session = tf.Session() session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=session, save_path=save_path) # 讀取保存的模型 print('Testing...') y_pred,loss_test, acc_test = evaluate(session, x_test, y_test) msg = 'Test Loss: {0:>6.2}, Test Acc: {1:>7.2%}' print(msg.format(loss_test, acc_test)) batch_size = 128 data_len = len(x_test) num_batch = int((data_len - 1) / batch_size) + 1 y_test_cls = np.argmax(y_test, 1) y_pred_cls = np.zeros(shape=len(x_test), dtype=np.int32) for i in range(num_batch): start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) feed_dict = { model.input_x: x_test[start_id:end_id], model.keep_prob: 1.0 } y_pred_cls[start_id:end_id] = session.run(model.y_pred_cls, feed_dict=feed_dict) # 評估 print("Precision, Recall and F1-Score...") print(metrics.classification_report(y_test_cls, y_pred_cls, target_names=categories)) # 混淆矩陣 print("Confusion Matrix...") cm = metrics.confusion_matrix(y_test_cls, y_pred_cls) print(cm) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) if __name__ == '__main__': print('Configuring RNN model...') config = TRNNConfig() if not os.path.exists(vocab_dir): build_vocab(train_dir, vocab_dir, config.vocab_size) categories, cat_to_id = read_category() words, word_to_id = read_vocab(vocab_dir) config.vocab_size = len(words) model = TextRNN(config) option='train' if option == 'train': train() else: test()
采用GRU核,訓練了46分鍾47秒,最好的驗證精度為91.54%。測試精度為94.67%。上一篇的CNN模型訓練只花了3分21秒,可見RNN模型在速度上慢了十幾倍。
Iter: 3500, Train Loss: 0.034, Train Acc: 98.44%, Val Loss: 0.35, Val Acc: 91.54%, Time: 0:46:47 *
Testing...
Test Loss: 0.2, Test Acc: 94.67%
Precision, Recall and F1-Score...
precision recall f1-score support
體育 0.99 0.99 0.99 1000
財經 0.93 0.99 0.96 1000
房產 1.00 1.00 1.00 1000
家居 0.95 0.83 0.89 1000
教育 0.88 0.93 0.90 1000
科技 0.95 0.96 0.95 1000
時尚 0.95 0.95 0.95 1000
時政 0.95 0.91 0.93 1000
游戲 0.94 0.96 0.95 1000
娛樂 0.94 0.96 0.95 1000
micro avg 0.95 0.95 0.95 10000
macro avg 0.95 0.95 0.95 10000
weighted avg 0.95 0.95 0.95 10000
Confusion Matrix...
[[990 0 0 0 5 1 0 0 4 0]
[ 0 987 1 0 2 3 0 6 1 0]
[ 0 0 996 2 2 0 0 0 0 0]
[ 0 22 2 834 60 20 25 20 10 7]
[ 1 6 0 6 925 7 5 12 4 34]
[ 0 5 0 8 8 959 2 2 16 0]
[ 0 0 0 13 9 2 948 4 12 12]
[ 0 33 1 15 21 11 1 910 4 4]
[ 1 1 0 2 10 5 11 0 962 8]
[ 4 2 0 1 15 3 5 2 12 956]]
Time usage: 0:00:40
四、模型預測
從兩個新聞中各摘取了一段內容,進行預測。結果預測為:科技、體育。
# coding: utf-8 from __future__ import print_function import os import tensorflow as tf import tensorflow.contrib.keras as kr from rnn_model import TRNNConfig, TextRNN from cnews_loader import read_category, read_vocab try: bool(type(unicode)) except NameError: unicode = str base_dir = 'data/cnews' vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textrnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳驗證結果保存路徑 class RnnModel: def __init__(self): self.config = TRNNConfig() self.categories, self.cat_to_id = read_category() self.words, self.word_to_id = read_vocab(vocab_dir) self.config.vocab_size = len(self.words) self.model = TextRNN(self.config) self.session = tf.Session() self.session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=self.session, save_path=save_path) # 讀取保存的模型 def predict(self, message): content = unicode(message) data = [self.word_to_id[x] for x in content if x in self.word_to_id] feed_dict = { self.model.input_x: kr.preprocessing.sequence.pad_sequences([data], self.config.seq_length), self.model.keep_prob: 1.0 } y_pred_cls = self.session.run(self.model.y_pred_cls, feed_dict=feed_dict) return self.categories[y_pred_cls[0]] if __name__ == '__main__': rnn_model = RnnModel() test_demo = ['三星ST550以全新的拍攝方式超越了以往任何一款數碼相機', '熱火vs騎士前瞻:皇帝回鄉二番戰 東部次席唾手可得新浪體育訊北京時間3月30日7:00'] for i in test_demo: print(rnn_model.predict(i))