PaddlePaddle Transformer encoder 源碼解析


[ github 源碼地址 ]

本文基於PaddlePaddle 1.7版本,解析動態圖下的Transformer encoder源碼實現。

 

Transformer的每個Encoder子層(bert_base中包含12個encoder子層)包含 2 個小子層 :

  • Multi-Head Attention
  • Feed Forward

(Decoder中還包含Masked Multi-Head Attention)

 

class 有如下幾個:

PrePostProcessLayer 用於添加殘差連接、正則化、dropout
PositionwiseFeedForwardLayer 全連接前饋神經網絡
MultiHeadAttentionLayer 多頭注意力層
EncoderSubLayer encoder子層
EncoderLayer transformer encoder層

 

在PaddlePaddle動態圖中,網絡層的實現繼承paddle.fluid.dygraph.Layer,類內方法__init__是對網絡層的定義,forward是跑前向時所需的計算。

[ 更多PaddlePaddle動態圖教程 ]

 

具體實現如下,對代碼的解釋在注釋中:

一些必要的導入

"dygraph transformer layers"

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np

import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph import Embedding, LayerNorm, Linear, Layer 

 

PrePostProcessLayer

可選模式:{ a: 殘差連接,n: 層歸一化,d: dropout}

殘差連接

圖中Add+Norm層。每經過一個模塊的運算, 都要把運算之前的值和運算之后的值相加, 從而得到殘差連接,殘差可以使梯度直接走捷徑反傳到最初始層。

殘差連接公式:

y=f(x)+x

x 表示輸入的變量,實際就是跨層相加。

層歸一化

LayerNorm實際就是對隱含層做層歸一化,即對某一層的所有神經元的輸入進行歸一化(沿着通道channel方向),使得其加快訓練速度:

 

層歸一化公式:

x : 該層神經元的向量表示

H : 層中隱藏神經元個數

ϵ : 添加較小的值到方差中以防止除零

g : 可訓練的比例參數

b : 可訓練的偏差參數

[ PaddlePaddle 對應 api 文檔 ]

 

dropout

丟棄或者保持x的每個元素獨立。Dropout是一種正則化手段,通過在訓練過程中阻止神經元節點間的相關性來減少過擬合。根據給定的丟棄概率,dropout操作符按丟棄概率隨機將一些神經元輸出設置為0,其他的仍保持不變。

dropout op可以從Program中刪除,提高執行效率。

class PrePostProcessLayer(Layer):
    """
    PrePostProcessLayer
    """

    def __init__(self, process_cmd, d_model, dropout_rate, name):
        super(PrePostProcessLayer, self).__init__()
        self.process_cmd = process_cmd # 處理模式 a n d, 可選多個
        self.functors = [] # 處理層
        self.exec_order = ""
        # 根據處理模式,為處理層添加子層
        for cmd in self.process_cmd:
            if cmd == "a":  # add residual connection
                self.functors.append(lambda x, y: x + y if y else x)
                self.exec_order += "a"
            elif cmd == "n":  # add layer normalization
                self.functors.append(
                    self.add_sublayer(
                        # name
                        "layer_norm_%d" % len(
                            self.sublayers(include_sublayers=False)),
                        LayerNorm(
                            normalized_shape=d_model, # 需規范化的shape,如果是單個整數,則此模塊將在最后一個維度上規范化(此時最后一維的維度需與該參數相同)。
                            param_attr=fluid.ParamAttr(  # 權重參數
                                name=name + "_layer_norm_scale",
                                # 常量初始化函數,通過輸入的value值初始化輸入變量
                                initializer=fluid.initializer.Constant(1.)),
                            bias_attr=fluid.ParamAttr( # 偏置參數
                                name=name + "_layer_norm_bias",
                                initializer=fluid.initializer.Constant(0.)))))
                self.exec_order += "n"
            elif cmd == "d":  # add dropout
                if dropout_rate:
                    self.functors.append(lambda x: fluid.layers.dropout(
                        x, dropout_prob=dropout_rate, is_test=False))
                    self.exec_order += "d"
    def forward(self, x, residual=None):
        for i, cmd in enumerate(self.exec_order):
            if cmd == "a":
                x = self.functors[i](x, residual)
            else:
                x = self.functors[i](x)
        return

 

PositionwiseFeedForwardLayer

bert中hidden_act(激活函數)是gelu。

class PositionwiseFeedForwardLayer(Layer):
    """
    PositionwiseFeedForwardLayer
    """

    def __init__(self,
                 hidden_act, # 激活函數
                 d_inner_hid, # 中間隱層的維度
                 d_model, # 最終輸出的維度
                 dropout_rate,
                 param_initializer=None,
                 name=""):
        super(PositionwiseFeedForwardLayer, self).__init__()

        # 兩個fc層
        self._i2h = Linear(
            input_dim=d_model,
            output_dim=d_inner_hid,
            param_attr=fluid.ParamAttr(
                name=name + '_fc_0.w_0', initializer=param_initializer),
            bias_attr=name + '_fc_0.b_0',
            act=hidden_act)

        self._h2o = Linear(
            input_dim=d_inner_hid,
            output_dim=d_model,
            param_attr=fluid.ParamAttr(
                name=name + '_fc_1.w_0', initializer=param_initializer),
            bias_attr=name + '_fc_1.b_0')

        self._dropout_rate = dropout_rate
    def forward(self, x):
        """
        forward
        :param x:
        :return:
        """
        hidden = self._i2h(x)
        # dropout
        if self._dropout_rate:
            hidden = fluid.layers.dropout(
                hidden,
                dropout_prob=self._dropout_rate,
                upscale_in_train="upscale_in_train",
                is_test=False)
        out = self._h2o(hidden)
        return out

 

 

