雙向LSTM模型的tensorflow實現


來源:https://github.com/jiangxinyang227/NLP-Project/text_classifier

import tensorflow as tf
from .base import BaseModel


class BiLstmAttenModel(BaseModel):
    def __init__(self, config, vocab_size, word_vectors):
        super(BiLstmAttenModel, self).__init__(config=config, vocab_size=vocab_size, word_vectors=word_vectors)
        # 構建模型
        self.build_model()
        # 初始化保存模型的saver對象
        self.init_saver()
    def build_model(self):
        # 詞嵌入層
        with tf.name_scope("embedding"):
            # 利用預訓練的詞向量初始化詞嵌入矩陣
            if self.word_vectors is not None:
                embedding_w = tf.Variable(tf.cast(self.word_vectors, dtype=tf.float32, name="word2vec"), name="embedding_w")
            else:
                embedding_w = tf.get_variable("embedding_w", shape=[self.vocab_size, self.config["embedding_size"]],initializer=tf.contrib.layers.xavier_initializer())
            # 利用詞嵌入矩陣將輸入的數據中的詞轉換成詞向量,維度[batch_size, sequence_length, embedding_size]
            embedded_words = tf.nn.embedding_lookup(embedding_w, self.inputs)
            # 定義兩層雙向LSTM的模型結構
            with tf.name_scope("Bi-LSTM"):
                for idx, hidden_size in enumerate(self.config["hidden_sizes"]):
                    with tf.name_scope("Bi-LSTM" + str(idx)):
                        # 定義前向LSTM結構
                        lstm_fw_cell = tf.nn.rnn_cell.DropoutWrapper(
                            tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
                            output_keep_prob=self.keep_prob)
                        # 定義反向LSTM結構
                        lstm_bw_cell = tf.nn.rnn_cell.DropoutWrapper(
                            tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
                            output_keep_prob=self.keep_prob)

                        # 采用動態rnn,可以動態的輸入序列的長度,若沒有輸入,則取序列的全長
                        # outputs是一個元祖(output_fw, output_bw),其中兩個元素的維度都是[batch_size, max_time, hidden_size],
                        # fw和bw的hidden_size一樣
                        # self.current_state 是最終的狀態,二元組(state_fw, state_bw),state_fw=[batch_size, s],s是一個元祖(h, c)
                        outputs, current_state = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell, lstm_bw_cell,
                                                                                 embedded_words, dtype=tf.float32,
                                                                                 scope="bi-lstm" + str(idx))
                        # 對outputs中的fw和bw的結果拼接 [batch_size, time_step, hidden_size * 2]
                        embedded_words = tf.concat(outputs, 2)

        # 將最后一層Bi-LSTM輸出的結果分割成前向和后向的輸出
        outputs = tf.split(embedded_words, 2, -1)
        # 在Bi-LSTM+Attention的論文中,將前向和后向的輸出相加
        with tf.name_scope("Attention"):
            H = outputs[0] + outputs[1]
            # 得到Attention的輸出
            output = self._attention(H)
            output_size = self.config["hidden_sizes"][-1]

        # 全連接層的輸出
        with tf.name_scope("output"):
            output_w = tf.get_variable(
                "output_w",
                shape=[output_size, self.config["num_classes"]],
                initializer=tf.contrib.layers.xavier_initializer())

            output_b = tf.Variable(tf.constant(0.1, shape=[self.config["num_classes"]]), name="output_b")
            self.l2_loss += tf.nn.l2_loss(output_w)
            self.l2_loss += tf.nn.l2_loss(output_b)
            self.logits = tf.nn.xw_plus_b(output, output_w, output_b, name="logits")
            self.predictions = self.get_predictions()

        self.loss = self.cal_loss()
        self.train_op, self.summary_op = self.get_train_op()

    def _attention(self, H):
        """
        利用Attention機制得到句子的向量表示
        """
        # 獲得最后一層LSTM的神經元數量
        hidden_size = self.config["hidden_sizes"][-1]

        # 初始化一個權重向量,是可訓練的參數
        W = tf.Variable(tf.random_normal([hidden_size], stddev=0.1))

        # 對Bi-LSTM的輸出用激活函數做非線性轉換
        M = tf.tanh(H)

        # 對W和M做矩陣運算,M=[batch_size, time_step, hidden_size],計算前做維度轉換成[batch_size * time_step, hidden_size]
        # newM = [batch_size, time_step, 1],每一個時間步的輸出由向量轉換成一個數字
        newM = tf.matmul(tf.reshape(M, [-1, hidden_size]), tf.reshape(W, [-1, 1]))

        # 對newM做維度轉換成[batch_size, time_step]
        restoreM = tf.reshape(newM, [-1, self.config["sequence_length"]])

        # 用softmax做歸一化處理[batch_size, time_step]
        self.alpha = tf.nn.softmax(restoreM)

        # 利用求得的alpha的值對H進行加權求和,用矩陣運算直接操作
        r = tf.matmul(tf.transpose(H, [0, 2, 1]), tf.reshape(self.alpha, [-1, self.config["sequence_length"], 1]))

        # 將三維壓縮成二維sequeezeR=[batch_size, hidden_size]
        sequeezeR = tf.squeeze(r)

        sentenceRepren = tf.tanh(sequeezeR)

        # 對Attention的輸出可以做dropout處理
        output = tf.nn.dropout(sentenceRepren, self.keep_prob)

        return output

base.py

import tensorflow as tf
import numpy as np

class BaseModel(object):
    def __init__(self, config, vocab_size=None, word_vectors=None):
        """
        文本分類的基類,提供了各種屬性和訓練,驗證,測試的方法
        :param config: 模型的配置參數
        :param vocab_size: 當不提供詞向量的時候需要vocab_size來初始化詞向量
        :param word_vectors:預訓練的詞向量,word_vectors 和 vocab_size必須有一個不為None
        """
        self.config = config
        self.vocab_size = vocab_size
        self.word_vectors = word_vectors
        self.inputs = tf.placeholder(tf.int32, [None, None], name="inputs")  # 數據輸入
        self.labels = tf.placeholder(tf.float32, [None], name="labels")  # 標簽
        self.keep_prob = tf.placeholder(tf.float32, name="keep_prob")  # dropout

        self.l2_loss = tf.constant(0.0)  # 定義l2損失
        self.loss = 0.0  # 損失
        self.train_op = None  # 訓練入口
        self.summary_op = None
        self.logits = None  # 模型最后一層的輸出
        self.predictions = None  # 預測結果
        self.saver = None  # 保存為ckpt模型的對象

    def cal_loss(self):
        """
        計算損失,支持二分類和多分類
        :return:
        """
        with tf.name_scope("loss"):
            losses = 0.0
            if self.config["num_classes"] == 1:
                losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits,
                                                                 labels=tf.reshape(self.labels, [-1, 1]))
            elif self.config["num_classes"] > 1:
                self.labels = tf.cast(self.labels, dtype=tf.int32)
                losses = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits,
                                                                        labels=self.labels)
            loss = tf.reduce_mean(losses)
            return loss

    def get_optimizer(self):
        """
        獲得優化器
        :return:
        """
        optimizer = None
        if self.config["optimization"] == "adam":
            optimizer = tf.train.AdamOptimizer(self.config["learning_rate"])
        if self.config["optimization"] == "rmsprop":
            optimizer = tf.train.RMSPropOptimizer(self.config["learning_rate"])
        if self.config["optimization"] == "sgd":
            optimizer = tf.train.GradientDescentOptimizer(self.config["learning_rate"])
        return optimizer

    def get_train_op(self):
        """
        獲得訓練的入口
        :return:
        """
        # 定義優化器
        optimizer = self.get_optimizer()

        trainable_params = tf.trainable_variables()
        gradients = tf.gradients(self.loss, trainable_params)
        # 對梯度進行梯度截斷
        clip_gradients, _ = tf.clip_by_global_norm(gradients, self.config["max_grad_norm"])
        train_op = optimizer.apply_gradients(zip(clip_gradients, trainable_params))

        tf.summary.scalar("loss", self.loss)
        summary_op = tf.summary.merge_all()

        return train_op, summary_op

    def get_predictions(self):
        """
        得到預測結果
        :return:
        """
        predictions = None
        if self.config["num_classes"] == 1:
            predictions = tf.cast(tf.greater_equal(self.logits, 0.0), tf.int32, name="predictions")
        elif self.config["num_classes"] > 1:
            predictions = tf.argmax(self.logits, axis=-1, name="predictions")
        return predictions

    def build_model(self):
        """
        創建模型
        :return:
        """
        raise NotImplementedError

    def init_saver(self):
        """
        初始化saver對象
        :return:
        """
        self.saver = tf.train.Saver(tf.global_variables())

    def train(self, sess, batch, dropout_prob):
        """
        訓練模型
        :param sess: tf的會話對象
        :param batch: batch數據
        :param dropout_prob: dropout比例
        :return: 損失和預測結果
        """

        feed_dict = {self.inputs: batch["x"],
                     self.labels: batch["y"],
                     self.keep_prob: dropout_prob}

        # 訓練模型
        _, summary, loss, predictions = sess.run([self.train_op, self.summary_op, self.loss, self.predictions],
                                                 feed_dict=feed_dict)
        return summary, loss, predictions

    def eval(self, sess, batch):
        """
        驗證模型
        :param sess: tf中的會話對象
        :param batch: batch數據
        :return: 損失和預測結果
        """
        feed_dict = {self.inputs: batch["x"],
                     self.labels: batch["y"],
                     self.keep_prob: 1.0}

        summary, loss, predictions = sess.run([self.summary_op, self.loss, self.predictions], feed_dict=feed_dict)
        return summary, loss, predictions

    def infer(self, sess, inputs):
        """
        預測新數據
        :param sess: tf中的會話對象
        :param inputs: batch數據
        :return: 預測結果
        """
        feed_dict = {self.inputs: np.array([inputs]),
                     self.keep_prob: 1.0}

        predict = sess.run(self.predictions, feed_dict=feed_dict)

        return predict


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM