三分鍾快速上手TensorFlow 2.0 (上)——前置基礎、模型建立與可視化


本文學習筆記參照來源:https://tf.wiki/zh/basic/basic.html

學習筆記類似提綱,具體細節參照上文鏈接

 

一些前置的基礎

隨機數      tf.random uniform(shape())

兩個元素零向量  tf.zeros(shape=(2))

2x2常量      tf.constant([1,2],[3,4])

查看形狀、類型、值     A.shape A.dtype A.numpy()

矩陣相加         tf.add(A,B)

矩陣相乘         tf.matmul(A,B)

 自動求導機制        tf.GradientTape()

import tensorflow as tf

x = tf.Variable(initial_value=3.)
with tf.GradientTape() as tape:     # 在 tf.GradientTape() 的上下文內,所有計算步驟都會被記錄以用於求導
    y = tf.square(x)
y_grad = tape.gradient(y, x)        # 計算y關於x的導數
print([y, y_grad])
單元函數求偏導
X = tf.constant([[1., 2.], [3., 4.]])
y = tf.constant([[1.], [2.]])
w = tf.Variable(initial_value=[[1.], [2.]])
b = tf.Variable(initial_value=1.)
with tf.GradientTape() as tape:
    L = 0.5 * tf.reduce_sum(tf.square(tf.matmul(X, w) + b - y))
w_grad, b_grad = tape.gradient(L, [w, b])        # 計算L(w, b)關於w, b的偏導數
print([L.numpy(), w_grad.numpy(), b_grad.numpy()])
多元函數求偏導

 

事例:線性回歸(梯度下降)

import numpy as np

X_raw = np.array([2013, 2014, 2015, 2016, 2017], dtype=np.float32)
y_raw = np.array([12000, 14000, 15000, 16500, 17500], dtype=np.float32)

X = (X_raw - X_raw.min()) / (X_raw.max() - X_raw.min())
y = (y_raw - y_raw.min()) / (y_raw.max() - y_raw.min())
基本的歸一化操作
對於多元函數 f(x) 求局部極小值,梯度下降 的過程如下:

初始化自變量為 x_0 , k=0
迭代進行下列步驟直到滿足收斂條件:

求函數 f(x) 關於自變量的梯度 \nabla f(x_k)
更新自變量: x_{k+1} = x_{k} - \gamma \nabla f(x_k) 。這里 \gamma 是學習率(也就是梯度下降一次邁出的 “步子” 大小)
k \leftarrow k+1
接下來,我們考慮如何使用程序來實現梯度下降方法,求得線性回歸的解 \min_{a, b} L(a, b) = \sum_{i=1}^n(ax_i + b - y_i)^2 。
梯度下降知識點
a, b = 0, 0

num_epoch = 10000
learning_rate = 1e-3
for e in range(num_epoch):
    # 手動計算損失函數關於自變量(模型參數)的梯度
    y_pred = a * X + b
    grad_a, grad_b = (y_pred - y).dot(X), (y_pred - y).sum()

    # 更新參數
    a, b = a - learning_rate * grad_a, b - learning_rate * grad_b

print(a, b)


# np.dot() 是求內積, np.sum() 是求和
# 手工求損失函數關於參數 a 和 b 的偏導數
NumPy 下的線性回歸
#使用 tape.gradient(ys, xs) 自動計算梯度;
#使用 optimizer.apply_gradients(grads_and_vars) 自動更新模型參數。

X = tf.constant(X)
y = tf.constant(y)

a = tf.Variable(initial_value=0.)
b = tf.Variable(initial_value=0.)
variables = [a, b]

num_epoch = 10000
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
for e in range(num_epoch):
    # 使用tf.GradientTape()記錄損失函數的梯度信息
    with tf.GradientTape() as tape:
        y_pred = a * X + b
        loss = 0.5 * tf.reduce_sum(tf.square(y_pred - y))
    # TensorFlow自動計算損失函數關於自變量(模型參數)的梯度
    grads = tape.gradient(loss, variables)
    # TensorFlow自動根據梯度更新參數
    optimizer.apply_gradients(grads_and_vars=zip(grads, variables))

print(a, b)
TensorFlow 下的線性回歸

 

TensorFlow 模型建立與訓練 

模型的構建: tf.keras.Model 和 tf.keras.layers

模型的損失函數: tf.keras.losses

模型的優化器: tf.keras.optimizer

模型的評估: tf.keras.metrics

 

Keras 模型以類的形式呈現

通過繼承 tf.keras.Model 這個 Python 類來定義自己的模型。

需要重寫 __init__() (構造函數,初始化)和 call(input)(模型調用)兩個方法

同時也可以根據需要增加自定義的方法。

class MyModel(tf.keras.Model):
    def __init__(self):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此處添加初始化代碼(包含 call 方法中會用到的層),例如
        # layer1 = tf.keras.layers.BuiltInLayer(...)
        # layer2 = MyCustomLayer(...)

    def call(self, input):
        # 此處添加模型調用的代碼(處理輸入並返回輸出),例如
        # x = layer1(input)
        # output = layer2(x)
        return output

    # 還可以添加自定義的方法
繼承 tf.keras.Model

 

 

import tensorflow as tf

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])


class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )

    def call(self, input):
        output = self.dense(input)
        return output


# 以下代碼結構與前節類似
model = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(X)      # 調用模型 y_pred = model(X) 而不是顯式寫出 y_pred = a * X + b
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)    # 使用 model.variables 這一屬性直接獲得模型中的所有變量
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)
線性模型 y_pred = a * X + b


全連接層 是 Keras 中最基礎和常用的層之一

對輸入矩陣 A 進行 f(AW + b) 的線性變換 + 激活函數操作。

 

這里的等號其實是經過了一個激活函數后得到最后的結果

 

主要參數如下:

  • units :輸出張量的維度;

  • activation :激活函數,對應於 f(AW + b) 中的 f ,默認為無激活函數( a(x) x )。常用的激活函數包括 tf.nn.relu 、 tf.nn.tanh 和 tf.nn.sigmoid

  • use_bias :是否加入偏置向量 bias ,即 f(AW + b) 中的 b。默認為 True ;

  • kernel_initializer 、 bias_initializer :權重矩陣 kernel 和偏置向量 bias 兩個變量的初始化器。默認為 tf.glorot_uniform_initializer 1 。設置為 tf.zeros_initializer 表示將兩個變量均初始化為全 0;

該層包含權重矩陣 kernel [input_dim, units] 和偏置向量 bias [units] 2 兩個可訓練變量,對應於 f(AW + b) 中的 W 和 b

 

基礎示例:多層感知機MLP

數據獲取及預處理: tf.keras.datasets

tf.keras.datasets
這里讀入的是灰度圖片,色彩通道數為 1(彩色 RGB 圖像色彩通道數為 3),所以我們使用  np.expand_dims()函數為圖像數據手動在最后添加一維通道。

模型的構建: tf.keras.Model 和 tf.keras.layers

class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()    # Flatten層將除第一維(batch_size)以外的維度展平
        self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):         # [batch_size, 28, 28, 1]
        x = self.flatten(inputs)    # [batch_size, 784]
        x = self.dense1(x)          # [batch_size, 100]
        x = self.dense2(x)          # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output
tf.keras.Model 和 tf.keras.layers

輸出 “輸入圖片分別屬於 0 到 9 的概率”,也就是一個 10 維的離散概率分布

這個 10 維向量至少滿足兩個條件:

  • 該向量中的每個元素均在 [0, 1] 之間;

  • 該向量的所有元素之和為 1。

softmax 函數能夠凸顯原始向量中最大的值,並抑制遠低於最大值的其他分量

模型的訓練: tf.keras.losses 和 tf.keras.optimizer

num_epochs = 5
batch_size = 50
learning_rate = 0.001
定義一些模型超參數
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
實例化模型和數據讀取類,並實例化一個 tf.keras.optimizer 的優化器
num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
迭代
    • 從 DataLoader 中隨機取一批訓練數據;

    • 將這批數據送入模型,計算出模型的預測值;

    • 將模型預測值與真實值進行比較,計算損失函數(loss)。這里使用 tf.keras.losses中的交叉熵函數作為損失函數;

    • 計算損失函數關於模型變量的導數;

    • 將求出的導數值傳入優化器,使用優化器的 apply_gradients 方法更新模型參數以最小化損失函數。

交叉熵作為損失函數,在分類問題中被廣泛應用。其離散形式為 H(y, \hat{y}) = -\sum_{i=1}^{n}y_i \log(\hat{y_i}) ,其中 y 為真實概率分布, \hat{y} 為預測概率分布, n 為分類任務的類別個數。預測概率分布與真實分布越接近,則交叉熵的值越小,反之則越大。

在 tf.keras 中,有兩個交叉熵相關的損失函數 tf.keras.losses.categorical_crossentropy 和 tf.keras.losses.sparse_categorical_crossentropy 。其中 sparse 的含義是,真實的標簽值 y_true 可以直接傳入 int 類型的標簽類別。具體而言:

loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred) 

loss = tf.keras.losses.categorical_crossentropy( y_true=tf.one_hot(y, depth=tf.shape(y_pred)[-1]), y_pred=y_pred ) 

的結果相同。

模型的評估: tf.keras.metrics

sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())
tf.keras.metrics
每次通過 update_state() 方法向評估器輸入兩個參數: y_pred 和 y_true ,即模型預測出的結果和真實結果。

 

 

 

 

卷積神經網絡(CNN)

包含一個或多個卷積層(Convolutional Layer)、池化層(Pooling Layer)和全連接層(Fully-connected Layer)

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,             # 卷積層神經元(卷積核)數目
            kernel_size=[5, 5],     # 感受野大小
            padding='same',         # padding策略(vaild 或 same)
            activation=tf.nn.relu   # 激活函數
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding='same',
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)                  # [batch_size, 28, 28, 32]
        x = self.pool1(x)                       # [batch_size, 14, 14, 32]
        x = self.conv2(x)                       # [batch_size, 14, 14, 64]
        x = self.pool2(x)                       # [batch_size, 7, 7, 64]
        x = self.flatten(x)                     # [batch_size, 7 * 7 * 64]
        x = self.dense1(x)                      # [batch_size, 1024]
        x = self.dense2(x)                      # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output
使用 Keras 實現卷積神經網絡

將模型的訓練中實例化模型的model MLP() 更換成 model CNN()即可

import tensorflow as tf
import tensorflow_datasets as tfds

num_batches = 1000
batch_size = 50
learning_rate = 0.001

dataset = tfds.load("tf_flowers", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(lambda img, label: (tf.image.resize(img, [224, 224]) / 255.0, label)).shuffle(1024).batch(32)
model = tf.keras.applications.MobileNetV2(weights=None, classes=5)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for images, labels in dataset:
    with tf.GradientTape() as tape:
        labels_pred = model(images)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=labels_pred)
        loss = tf.reduce_mean(loss)
        print("loss %f" % loss.numpy())
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
使用 Keras 中預定義的經典卷積神經網絡結構
model = tf.keras.applications.MobileNetV2() 實例化一個 MobileNetV2 網絡結構

共通的常用參數如下:

  • input_shape :輸入張量的形狀(不含第一維的 Batch),大多默認為 224 × 224 × 3。一般而言,模型對輸入張量的大小有下限,長和寬至少為 32 × 32 或 75 × 75 ;

  • include_top :在網絡的最后是否包含全連接層,默認為 True ;

  • weights :預訓練權值,默認為 'imagenet' ,即為當前模型載入在 ImageNet 數據集上預訓練的權值。如需隨機初始化變量可設為 None ;

  • classes :分類數,默認為 1000。修改該參數需要 include_top 參數為 True 且 weights 參數為 None 。

 卷積示意圖

一個單通道的 7×7 圖像在通過一個感受野為 3×3 ,參數為 10 個的卷積層神經元后,得到 5×5 的矩陣

在 tf.keras.layers.Conv2D 中 padding 參數設為 same 時,會將周圍缺少的部分使用 0 補齊,使得輸出的矩陣大小和輸入一致。

通過 tf.keras.layers.Conv2D 的 strides 參數即可設置步長(默認為 1)。比如,在上面的例子中,如果我們將步長設定為 2,輸出的卷積結果即會是一個 3×3 的矩陣。

 

池化層(Pooling Layer)的理解則簡單得多,其可以理解為對圖像進行降采樣的過程,對於每一次滑動窗口中的所有值,輸出其中的最大值(MaxPooling)、均值或其他方法產生的值。

例如,對於一個三通道的 16×16 圖像(即一個 16*16*3 的張量),經過感受野為 2×2,滑動步長為 2 的池化層,則得到一個 8*8*3 的張量。

 

 

 

循環神經網絡(RNN)

適宜於處理序列數據的神經網絡,被廣泛用於語言模型、文本生成、機器翻譯等

來看一下尼采風格文本的自動生成

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
            origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [batch_size, seq_length], [num_batch]
讀取文本,並以字符為單位進行編碼
字符種類數為 num_chars ,則每種字符賦予一個 0 到 num_chars 1 之間的唯一整數編號 i
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)
模型的實現
在 __init__ 方法中我們實例化一個常用的 LSTMCell 單元,以及一個線性變換用的全連接層,我們首先對序列進行 “One Hot” 操作,即將序列中的每個字符的編碼 i 均變換為一個 num_char 維向量,其第 i 位為 1,其余均為 0。變換后的序列張量形狀為 [seq_length, num_chars] 。然后,我們初始化 RNN 單元的狀態,存入變量 state 中。接下來,將序列從頭到尾依次送入 RNN 單元,即在 t 時刻,將上一個時刻 t-1 的 RNN 單元狀態 state 和序列的第 t 個元素 inputs[t, :] 送入 RNN 單元,得到當前時刻的輸出 output 和 RNN 單元狀態。取 RNN 單元最后一次的輸出,通過全連接層變換到 num_chars 維,即作為模型的輸出。

 

 

num_batches = 1000
seq_length = 40
batch_size = 50
learning_rate = 1e-3
定義一些模型超參數
data_loader = DataLoader()
    model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(seq_length, batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
模型訓練
    • 從 DataLoader 中隨機取一批訓練數據;

    • 將這批數據送入模型,計算出模型的預測值;

    • 將模型預測值與真實值進行比較,計算損失函數(loss);

    • 計算損失函數關於模型變量的導數;

    • 使用優化器更新模型參數以最小化損失函數。

 def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)
        prob = tf.nn.softmax(logits / temperature).numpy()
        return np.array([np.random.choice(self.num_chars, p=prob[i, :])
                         for i in range(batch_size.numpy())])

X_, _ = data_loader.get_batch(seq_length, 1)
    for diversity in [0.2, 0.5, 1.0, 1.2]:
        X = X_
        print("diversity %f:" % diversity)
        for t in range(400):
            y_pred = model.predict(X, diversity)
            print(data_loader.indices_char[y_pred[0]], end='', flush=True)
            X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
        print("\n")
連續預測得到生成文本
使用 tf.argmax() 函數,將對應概率最大的值作為預測值對於文本生成而言,這樣的預測方式過於絕對,會使得生成的文本失去豐富性。
使用 np.random.choice() 函數按照生成的概率分布取樣。這樣,即使是對應概率較小的字符,也有機會被取樣到。
同時,加入一個 temperature 參數控制分布的形狀,參數值越大則分布越平緩(最大值和最小值的差值越小),生成文本的豐富度越高;參數值越小則分布越陡峭,生成文本的豐富度越低。

 

 

 

 

 

 

深度強化學習(DRL)

強調如何基於環境而行動,以取得最大化的預期利益。

使用深度強化學習玩 CartPole(倒立擺)游戲

import gym

env = gym.make('CartPole-v1')       # 實例化一個游戲環境,參數為游戲名稱
state = env.reset()                 # 初始化環境,獲得初始狀態
while True:
    env.render()                    # 對當前幀進行渲染,繪圖到屏幕
    action = model.predict(state)   # 假設我們有一個訓練好的模型,能夠通過當前狀態預測出這時應該進行的動作
    next_state, reward, done, info = env.step(action)   # 讓環境執行動作,獲得執行完動作的下一個狀態,動作的獎勵,游戲是否已結束以及額外信息
    if done:                        # 如果游戲結束則退出循環
        break