MultiHeadAttentionLayer

 

幾個維度:

 

self._emb_size = config['hidden_size']   # 768

d_key=self._emb_size // self._n_head,

d_value=self._emb_size // self._n_head,

d_model=self._emb_size,

d_inner_hid=self._emb_size * 4

class MultiHeadAttentionLayer(Layer):
    """
    MultiHeadAttentionLayer
    """

    def __init__(self,
                 d_key,
                 d_value,
                 d_model,
                 n_head=1,
                 dropout_rate=0.,
                 cache=None,
                 gather_idx=None,
                 static_kv=False,
                 param_initializer=None,
                 name=""):
        super(MultiHeadAttentionLayer, self).__init__()
        self._n_head = n_head
        self._d_key = d_key
        self._d_value = d_value
        self._d_model = d_model
        self._dropout_rate = dropout_rate

        self._q_fc = Linear(
            input_dim=d_model,
            output_dim=d_key * n_head,
            param_attr=fluid.ParamAttr(
                name=name + '_query_fc.w_0', initializer=param_initializer),
            bias_attr=name + '_query_fc.b_0')

        self._k_fc = Linear(
            input_dim=d_model,
            output_dim=d_key * n_head,
            param_attr=fluid.ParamAttr(
                name=name + '_key_fc.w_0', initializer=param_initializer),
            bias_attr=name + '_key_fc.b_0')

        self._v_fc = Linear(
            input_dim=d_model,
            output_dim=d_value * n_head,
            param_attr=fluid.ParamAttr(
                name=name + '_value_fc.w_0', initializer=param_initializer),
            bias_attr=name + '_value_fc.b_0')

        self._proj_fc = Linear(
            input_dim=d_value * n_head,
            output_dim=d_model,
            param_attr=fluid.ParamAttr(
                name=name + '_output_fc.w_0', initializer=param_initializer),
            bias_attr=name + '_output_fc.b_0')

    def forward(self, queries, keys, values, attn_bias):
        """
        forward
        :param queries:
        :param keys:
        :param values:
        :param attn_bias:
        :return:
        """
        # compute q ,k ,v
        keys = queries if keys is None else keys
        values = keys if values is None else values
        # 得到q k v 矩陣
        q = self._q_fc(queries)
        k = self._k_fc(keys)
        v = self._v_fc(values)

        # split head

        q_hidden_size = q.shape[-1]     
        eshaped_q = fluid.layers.reshape(
            x=q,
            shape=[0, 0, self._n_head, q_hidden_size // self._n_head],
            inplace=False)
        transpose_q = fluid.layers.transpose(x=reshaped_q, perm=[0, 2, 1, 3])

        k_hidden_size = k.shape[-1]
        reshaped_k = fluid.layers.reshape(
            x=k,
            shape=[0, 0, self._n_head, k_hidden_size // self._n_head],
            inplace=False)
        transpose_k = fluid.layers.transpose(x=reshaped_k, perm=[0, 2, 1, 3])

        v_hidden_size = v.shape[-1]
        reshaped_v = fluid.layers.reshape(
            x=v,
            shape=[0, 0, self._n_head, v_hidden_size // self._n_head],
            inplace=False)
        transpose_v = fluid.layers.transpose(x=reshaped_v, perm=[0, 2, 1, 3])

        scaled_q = fluid.layers.scale(x=transpose_q, scale=self._d_key**-0.5)
        # scale dot product attention
        product = fluid.layers.matmul(
            #x=transpose_q,
            x=scaled_q,
            y=transpose_k,
            transpose_y=True)
        #alpha=self._d_model**-0.5)
        if attn_bias:
            product += attn_bias
        weights = fluid.layers.softmax(product)
        if self._dropout_rate:
            weights_droped = fluid.layers.dropout(
                weights,
                dropout_prob=self._dropout_rate,
                dropout_implementation="upscale_in_train",
                is_test=False)
            out = fluid.layers.matmul(weights_droped, transpose_v)
        else:       
                out = fluid.layers.matmul(weights, transpose_v)

        # combine heads
        if len(out.shape) != 4:
            raise ValueError("Input(x) should be a 4-D Tensor.")
        trans_x = fluid.layers.transpose(out, perm=[0, 2, 1, 3])
        final_out = fluid.layers.reshape(
            x=trans_x,
            shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]],
            inplace=False)

        # fc to output
        proj_out = self._proj_fc(final_out)
        return proj_out

 

EncoderSubLayer

class EncoderSubLayer(Layer):
    """
    EncoderSubLayer
    """

    def __init__(self,
                 hidden_act,
                 n_head,
                 d_key,
                 d_value,
                 d_model,
                 d_inner_hid,
                 prepostprocess_dropout,
                 attention_dropout,
                 relu_dropout,
                 preprocess_cmd="n",
                 postprocess_cmd="da",
                 param_initializer=None,
                 name=""):

        super(EncoderSubLayer, self).__init__()
        self.name = name
        self._preprocess_cmd = preprocess_cmd
        self._postprocess_cmd = postprocess_cmd
        self._prepostprocess_dropout = prepostprocess_dropout
        # 預處理
        self._preprocess_layer = PrePostProcessLayer(
            self._preprocess_cmd,
            d_model,
            prepostprocess_dropout,
            name=name + "_pre_att")
        # 多頭注意力
        self._multihead_attention_layer = MultiHeadAttentionLayer(
            d_key,
            d_value,
            d_model,
            n_head,
            attention_dropout,
            None,
            None,
            False,
            param_initializer,
            name=name + "_multi_head_att")

        self._postprocess_layer = PrePostProcessLayer(
            self._postprocess_cmd,
            d_model,
            self._prepostprocess_dropout,
            name=name + "_post_att")
        self._preprocess_layer2 = PrePostProcessLayer(
            self._preprocess_cmd,
            d_model,
            self._prepostprocess_dropout,
            name=name + "_pre_ffn")

        self._positionwise_feed_forward = PositionwiseFeedForwardLayer(
            hidden_act,
            d_inner_hid,
            d_model,
            relu_dropout,
            param_initializer,
            name=name + "_ffn")

        self._postprocess_layer2 = PrePostProcessLayer(
            self._postprocess_cmd,
            d_model,
            self._prepostprocess_dropout,
            name=name + "_post_ffn")

    def forward(self, enc_input, attn_bias):
        """
        forward
        :param enc_input: encoder 輸入
        :param attn_bias: attention 偏置
        :return: 一層encoder encode輸入之后的結果
        """
        # 在進行多頭attention前,先進行預處理
        pre_process_multihead = self._preprocess_layer(enc_input)
        # 預處理之后的結果給到多頭attention層
        attn_output = self._multihead_attention_layer(pre_process_multihead,
                                                      None, None, attn_bias)
        # 經過attention之后進行后處理
        attn_output = self._postprocess_layer(attn_output, enc_input)
        # 在給到FFN層前進行預處理
        pre_process2_output = self._preprocess_layer2(attn_output)
        # 得到FFN層的結果
        ffd_output = self._positionwise_feed_forward(pre_process2_output)
        # 返回后處理后的結果
        return self._postprocess_layer2(ffd_output, attn_output)

 

EncoderLayer

class EncoderLayer(Layer):
    """
    encoder
    """

    def __init__(self,
                 hidden_act,
                 n_layer, # encoder子層數量 / encoder深度
                 n_head, # 注意力機制中head數量
                 d_key,
                 d_value,
                 d_model,
                 d_inner_hid,
                 prepostprocess_dropout, # 處理層的dropout概率
                 attention_dropout, # attention層的dropout概率
                 relu_dropout, # 激活函數層的dropout概率
                 preprocess_cmd="n", # 前處理,正則化
                 postprocess_cmd="da", # 后處理,dropout + 殘差連接
                 param_initializer=None,
                 name=""):

        super(EncoderLayer, self).__init__()
        self._preprocess_cmd = preprocess_cmd
        self._encoder_sublayers = list()
        self._prepostprocess_dropout = prepostprocess_dropout
        self._n_layer = n_layer
        self._hidden_act = hidden_act
        # 后處理層,這里是層正則化
        self._preprocess_layer = PrePostProcessLayer(
            self._preprocess_cmd, 3, self._prepostprocess_dropout,
            "post_encoder")
        # 根據n_layer的設置(bert_base中是12)迭代定義幾個encoder子層
        for i in range(n_layer):
            self._encoder_sublayers.append(
                # 使用add_sublayer方法添加子層
                self.add_sublayer(
                    'esl_%d' % i,
                    EncoderSubLayer(
                        hidden_act,
                        n_head,
                        d_key,
                        d_value,
                        d_model,
                        d_inner_hid,
                        prepostprocess_dropout,
                        attention_dropout,
                        relu_dropout,
                        preprocess_cmd,
                        postprocess_cmd,
                        param_initializer,
                        name=name + '_layer_' + str(i))))

    def forward(self, enc_input, attn_bias):
        """
        forward
        :param enc_input: 模型輸入
        :param attn_bias: bias項可根據具體情況選擇是否保留
        :return: encode之后的結果
        """
        # 迭代多個encoder子層,例如 bert base 的encoder子層數為12(self._n_layer)
        for i in range(self._n_layer):
            # 得到子層的輸出,參數為 enc_input, attn_bias
            enc_output = self._encoder_sublayers[i](enc_input, attn_bias)
            # 該子層的輸出作為下一子層的輸入
            enc_input = enc_output
        # 返回處理過的層
        return self._preprocess_layer(enc_output)

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM