案例學習--理解語言的 Transformer 模型


本教程訓練了一個 Transformer 模型 用於將葡萄牙語翻譯成英語。這是一個高級示例,假定您具備文本生成(text generation)注意力機制(attention) 的知識。

Transformer 模型的核心思想是自注意力機制(self-attention)——能注意輸入序列的不同位置以計算該序列的表示的能力。Transformer 創建了多層自注意力層(self-attetion layers)組成的堆棧,下文的按比縮放的點積注意力(Scaled dot product attention)多頭注意力(Multi-head attention)部分對此進行了說明。

一個 transformer 模型用自注意力層而非 RNNsCNNs 來處理變長的輸入。這種通用架構有一系列的優勢:

  • 它不對數據間的時間/空間關系做任何假設。這是處理一組對象(objects)的理想選擇(例如,星際爭霸單位(StarCraft units))。
  • 層輸出可以並行計算,而非像 RNN 這樣的序列計算。
  • 遠距離項可以影響彼此的輸出,而無需經過許多 RNN 步驟或卷積層(例如,參見場景記憶 Transformer(Scene Memory Transformer)
  • 它能學習長距離的依賴。在許多序列任務中,這是一項挑戰。

該架構的缺點是:

  • 對於時間序列,一個單位時間的輸出是從整個歷史記錄計算的,而非僅從輸入和當前的隱含狀態計算得到。這可能效率較低。
  • 如果輸入確實有時間/空間的關系,像文本,則必須加入一些位置編碼,否則模型將有效地看到一堆單詞。

在此 notebook 中訓練完模型后,您將能輸入葡萄牙語句子,得到其英文翻譯。

Attention heatmap
import tensorflow_datasets as tfds
import tensorflow as tf

import time
import numpy as np
import matplotlib.pyplot as plt

設置輸入流水線(input pipeline)

使用 TFDS 來導入 葡萄牙語-英語翻譯數據集,該數據集來自於 TED 演講開放翻譯項目.

該數據集包含來約 50000 條訓練樣本,1100 條驗證樣本,以及 2000 條測試樣本。

examples, metadata = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True,
                               as_supervised=True)
train_examples, val_examples = examples['train'], examples['validation']
for example in train_examples.take(2):
    print(example)
(<tf.Tensor: shape=(), dtype=string, numpy=b'e quando melhoramos a procura , tiramos a \xc3\xbanica vantagem da impress\xc3\xa3o , que \xc3\xa9 a serendipidade .'>, <tf.Tensor: shape=(), dtype=string, numpy=b'and when you improve searchability , you actually take away the one advantage of print , which is serendipity .'>)
(<tf.Tensor: shape=(), dtype=string, numpy=b'mas e se estes fatores fossem ativos ?'>, <tf.Tensor: shape=(), dtype=string, numpy=b'but what if it were active ?'>)

從訓練數據集創建自定義子詞分詞器(subwords tokenizer)。

tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (en.numpy() for pt, en in train_examples), target_vocab_size=2**13)

tokenizer_pt = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (pt.numpy() for pt, en in train_examples), target_vocab_size=2**13)

一個簡單句子的 token 示例

sample_string = 'Transformer is awesome.'

tokenized_string = tokenizer_en.encode(sample_string)
print ('Tokenized string is {}'.format(tokenized_string))

original_string = tokenizer_en.decode(tokenized_string)
print ('The original string: {}'.format(original_string))

assert original_string == sample_string
Tokenized string is [7915, 1248, 7946, 7194, 13, 2799, 7877]
The original string: Transformer is awesome.

如果單詞不在詞典中,則分詞器(tokenizer)通過將單詞分解為子詞來對字符串進行編碼。

for ts in tokenized_string:
    print ('{} ----> {}'.format(ts, tokenizer_en.decode([ts])))
7915 ----> T
1248 ----> ran
7946 ----> s
7194 ----> former 
13 ----> is 
2799 ----> awesome
7877 ----> .
BUFFER_SIZE = 20000
BATCH_SIZE = 64

給每一個樣本增加開始和結束標記(token)添加到輸入和目標。

def encode(lang1, lang2):
    lang1 = [tokenizer_pt.vocab_size] + tokenizer_pt.encode(
            lang1.numpy()) + [tokenizer_pt.vocab_size+1]

    lang2 = [tokenizer_en.vocab_size] + tokenizer_en.encode(
            lang2.numpy()) + [tokenizer_en.vocab_size+1]
  
    return lang1, lang2