Gym 的基本調用方法
import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque

num_episodes = 500              # 游戲訓練的總episode數量
num_exploration_episodes = 100  # 探索過程所占的episode數量
max_len_episode = 1000          # 每個episode的最大回合數
batch_size = 32                 # 批次大小
learning_rate = 1e-3            # 學習率
gamma = 1.                      # 折扣因子
initial_epsilon = 1.            # 探索起始時的探索率
final_epsilon = 0.01            # 探索終止時的探索率
定義一些模型超參數
class QNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)
tf.keras.Model 建立一個 Q 函數網絡(Q-network)
使用較簡單的多層全連接神經網絡進行擬合。該網絡輸入當前狀態,輸出各個動作下的 Q-value(CartPole 下為 2 維,即向左和向右推動小車)。
if __name__ == '__main__':
    env = gym.make('CartPole-v1')       # 實例化一個游戲環境,參數為游戲名稱
    model = QNetwork()
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    replay_buffer = deque(maxlen=10000) # 使用一個 deque 作為 Q Learning 的經驗回放池
    epsilon = initial_epsilon
    for episode_id in range(num_episodes):
        state = env.reset()             # 初始化環境,獲得初始狀態
        epsilon = max(                  # 計算當前探索率
            initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
            final_epsilon)
        for t in range(max_len_episode):
            env.render()                                # 對當前幀進行渲染,繪圖到屏幕
            if random.random() < epsilon:               # epsilon-greedy 探索策略,以 epsilon 的概率選擇隨機動作
                action = env.action_space.sample()      # 選擇隨機動作(探索)
            else:
                action = model.predict(np.expand_dims(state, axis=0)).numpy()   # 選擇模型計算出的 Q Value 最大的動作
                action = action[0]

            # 讓環境執行動作,獲得執行完動作的下一個狀態,動作的獎勵,游戲是否已結束以及額外信息
            next_state, reward, done, info = env.step(action)
            # 如果游戲Game Over,給予大的負獎勵
            reward = -10. if done else reward
            # 將(state, action, reward, next_state)的四元組(外加 done 標簽表示是否結束)放入經驗回放池
            replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
            # 更新當前 state
            state = next_state

            if done:                                    # 游戲結束則退出本輪循環,進行下一個 episode
                print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
                break

            if len(replay_buffer) >= batch_size:
                # 從經驗回放池中隨機取一個批次的四元組,並分別轉換為 NumPy 數組
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                    *random.sample(replay_buffer, batch_size))
                batch_state, batch_reward, batch_next_state, batch_done = \
                    [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
                batch_action = np.array(batch_action, dtype=np.int32)

                q_value = model(batch_next_state)
                y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)  # 計算 y 值
                with tf.GradientTape() as tape:
                    loss = tf.keras.losses.mean_squared_error(  # 最小化 y 和 Q-value 的距離
                        y_true=y,
                        y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
                    )
                grads = tape.gradient(loss, model.variables)
                optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))       # 計算梯度並更新參數
主程序中實現 Q Learning 算法
將上面的 QNetwork 更換為 CNN 網絡,並對狀態做一些修改,即可用於玩一些簡單的視頻游戲

 

 以上示例均使用了 Keras 的 Subclassing API 建立模型,即對 tf.keras.Model 類進行擴展以定義自己的新模型,同時手工編寫了訓練和評估模型的流程。

 

Keras Pipeline *

只需要建立一個結構相對簡單和典型的神經網絡(比如上文中的 MLP 和 CNN),並使用常規的手段進行訓練。這時,Keras 也給我們提供了另一套更為簡單高效的內置方法來建立、訓練和評估模型。

model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(100, activation=tf.nn.relu),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Softmax()
        ])
Keras Sequential/Functional API 模式建立模型
通過向 tf.keras.models.Sequential() 提供一個層的列表,就能快速地建立一個 tf.keras.Model 模型並返回
 
 
這種層疊結構並不能表示任意的神經網絡結構,Functional API,幫助我們建立更為復雜的模型
inputs = tf.keras.Input(shape=(28, 28, 1))
        x = tf.keras.layers.Flatten()(inputs)
        x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
        x = tf.keras.layers.Dense(units=10)(x)
        outputs = tf.keras.layers.Softmax()(x)
        model = tf.keras.Model(inputs=inputs, outputs=outputs)
建立更為復雜的模型
使用方法是將層作為可調用的對象並返回張量(這點與之前章節的使用方法一致),並將輸入向量和輸出向量提供給 tf.keras.Model 的 inputs 和 outputs 參數
model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )
模型建立完成后,通過 tf.keras.Model 的 compile 方法配置訓練過程

接受 3 個重要的參數:

  • oplimizer :優化器,可從 tf.keras.optimizers 中選擇;

  • loss :損失函數,可從 tf.keras.losses 中選擇;

  • metrics :評估指標,可從 tf.keras.metrics 中選擇

model.fit(data_loader.train_data, data_loader.train_label, epochs=num_epochs, batch_size=batch_size)
使用 tf.keras.Model 的 fit 方法訓練模型

接受 5 個重要的參數:

  • x :訓練數據;

  • y :目標數據(數據標簽);

  • epochs :將訓練數據迭代多少遍;

  • batch_size :批次的大小;

  • validation_data :驗證數據,可用於在訓練過程中監控模型的性能。

print(model.evaluate(data_loader.test_data, data_loader.test_label))
使用 tf.keras.Model.evaluate 評估訓練效果
提供測試數據及標簽即可

 

如果現有的這些層無法滿足我的要求,我需要定義自己的層怎么辦?

事實上,不僅可以繼承 tf.keras.Model 編寫自己的模型類,也可以繼承 tf.keras.layers.Layer 編寫自己的層。

class MyLayer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        # 初始化代碼

    def build(self, input_shape):     # input_shape 是一個 TensorShape 類型對象,提供輸入的形狀
        # 在第一次使用該層的時候調用該部分代碼,在這里創建變量可以使得變量的形狀自適應輸入的形狀
        # 而不需要使用者額外指定變量形狀。
        # 如果已經可以完全確定變量的形狀,也可以在__init__部分創建變量
        self.variable_0 = self.add_weight(...)
        self.variable_1 = self.add_weight(...)

    def call(self, inputs):
        # 模型調用的代碼(處理輸入並返回輸出)
        return output
自定義層
需要繼承 tf.keras.layers.Layer 類,並重寫 __init__ 、 build 和 call 三個方法
 
事例
class LinearLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):     # 這里 input_shape 是第一次運行call()時參數inputs的形狀
        self.w = self.add_variable(name='w',
            shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
        self.b = self.add_variable(name='b',
            shape=[self.units], initializer=tf.zeros_initializer())

    def call(self, inputs):
        y_pred = tf.matmul(inputs, self.w) + self.b
        return y_pred
自己實現一個前文提到的的全連接層( tf.keras.layers.Dense )
在 build 方法中創建兩個變量,並在 call 方法中使用創建的變量進行運算
class LinearModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.layer = LinearLayer(units=1)

    def call(self, inputs):
        output = self.layer(inputs)
        return output
定義模型的時候,我們便可以如同 Keras 中的其他層一樣,調用我們自定義的層 LinearLayer

 

class MeanSquaredError(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        return tf.reduce_mean(tf.square(y_pred - y_true))
自定義損失函數
需要繼承 tf.keras.losses.Loss 類,重寫 call 方法即可,輸入真實值 y_true 和模型預測值 y_pred ,輸出模型預測值和真實值之間通過自定義的損失函數計算出的損失值。上面的示例為均方差損失函數

 

class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self):
        super().__init__()
        self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
        self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
        self.total.assign_add(tf.shape(y_true)[0])
        self.count.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.count / self.total
自定義評估指標
需要繼承 tf.keras.metrics.Metric 類,並重寫 __init__ 、 update_state 和 result 三個方法。上面的示例對前面用到的 SparseCategoricalAccuracy評估指標類做了一個簡單的重實現:

 

下一篇:

三分鍾快速上手TensorFlow 2.0 (中)——常用模塊和模型的部署

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM