Tensorflow 2.x入門教程


前言

  至於為什么寫這個教程,首先是為了自己學習做個記錄,其次是因為Tensorflow的API寫的很好,但是他的教程寫的太亂了,不適合新手學習。tensorflow 1 和tensorflow 2 有相似之處但是不兼容,tensorflow 2將keras融合了。TensorFlow™ 是一個采用 數據流圖(data flow graphs),用於數值計算的開源軟件庫。圖中得節點(Nodes)表示數學操作,圖中的線(edges)則表示在節點間相互聯系的多維數據數組,即張量(tensor)。它靈活的架構讓你可以在多種平台上展開計算,例如台式計算機中的一個或多個CPU(或GPU),服務器,移動設備等等。

TensorFlow的主要優點:

  • 靈活性:支持底層數值計算,C++自定義操作符
  • 可移植性:從服務器到PC到手機,從CPU到GPU到TPU
  • 分布式計算:分布式並行計算,可指定操作符對應計算設備

層次結構

  TensorFlow的層次結構從低到高可以分成如下五層:硬件層,內核層,低階API,中階API,高階API。

  • 第一層:硬件層,TensorFlow支持CPU、GPU或TPU加入計算資源池。
  • 第二層:內核層,為C++實現的內核,kernel可以跨平台分布運行。
  • 第三層:低階API,由Python實現的操作符,提供了封裝C++內核的低級API指令。主要包括各種張量操作算子、計算圖、自動微分。如tf.Variable,tf.constant,tf.function,tf.GradientTape,tf.nn.softmax... 
  • 第四層:中階API,由Python實現的模型組件,對低級API進行了函數封裝。主要包括數據管道(tf.data)、特征列(tf.feature_column)、激活函數(tf.nn)、模型層(tf.keras.layers)、損失函數(tf.keras.losses)、評估函數(tf.keras.metrics)、優化器(tf.keras.optimizers)、回調函數(tf.keras.callbacks) 等等。
  • 第五層:高階API,由Python實現的模型成品。主要為tf.keras.models提供的模型的類接口,主要包括:模型的構建(Sequential、functional API、Model子類化)、型的訓練(內置fit方法、內置train_on_batch方法、自定義訓練循環、單GPU訓練模型、多GPU訓練模型、TPU訓練模型)、模型的部署(tensorflow serving部署模型、使用spark(scala)調用tensorflow模型)。 

概述

創建張量

tf.constant(value, dtype=tf.float32)    # 常數
tf.range(start, limit=None, delta=1)    # 生成一個范圍內間隔為delta的 張量
tf.linspace(start, stop, num) # 在一個間隔內生成均勻間隔的值
tf.zeros()  # 創建全0張量
tf.ones()  # 創建全1張量
tf.zeros_like(input)  # 創建和input一樣大小的張量
tf.fill(dims, value)    # 創建shape為dim,全為value的張量

tf.random.uniform([5], minval=0, maxval=10)     # 均勻分布隨機
tf.random.normal([3, 3], mean=0.0, stddev=1.0)  # 正態分布隨機

tf.Variable(initial_value)  # 變量

tf.Variable:

  • name:變量的名字,默認情況下,會自動獲得唯一的變量名
  • trainable:設置為 False 可以關閉梯度。例如,訓練計步器就是一個不需要梯度的變量

tf.rank(a):求矩陣的秩

變量的設備位置

為了提高性能,TensorFlow 會嘗試將張量和變量放在與其 dtype 兼容的最快設備上。這意味着如果有 GPU,那么大部分變量都會放置在 GPU 上,不過,我們可以重寫變量的位置。

with tf.device('CPU:0'):
  # Create some tensors
  a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
  b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
  c = tf.matmul(a, b)

print(c)

使用assign重新分配張量

a.assign([5, 6])  # a = [5, 6]
a.assign_add([2, 3])  # a = a+[2,3]
a.assign_sub([7, 9])  # a=a-[7,9]

維度變換

維度變換相關函數主要有 tf.reshape,tf.squeeze,tf.expand_dims,tf.transpose。

  • tf.reshape():改變張量的形狀
  • tf.squeeze():減少維度為1的維度
  • tf.expand_dims(input, axis):增加維度
  • tf.transpose(a, perm=None):交換維度

tf.reshape可以改變張量的形狀,但是其本質上不會改變張量元素的存儲順序,所以,該操作實際上非常迅速,並且是可逆的。

合並分隔

和numpy類似,可以用tf.concattf.stack方法對多個張量進行合並,可以用tf.split方法把一個張量分割成多個張量。

tf.concat和tf.stack有略微的區別,tf.concat是連接,不會增加維度,而tf.stack是堆疊,會增加維度。

a = tf.constant([[1.0, 2.0], [3.0, 4.0]])  # (2,2)
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])  # (2,2)

c = tf.concat([a, b], axis=0)  # (4, 2)
c = tf.stack([a, b], axis=0)  # (2, 2, 2)
d = tf.split(c, 2, axis=0)  # [(1, 2, 2),(1, 2, 2)]

Tensor與Array的轉換

c = np.array(b)     # tensor 轉 np
c = b.numpy()       # tensor 轉 np
tf.convert_to_tensor(c)     # np 轉 tensor

數學運算

tf.add(a,b)        # 加法 a+b
tf.multiply(a,b)       # 逐元素乘法a*b
tf.matmul(a,b)     # 矩陣乘法a@b

類型轉換

tensorflow支持的模型有:tf.float16、tf.float64、tf.int8、tf.int16、tf.int32...

a = tf.constant([2.2, 3.3, 4.4], dtype=tf.float64)
b = tf.cast(a, dtype=tf.float16)    # 類型轉換

計算圖

  有三種計算圖的構建方式:靜態計算圖動態計算圖,以及Autograph。在TensorFlow1.0時代,采用的是靜態計算圖,需要先使用TensorFlow的各種算子創建計算圖,然后再開啟一個會話Session,顯式執行計算圖。而在TensorFlow2.0時代,采用的是動態計算圖,即每使用一個算子后,該算子會被動態加入到隱含的默認計算圖中立即執行得到結果,而無需開啟Session。使用動態計算圖(Eager Excution)的好處是方便調試程序,它會讓TensorFlow代碼的表現和Python原生代碼的表現一樣,寫起來就像寫numpy一樣,各種日志打印,控制流全部都是可以使用的。使用動態計算圖的缺點是運行效率相對會低一些。因為使用動態圖會有許多次Python進程和TensorFlow的C++進程之間的通信。而靜態計算圖構建完成之后幾乎全部在TensorFlow內核上使用C++代碼執行,效率更高。此外靜態圖會對計算步驟進行一定的優化,剪去和結果無關的計算步驟。

  如果需要在TensorFlow2.0中使用靜態圖,可以使用@tf.function裝飾器將普通Python函數轉換成對應的TensorFlow計算圖構建代碼。運行該函數就相當於在TensorFlow1.0中用Session執行代碼。使用tf.function構建靜態圖的方式叫做 Autograph。當然Autograph機制能夠轉換的代碼並不是沒有任何約束的,有一些編碼規范需要遵循,否則可能會轉換失敗或者不符合預期。

  1. 被@tf.function修飾的函數應盡可能使用TensorFlow中的函數而不是Python中的其他函數。例如使用tf.print而不是print,使用tf.range而不是range,使用tf.constant(True)而不是True.
  2. 避免在@tf.function修飾的函數內部定義tf.Variable
  3. 被@tf.function修飾的函數不可修改該函數外部的列表或字典等數據結構變量

  計算圖由節點(nodes)和線(edges)組成。節點表示操作符Operator,或者稱之為算子,線表示計算間的依賴。實線表示有數據傳遞依賴,傳遞的數據即張量。虛線通常可以表示控制依賴,即執行先后順序。

import tensorflow as tf

# 使用autograph構建靜態圖
@tf.function
def strjoin(x,y):
    z =  tf.strings.join([x,y],separator = " ")
    tf.print(z)
    return z

result = strjoin(tf.constant("hello"),tf.constant("world"))

print(result)

您可以像這樣測量靜態圖和動態圖性能差異:

x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32)

def power(x, y):
  result = tf.eye(10, dtype=tf.dtypes.int32)
  for _ in range(y):
    result = tf.matmul(x, result)
  return result

print("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000))        # 2.56378621799

# 將python函數轉換為圖形
power_as_graph = tf.function(power)
print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000))        # 0.683253670
View Code

我們還可以再函數前使用裝飾器 @tf.function 調用tf.function,同時也可以使用 tf.config.run_functions_eagerly(True) 關閉Function創建和運行圖形的能力。

  前面在介紹Autograph的編碼規范時提到構建Autograph時應該避免在@tf.function修飾的函數內部定義tf.Variable。但是如果在函數外部定義tf.Variable的話,又會顯得這個函數有外部變量依賴,封裝不夠完美。一種簡單的思路是定義一個類,並將相關的tf.Variable創建放在類的初始化方法中。而將函數的邏輯放在其他方法中。

class DemoModule(tf.Module):
    def __init__(self, init_value=tf.constant(0.0), name=None):
        super(DemoModule, self).__init__(name=name)
        with self.name_scope:  # 相當於with tf.name_scope("demo_module")
            self.x = tf.Variable(init_value, dtype=tf.float32, trainable=True)

    @tf.function
    def addprint(self, a):
        with self.name_scope:
            self.x.assign_add(a)
            tf.print(self.x)
            return self.x
View Code

自動微分

  自動微分用於訓練神經網絡的反向傳播非常有用,TensorFlow 會記住在前向傳遞過程中哪些運算以何種順序發生。隨后,在后向傳遞期間,以相反的順序遍歷此運算列表來計算梯度。

Tensorflow一般使用tf.GradientTape來記錄正向運算過程,然后反向傳播自動計算梯度值。

$$f(x)=ax^2+bx+c$$

x = tf.Variable(0.0,name = "x",dtype = tf.float32)
a = tf.constant(1.0)
b = tf.constant(-2.0)
c = tf.constant(1.0)

with tf.GradientTape() as tape:
    y = a*tf.pow(x,2) + b*x + c
    
dy_dx = tape.gradient(y,x)
print(dy_dx)    # tf.Tensor(-2.0, shape=(), dtype=float32)

對常量張量也可以求導,只不過需要增加watch

with tf.GradientTape() as tape:
    tape.watch([a,b,c])
    y = a*tf.pow(x,2) + b*x + c
    
dy_dx,dy_da,dy_db,dy_dc = tape.gradient(y,[x,a,b,c])
print(dy_da)    # tf.Tensor(0.0, shape=(), dtype=float32)
print(dy_dc)    # tf.Tensor(1.0, shape=(), dtype=float32)
View Code

可以求二階導數

with tf.GradientTape() as tape2:
    with tf.GradientTape() as tape1:   
        y = a*tf.pow(x,2) + b*x + c
    dy_dx = tape1.gradient(y,x)   
dy2_dx2 = tape2.gradient(dy_dx,x)

print(dy2_dx2)    # tf.Tensor(2.0, shape=(), dtype=float32)
View Code

利用梯度和優化器求最小值

# 求f(x) = a*x**2 + b*x + c的最小值
# 使用optimizer.apply_gradients

x = tf.Variable(0.0,name = "x",dtype = tf.float32)
a = tf.constant(1.0)
b = tf.constant(-2.0)
c = tf.constant(1.0)

optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for _ in range(1000):
    with tf.GradientTape() as tape:
        y = a*tf.pow(x,2) + b*x + c
    dy_dx = tape.gradient(y,x)  # 計算梯度
    optimizer.apply_gradients(grads_and_vars=[(dy_dx,x)])  # 根據梯度更新變量
    
tf.print("y =",y,"; x =",x)
View Code

如果不想被計算梯度:

with tf.GradientTape(watch_accessed_variables=False) as tape:
    pass

使用TensorFlow實現神經網絡模型的一般流程包括:

  1. 准備數據
  2. 定義模型
  3. 訓練模型
  4. 評估模型
  5. 推理模型
  6. 保存模型

數據輸入

  tensorflow支持 Numpy 數組、Pandas DataFrame、Python 生成器、csv文件、文本文件、文件路徑、TFrecords文件等方式構建數據管道。如果您的數據很小並且適合內存,我們建議您使用tf.data.Dataset.from_tensor_slices()從Numpy array構建數據管道

  • Dataset:如果您有大型數據集並且需要進行分布式訓練
  • Sequence:如果您有大型數據集並且需要執行大量在 TensorFlow 中無法完成的自定義 Python 端處理(例如,如果您依賴外部庫進行數據加載或預處理)
  • 通過tfrecords文件方式構建數據管道較為復雜,需要對樣本構建tf.Example后壓縮成字符串寫到tfrecoreds文件,讀取后再解析成tf.Example。但tfrecoreds文件的優點是壓縮后文件較小,便於網絡傳播,加載速度較快。

Numpy構建數據

官方推薦使用 tf.data.Dataset.from_tensors() 或 tf.data.Dataset.from_tensor_slices() 創建數據集,Dataset支持一類特殊的操作Trainformation(打亂、生成epoch...等操作)

  • data.map(function):將轉換函數映射到數據集每一個元素
  • data.batch(batch_size):構建batch
  • data.shuffle(buffer_size):隨機打亂輸入數據,從該緩沖區中隨機采樣元素
  • data.repeat():repeat的功能就是將整個序列重復多次,一般不帶參數
  • data.prefetch(tf.data.experimental.AUTOTUNE)  :預先取 數據
  • data.take():采樣,從開始位置取前幾個元素
  • ...
features = np.arange(0, 100, dtype=np.int32)    # # (100,)
labels = np.zeros(100, dtype=np.int32)  # (100,)

data = tf.data.Dataset.from_tensor_slices((features, labels))   # 創建數據集
data = data.repeat()    # 無限期地補充數據
data = data.shuffle(buffer_size=100)    # 打亂數據
data = data.batch(batch_size=4)     # 批量數據
data = data.prefetch(buffer_size=1)     # 預取批處理(預加載批處理,消耗更快)

for batch_x, batch_y in data.take(5):
    print(batch_x.shape, batch_y.shape)     # (4,) (4,)
    break

注意:如果你打算多次調用,你可以使用迭代器的方式:

ite_data = iter(data)
for i in range(5):
batch_x, batch_y = next(ite_data)
print(batch_x, batch_y)

for i in range(5):
batch_x, batch_y = next(ite_data)
print(batch_x, batch_y)
View Code

提升管道性能

  訓練深度學習模型常常會非常耗時。模型訓練的耗時主要來自於兩個部分,一部分來自數據准備,另一部分來自參數迭代。參數迭代過程的耗時通常依賴於GPU來提升。而數據准備過程的耗時則可以通過構建高效的數據管道進行提升。

以下是一些構建高效數據管道的建議。

  1. 使用 prefetch 方法讓數據准備和參數迭代兩個過程相互並行。
  2. 使用 interleave 方法可以讓數據讀取過程多進程執行,並將不同來源數據夾在一起。
  3. 使用 map 時設置num_parallel_calls 讓數據轉換過程多進行執行。
  4. 使用 cache 方法讓數據在第一個epoch后緩存到內存中,僅限於數據集不大情形。
  5. 使用 map轉換時,先batch,然后采用向量化的轉換方法對每個batch進行轉換。

生成器構建數據

def generate_features():
    # 函數生成一個隨機字符串
    def random_string(length):
        return ''.join(random.choice(string.ascii_letters) for m in range(length))
    # 返回一個隨機字符串、一個隨機向量和一個隨機整數
    yield random_string(4), np.random.uniform(size=4), random.randint(0, 10)


data = tf.data.Dataset.from_generator(generate_features, output_types=(tf.string, tf.float32, tf.int32))
data = data.repeat()        # 無限期地補充數據
data = data.shuffle(buffer_size=100)    # 打亂數據
data = data.batch(batch_size=4)     # 批量數據(將記錄聚合在一起)
data = data.prefetch(buffer_size=1)     # 預取批量(預加載批量以便更快的消耗)

# Display data.
for batch_str, batch_vector, batch_int in data.take(5):
    # (4,) (4, 4) (4,)
    print(batch_str.shape, batch_vector.shape, batch_int.shape)

keras.utils.Sequence

特別是,keras.utils.Sequence該類提供了一個簡單的接口來構建 Python 數據生成器,該生成器可以感知多處理並且可以洗牌。

Sequence必須實現兩種方法:

  • __getitem__:返回一個batch數據
  • __len__:整型,返回batch的數量
import tensorflow as tf
from keras.utils.data_utils import Sequence

class SequenceDataset(Sequence):
    def __init__(self, batch_size):
        self.input_data = tf.random.normal((640, 8192, 1))
        self.label_data = tf.random.normal((640, 8192, 1))
        self.batch_size = batch_size

    def __len__(self):
        return int(tf.math.ceil(len(self.input_data) / float(self.batch_size)))

    # 每次輸出一個batch
    def __getitem__(self, idx):
        batch_x = self.input_data[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.label_data[idx * self.batch_size:(idx + 1) * self.batch_size]
        return batch_x, batch_y


sequence = SequenceDataset(batch_size=64)

for batch_idx, (x, y) in enumerate(sequence):
    print(batch_idx, x.shape, y.shape)  # tf.float32
    # 0 (64, 8192, 1) (64, 8192, 1)
    break

搭建模型

  深度學習模型一般由各種模型層組合而成,如果這些內置模型層不能夠滿足需求,我們也可以通過編寫tf.keras.Lambda匿名模型層或繼承tf.keras.layers.Layer基類構建自定義的模型層。其中tf.keras.Lambda匿名模型層只適用於構造沒有學習參數的模型層。

搭建模型有以下3種方式構建模型:

  1. Sequential順序模型:用於簡單的層堆棧, 其中每一層恰好有一個輸入張量一個輸出張量
  2. 函數式API模型:多輸入多輸出,或者模型需要共享權重,或者模型具有殘差連接等非順序結構,
  3. 繼承Model基類自定義模型:如果無特定必要,盡可能避免使用Model子類化的方式構建模型,這種方式提供了極大的靈活性,但也有更大的概率出錯

順序建模

model = keras.Sequential([
    layers.Dense(2, activation="relu", name="layer1"),
    layers.Dense(3, activation="relu", name="layer2"),
    layers.Dense(4, name="layer3"),
])

還可以通過add方法創建順序模型

model = keras.Sequential()
model.add(layers.Dense(2, activation="relu"))
model.add(layers.Dense(3, activation="relu"))
model.add(layers.Dense(4))

因為模型不知道輸入shape,所以起初模型沒有權重,因此我們需要告知模型輸入shape

# 在第一層添加Input
model.add(keras.Input(shape=(4,)))
model.add(layers.Dense(2, activation="relu"))

# 或者在第一層添加input_shape
model.add(layers.Dense(2, activation="relu", input_shape=(4,)))

順序模型可以配合add 和 model.summary() 在模型的任何位置查看該層的輸入輸出。

函數式API建模

  函數式API搭建模型比Sequential更加靈活,可以處理具有非線性拓撲、共享層甚至多個輸入或輸出的模型。

inputs = keras.Input(shape=(784,))
x = layers.Dense(64, activation="relu")(inputs)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10)(x)
model = keras.Model(inputs=inputs, outputs=outputs, name="mnist_model")
model.summary()     # 查看模型摘要

還可以將模型繪制為圖形

keras.utils.plot_model(model, "my_first_model_with_shape_info.png", show_shapes=True)

補充:函數式模型是可以嵌套的

自定義建模

  在 TensorFlow 中,模型類的繼承關系為: 

  tf.keras.Model (模型) tf.keras.layers.Layer (層) tf.Module (模塊)

  通常使用Layer類來定義內部計算塊,並使用Model類定義外部模型(訓練的對象)。

繼承 tf.Module 栗子 (主要用於創建模塊):

import tensorflow as tf

# 我們繼承tf.Module 搭建一個Dense層
class CustomDense(tf.Module):
    def __init__(self, in_features, out_features, **kwargs):
        super().__init__(**kwargs)
        self.weight = tf.Variable(tf.random.normal([in_features, out_features]), name='weight')
        self.bias = tf.Variable(tf.zeros([out_features]), name='bias')

    def __call__(self, x):
        x = tf.matmul(x, self.weight) + self.bias
        x = tf.nn.relu(x)
        return x


model = CustomDense(in_features=5, out_features=10)
x = tf.random.normal(shape=(64, 5))
outputs = model(x)
print("CustomDense outputs", outputs.shape)  # (64, 10)
View Code

繼承 tf.keras.layers.Layer 栗子 (主要用於創建):

# 使用CustomDense 搭建一個模型
class CustomLayer(tf.keras.layers.Layer):
    # 添加 **kwargs來支持基本Keras層參數
    def __init__(self, in_features, out_features, **kwargs):
        super().__init__(**kwargs)
        # 這將很快轉移到 build ;見下代碼
        self.weight = tf.Variable(tf.random.normal([in_features, out_features]), name='weight')
        self.bias = tf.Variable(tf.zeros([out_features]), name='bias')

    # Keras 層有自己的 __call__,他然后調用 call()
    def call(self, x):
        x = tf.matmul(x, self.weight) + self.bias
        x = tf.nn.relu(x)
        return x

model = CustomLayer(in_features=5, out_features=10)
x = tf.random.normal(shape=(64, 5))
outputs = model(x)
print("CustomLayer outputs", outputs.shape)  # (64, 10)
View Code
class FlexibleDense(tf.keras.layers.Layer):
    # 添加 **kwargs來支持基本Keras層參數
    def __init__(self, out_features, **kwargs):
        super().__init__(**kwargs)
        self.out_features = out_features

    # 創建層的狀態(權重)
    # 在call前會先調用 build,使得權重的形狀固定下來,這樣我們就不用操心輸入的shape
    def build(self, input_shape):
        self.weight = tf.Variable(tf.random.normal([input_shape[-1], self.out_features]), name='weight')
        self.bias = tf.Variable(tf.zeros([self.out_features]), name='bias')

    def call(self, inputs):
        return tf.matmul(inputs, self.weight) + self.bias


model = FlexibleDense(out_features=10)
x = tf.random.normal(shape=(64, 5))
outputs = model(x)
print("FlexibleDense outputs", outputs.shape)  # (64, 10)
layer use build

繼承 tf.keras.Model 栗子 (主要用於創建模型):

class CustomModel(tf.keras.Model):
    def __init__(self, in_features=5, out_features=10, **kwargs):
        super().__init__(**kwargs)
        self.dense_1 = CustomLayer(in_features, out_features)
        self.dense_2 = CustomLayer(out_features, out_features)

    def call(self, x):
        x = self.dense_1(x)
        x = self.dense_2(x)
        return x


model = CustomLayer(in_features=5, out_features=10)
x = tf.random.normal(shape=(64, 5))
outputs = model(x)
print("CustomModel outputs", outputs.shape)  # (64, 10)
View Code

keras層具有許多額外的功能:

  • 可選損失函數
  • 對度量函數的支持
  • 對可選 training 參數的內置支持,用於區分訓練和推斷用途
  • get_config 和 from_config 方法,允許您准確存儲配置以在 Python 中克隆模型

keras.Model類具有keras.Layer類相同的API,但有以下區別:

  • Model類 提供了內置的訓練  model.fit() 、評估 model.evaluate()  和預測  model.predict()  API
  • Model類 可以通過 model.layers 屬性公開內層的列表。
  • Model類 提供了保存和序列化 API  save()、save_weights() ...

  Layer 類對應於“層”,Model 類對應於“模型,如果您想知道“我應該使用Layer類還是Model類?”,請問自己:我需要調用fit()它嗎?我需要save() 嗎?如果是這樣,請與Model。如果不是,請使用Layer。

我們來吃個栗子,用 layers.Layer 搭建層(Layer 之間可以相互調用),用keras.Model搭建模型(Model 之間可以相互調用)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""
    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""
    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""
    def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name="autoencoder", **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)  # 添加損失
        return reconstructed
View Code

我們可以搭建一個自定義訓練模塊,來訓練上面的模型

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
View Code

由於上面的模型是 keras.Model 的子類,它具有內置循環方法(model.fit)

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
View Code

當然自定義的 keras.Model 還可以和函數式API 混搭訓練

original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
outputs = layers.Dense(original_dim, activation="sigmoid")(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")

# Add KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# Train.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
函數式API訓練模型
View Code

補充知識:自定義層

如果自定義模型層沒有需要被訓練的參數,一般推薦使用Lamda層實現。

mypower = layers.Lambda(lambda x:tf.math.pow(x,2))
mypower(tf.range(5))

如果自定義模型層有需要被訓練的參數,則可以通過繼承Layer基類實現。Layer的子類化一般需要重新實現初始化方法,Build方法和Call方法。如果built = False,調用__call__時會先調用build方法, 再調用call方法。

損失函數

  一般來說,損失值 由 損失函數正則化項 組成(Objective = Loss + Regularization)。對於keras模型,正則化項一般在各層中指定,例如使用Dense的 kernel_regularizer 和 bias_regularizer 等參數指定權重使用 L1 或者 L2 正則化項,此外還可以用kernel_constraint 和 bias_constraint 等參數約束權重的取值范圍,這也是一種正則化手段。

損失函數在 model.compile 時候指定。

  • 對於回歸模型,通常使用的損失函數是平方損失函數 mean_squared_error,簡寫為 mse,類實現形式為 MeanSquaredError 和 MSE
  • 對於二分類模型,通常使用的是二元交叉熵損失函數 binary_crossentropy。
  • 對於多分類模型,如果label是one-hot編碼的,則使用交叉熵損失函數 categorical_crossentropy。如果label是序號編碼的,則需要使用稀疏類別交叉熵損失函數 sparse_categorical_crossentropy。
  • 如果有需要,也可以自定義損失函數,自定義損失函數需要接收兩個張量y_true,y_pred作為輸入參數,並輸出一個標量作為損失函數值。

常見的Loss可以參看Tensorflow的官網:tf.keras.losses

自定義損失

keras提供了兩種自定義損失:

函數實現:輸入 y_true 和 y_pred,返回 損失值

def custom_mean_squared_error(y_true, y_pred):
    return tf.math.reduce_mean(tf.square(y_true - y_pred))
model.compile(optimizer=..., loss=custom_mean_squared_error)

類實現:繼承 tf.keras.losses.Loss ,在init中初始化參數,在call中重寫計算邏輯。

class CustomMSE(keras.losses.Loss):
    def __init__(self, regularization_factor=0.1, name="custom_mse"):
        super().__init__(name=name)
        self.regularization_factor = regularization_factor

    def call(self, y_true, y_pred):
        mse = tf.math.reduce_mean(tf.square(y_true - y_pred))
        reg = tf.math.reduce_mean(tf.square(0.5 - y_pred))
        return mse + reg * self.regularization_factor

model.compile(optimizer=..., loss=CustomMSE())

度量函數

  人們通常會通過度量函數來從另一個方面 評估模型的好壞,可以在 model.compile 時,通過列表形式指定單/多個評估指標。

  • model.compile 時,可以
  • 也可以自定義評估指標。自定義評估指標需要接收兩個張量y_true,y_pred作為輸入參數,並輸出一個標量作為評估值。
  • 也可以繼承 tf.keras.metrics.Metric 自定義度量方法,update_state方法,result方法實現評估指標的計算邏輯,從而得到評估指標的類的實現形式。

常見的Metric可以參看Tensorflow的官網:tf.keras.metrics

自定義度量

  繼承 tf.keras.metrics.Metric,並且實現4個類方法:

  • __init__(self):創建狀態變量
  • update_state(self, y_true, y_pred, sample_weight=None):根據 y_true 和 y_pred 來更新狀態變量
  • result(self):使用狀態變量來計算 最終結果
  • reset_state(self):重新初始化狀態變量
class SnrCustomMetrics(metrics.Metric):
    def __init__(self, name="snr_metrics", **kwargs):
        # 創建狀態變量
        super(SnrCustomMetrics, self).__init__(name=name, **kwargs)
        self.metrics_value = self.add_weight(name="snr_value", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        """使用目標 y_true 和模型預測 y_pred 來更新狀態變量
        :param y_true: target
        :param y_pred: 預測值
        :param sample_weight: 權重
        """
        labels_pow = tf.pow(y_true, 2)
        noise_pow = tf.pow(y_pred - y_true, 2)
        snr_value = 10 * tf.math.log(tf.reduce_sum(labels_pow) / tf.reduce_sum(noise_pow)) / tf.math.log(10.)

        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, "float32")
            snr_value = tf.multiply(snr_value, sample_weight)
        self.metrics_value.assign_add(snr_value)

    # 使用狀態變量來計算最終結果
    def result(self):
        return self.metrics_value

    # metrics 的狀態將在每個epoch開始時重置。
    def reset_state(self):
        self.metrics_value.assign(0.0)

model.compile(optimizer=..., loss=..., metrics=[SnrCustomMetrics()])
View Code

優化器

  機器學習界有一群煉丹師,他們每天的日常是:拿來葯材(數據),架起八卦爐(模型),點着六味真火(優化算法),就搖着蒲扇等着丹葯出爐了。不過,當過廚子的都知道,同樣的食材,同樣的菜譜,但火候不一樣了,這出來的口味可是千差萬別。火小了夾生,火大了易糊,火不勻則半生半糊。機器學習也是一樣,模型優化算法的選擇直接關系到最終模型的性能。有時候效果不好,未必是特征的問題或者模型設計的問題,很可能就是優化算法的問題。

  深度學習優化算法大概經歷了 SGD -> SGDM -> NAG ->Adagrad -> Adadelta(RMSprop) -> Adam -> Nadam 這樣的發展歷程。model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)

  • SGD:默認參數為純SGD, 設置momentum參數不為0實際上變成SGDM, 考慮了一階動量, 設置 nesterov為True后變成NAG,即 Nesterov Acceleration Gradient,在計算梯度時計算的是向前走一步所在位置的梯度
  • Adagrad:考慮了二階動量,對於不同的參數有不同的學習率,即自適應學習率。缺點是學習率單調下降,可能后期學習速率過慢乃至提前停止學習
  • RMSprop:考慮了二階動量,對於不同的參數有不同的學習率,即自適應學習率,對Adagrad進行了優化,通過指數平滑只考慮一定窗口內的二階動量
  • Adadelta:考慮了二階動量,與RMSprop類似,但是更加復雜一些,自適應性更強
  • Adam:同時考慮了一階動量和二階動量,可以看成RMSprop上進一步考慮了Momentum
  • Nadam:在Adam基礎上進一步考慮了 Nesterov Acceleration

  對於一般新手煉丹師,優化器直接使用Adam,並使用其默認參數就OK了。一些愛寫論文的煉丹師由於追求評估指標效果,可能會偏愛前期使用Adam優化器快速下降,后期使用SGD並精調優化器參數得到更好的結果。此外目前也有一些前沿的優化算法,據稱效果比Adam更好,例如LazyAdam,Look-ahead,RAdam,Ranger等。

  初始化優化器時會創建一個變量optimier.iterations用於記錄迭代的次數。因此優化器和tf.Variable一樣,一般在@tf.function外創建。

優化器主要使用apply_gradients方法傳入變量和對應梯度從而來對給定變量進行迭代,

optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)

with tf.GradientTape() as tape:
    ...
grads = tape.gradient(loss, model.trainable_weights)  # 根據損失 求梯度
optimizer.apply_gradients(zip(grads, model.trainable_weights))  # 根據梯度 優化模型

或者直接使用minimize方法對目標函數進行迭代優化。

@tf.function
def train(epoch=1000):
    for _ in tf.range(epoch):
        optimizer.minimize(loss, model.trainable_weights)
    tf.print("epoch = ", optimizer.iterations)
    return loss

當然,更常見的使用是在編譯時將優化器傳入model.fit

model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)

回調函數

  tf.keras的回調函數實際上是一個類,一般是在model.fit時作為參數指定,一般收集一些日志信息,改變學習率等超參數,提前終止訓練過程等等

大部分時候,keras.callbacks子模塊中定義的回調函數類已經足夠使用了,如果有特定的需要,我們也可以通過對keras.callbacks.Callbacks實施子類化構造自定義的回調函數

  • BaseLogger: 收集每個epoch上metrics平均值,對stateful_metrics參數中的帶中間狀態的指標直接拿最終值無需對各個batch平均,指標均值結果將添加到logs變量中。該回調函數被所有模型默認添加,且是第一個被添加的
  • History: 將BaseLogger計算的各個epoch的metrics結果記錄到history這個dict變量中,並作為model.fit的返回值。該回調函數被所有模型默認添加,在BaseLogger之后被添加
  • EarlyStopping: 當被監控指標在設定的若干個epoch后沒有提升,則提前終止訓練
  • TensorBoard: 為Tensorboard可視化保存日志信息。支持評估指標,計算圖,模型參數等的可視化
  • ModelCheckpoint: 在每個epoch后保存模型
  • ReduceLROnPlateau:如果監控指標在設定的若干個epoch后沒有提升,則以一定的因子減少學習率
  • TerminateOnNaN:如果遇到loss為NaN,提前終止訓練
  • LearningRateScheduler:學習率控制器。給定學習率lr和epoch的函數關系,根據該函數關系在每個epoch前調整學習率
  • CSVLogger:將每個epoch后的logs結果記錄到CSV文件中
  • ProgbarLogger:將每個epoch后的logs結果打印到標准輸出流中

EarlyStopping

當監控的指標停止改進時停止訓練

tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=0, verbose=0, mode='auto', baseline=None, restore_best_weights=False)
  • monitor:要監控的 對象
  • min_delta:被監測對象的最小變化 小於 min_delta 的絕對變化,將被視為沒有改進
  • patience:如果經過num個epoch,監控對象沒有改善,則停止訓練
  • verbose:詳細模式。1打印過程信息,0不打印
  • mode:{"auto", "min", "max"}之一。
    • min,當監控的數量停止減少時,培訓停止
    • max,當監控的數量停止增加時,它將停止
    • auto,方向會從被監控對象的名稱中自動推斷出來
  • restore_best_weights:是否從監測量的最佳值的epoch恢復模型權重。如果為 False,則使用在訓練的最后一步獲得的模型權重

ModelCheckpoint

以某種頻率保存 Keras 模型或模型權重

tf.keras.callbacks.ModelCheckpoint(
    filepath, monitor='val_loss', verbose=0, save_best_only=False,
    save_weights_only=False, mode='auto', save_freq='epoch',
    options=None, initial_value_threshold=None, **kwargs
)
  • filepath:保存模型文件的路徑
  • monitor:監控指標
  • save_best_only:是否僅保存最佳模型
  • save_weights_only:是否僅保存模型權重
  • save_freq:epoch 或整數

Tensorboard

  Tensorboard有助於追蹤模型訓練過程的Scalars、Graphs、Distributions等等

  • Scalars:顯示損失和指標在每個時期如何變化。 還可以使用它來跟蹤訓練速度,學習率和其他標量值。
  • Graphs:可幫助您可視化模型。
  • DistributionsHistograms 顯示張量隨時間的分布。 可以 可視化權重和偏差並驗證它們是否以預期的方式變化
tf.keras.callbacks.TensorBoard(
    log_dir='logs', histogram_freq=0, write_graph=True,
    write_images=False, write_steps_per_second=False, update_freq='epoch',
    profile_batch=0, embeddings_freq=0, embeddings_metadata=None, **kwargs
)
  • log_dir:TensorBoard的日志路徑
  • histogram_freq:計算模型層的激活和權重直方圖的頻率(以時期為單位)。如果設置為 0,則不會計算直方圖。
  • write_graph:是否在 TensorBoard 中可視化graph。
  • write_steps_per_second:是否將每step的訓練步數記錄到 Tensorboard 中。這支持epoch和batch頻率記錄。
  • update_freq:'batch'或'epoch'或整數。使用 時'batch',在每批之后將損失和指標寫入 TensorBoard。