Note:為了使本示例較小且相對較快,刪除長度大於40個標記的樣本。

MAX_LENGTH = 40
def filter_max_length(x, y, max_length=MAX_LENGTH):
    return tf.logical_and(tf.size(x) <= max_length,
                            tf.size(y) <= max_length)

.map() 內部的操作以圖模式(graph mode)運行,.map() 接收一個不具有 numpy 屬性的圖張量(graph tensor)。該分詞器(tokenizer)需要將一個字符串或 Unicode 符號,編碼成整數。因此,您需要在 tf.py_function 內部運行編碼過程,tf.py_function 接收一個 eager 張量,該 eager 張量有一個包含字符串值的 numpy 屬性。

https://www.tensorflow.org/api_docs/python/tf/py_function

def tf_encode(pt, en):
    result_pt, result_en = tf.py_function(encode, [pt, en], [tf.int64, tf.int64], 'encode-op')
    result_pt.set_shape([None])
    result_en.set_shape([None])
    return result_pt, result_en
train_dataset = train_examples.map(tf_encode)
train_dataset = train_dataset.filter(filter_max_length)
# 將數據集緩存到內存中以加快讀取速度。
train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(BUFFER_SIZE).padded_batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)


val_dataset = val_examples.map(tf_encode)
val_dataset = val_dataset.filter(filter_max_length).padded_batch(BATCH_SIZE)
pt_batch, en_batch = next(iter(val_dataset))
pt_batch, en_batch
(<tf.Tensor: shape=(64, 38), dtype=int64, numpy=
 array([[8214,  342, 3032, ...,    0,    0,    0],
        [8214,   95,  198, ...,    0,    0,    0],
        [8214, 4479, 7990, ...,    0,    0,    0],
        ...,
        [8214,  584,   12, ...,    0,    0,    0],
        [8214,   59, 1548, ...,    0,    0,    0],
        [8214,  118,   34, ...,    0,    0,    0]])>,
 <tf.Tensor: shape=(64, 40), dtype=int64, numpy=
 array([[8087,   98,   25, ...,    0,    0,    0],
        [8087,   12,   20, ...,    0,    0,    0],
        [8087,   12, 5453, ...,    0,    0,    0],
        ...,
        [8087,   18, 2059, ...,    0,    0,    0],
        [8087,   16, 1436, ...,    0,    0,    0],
        [8087,   15,   57, ...,    0,    0,    0]])>)

位置編碼(Positional encoding)

因為該模型並不包括任何的循環(recurrence)或卷積,所以模型添加了位置編碼,為模型提供一些關於單詞在句子中相對位置的信息。

位置編碼向量被加到嵌入(embedding)向量中。嵌入表示一個 d 維空間的標記,在 d 維空間中有着相似含義的標記會離彼此更近。但是,嵌入並沒有對在一句話中的詞的相對位置進行編碼。因此,當加上位置編碼后,詞將基於它們含義的相似度以及它們在句子中的位置,在 d 維空間中離彼此更近。

參看 位置編碼 的 notebook 了解更多信息。計算位置編碼的公式如下:

\[\Large{PE_{(pos, 2i)} = sin(pos / 10000^{2i / d_{model}})} \]

\[\Large{PE_{(pos, 2i+1)} = cos(pos / 10000^{2i / d_{model}})} \]

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)

    # 將 sin 應用於數組中的偶數索引(indices);2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # 將 cos 應用於數組中的奇數索引;2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)
pos_encoding = positional_encoding(50, 512)
print (pos_encoding.shape)

plt.pcolormesh(pos_encoding[0], cmap='RdBu')
plt.xlabel('Dimension')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()
(1, 50, 512)

image

遮擋(Masking)

遮擋一批序列中所有的填充標記(pad tokens)。0 不 mask 掉, 0 就不mask.

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
  
    # 添加額外的維度來將填充加到
    # 注意力對數(logits)。
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)
x = tf.constant([[7, 6, 0, 0, 1], [1, 2, 3, 0, 0], [0, 0, 0, 4, 5]])
create_padding_mask(x)
<tf.Tensor: shape=(3, 1, 1, 5), dtype=float32, numpy=
array([[[[0., 0., 1., 1., 0.]]],


       [[[0., 0., 0., 1., 1.]]],


       [[[1., 1., 1., 0., 0.]]]], dtype=float32)>

前瞻遮擋(look-ahead mask)用於遮擋一個序列中的后續標記(future tokens)。換句話說,該 mask 表明了不應該使用的條目。

這意味着要預測第三個詞,將僅使用第一個和第二個詞。與此類似,預測第四個詞,僅使用第一個,第二個和第三個詞,依此類推。

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)
x = tf.random.uniform((1, 3))
temp = create_look_ahead_mask(x.shape[1])
temp
<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[0., 1., 1.],
       [0., 0., 1.],
       [0., 0., 0.]], dtype=float32)>

按比縮放的點積注意力(Scaled dot product attention)

scaled_dot_product_attention

Transformer 使用的注意力函數有三個輸入:Q(請求(query))、K(主鍵(key))、V(數值(value))。用於計算注意力權重的等式為:

\[\Large{Attention(Q, K, V) = softmax_k(\frac{QK^T}{\sqrt{d_k}}) V} \]

點積注意力被縮小了深度的平方根倍。這樣做是因為對於較大的深度值,點積的大小會增大,從而推動 softmax 函數往僅有很小的梯度的方向靠攏,導致了一種很硬的(hard)softmax。

例如,假設 QK 的均值為0,方差為1。它們的矩陣乘積將有均值為0,方差為 dk。因此,dk 的平方根被用於縮放(而非其他數值),因為,QK 的矩陣乘積的均值本應該為 0,方差本應該為1,這樣會獲得一個更平緩的 softmax。

遮擋(mask)與 -1e9(接近於負無窮)相乘。這樣做是因為遮擋與縮放的 Q 和 K 的矩陣乘積相加,並在 softmax 之前立即應用。目標是將這些單元歸零,因為 softmax 的較大負數輸入在輸出中接近於零。

def scaled_dot_product_attention(q, k, v, mask):
    """計算注意力權重。
    q, k, v 必須具有匹配的前置維度。
    k, v 必須有匹配的倒數第二個維度,例如:seq_len_k = seq_len_v。
    雖然 mask 根據其類型(填充或前瞻)有不同的形狀,
    但是 mask 必須能進行廣播轉換以便求和。

    參數:
    q: 請求的形狀 == (..., seq_len_q, depth)
    k: 主鍵的形狀 == (..., seq_len_k, depth)
    v: 數值的形狀 == (..., seq_len_v, depth_v)
    mask: Float 張量,其形狀能轉換成
          (..., seq_len_q, seq_len_k)。默認為None。

    返回值:
    輸出,注意力權重
    """

    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # 縮放 matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # 將 mask 加入到縮放的張量上。
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)  

    # softmax 在最后一個軸(seq_len_k)上歸一化,因此分數
    # 相加等於1。
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

當 softmax 在 K 上進行歸一化后,它的值決定了分配到 Q 的重要程度。

輸出表示注意力權重和 V(數值)向量的乘積。這確保了要關注的詞保持原樣,而無關的詞將被清除掉。

def print_out(q, k, v):
    temp_out, temp_attn = scaled_dot_product_attention(
      q, k, v, None)
    print ('Attention weights are:')
    print (temp_attn)
    print ('Output is:')
    print (temp_out)
np.set_printoptions(suppress=True)

temp_k = tf.constant([[10,0,0],
                      [0,10,0],
                      [0,0,10],
                      [0,0,10]], dtype=tf.float32)  # (4, 3)

temp_v = tf.constant([[   1,0],
                      [  10,0],
                      [ 100,5],
                      [1000,6]], dtype=tf.float32)  # (4, 2)

# 這條 `請求(query)符合第二個`主鍵(key)`,
# 因此返回了第二個`數值(value)`。
temp_q = tf.constant([[0, 10, 0]], dtype=tf.float32)  # (1, 3)
print_out(temp_q, temp_k, temp_v)
Attention weights are:
tf.Tensor([[0. 1. 0. 0.]], shape=(1, 4), dtype=float32)
Output is:
tf.Tensor([[10.  0.]], shape=(1, 2), dtype=float32)
# 這條請求符合重復出現的主鍵(第三第四個),
# 因此,對所有的相關數值取了平均。
temp_q = tf.constant([[0, 0, 10]], dtype=tf.float32)  # (1, 3)
print_out(temp_q, temp_k, temp_v)
Attention weights are:
tf.Tensor([[0.  0.  0.5 0.5]], shape=(1, 4), dtype=float32)
Output is:
tf.Tensor([[550.    5.5]], shape=(1, 2), dtype=float32)
# 這條請求符合第一和第二條主鍵,
# 因此,對它們的數值去了平均。
temp_q = tf.constant([[10, 10, 0]], dtype=tf.float32)  # (1, 3)
print_out(temp_q, temp_k, temp_v)
Attention weights are:
tf.Tensor([[0.5 0.5 0.  0. ]], shape=(1, 4), dtype=float32)
Output is:
tf.Tensor([[5.5 0. ]], shape=(1, 2), dtype=float32)

將所有請求一起傳遞。(batch 計算)

temp_q = tf.constant([[0, 0, 10], [0, 10, 0], [10, 10, 0]], dtype=tf.float32)  # (3, 3)
print_out(temp_q, temp_k, temp_v)
Attention weights are:
tf.Tensor(
[[0.  0.  0.5 0.5]
 [0.  1.  0.  0. ]
 [0.5 0.5 0.  0. ]], shape=(3, 4), dtype=float32)
Output is:
tf.Tensor(
[[550.    5.5]
 [ 10.    0. ]
 [  5.5   0. ]], shape=(3, 2), dtype=float32)

多頭注意力(Multi-head attention)

multi-head attention

多頭注意力由四部分組成:

  • 線性層並分拆成多頭。
  • 按比縮放的點積注意力。
  • 多頭及聯。
  • 最后一層線性層。

每個多頭注意力塊有三個輸入:Q(請求)、K(主鍵)、V(數值)。這些輸入經過線性(Dense)層,並分拆成多頭。

將上面定義的 scaled_dot_product_attention 函數應用於每個頭(進行了廣播(broadcasted)以提高效率)。注意力這步必須使用一個恰當的 mask。然后將每個頭的注意力輸出連接起來(用tf.transposetf.reshape),並放入最后的 Dense 層。

Q、K、和 V 被拆分到了多個頭,而非單個的注意力頭,因為多頭允許模型共同注意來自不同表示空間的不同位置的信息。在分拆后,每個頭部的維度減少,因此總的計算成本與有着全部維度的單個注意力頭相同。

class MultiHeadAttention(tf.keras.layers.Layer):
    """
    1 線性層並分拆成多頭。
    2 按比縮放的點積注意力。
    3 多頭及聯。
    4 最后一層線性層。
    """
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """分拆最后一個維度到 (num_heads, dimension).
        轉置結果使得形狀為 (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention, 
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

創建一個 MultiHeadAttention 層進行嘗試。在序列中的每個位置 yMultiHeadAttention 在序列中的所有其他位置運行所有8個注意力頭,在每個位置y,返回一個新的同樣長度的向量。

temp_mha = MultiHeadAttention(d_model=512, num_heads=8)
y = tf.random.uniform((1, 60, 512))  # (batch_size, encoder_sequence, d_model)
out, attn = temp_mha(y, k=y, q=y, mask=None)
out.shape, attn.shape
(TensorShape([1, 60, 512]), TensorShape([1, 8, 60, 60]))

點式前饋網絡(Point wise feed forward network)

點式前饋網絡由兩層全聯接層組成,兩層之間有一個 ReLU 激活函數。

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
      tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])
sample_ffn = point_wise_feed_forward_network(512, 2048)
sample_ffn(tf.random.uniform((64, 50, 512))).shape
TensorShape([64, 50, 512])

編碼與解碼(Encoder and decoder)

transformer

Transformer 模型與標准的具有注意力機制的序列到序列模型(sequence to sequence with attention model),遵循相同的一般模式。

  • 輸入語句經過 N 個編碼器層,為序列中的每個詞/標記生成一個輸出。
  • 解碼器關注編碼器的輸出以及它自身的輸入(自注意力)來預測下一個詞。

編碼器層(Encoder layer)

每個編碼器層包括以下子層:

  1. 多頭注意力(有填充遮擋)
  2. 點式前饋網絡(Point wise feed forward networks)。

每個子層在其周圍有一個殘差連接,然后進行層歸一化。殘差連接有助於避免深度網絡中的梯度消失問題。

每個子層的輸出是 LayerNorm(x + Sublayer(x))。歸一化是在 d_model(最后一個)維度完成的。Transformer 中有 N 個編碼器層。

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):

        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        # 殘差
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        # 殘差
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)

        return out2
sample_encoder_layer = EncoderLayer(512, 8, 2048)

sample_encoder_layer_output = sample_encoder_layer(
    tf.random.uniform((64, 43, 512)), False, None)

sample_encoder_layer_output.shape  # (batch_size, input_seq_len, d_model)
TensorShape([64, 43, 512])

解碼器層(Decoder layer)

每個解碼器層包括以下子層:

  1. 遮擋的多頭注意力(前瞻遮擋和填充遮擋)
  2. 多頭注意力(用填充遮擋)。V(數值)和 K(主鍵)接收編碼器輸出作為輸入。Q(請求)接收遮擋的多頭注意力子層的輸出
  3. 點式前饋網絡

每個子層在其周圍有一個殘差連接,然后進行層歸一化。每個子層的輸出是 LayerNorm(x + Sublayer(x))。歸一化是在 d_model(最后一個)維度完成的。

Transformer 中共有 N 個解碼器層。

當 Q 接收到解碼器的第一個注意力塊的輸出,並且 K 接收到編碼器的輸出時,注意力權重表示根據編碼器的輸出賦予解碼器輸入的重要性。換一種說法,解碼器通過查看編碼器輸出和對其自身輸出的自注意力,預測下一個詞。參看按比縮放的點積注意力部分的演示。

transformer
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)


    def call(self, x, enc_output, training, 
           look_ahead_mask, padding_mask):
        # enc_output.shape == (batch_size, input_seq_len, d_model)

        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # 編碼器和解碼器之間的attention
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  # (batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

        return out3, attn_weights_block1, attn_weights_block2
sample_decoder_layer = DecoderLayer(512, 8, 2048)

sample_decoder_layer_output, _, _ = sample_decoder_layer(
    tf.random.uniform((64, 50, 512)), sample_encoder_layer_output, 
    False, None, None)

sample_decoder_layer_output.shape  # (batch_size, target_seq_len, d_model)
TensorShape([64, 50, 512])

編碼器(Encoder)

編碼器 包括:

  1. 輸入嵌入(Input Embedding)
  2. 位置編碼(Positional Encoding)
  3. N 個編碼器層(encoder layers)

輸入經過嵌入(embedding)后,該嵌入與位置編碼相加。該加法結果的輸出是編碼器層的輸入。編碼器的輸出是解碼器的輸入。

class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
               maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, 
                                                self.d_model)


        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):

        seq_len = tf.shape(x)[1]

        # 將嵌入和位置編碼相加。
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        # The reason we increase the embedding values before the addition 
        # is to make the positional encoding relatively smaller. 
        # This means the original meaning in the embedding vector won’t be lost when we add them together.
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)
sample_encoder = Encoder(num_layers=2, d_model=512, num_heads=8, 
                         dff=2048, input_vocab_size=8500,
                         maximum_position_encoding=10000)

sample_encoder_output = sample_encoder(tf.random.uniform((64, 62)), 
                                       training=False, mask=None)

print (sample_encoder_output.shape)  # (batch_size, input_seq_len, d_model)
(64, 62, 512)

解碼器(Decoder)

解碼器包括:

  1. 輸出嵌入(Output Embedding)
  2. 位置編碼(Positional Encoding)
  3. N 個解碼器層(decoder layers)

目標(target)經過一個嵌入后,該嵌入和位置編碼相加。該加法結果是解碼器層的輸入。解碼器的輸出是最后的線性層的輸入。

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
               maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, 
           look_ahead_mask, padding_mask):

        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                 look_ahead_mask, padding_mask)

            attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2

        # x.shape == (batch_size, target_seq_len, d_model)
        return x, attention_weights
sample_decoder = Decoder(num_layers=2, d_model=512, num_heads=8, 
                         dff=2048, target_vocab_size=8000,
                         maximum_position_encoding=5000)

output, attn = sample_decoder(tf.random.uniform((64, 26)), 
                              enc_output=sample_encoder_output, 
                              training=False, look_ahead_mask=None, 
                              padding_mask=None)

output.shape, attn['decoder_layer2_block2'].shape
(TensorShape([64, 26, 512]), TensorShape([64, 8, 26, 62]))

創建 Transformer

Transformer 包括編碼器,解碼器和最后的線性層。解碼器的輸出是線性層的輸入,返回線性層的輸出。
transformer

class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, 
               target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff, 
                               input_vocab_size, pe_input, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff, 
                               target_vocab_size, pe_target, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, training, enc_padding_mask, 
           look_ahead_mask, dec_padding_mask):

        enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)

        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights
sample_transformer = Transformer(
    num_layers=2, d_model=512, num_heads=8, dff=2048, 
    input_vocab_size=8500, target_vocab_size=8000, 
    pe_input=10000, pe_target=6000)

temp_input = tf.random.uniform((64, 62))
temp_target = tf.random.uniform((64, 26))

fn_out, _ = sample_transformer(temp_input, temp_target, training=False, 
                               enc_padding_mask=None, 
                               look_ahead_mask=None,
                               dec_padding_mask=None)

fn_out.shape  # (batch_size, tar_seq_len, target_vocab_size)
TensorShape([64, 26, 8000])

配置超參數(hyperparameters)

為了讓本示例小且相對較快,已經減小了num_layers、 d_model 和 dff 的值。

Transformer 的基礎模型使用的數值為:num_layers=6d_model = 512dff = 2048。關於所有其他版本的 Transformer,請查閱論文

Note:通過改變以下數值,您可以獲得在許多任務上達到最先進水平的模型。

num_layers = 4
d_model = 128
dff = 512
num_heads = 8

input_vocab_size = tokenizer_pt.vocab_size + 2
target_vocab_size = tokenizer_en.vocab_size + 2
dropout_rate = 0.1

優化器(Optimizer)

根據論文中的公式,將 Adam 優化器與自定義的學習速率調度程序(scheduler)配合使用。

\[\Large{lrate = d_{model}^{-0.5} * min(step{\_}num^{-0.5}, step{\_}num * warmup{\_}steps^{-1.5})} \]

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9)
temp_learning_rate_schedule = CustomSchedule(d_model)

plt.plot(temp_learning_rate_schedule(tf.range(40000, dtype=tf.float32)))
plt.ylabel("Learning Rate")
plt.xlabel("Train Step")
Text(0.5, 0, 'Train Step')

image

損失函數與指標(Loss and metrics)

由於目標序列是填充(padded)過的,因此在計算損失函數時,應用填充遮擋非常重要。

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

mask in mask = tf.math.logical_not(tf.math.equal(real, 0)) is taking care of the PADDING.

So, in your batch you would have sentences of different length and you do 0 padding to make all of them of equal length

(think about I have an apple v/s It's a good day to play football in the sun)

But, it doesn't make sense to include the 0 padded section in the loss calculation - hence, it's first looking into indices where you have a 0 and using multiplication later on to make their loss contribution 0.

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
    name='train_accuracy')

訓練與檢查點(Training and checkpointing)

transformer = Transformer(num_layers, d_model, num_heads, dff,
                          input_vocab_size, target_vocab_size, 
                          pe_input=input_vocab_size, 
                          pe_target=target_vocab_size,
                          rate=dropout_rate)
def create_masks(inp, tar):
    # 編碼器填充遮擋
    enc_padding_mask = create_padding_mask(inp)

    # 在解碼器的第二個注意力模塊使用。
    # 該填充遮擋用於遮擋編碼器的輸出。
    dec_padding_mask = create_padding_mask(inp)

    # 在解碼器的第一個注意力模塊使用。
    # 用於填充(pad)和遮擋(mask)解碼器獲取到的輸入的后續標記(future tokens)。
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    # 把該掩蓋的都給掩蓋掉
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

創建檢查點的路徑和檢查點管理器(manager)。這將用於在每 n 個周期(epochs)保存檢查點。

checkpoint_path = "./checkpoints/train"

ckpt = tf.train.Checkpoint(transformer=transformer,
                           optimizer=optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 如果檢查點存在,則恢復最新的檢查點。
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

目標(target)被分成了 tar_inp 和 tar_real。tar_inp 作為輸入傳遞到解碼器。tar_real 是位移了 1 的同一個輸入:在 tar_inp 中的每個位置,tar_real 包含了應該被預測到的下一個標記(token)。

例如,sentence = "SOS A lion in the jungle is sleeping EOS"

tar_inp = "SOS A lion in the jungle is sleeping"

tar_real = "A lion in the jungle is sleeping EOS"

Transformer 是一個自回歸(auto-regressive)模型:它一次作一個部分的預測,然后使用到目前為止的自身的輸出來決定下一步要做什么。

在訓練過程中,本示例使用了 teacher-forcing 的方法(就像文本生成教程中一樣)。無論模型在當前時間步驟下預測出什么,teacher-forcing 方法都會將真實的輸出傳遞到下一個時間步驟上。

當 transformer 預測每個詞時,自注意力(self-attention)功能使它能夠查看輸入序列中前面的單詞,從而更好地預測下一個單詞。

為了防止模型在期望的輸出上達到峰值,模型使用了前瞻遮擋(look-ahead mask)。

EPOCHS = 20
# 該 @tf.function 將追蹤-編譯 train_step 到 TF 圖中,以便更快地
# 執行。該函數專用於參數張量的精確形狀。為了避免由於可變序列長度或可變
# 批次大小(最后一批次較小)導致的再追蹤,使用 input_signature 指定
# 更多的通用形狀。

train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
]

@tf.function(input_signature=train_step_signature)
def train_step(inp, tar):
  tar_inp = tar[:, :-1]
  tar_real = tar[:, 1:]
  
  enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
  
  with tf.GradientTape() as tape:
    predictions, _ = transformer(inp, tar_inp, 
                                 True, 
                                 enc_padding_mask, 
                                 combined_mask, 
                                 dec_padding_mask)
    loss = loss_function(tar_real, predictions)

  gradients = tape.gradient(loss, transformer.trainable_variables)    
  optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
  
  train_loss(loss)
  train_accuracy(tar_real, predictions)

葡萄牙語作為輸入語言,英語為目標語言。

for epoch in range(EPOCHS):
    start = time.time()

    train_loss.reset_states()
    train_accuracy.reset_states()
  
    # inp -> portuguese, tar -> english
    for (batch, (inp, tar)) in enumerate(train_dataset):
        train_step(inp, tar)

        if batch % 50 == 0:
            print ('Epoch {} Batch {} Loss {:.4f} Accuracy {:.4f}'.format(
                  epoch + 1, batch, train_loss.result(), train_accuracy.result()))
      
    if (epoch + 1) % 5 == 0:
        ckpt_save_path = ckpt_manager.save()
        print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                             ckpt_save_path))
    
    print ('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, 
                                                train_loss.result(), 
                                                train_accuracy.result()))

    print ('Time taken for 1 epoch: {} secs\n'.format(time.time() - start))

評估(Evaluate)

以下步驟用於評估:

  • 用葡萄牙語分詞器(tokenizer_pt)編碼輸入語句。此外,添加開始和結束標記,這樣輸入就與模型訓練的內容相同。這是編碼器輸入。
  • 解碼器輸入為 start token == tokenizer_en.vocab_size
  • 計算填充遮擋和前瞻遮擋。
  • 解碼器通過查看編碼器輸出和它自身的輸出(自注意力)給出預測。
  • 選擇最后一個詞並計算它的 argmax。
  • 將預測的詞連接到解碼器輸入,然后傳遞給解碼器。
  • 在這種方法中,解碼器根據它預測的之前的詞預測下一個。

Note:這里使用的模型具有較小的能力以保持相對較快,因此預測可能不太正確。要復現論文中的結果,請使用全部數據集,並通過修改上述超參數來使用基礎 transformer 模型或者 transformer XL。

def evaluate(inp_sentence):
    start_token = [tokenizer_pt.vocab_size]
    end_token = [tokenizer_pt.vocab_size + 1]

    # 輸入語句是葡萄牙語,增加開始和結束標記
    inp_sentence = start_token + tokenizer_pt.encode(inp_sentence) + end_token
    encoder_input = tf.expand_dims(inp_sentence, 0)

    # 因為目標是英語,輸入 transformer 的第一個詞應該是
    # 英語的開始標記。
    decoder_input = [tokenizer_en.vocab_size]
    output = tf.expand_dims(decoder_input, 0)
    
    for i in range(MAX_LENGTH):
        enc_padding_mask, combined_mask, dec_padding_mask = create_masks(
            encoder_input, output)

        # predictions.shape == (batch_size, seq_len, vocab_size)
        predictions, attention_weights = transformer(encoder_input, 
                                                     output,
                                                     False,
                                                     enc_padding_mask,
                                                     combined_mask,
                                                     dec_padding_mask)

        # 從 seq_len 維度選擇最后一個詞
        predictions = predictions[: ,-1:, :]  # (batch_size, 1, vocab_size)

        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

        # 如果 predicted_id 等於結束標記,就返回結果
        if predicted_id == tokenizer_en.vocab_size+1:
            return tf.squeeze(output, axis=0), attention_weights

        # 連接 predicted_id 與輸出,作為解碼器的輸入傳遞到解碼器。
        output = tf.concat([output, predicted_id], axis=-1)

    return tf.squeeze(output, axis=0), attention_weights
def plot_attention_weights(attention, sentence, result, layer):
  fig = plt.figure(figsize=(16, 8))
  
  sentence = tokenizer_pt.encode(sentence)
  
  attention = tf.squeeze(attention[layer], axis=0)
  
  for head in range(attention.shape[0]):
    ax = fig.add_subplot(2, 4, head+1)
    
    # 畫出注意力權重
    ax.matshow(attention[head][:-1, :], cmap='viridis')

    fontdict = {'fontsize': 10}
    
    ax.set_xticks(range(len(sentence)+2))
    ax.set_yticks(range(len(result)))
    
    ax.set_ylim(len(result)-1.5, -0.5)
        
    ax.set_xticklabels(
        ['<start>']+[tokenizer_pt.decode([i]) for i in sentence]+['<end>'], 
        fontdict=fontdict, rotation=90)
    
    ax.set_yticklabels([tokenizer_en.decode([i]) for i in result 
                        if i < tokenizer_en.vocab_size], 
                       fontdict=fontdict)
    
    ax.set_xlabel('Head {}'.format(head+1))
  
  plt.tight_layout()
  plt.show()
def translate(sentence, plot=''):
  result, attention_weights = evaluate(sentence)
  
  predicted_sentence = tokenizer_en.decode([i for i in result 
                                            if i < tokenizer_en.vocab_size])  

  print('Input: {}'.format(sentence))
  print('Predicted translation: {}'.format(predicted_sentence))
  
  if plot:
    plot_attention_weights(attention_weights, sentence, result, plot)
translate("este é um problema que temos que resolver.")
print ("Real translation: this is a problem we have to solve .")
Input: este é um problema que temos que resolver.
Predicted translation: this is a problem that we have to solve the united states is that we have to solve the world .
Real translation: this is a problem we have to solve .
translate("os meus vizinhos ouviram sobre esta ideia.")
print ("Real translation: and my neighboring homes heard about this idea .")
Input: os meus vizinhos ouviram sobre esta ideia.
Predicted translation: my neighbors heard about this idea .
Real translation: and my neighboring homes heard about this idea .
translate("vou então muito rapidamente partilhar convosco algumas histórias de algumas coisas mágicas que aconteceram.")
print ("Real translation: so i 'll just share with you some stories very quickly of some magical things that have happened .")
Input: vou então muito rapidamente partilhar convosco algumas histórias de algumas coisas mágicas que aconteceram.
Predicted translation: so i 'm going to share with you a couple of exciting stories of some magical things that happened .
Real translation: so i 'll just share with you some stories very quickly of some magical things that have happened .

您可以為 plot 參數傳遞不同的層和解碼器的注意力模塊。

translate("este é o primeiro livro que eu fiz.", plot='decoder_layer4_block2')
print ("Real translation: this is the first book i've ever done.")
Input: este é o primeiro livro que eu fiz.
Predicted translation: this is the first book that i made .

image
Real translation: this is the first book i've ever done.

總結

在本教程中,您已經學習了位置編碼,多頭注意力,遮擋的重要性以及如何創建一個 transformer。
嘗試使用一個不同的數據集來訓練 transformer。您可也可以通過修改上述的超參數來創建基礎 transformer 或者 transformer XL。您也可以使用這里定義的層來創建 BERT 並訓練最先進的模型。此外,您可以實現 beam search 得到更好的預測。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM