來源:https://github.com/jiangxinyang227/NLP-Project/text_classifier
import tensorflow as tf
from .base import BaseModel
class BiLstmAttenModel(BaseModel):
def __init__(self, config, vocab_size, word_vectors):
super(BiLstmAttenModel, self).__init__(config=config, vocab_size=vocab_size, word_vectors=word_vectors)
# 構建模型
self.build_model()
# 初始化保存模型的saver對象
self.init_saver()
def build_model(self):
# 詞嵌入層
with tf.name_scope("embedding"):
# 利用預訓練的詞向量初始化詞嵌入矩陣
if self.word_vectors is not None:
embedding_w = tf.Variable(tf.cast(self.word_vectors, dtype=tf.float32, name="word2vec"), name="embedding_w")
else:
embedding_w = tf.get_variable("embedding_w", shape=[self.vocab_size, self.config["embedding_size"]],initializer=tf.contrib.layers.xavier_initializer())
# 利用詞嵌入矩陣將輸入的數據中的詞轉換成詞向量,維度[batch_size, sequence_length, embedding_size]
embedded_words = tf.nn.embedding_lookup(embedding_w, self.inputs)
# 定義兩層雙向LSTM的模型結構
with tf.name_scope("Bi-LSTM"):
for idx, hidden_size in enumerate(self.config["hidden_sizes"]):
with tf.name_scope("Bi-LSTM" + str(idx)):
# 定義前向LSTM結構
lstm_fw_cell = tf.nn.rnn_cell.DropoutWrapper(
tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
output_keep_prob=self.keep_prob)
# 定義反向LSTM結構
lstm_bw_cell = tf.nn.rnn_cell.DropoutWrapper(
tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
output_keep_prob=self.keep_prob)
# 采用動態rnn,可以動態的輸入序列的長度,若沒有輸入,則取序列的全長
# outputs是一個元祖(output_fw, output_bw),其中兩個元素的維度都是[batch_size, max_time, hidden_size],
# fw和bw的hidden_size一樣
# self.current_state 是最終的狀態,二元組(state_fw, state_bw),state_fw=[batch_size, s],s是一個元祖(h, c)
outputs, current_state = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell, lstm_bw_cell,
embedded_words, dtype=tf.float32,
scope="bi-lstm" + str(idx))
# 對outputs中的fw和bw的結果拼接 [batch_size, time_step, hidden_size * 2]
embedded_words = tf.concat(outputs, 2)
# 將最后一層Bi-LSTM輸出的結果分割成前向和后向的輸出
outputs = tf.split(embedded_words, 2, -1)
# 在Bi-LSTM+Attention的論文中,將前向和后向的輸出相加
with tf.name_scope("Attention"):
H = outputs[0] + outputs[1]
# 得到Attention的輸出
output = self._attention(H)
output_size = self.config["hidden_sizes"][-1]
# 全連接層的輸出
with tf.name_scope("output"):
output_w = tf.get_variable(
"output_w",
shape=[output_size, self.config["num_classes"]],
initializer=tf.contrib.layers.xavier_initializer())
output_b = tf.Variable(tf.constant(0.1, shape=[self.config["num_classes"]]), name="output_b")
self.l2_loss += tf.nn.l2_loss(output_w)
self.l2_loss += tf.nn.l2_loss(output_b)
self.logits = tf.nn.xw_plus_b(output, output_w, output_b, name="logits")
self.predictions = self.get_predictions()
self.loss = self.cal_loss()
self.train_op, self.summary_op = self.get_train_op()
def _attention(self, H):
"""
利用Attention機制得到句子的向量表示
"""
# 獲得最后一層LSTM的神經元數量
hidden_size = self.config["hidden_sizes"][-1]
# 初始化一個權重向量,是可訓練的參數
W = tf.Variable(tf.random_normal([hidden_size], stddev=0.1))
# 對Bi-LSTM的輸出用激活函數做非線性轉換
M = tf.tanh(H)
# 對W和M做矩陣運算,M=[batch_size, time_step, hidden_size],計算前做維度轉換成[batch_size * time_step, hidden_size]
# newM = [batch_size, time_step, 1],每一個時間步的輸出由向量轉換成一個數字
newM = tf.matmul(tf.reshape(M, [-1, hidden_size]), tf.reshape(W, [-1, 1]))
# 對newM做維度轉換成[batch_size, time_step]
restoreM = tf.reshape(newM, [-1, self.config["sequence_length"]])
# 用softmax做歸一化處理[batch_size, time_step]
self.alpha = tf.nn.softmax(restoreM)
# 利用求得的alpha的值對H進行加權求和,用矩陣運算直接操作
r = tf.matmul(tf.transpose(H, [0, 2, 1]), tf.reshape(self.alpha, [-1, self.config["sequence_length"], 1]))
# 將三維壓縮成二維sequeezeR=[batch_size, hidden_size]
sequeezeR = tf.squeeze(r)
sentenceRepren = tf.tanh(sequeezeR)
# 對Attention的輸出可以做dropout處理
output = tf.nn.dropout(sentenceRepren, self.keep_prob)
return output
base.py
import tensorflow as tf
import numpy as np
class BaseModel(object):
def __init__(self, config, vocab_size=None, word_vectors=None):
"""
文本分類的基類,提供了各種屬性和訓練,驗證,測試的方法
:param config: 模型的配置參數
:param vocab_size: 當不提供詞向量的時候需要vocab_size來初始化詞向量
:param word_vectors:預訓練的詞向量,word_vectors 和 vocab_size必須有一個不為None
"""
self.config = config
self.vocab_size = vocab_size
self.word_vectors = word_vectors
self.inputs = tf.placeholder(tf.int32, [None, None], name="inputs") # 數據輸入
self.labels = tf.placeholder(tf.float32, [None], name="labels") # 標簽
self.keep_prob = tf.placeholder(tf.float32, name="keep_prob") # dropout
self.l2_loss = tf.constant(0.0) # 定義l2損失
self.loss = 0.0 # 損失
self.train_op = None # 訓練入口
self.summary_op = None
self.logits = None # 模型最后一層的輸出
self.predictions = None # 預測結果
self.saver = None # 保存為ckpt模型的對象
def cal_loss(self):
"""
計算損失,支持二分類和多分類
:return:
"""
with tf.name_scope("loss"):
losses = 0.0
if self.config["num_classes"] == 1:
losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits,
labels=tf.reshape(self.labels, [-1, 1]))
elif self.config["num_classes"] > 1:
self.labels = tf.cast(self.labels, dtype=tf.int32)
losses = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits,
labels=self.labels)
loss = tf.reduce_mean(losses)
return loss
def get_optimizer(self):
"""
獲得優化器
:return:
"""
optimizer = None
if self.config["optimization"] == "adam":
optimizer = tf.train.AdamOptimizer(self.config["learning_rate"])
if self.config["optimization"] == "rmsprop":
optimizer = tf.train.RMSPropOptimizer(self.config["learning_rate"])
if self.config["optimization"] == "sgd":
optimizer = tf.train.GradientDescentOptimizer(self.config["learning_rate"])
return optimizer
def get_train_op(self):
"""
獲得訓練的入口
:return:
"""
# 定義優化器
optimizer = self.get_optimizer()
trainable_params = tf.trainable_variables()
gradients = tf.gradients(self.loss, trainable_params)
# 對梯度進行梯度截斷
clip_gradients, _ = tf.clip_by_global_norm(gradients, self.config["max_grad_norm"])
train_op = optimizer.apply_gradients(zip(clip_gradients, trainable_params))
tf.summary.scalar("loss", self.loss)
summary_op = tf.summary.merge_all()
return train_op, summary_op
def get_predictions(self):
"""
得到預測結果
:return:
"""
predictions = None
if self.config["num_classes"] == 1:
predictions = tf.cast(tf.greater_equal(self.logits, 0.0), tf.int32, name="predictions")
elif self.config["num_classes"] > 1:
predictions = tf.argmax(self.logits, axis=-1, name="predictions")
return predictions
def build_model(self):
"""
創建模型
:return:
"""
raise NotImplementedError
def init_saver(self):
"""
初始化saver對象
:return:
"""
self.saver = tf.train.Saver(tf.global_variables())
def train(self, sess, batch, dropout_prob):
"""
訓練模型
:param sess: tf的會話對象
:param batch: batch數據
:param dropout_prob: dropout比例
:return: 損失和預測結果
"""
feed_dict = {self.inputs: batch["x"],
self.labels: batch["y"],
self.keep_prob: dropout_prob}
# 訓練模型
_, summary, loss, predictions = sess.run([self.train_op, self.summary_op, self.loss, self.predictions],
feed_dict=feed_dict)
return summary, loss, predictions
def eval(self, sess, batch):
"""
驗證模型
:param sess: tf中的會話對象
:param batch: batch數據
:return: 損失和預測結果
"""
feed_dict = {self.inputs: batch["x"],
self.labels: batch["y"],
self.keep_prob: 1.0}
summary, loss, predictions = sess.run([self.summary_op, self.loss, self.predictions], feed_dict=feed_dict)
return summary, loss, predictions
def infer(self, sess, inputs):
"""
預測新數據
:param sess: tf中的會話對象
:param inputs: batch數據
:return: 預測結果
"""
feed_dict = {self.inputs: np.array([inputs]),
self.keep_prob: 1.0}
predict = sess.run(self.predictions, feed_dict=feed_dict)
return predict