在model.fit中使用

  當使用Model.fit() 函數進行訓練時, 添加 tf.keras.callback.TensorBoard 回調函數可確保創建和存儲日志

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./event_file")
model.fit(... 
          callbacks=[tensorboard_callback])

在自定義訓練框架中使用

summary_writer = tf.summary.create_file_writer("./event_file")
with summary_writer.as_default():
    tf.summary.scalar('train/loss', train_loss, step=epoch)
    tf.summary.scalar('train/accuracy', train_accuracy, step=epoch)
    tf.summary.scalar('val/loss', val_loss, step=epoch)
    tf.summary.scalar('val/accuracy', val_accuracy, step=epoch)

啟動Tensorboard,在當前文件夾中,cmd運行:

tensorboard --logdir "./"

訓練模型

訓練模型通常有3種方法,

  • 內置fit方法:支持對numpy array,tf.data.Dataset以及 Python generator數據進行訓練,並且可以通過設置回調函數實現對訓練過程的復雜控制邏輯。
  • 內置train_on_batch方法
  • 自定義訓練循環

model.fit

model.fit(
    x=None, y=None, batch_size=None, epochs=1, verbose='auto',
    callbacks=None, validation_split=0.0, validation_data=None, shuffle=True,
    initial_epoch=0, steps_per_epoch=None)

參數:

  • x:輸入數據,可以是:
    • numpy數組、數組列表(如果模型有多個輸入)
    • Tensorflow張量或張量列表(如果模型有多個輸入)
    • 如果模型具有命名輸入,則將輸入名稱映射到相應的數組/張量的字典。
    • tf.data數據集,應該返回一個(inputs, targets)或 的元組(inputs, targets, sample_weights)
    • 生成器或keras.utils.Sequence返回(inputs, targets) 或(inputs, targets, sample_weights)。
  • y:目標數據。與輸入數據一樣x,它可以是 Numpy 數組或 TensorFlow 張量。它應該是一致的x(你不能有 Numpy 輸入和張量目標,或者相反)。如果x是數據集、生成器或keras.utils.Sequence實例,y則不應指定(因為目標將從 獲取x)。
  • batch_size:每次梯度更新的樣本數。如果未指定,將默認為 32。如果您的數據是數據集、生成器或實例的形式(因為它們生成批次),請不要指定。
  • epochs:訓練模型的周期數
  • verbose:'auto'、0、1 或 2。詳細模式。0 =靜默,1 = 進度條,2 = 每個 epoch 一行。'auto' 在大多數情況下默認為 1
  • callbacks:訓練期間調用的回調列表。tf.keras.callbacks
  • validation_split:在 0 和 1 之間浮動。從訓練數據集中分離一部分數據用於驗證,並將在每個 epoch 結束時評估該數據的損失和指標
  • validation_data:驗證數據集
  • shuffle:布爾值(是否在每個 epoch 之前對訓練數據進行洗牌)
  • initial_epoch:整數。開始訓練的epoch(對於恢復之前的訓練運行很有用)。

返回:history,調用 history.history 可以查看訓練期間損失值和度量值的記錄

train_on_batch

  該內置方法相比較fit方法更加靈活,可以不通過回調函數而直接在batch層次上更加精細地控制訓練的過程。

for epoch in tf.range(1, epoches + 1):
    for x, y in ds_train:
        train_result = model.train_on_batch(x, y)

    for x, y in ds_valid:
        valid_result = model.test_on_batch(x, y, reset_metrics=False)

自定義訓練循環

 自定義訓練循環無需編譯模型,直接利用優化器根據損失函數反向傳播迭代參數,擁有最高的靈活性。

訓練循環包括按順序重復執行三個任務:

  • 給模型輸入batch數據以生成輸出
  • 通過將輸出與標簽進行比較來計算損失
  • 使用GradientTape計算梯度
  • 使用這些梯度優化變量
optimizer = keras.optimizers.SGD(learning_rate=1e-3)  # 實例化一個優化器
loss_fn = keras.losses.BinaryCrossentropy()  # 實例化損失函數

train_loss = keras.metrics.Mean(name='train_loss')
valid_loss = keras.metrics.Mean(name='valid_loss')

train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')
valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')


@tf.function
def train_step(features, labels):
    with tf.GradientTape() as tape:
        logits = model(features, training=True)
        loss_value = loss_fn(labels, logits)
        # loss_value += sum(model.losses)   # 添加額外的損失
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    train_loss.update_state(loss_value)
    train_metric.update_state(labels, logits)


@tf.function
def valid_step(features, labels):
    val_logits = model(features, training=False)

    loss_value = loss_fn(labels, val_logits)
    valid_loss.update_state(loss_value)
    valid_metric.update_state(labels, val_logits)


epochs = 2
for epoch in range(epochs):
    start_time = time.time()
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)
    # 在每個epoch結束時運行驗證循環
    for x_batch_val, y_batch_val in val_dataset:
        valid_step(x_batch_val, y_batch_val)

    if epoch % 5 == 0:
        print('Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'.format(epoch, train_loss.result(),
                                                                                    train_metric.result(),
                                                                                    valid_loss.result(),
                                                                                    valid_metric.result()))
    train_loss.reset_states()
    valid_loss.reset_states()
    train_metric.reset_states()
    valid_metric.reset_states()

    print("運行時間: %.2fs" % (time.time() - start_time))

評估模型

通過自定義訓練循環訓練的模型沒有經過編譯,無法直接使用model.evaluate(ds_valid)方法

model.evaluate(x = x_test,y = y_test)

推理模型

可以使用以下方法:

model.predict(ds_test)

model(x_test)

model.call(x_test)

model.predict_on_batch(x_test)

推薦優先使用model.predict(ds_test)方法,既可以對Dataset,也可以對Tensor使用。

保存和加載模型

Keras模型由多個組件組成:

  • 模型架構,指定模型包含的層及其連接方式
  • 權重值
  • 優化器及其狀態(如果有)(這使您可以在離開的地方重新開始訓練)
  • 模型的編譯信息(損失和度量,通過編譯模型或通過調用 add_loss()  或 add_metric()  來定義)

您可以通過 Keras API 將這些片段一次性保存到磁盤,或僅選擇性地保存其中某一部分:

  • 所有內容以 TensorFlow SavedModel 格式(或較早的 Keras H5 格式)保存
  • 保存架構/配置,通常保存為 JSON 文件
  • 保存權重值。通常在訓練模型時使用

保存和加載整個模型

低級API   tf.saved_model 

  • 保存模型: tf.saved_model.save(model, path_to_dir) 
  • 加載模型: model = tf.saved_model.load(path_to_dir) 

高級API  tf.keras.Model (如果我們的模型不包含自定義子類 keras 模型,則非常推薦)

  • 保存模型:
    •  model.save(SaveModel_path) 
    •  tf.keras.models.save_model(model, SaveModel_path) 
  • 加載模型:
    •  model = tf.keras.models.load_model(SaveModel_path)  
SaveModel_path = 'path/to/location'

model.save(SaveModel_path)  
# 或者 
tf.keras.models.save_model(model, SaveModel_path)  
# 加載模型
model = tf.keras.models.load_model(SaveModel_path)

SavedModel 格式會存儲類名稱、調用函數、損失和權重(如果已實現,還包括配置)。調用函數會定義模型/層的計算圖。

如果我們 將 save_format='h5' 傳遞給 save(),則模型將以H5的格式保存

  •  Keras H5 格式(較舊的):包含模型架構、權重值和compile()信息。它是SavedModel的輕量級替代品。
    • 通過model.add_loss()和model.add_metric()添加的外部損失和度量 不會被保存
    • 自定義對象(如自定義層)的計算圖不包含在保存的h5文件中。在加載時,Keras需要訪問這些對象的Python類/函數來重構模型。

保存模型架構

模型的配置(或架構)指定模型包含的層,以及這些層的連接方式*。如果您有模型的配置,則可以使用權重的新初始化狀態創建模型,而無需編譯信息。

*請注意,這僅適用於使用函數式或序列式 API 定義的模型,不適用於子類化模型。

get_config() 和 from_config()

調用 config = model.get_config() 將返回一個包含模型配置的 Python 字典。然后可以通過 Sequential.from_config(config) (針對 Sequential 模型)或 Model.from_config(config) (針對函數式 API 模型)重建同一模型。

# -----------------層示例
layer = keras.layers.Dense(3, activation="relu")
layer_config = layer.get_config()
new_layer = keras.layers.Dense.from_config(layer_config)

# -----------------序列模型示例
model = keras.Sequential([keras.Input((32,)), keras.layers.Dense(1)])
config = model.get_config()
new_model = keras.Sequential.from_config(config)

# -----------------函數式模式示例
inputs = keras.Input((32,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
config = model.get_config()
new_model = keras.Model.from_config(config)
View Code

to_json() 和 tf.keras.models.model_from_json()

這與 get_config / from_config 類似,不同之處在於它會將模型轉換成 JSON 字符串,之后該字符串可以在沒有原始模型類的情況下進行加載。它還特定於Model,不適用於Layer。

json_str = model.to_json()  # 保存模型結構
model_json = tf.keras.models.model_from_json(json_str)  # 恢復模型結構

自定義對象

模型和層

子類化模型和層的架構在 __init__ 和 call 方法中進行定義。無法將其序列化為與 JSON 兼容的配置。

為了保存/加載帶有自定義層的模型或子類化模型,您應該重寫 get_config 和 from_config(可選)方法。此外,您還應該注冊自定義對象,以便 Keras 能夠感知它。

自定義函數

自定義函數(如激活損失或初始化)不需要 get_config 方法。只需將函數名稱注冊為自定義對象,就足以進行加載。

保存模型權重

您可以選擇僅保存和加載模型的權重。這可能對以下情況有用:

  • 您只需使用模型進行推斷:在這種情況下,您無需重新開始訓練,因此不需要編譯信息或優化器狀態。
  • 您正在進行遷移學習:在這種情況下,您需要重用先驗模型的狀態來訓練新模型,因此不需要先驗模型的編譯信息。
  • 如果我們的模型全是又自定義Model或Layer,則推薦用這個

在內存中保存和加載模型權重

  •  tf.keras.layers.Layer.get_weights() :返回 Numpy 數組列表。
  •  tf.keras.layers.Layer.set_weights() :將模型權重設置為 weights 參數中的值。
def create_layer():
    layer = keras.layers.Dense(64, activation="relu", name="dense_2")
    layer.build((None, 784))
    return layer


layer_1 = create_layer()
layer_2 = create_layer()

# Copy weights from layer 1 to layer 2
layer_2.set_weights(layer_1.get_weights())

在磁盤中保存和加載模型權重

 model.save_weights:將權重保存到磁盤,我們可以以save_format="tf" 或 save_format="h5"格式保存

# 保存模型權重
model.save_weights(...)

# 恢復模型結構
model_json = models.model_from_json(json_str)
model_json.compile(...)

# 加載權重
model_json.load_weights(...)

更多詳情請參見:保存並加載Keras模型

GPU訓練

指定GPU訓練

深度學習的訓練過程常常非常耗時,一個模型訓練幾個小時是家常便飯,訓練幾天也是常有的事情,有時候甚至要訓練幾十天。

訓練過程的耗時主要來自於兩個部分,一部分來自數據准備,另一部分來自參數更新。

  數據准備過程可以使用更多進程處理數據來縮減時間。參數更新時間可以應用GPU或者Google的TPU來進行加速。

  當存在可用的GPU時,如果不特意指定device,tensorflow會自動優先選擇使用GPU來創建張量和執行張量計算。但如果是在公司或者學校實驗室的服務器環境,存在多個GPU和多個使用者時,為了不讓單個同學的任務占用全部GPU資源導致其他同學無法使用(tensorflow默認獲取全部GPU的全部內存資源權限,但實際上只使用一個GPU的部分資源),我們通常會在開頭增加以下幾行代碼以控制每個任務使用的GPU編號和顯存大小,以便其他同學也能夠同時訓練模型。

gpus = tf.config.list_physical_devices("GPU")
tf.print(gpus)
# [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
#  PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'),
#  PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'),
#  PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')]

if gpus:
    gpu0 = gpus[0]  # 如果有多個GPU,僅使用第0個GPU
    tf.config.experimental.set_memory_growth(gpu0, True)  # 設置GPU顯存用量按需使用
    # 或者也可以設置GPU顯存為固定使用量(例如:4G)
    # tf.config.experimental.set_virtual_device_configuration(gpu0,
    #    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)])
    tf.config.set_visible_devices([gpu0], "GPU")

單機多卡訓練

  TensorFlow 在 tf.distribute.MirroredStrategy  中為我們提供了單機多卡訓練策略,使用這種策略時,我們只需實例化一個 MirroredStrategy 策略,並將模型構建的代碼放入 strategy.scope() 的上下文環境中:

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    # 模型構建代碼

可以在參數中指定設備,如:

# 指定只使用第 0、1 號 GPU 參與分布式策略。
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

MirroredStrategy 的步驟如下:

  • 訓練開始前,該策略在所有 N 個計算設備上均各復制一份完整的模型;
  • 每次訓練傳入一個batch的數據時,將數據分成 N 份,分別傳入 N 個計算設備(即數據並行);
  • N 個計算設備使用本地變量(鏡像變量)分別計算自己所獲得的部分數據的梯度;
  • 使用分布式計算的 All-reduce 操作,在計算設備間高效交換梯度數據並進行求和,使得最終每個設備都有了所有設備的梯度之和;
  • 使用梯度求和的結果更新本地變量(鏡像變量);
  • 當所有設備均更新本地變量后,進行下一輪訓練(即該並行策略是同步的)。

默認情況下,TensorFlow 中的 MirroredStrategy 策略使用 NVIDIA NCCL 進行 All-reduce 操作。

tf.distribute.Strategy和model.fit

tf.distribute.Strategy被集成到tf.keras,tf.keras是用於構建和訓練模型的高級 API。您可以使用 Model.fit 來無縫分布式訓練模型

TensorFlow 分布策略支持所有類型的 Keras 模型——Sequential、Functional和subclassed。以下是您需要在代碼中更改的內容:

  • 創建實例 tf.distribute.Strategy。
  • 在strategy.scope中創建 Keras 模型、優化器和度量
num_epochs = 5
batch_size_per_replica = 64 # 每個顯卡上的batch數
learning_rate = 0.001

strategy = tf.distribute.MirroredStrategy()
print('Number of devices: %d' % strategy.num_replicas_in_sync)  # 輸出設備數量
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync     # 總batch_size

dataset = ...

with strategy.scope():
    model = tf.keras.applications.MobileNetV2(weights=None, classes=2)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )

model.fit(dataset, epochs=num_epochs)

分布式訓練匯總

import os
import tensorflow as tf
import tensorflow_datasets as tfds

strategy = tf.distribute.MirroredStrategy()
print('設備數量: {}'.format(strategy.num_replicas_in_sync))

epochs = 12
batch_size_per_replica = 64
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync


# 數據集
def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255

    return image, label


datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']
train_dataset = mnist_train.map(scale).cache().shuffle(10000).batch(batch_size)
eval_dataset = mnist_test.map(scale).batch(batch_size)

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

checkpoint_dir = './training_checkpoints'  # 定義用於存儲檢查點的檢查點目錄
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")  # 定義檢查點文件的名稱


# 定義一個函數來衰減學習速率。
def decay(epoch):
    if epoch < 3:
        return 1e-3
    elif epoch >= 3 and epoch < 7:
        return 1e-4
    else:
        return 1e-5


# 定義一個回調函數,用於在每個epoch的末尾打印學習速率
class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print('\nepoch的 {} 學習率是 {}'.format(epoch + 1, model.optimizer.lr.numpy()))


# 把所有的回調放在一起
callbacks = [tf.keras.callbacks.TensorBoard(log_dir='./logs'),
             tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                                save_weights_only=True),
             tf.keras.callbacks.LearningRateScheduler(decay),
             PrintLR()]

# 訓練和評估
model.fit(train_dataset, epochs=epochs, callbacks=callbacks)
View Code

保存為SavedModel格式模型,您可以使用或不使用Strategy.scope.

path = 'saved_model/'
model.save(path, save_format='tf')

# 加載模型,不Strategy.scope
unreplicated_model = tf.keras.models.load_model(path)
unreplicated_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))



# 加載模型Strategy.scope
with strategy.scope():
    replicated_model = tf.keras.models.load_model(path)
    replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                             optimizer=tf.keras.optimizers.Adam(),
                             metrics=['accuracy'])

    eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
    print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
View Code

tf.distribute.Strategy和自定義訓練循環

1、在mirrored_strategy.scope() 內創建模型和優化器

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

2、調用 tf.distribute.Strategy.experimental_distribute_dataset 創建分布式數據集

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

3、使用 tf.nn.compute_average_loss 計算損失。tf.nn.compute_average_loss對每個樣本損失求和並將總和除以global_batch_size。

def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

4、將train_step放入 tf.distribute.Strategy.run 中,並傳入之前創建的數據集

5、使用 tf.distribute.Strategy.reduce 來聚合 tf.distribute.Strategy.run。tf.distribute.Strategy.run返回每個GPU結果。您還可以tf.distribute.Strategy.experimental_local_results獲取結果值列表。

def train_step(inputs):
    features, labels = inputs

    with tf.GradientTape() as tape:
        predictions = model(features, training=True)
        loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss


@tf.function
def distributed_train_step(dist_inputs):
    per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
    return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

6、迭代dist_dataset並循環運行訓練:

for dist_inputs in dist_dataset:
    print(distributed_train_step(dist_inputs))

分布式訓練匯總

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import os


strategy = tf.distribute.MirroredStrategy()     # 實例化MirroredStrategy
print('設備數量: {}'.format(strategy.num_replicas_in_sync))

epochs = 10
batch_size_per_replica = 64  # 每個GPU上得batch數
global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync  # 總batch數

# 數據集
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]

# 獲取[0,1]范圍內的圖像
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

buffer_size = len(train_images)

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch(
    global_batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)    # 分布式數據集
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)      # 分布式數據集

# 創建模型
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    return model


# 創建一個檢查點目錄來存儲檢查點
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")


with strategy.scope():
    # 創建訓練損失
    # 將reduction設置為“none”,這樣我們可以在之后進行reduction,並除以全局batch size
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True,
        reduction=tf.keras.losses.Reduction.NONE)

    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

    test_loss = tf.keras.metrics.Mean(name='test_loss')     # 創建測試損失
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')  # 訓練精度
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')  # 測試精度

    # 模型、優化器和檢查點必須創建在 'strategy.scope' 下。
    model = create_model()
    optimizer = tf.keras.optimizers.Adam()
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)


# 訓練step
def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss


# 測試step
def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)


# 分布式訓練
@tf.function
def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)


# 分布式測試
@tf.function
def distributed_test_step(dataset_inputs):
    return strategy.run(test_step, args=(dataset_inputs,))


for epoch in range(epochs):
    # 訓練循環
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
        total_loss += distributed_train_step(x)
        num_batches += 1
    train_loss = total_loss / num_batches

    # 測試循環
    for x in test_dist_dataset:
        distributed_test_step(x)

    if epoch % 10 == 0:
        checkpoint.save(checkpoint_prefix)

    print("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}".format(epoch + 1, train_loss,
                                                                                      train_accuracy.result() * 100,
                                                                                      test_loss.result(),
                                                                                      test_accuracy.result() * 100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()
View Code

tf.distribute.Strategy 可以再沒有strategy的情況下恢復最新的檢查點並測試

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()


@tf.function
def eval_step(images, labels):
    predictions = new_model(images, training=False)
    eval_accuracy(labels, predictions)


checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
    eval_step(images, labels)

print('在沒有strategy的情況下恢復保存的模型后的准確性: {}'.format(eval_accuracy.result() * 100))
View Code

匯總

keras訓練匯總

自定義訓練匯總

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import time

# 超參數
lr = 1e-3   # 學習率
batch_size = 64
epochs = 2

# 數據准備
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# 保留1萬個樣品用於驗證
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# 准備訓練數據集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# 准備驗證數據集
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

# 搭建模型
inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

optimizer = keras.optimizers.SGD(learning_rate=lr)    # 實例化一個優化器
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)  # 實例化損失函數

train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')
valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # loss_value += sum(model.losses)   # 添加額外的損失
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_metric.update_state(y, logits)
    return loss_value

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    valid_metric.update_state(y, val_logits)


for epoch in range(epochs):
    start_time = time.time()
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        if step % 200 == 0:
            print("訓練損失( %d: %.4f" % (step, float(loss_value)))

    train_acc = train_metric.result()
    print("訓練精度: %.4f" % (float(train_acc),))
    train_metric.reset_states()  # 在每個epoch結束時重置訓練指標

    # 在每個epoch結束時運行驗證循環
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = valid_metric.result()
    valid_metric.reset_states()
    print("驗證精度: %.4f" % (float(val_acc),))
    print("運行時間: %.2fs" % (time.time() - start_time))
View Code

 

參考

【電子書】簡單粗暴 TensorFlow 2

【知乎】最全Tensorflow2.0 入門教程持續更新

【和鯨社區】30天吃掉那只TensorFlow2.0 | Github

【書籍】TensorFlow 2深度學習開源書 | PDF下載 提取碼:juqs

【bilibili】tensorflow2.0入門與實戰 2019年最通俗易懂的課程

【bilibili】神經網絡與深度學習——TensorFlow2.0實戰【中文課程】

【github】TensorFlow-Examples 

【github】TensorFlow-2.x-Tutorials 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM