前言
至於為什么寫這個教程,首先是為了自己學習做個記錄,其次是因為Tensorflow的API寫的很好,但是他的教程寫的太亂了,不適合新手學習。tensorflow 1 和tensorflow 2 有相似之處但是不兼容,tensorflow 2將keras融合了。TensorFlow™ 是一個采用 數據流圖(data flow graphs),用於數值計算的開源軟件庫。圖中得節點(Nodes)表示數學操作,圖中的線(edges)則表示在節點間相互聯系的多維數據數組,即張量(tensor)。它靈活的架構讓你可以在多種平台上展開計算,例如台式計算機中的一個或多個CPU(或GPU),服務器,移動設備等等。
TensorFlow的主要優點:
- 靈活性:支持底層數值計算,C++自定義操作符
- 可移植性:從服務器到PC到手機,從CPU到GPU到TPU
- 分布式計算:分布式並行計算,可指定操作符對應計算設備
層次結構
TensorFlow的層次結構從低到高可以分成如下五層:硬件層,內核層,低階API,中階API,高階API。
- 第一層:硬件層,TensorFlow支持CPU、GPU或TPU加入計算資源池。
- 第二層:內核層,為C++實現的內核,kernel可以跨平台分布運行。
- 第三層:低階API,由Python實現的操作符,提供了封裝C++內核的低級API指令。主要包括各種張量操作算子、計算圖、自動微分。如tf.Variable,tf.constant,tf.function,tf.GradientTape,tf.nn.softmax...
- 第四層:中階API,由Python實現的模型組件,對低級API進行了函數封裝。主要包括數據管道(tf.data)、特征列(tf.feature_column)、激活函數(tf.nn)、模型層(tf.keras.layers)、損失函數(tf.keras.losses)、評估函數(tf.keras.metrics)、優化器(tf.keras.optimizers)、回調函數(tf.keras.callbacks) 等等。
- 第五層:高階API,由Python實現的模型成品。主要為tf.keras.models提供的模型的類接口,主要包括:模型的構建(Sequential、functional API、Model子類化)、型的訓練(內置fit方法、內置train_on_batch方法、自定義訓練循環、單GPU訓練模型、多GPU訓練模型、TPU訓練模型)、模型的部署(tensorflow serving部署模型、使用spark(scala)調用tensorflow模型)。
概述
創建張量
tf.constant(value, dtype=tf.float32) # 常數 tf.range(start, limit=None, delta=1) # 生成一個范圍內間隔為delta的 張量 tf.linspace(start, stop, num) # 在一個間隔內生成均勻間隔的值 tf.zeros() # 創建全0張量 tf.ones() # 創建全1張量 tf.zeros_like(input) # 創建和input一樣大小的張量 tf.fill(dims, value) # 創建shape為dim,全為value的張量 tf.random.uniform([5], minval=0, maxval=10) # 均勻分布隨機 tf.random.normal([3, 3], mean=0.0, stddev=1.0) # 正態分布隨機 tf.Variable(initial_value) # 變量
tf.Variable:
- name:變量的名字,默認情況下,會自動獲得唯一的變量名
- trainable:設置為 False 可以關閉梯度。例如,訓練計步器就是一個不需要梯度的變量
tf.rank(a):求矩陣的秩
變量的設備位置
為了提高性能,TensorFlow 會嘗試將張量和變量放在與其 dtype 兼容的最快設備上。這意味着如果有 GPU,那么大部分變量都會放置在 GPU 上,不過,我們可以重寫變量的位置。
with tf.device('CPU:0'): # Create some tensors a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b) print(c)
使用assign重新分配張量
a.assign([5, 6]) # a = [5, 6] a.assign_add([2, 3]) # a = a+[2,3] a.assign_sub([7, 9]) # a=a-[7,9]
維度變換
維度變換相關函數主要有 tf.reshape,tf.squeeze,tf.expand_dims,tf.transpose。
- tf.reshape():改變張量的形狀
- tf.squeeze():減少維度為1的維度
- tf.expand_dims(input, axis):增加維度
- tf.transpose(a, perm=None):交換維度
tf.reshape可以改變張量的形狀,但是其本質上不會改變張量元素的存儲順序,所以,該操作實際上非常迅速,並且是可逆的。
合並分隔
和numpy類似,可以用tf.concat和tf.stack方法對多個張量進行合並,可以用tf.split方法把一個張量分割成多個張量。
tf.concat和tf.stack有略微的區別,tf.concat是連接,不會增加維度,而tf.stack是堆疊,會增加維度。
a = tf.constant([[1.0, 2.0], [3.0, 4.0]]) # (2,2) b = tf.constant([[5.0, 6.0], [7.0, 8.0]]) # (2,2) c = tf.concat([a, b], axis=0) # (4, 2) c = tf.stack([a, b], axis=0) # (2, 2, 2) d = tf.split(c, 2, axis=0) # [(1, 2, 2),(1, 2, 2)]
Tensor與Array的轉換
c = np.array(b) # tensor 轉 np c = b.numpy() # tensor 轉 np tf.convert_to_tensor(c) # np 轉 tensor
數學運算
tf.add(a,b) # 加法 a+b tf.multiply(a,b) # 逐元素乘法a*b tf.matmul(a,b) # 矩陣乘法a@b
類型轉換
tensorflow支持的模型有:tf.float16、tf.float64、tf.int8、tf.int16、tf.int32...
a = tf.constant([2.2, 3.3, 4.4], dtype=tf.float64) b = tf.cast(a, dtype=tf.float16) # 類型轉換
計算圖
有三種計算圖的構建方式:靜態計算圖,動態計算圖,以及Autograph。在TensorFlow1.0時代,采用的是靜態計算圖,需要先使用TensorFlow的各種算子創建計算圖,然后再開啟一個會話Session,顯式執行計算圖。而在TensorFlow2.0時代,采用的是動態計算圖,即每使用一個算子后,該算子會被動態加入到隱含的默認計算圖中立即執行得到結果,而無需開啟Session。使用動態計算圖(Eager Excution)的好處是方便調試程序,它會讓TensorFlow代碼的表現和Python原生代碼的表現一樣,寫起來就像寫numpy一樣,各種日志打印,控制流全部都是可以使用的。使用動態計算圖的缺點是運行效率相對會低一些。因為使用動態圖會有許多次Python進程和TensorFlow的C++進程之間的通信。而靜態計算圖構建完成之后幾乎全部在TensorFlow內核上使用C++代碼執行,效率更高。此外靜態圖會對計算步驟進行一定的優化,剪去和結果無關的計算步驟。
如果需要在TensorFlow2.0中使用靜態圖,可以使用@tf.function裝飾器將普通Python函數轉換成對應的TensorFlow計算圖構建代碼。運行該函數就相當於在TensorFlow1.0中用Session執行代碼。使用tf.function構建靜態圖的方式叫做 Autograph。當然Autograph機制能夠轉換的代碼並不是沒有任何約束的,有一些編碼規范需要遵循,否則可能會轉換失敗或者不符合預期。
- 被@tf.function修飾的函數應盡可能使用TensorFlow中的函數而不是Python中的其他函數。例如使用tf.print而不是print,使用tf.range而不是range,使用tf.constant(True)而不是True.
- 避免在@tf.function修飾的函數內部定義tf.Variable
- 被@tf.function修飾的函數不可修改該函數外部的列表或字典等數據結構變量
計算圖由節點(nodes)和線(edges)組成。節點表示操作符Operator,或者稱之為算子,線表示計算間的依賴。實線表示有數據傳遞依賴,傳遞的數據即張量。虛線通常可以表示控制依賴,即執行先后順序。
import tensorflow as tf # 使用autograph構建靜態圖 @tf.function def strjoin(x,y): z = tf.strings.join([x,y],separator = " ") tf.print(z) return z result = strjoin(tf.constant("hello"),tf.constant("world")) print(result)
您可以像這樣測量靜態圖和動態圖性能差異:

x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32) def power(x, y): result = tf.eye(10, dtype=tf.dtypes.int32) for _ in range(y): result = tf.matmul(x, result) return result print("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000)) # 2.56378621799 # 將python函數轉換為圖形 power_as_graph = tf.function(power) print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000)) # 0.683253670
我們還可以再函數前使用裝飾器 @tf.function 調用tf.function,同時也可以使用 tf.config.run_functions_eagerly(True) 關閉Function創建和運行圖形的能力。
前面在介紹Autograph的編碼規范時提到構建Autograph時應該避免在@tf.function修飾的函數內部定義tf.Variable。但是如果在函數外部定義tf.Variable的話,又會顯得這個函數有外部變量依賴,封裝不夠完美。一種簡單的思路是定義一個類,並將相關的tf.Variable創建放在類的初始化方法中。而將函數的邏輯放在其他方法中。

class DemoModule(tf.Module): def __init__(self, init_value=tf.constant(0.0), name=None): super(DemoModule, self).__init__(name=name) with self.name_scope: # 相當於with tf.name_scope("demo_module") self.x = tf.Variable(init_value, dtype=tf.float32, trainable=True) @tf.function def addprint(self, a): with self.name_scope: self.x.assign_add(a) tf.print(self.x) return self.x
自動微分
自動微分用於訓練神經網絡的反向傳播非常有用,TensorFlow 會記住在前向傳遞過程中哪些運算以何種順序發生。隨后,在后向傳遞期間,以相反的順序遍歷此運算列表來計算梯度。
Tensorflow一般使用tf.GradientTape來記錄正向運算過程,然后反向傳播自動計算梯度值。
$$f(x)=ax^2+bx+c$$
x = tf.Variable(0.0,name = "x",dtype = tf.float32) a = tf.constant(1.0) b = tf.constant(-2.0) c = tf.constant(1.0) with tf.GradientTape() as tape: y = a*tf.pow(x,2) + b*x + c dy_dx = tape.gradient(y,x) print(dy_dx) # tf.Tensor(-2.0, shape=(), dtype=float32)
對常量張量也可以求導,只不過需要增加watch

with tf.GradientTape() as tape: tape.watch([a,b,c]) y = a*tf.pow(x,2) + b*x + c dy_dx,dy_da,dy_db,dy_dc = tape.gradient(y,[x,a,b,c]) print(dy_da) # tf.Tensor(0.0, shape=(), dtype=float32) print(dy_dc) # tf.Tensor(1.0, shape=(), dtype=float32)
可以求二階導數

with tf.GradientTape() as tape2: with tf.GradientTape() as tape1: y = a*tf.pow(x,2) + b*x + c dy_dx = tape1.gradient(y,x) dy2_dx2 = tape2.gradient(dy_dx,x) print(dy2_dx2) # tf.Tensor(2.0, shape=(), dtype=float32)
利用梯度和優化器求最小值

# 求f(x) = a*x**2 + b*x + c的最小值 # 使用optimizer.apply_gradients x = tf.Variable(0.0,name = "x",dtype = tf.float32) a = tf.constant(1.0) b = tf.constant(-2.0) c = tf.constant(1.0) optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) for _ in range(1000): with tf.GradientTape() as tape: y = a*tf.pow(x,2) + b*x + c dy_dx = tape.gradient(y,x) # 計算梯度 optimizer.apply_gradients(grads_and_vars=[(dy_dx,x)]) # 根據梯度更新變量 tf.print("y =",y,"; x =",x)
如果不想被計算梯度:
with tf.GradientTape(watch_accessed_variables=False) as tape: pass
使用TensorFlow實現神經網絡模型的一般流程包括:
- 准備數據
- 定義模型
- 訓練模型
- 評估模型
- 推理模型
- 保存模型
數據輸入
tensorflow支持 Numpy 數組、Pandas DataFrame、Python 生成器、csv文件、文本文件、文件路徑、TFrecords文件等方式構建數據管道。如果您的數據很小並且適合內存,我們建議您使用tf.data.Dataset.from_tensor_slices()從Numpy array構建數據管道
- Dataset:如果您有大型數據集並且需要進行分布式訓練
- Sequence:如果您有大型數據集並且需要執行大量在 TensorFlow 中無法完成的自定義 Python 端處理(例如,如果您依賴外部庫進行數據加載或預處理)
- 通過tfrecords文件方式構建數據管道較為復雜,需要對樣本構建tf.Example后壓縮成字符串寫到tfrecoreds文件,讀取后再解析成tf.Example。但tfrecoreds文件的優點是壓縮后文件較小,便於網絡傳播,加載速度較快。
Numpy構建數據
官方推薦使用 tf.data.Dataset.from_tensors() 或 tf.data.Dataset.from_tensor_slices() 創建數據集,Dataset支持一類特殊的操作Trainformation(打亂、生成epoch...等操作)
- data.map(function):將轉換函數映射到數據集每一個元素
- data.batch(batch_size):構建batch
- data.shuffle(buffer_size):隨機打亂輸入數據,從該緩沖區中隨機采樣元素
- data.repeat():repeat的功能就是將整個序列重復多次,一般不帶參數
- data.prefetch(tf.data.experimental.AUTOTUNE) :預先取 數據
- data.take():采樣,從開始位置取前幾個元素
- ...
features = np.arange(0, 100, dtype=np.int32) # # (100,) labels = np.zeros(100, dtype=np.int32) # (100,) data = tf.data.Dataset.from_tensor_slices((features, labels)) # 創建數據集 data = data.repeat() # 無限期地補充數據 data = data.shuffle(buffer_size=100) # 打亂數據 data = data.batch(batch_size=4) # 批量數據 data = data.prefetch(buffer_size=1) # 預取批處理(預加載批處理,消耗更快) for batch_x, batch_y in data.take(5): print(batch_x.shape, batch_y.shape) # (4,) (4,) break
注意:如果你打算多次調用,你可以使用迭代器的方式:

ite_data = iter(data) for i in range(5): batch_x, batch_y = next(ite_data) print(batch_x, batch_y) for i in range(5): batch_x, batch_y = next(ite_data) print(batch_x, batch_y)
提升管道性能
訓練深度學習模型常常會非常耗時。模型訓練的耗時主要來自於兩個部分,一部分來自數據准備,另一部分來自參數迭代。參數迭代過程的耗時通常依賴於GPU來提升。而數據准備過程的耗時則可以通過構建高效的數據管道進行提升。
以下是一些構建高效數據管道的建議。
- 使用 prefetch 方法讓數據准備和參數迭代兩個過程相互並行。
- 使用 interleave 方法可以讓數據讀取過程多進程執行,並將不同來源數據夾在一起。
- 使用 map 時設置num_parallel_calls 讓數據轉換過程多進行執行。
- 使用 cache 方法讓數據在第一個epoch后緩存到內存中,僅限於數據集不大情形。
- 使用 map轉換時,先batch,然后采用向量化的轉換方法對每個batch進行轉換。
生成器構建數據
def generate_features(): # 函數生成一個隨機字符串 def random_string(length): return ''.join(random.choice(string.ascii_letters) for m in range(length)) # 返回一個隨機字符串、一個隨機向量和一個隨機整數 yield random_string(4), np.random.uniform(size=4), random.randint(0, 10) data = tf.data.Dataset.from_generator(generate_features, output_types=(tf.string, tf.float32, tf.int32)) data = data.repeat() # 無限期地補充數據 data = data.shuffle(buffer_size=100) # 打亂數據 data = data.batch(batch_size=4) # 批量數據(將記錄聚合在一起) data = data.prefetch(buffer_size=1) # 預取批量(預加載批量以便更快的消耗) # Display data. for batch_str, batch_vector, batch_int in data.take(5): # (4,) (4, 4) (4,) print(batch_str.shape, batch_vector.shape, batch_int.shape)
keras.utils.Sequence
特別是,keras.utils.Sequence該類提供了一個簡單的接口來構建 Python 數據生成器,該生成器可以感知多處理並且可以洗牌。
Sequence必須實現兩種方法:
- __getitem__:返回一個batch數據
- __len__:整型,返回batch的數量
import tensorflow as tf from keras.utils.data_utils import Sequence class SequenceDataset(Sequence): def __init__(self, batch_size): self.input_data = tf.random.normal((640, 8192, 1)) self.label_data = tf.random.normal((640, 8192, 1)) self.batch_size = batch_size def __len__(self): return int(tf.math.ceil(len(self.input_data) / float(self.batch_size))) # 每次輸出一個batch def __getitem__(self, idx): batch_x = self.input_data[idx * self.batch_size:(idx + 1) * self.batch_size] batch_y = self.label_data[idx * self.batch_size:(idx + 1) * self.batch_size] return batch_x, batch_y sequence = SequenceDataset(batch_size=64) for batch_idx, (x, y) in enumerate(sequence): print(batch_idx, x.shape, y.shape) # tf.float32 # 0 (64, 8192, 1) (64, 8192, 1) break
搭建模型
深度學習模型一般由各種模型層組合而成,如果這些內置模型層不能夠滿足需求,我們也可以通過編寫tf.keras.Lambda匿名模型層或繼承tf.keras.layers.Layer基類構建自定義的模型層。其中tf.keras.Lambda匿名模型層只適用於構造沒有學習參數的模型層。
搭建模型有以下3種方式構建模型:
- Sequential順序模型:用於簡單的層堆棧, 其中每一層恰好有一個輸入張量和一個輸出張量
- 函數式API模型:多輸入多輸出,或者模型需要共享權重,或者模型具有殘差連接等非順序結構,
- 繼承Model基類自定義模型:如果無特定必要,盡可能避免使用Model子類化的方式構建模型,這種方式提供了極大的靈活性,但也有更大的概率出錯
順序建模
model = keras.Sequential([ layers.Dense(2, activation="relu", name="layer1"), layers.Dense(3, activation="relu", name="layer2"), layers.Dense(4, name="layer3"), ])
還可以通過add方法創建順序模型
model = keras.Sequential() model.add(layers.Dense(2, activation="relu")) model.add(layers.Dense(3, activation="relu")) model.add(layers.Dense(4))
因為模型不知道輸入shape,所以起初模型沒有權重,因此我們需要告知模型輸入shape
# 在第一層添加Input model.add(keras.Input(shape=(4,))) model.add(layers.Dense(2, activation="relu")) # 或者在第一層添加input_shape model.add(layers.Dense(2, activation="relu", input_shape=(4,)))
順序模型可以配合add 和 model.summary() 在模型的任何位置查看該層的輸入輸出。
函數式API建模
函數式API搭建模型比Sequential更加靈活,可以處理具有非線性拓撲、共享層甚至多個輸入或輸出的模型。
inputs = keras.Input(shape=(784,)) x = layers.Dense(64, activation="relu")(inputs) x = layers.Dense(64, activation="relu")(x) outputs = layers.Dense(10)(x) model = keras.Model(inputs=inputs, outputs=outputs, name="mnist_model") model.summary() # 查看模型摘要
還可以將模型繪制為圖形
keras.utils.plot_model(model, "my_first_model_with_shape_info.png", show_shapes=True)
補充:函數式模型是可以嵌套的
自定義建模
在 TensorFlow 中,模型類的繼承關系為:
tf.keras.Model (模型) > tf.keras.layers.Layer (層) > tf.Module (模塊)。
通常使用Layer類來定義內部計算塊,並使用Model類定義外部模型(訓練的對象)。
繼承 tf.Module 栗子 (主要用於創建模塊):

import tensorflow as tf # 我們繼承tf.Module 搭建一個Dense層 class CustomDense(tf.Module): def __init__(self, in_features, out_features, **kwargs): super().__init__(**kwargs) self.weight = tf.Variable(tf.random.normal([in_features, out_features]), name='weight') self.bias = tf.Variable(tf.zeros([out_features]), name='bias') def __call__(self, x): x = tf.matmul(x, self.weight) + self.bias x = tf.nn.relu(x) return x model = CustomDense(in_features=5, out_features=10) x = tf.random.normal(shape=(64, 5)) outputs = model(x) print("CustomDense outputs", outputs.shape) # (64, 10)
繼承 tf.keras.layers.Layer 栗子 (主要用於創建層):

# 使用CustomDense 搭建一個模型 class CustomLayer(tf.keras.layers.Layer): # 添加 **kwargs來支持基本Keras層參數 def __init__(self, in_features, out_features, **kwargs): super().__init__(**kwargs) # 這將很快轉移到 build ;見下代碼 self.weight = tf.Variable(tf.random.normal([in_features, out_features]), name='weight') self.bias = tf.Variable(tf.zeros([out_features]), name='bias') # Keras 層有自己的 __call__,他然后調用 call() def call(self, x): x = tf.matmul(x, self.weight) + self.bias x = tf.nn.relu(x) return x model = CustomLayer(in_features=5, out_features=10) x = tf.random.normal(shape=(64, 5)) outputs = model(x) print("CustomLayer outputs", outputs.shape) # (64, 10)

class FlexibleDense(tf.keras.layers.Layer): # 添加 **kwargs來支持基本Keras層參數 def __init__(self, out_features, **kwargs): super().__init__(**kwargs) self.out_features = out_features # 創建層的狀態(權重) # 在call前會先調用 build,使得權重的形狀固定下來,這樣我們就不用操心輸入的shape def build(self, input_shape): self.weight = tf.Variable(tf.random.normal([input_shape[-1], self.out_features]), name='weight') self.bias = tf.Variable(tf.zeros([self.out_features]), name='bias') def call(self, inputs): return tf.matmul(inputs, self.weight) + self.bias model = FlexibleDense(out_features=10) x = tf.random.normal(shape=(64, 5)) outputs = model(x) print("FlexibleDense outputs", outputs.shape) # (64, 10)
繼承 tf.keras.Model 栗子 (主要用於創建模型):

class CustomModel(tf.keras.Model): def __init__(self, in_features=5, out_features=10, **kwargs): super().__init__(**kwargs) self.dense_1 = CustomLayer(in_features, out_features) self.dense_2 = CustomLayer(out_features, out_features) def call(self, x): x = self.dense_1(x) x = self.dense_2(x) return x model = CustomLayer(in_features=5, out_features=10) x = tf.random.normal(shape=(64, 5)) outputs = model(x) print("CustomModel outputs", outputs.shape) # (64, 10)
keras層具有許多額外的功能:
- 可選損失函數
- 對度量函數的支持
- 對可選 training 參數的內置支持,用於區分訓練和推斷用途
- get_config 和 from_config 方法,允許您准確存儲配置以在 Python 中克隆模型
keras.Model類具有keras.Layer類相同的API,但有以下區別:
- Model類 提供了內置的訓練 model.fit() 、評估 model.evaluate() 和預測 model.predict() API
- Model類 可以通過 model.layers 屬性公開內層的列表。
- Model類 提供了保存和序列化 API save()、save_weights() ...
Layer 類對應於“層”,Model 類對應於“模型”,如果您想知道“我應該使用Layer類還是Model類?”,請問自己:我需要調用fit()它嗎?我需要save() 嗎?如果是這樣,請與Model。如果不是,請使用Layer。
我們來吃個栗子,用 layers.Layer 搭建層(Layer 之間可以相互調用),用keras.Model搭建模型(Model 之間可以相互調用):

import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers class Sampling(layers.Layer): """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit.""" def call(self, inputs): z_mean, z_log_var = inputs batch = tf.shape(z_mean)[0] dim = tf.shape(z_mean)[1] epsilon = tf.keras.backend.random_normal(shape=(batch, dim)) return z_mean + tf.exp(0.5 * z_log_var) * epsilon class Encoder(layers.Layer): """Maps MNIST digits to a triplet (z_mean, z_log_var, z).""" def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs): super(Encoder, self).__init__(name=name, **kwargs) self.dense_proj = layers.Dense(intermediate_dim, activation="relu") self.dense_mean = layers.Dense(latent_dim) self.dense_log_var = layers.Dense(latent_dim) self.sampling = Sampling() def call(self, inputs): x = self.dense_proj(inputs) z_mean = self.dense_mean(x) z_log_var = self.dense_log_var(x) z = self.sampling((z_mean, z_log_var)) return z_mean, z_log_var, z class Decoder(layers.Layer): """Converts z, the encoded digit vector, back into a readable digit.""" def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs): super(Decoder, self).__init__(name=name, **kwargs) self.dense_proj = layers.Dense(intermediate_dim, activation="relu") self.dense_output = layers.Dense(original_dim, activation="sigmoid") def call(self, inputs): x = self.dense_proj(inputs) return self.dense_output(x) class VariationalAutoEncoder(keras.Model): """Combines the encoder and decoder into an end-to-end model for training.""" def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name="autoencoder", **kwargs): super(VariationalAutoEncoder, self).__init__(name=name, **kwargs) self.original_dim = original_dim self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim) self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim) def call(self, inputs): z_mean, z_log_var, z = self.encoder(inputs) reconstructed = self.decoder(z) # Add KL divergence regularization loss. kl_loss = -0.5 * tf.reduce_mean( z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1 ) self.add_loss(kl_loss) # 添加損失 return reconstructed
我們可以搭建一個自定義訓練模塊,來訓練上面的模型

original_dim = 784 vae = VariationalAutoEncoder(original_dim, 64, 32) optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) mse_loss_fn = tf.keras.losses.MeanSquaredError() loss_metric = tf.keras.metrics.Mean() (x_train, _), _ = tf.keras.datasets.mnist.load_data() x_train = x_train.reshape(60000, 784).astype("float32") / 255 train_dataset = tf.data.Dataset.from_tensor_slices(x_train) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64) epochs = 2 # Iterate over epochs. for epoch in range(epochs): print("Start of epoch %d" % (epoch,)) # Iterate over the batches of the dataset. for step, x_batch_train in enumerate(train_dataset): with tf.GradientTape() as tape: reconstructed = vae(x_batch_train) # Compute reconstruction loss loss = mse_loss_fn(x_batch_train, reconstructed) loss += sum(vae.losses) # Add KLD regularization loss grads = tape.gradient(loss, vae.trainable_weights) optimizer.apply_gradients(zip(grads, vae.trainable_weights)) loss_metric(loss) if step % 100 == 0: print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
由於上面的模型是 keras.Model 的子類,它具有內置循環方法(model.fit)

vae = VariationalAutoEncoder(784, 64, 32) optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError()) vae.fit(x_train, x_train, epochs=2, batch_size=64)
當然自定義的 keras.Model 還可以和函數式API 混搭訓練

original_dim = 784 intermediate_dim = 64 latent_dim = 32 # Define encoder model. original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input") x = layers.Dense(intermediate_dim, activation="relu")(original_inputs) z_mean = layers.Dense(latent_dim, name="z_mean")(x) z_log_var = layers.Dense(latent_dim, name="z_log_var")(x) z = Sampling()((z_mean, z_log_var)) encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder") # Define decoder model. latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling") x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs) outputs = layers.Dense(original_dim, activation="sigmoid")(x) decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder") # Define VAE model. outputs = decoder(z) vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae") # Add KL divergence regularization loss. kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1) vae.add_loss(kl_loss) # Train. optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError()) vae.fit(x_train, x_train, epochs=3, batch_size=64) 函數式API訓練模型
補充知識:自定義層
如果自定義模型層沒有需要被訓練的參數,一般推薦使用Lamda層實現。
mypower = layers.Lambda(lambda x:tf.math.pow(x,2)) mypower(tf.range(5))
如果自定義模型層有需要被訓練的參數,則可以通過繼承Layer基類實現。Layer的子類化一般需要重新實現初始化方法,Build方法和Call方法。如果built = False,調用__call__時會先調用build方法, 再調用call方法。
損失函數
一般來說,損失值 由 損失函數 和 正則化項 組成(Objective = Loss + Regularization)。對於keras模型,正則化項一般在各層中指定,例如使用Dense的 kernel_regularizer 和 bias_regularizer 等參數指定權重使用 L1 或者 L2 正則化項,此外還可以用kernel_constraint 和 bias_constraint 等參數約束權重的取值范圍,這也是一種正則化手段。
損失函數在 model.compile 時候指定。
- 對於回歸模型,通常使用的損失函數是平方損失函數 mean_squared_error,簡寫為 mse,類實現形式為 MeanSquaredError 和 MSE
- 對於二分類模型,通常使用的是二元交叉熵損失函數 binary_crossentropy。
- 對於多分類模型,如果label是one-hot編碼的,則使用交叉熵損失函數 categorical_crossentropy。如果label是序號編碼的,則需要使用稀疏類別交叉熵損失函數 sparse_categorical_crossentropy。
- 如果有需要,也可以自定義損失函數,自定義損失函數需要接收兩個張量y_true,y_pred作為輸入參數,並輸出一個標量作為損失函數值。
常見的Loss可以參看Tensorflow的官網:tf.keras.losses
自定義損失
keras提供了兩種自定義損失:
函數實現:輸入 y_true 和 y_pred,返回 損失值
def custom_mean_squared_error(y_true, y_pred): return tf.math.reduce_mean(tf.square(y_true - y_pred)) model.compile(optimizer=..., loss=custom_mean_squared_error)
類實現:繼承 tf.keras.losses.Loss ,在init中初始化參數,在call中重寫計算邏輯。
class CustomMSE(keras.losses.Loss): def __init__(self, regularization_factor=0.1, name="custom_mse"): super().__init__(name=name) self.regularization_factor = regularization_factor def call(self, y_true, y_pred): mse = tf.math.reduce_mean(tf.square(y_true - y_pred)) reg = tf.math.reduce_mean(tf.square(0.5 - y_pred)) return mse + reg * self.regularization_factor model.compile(optimizer=..., loss=CustomMSE())
度量函數
人們通常會通過度量函數來從另一個方面 評估模型的好壞,可以在 model.compile 時,通過列表形式指定單/多個評估指標。
- model.compile 時,可以
- 也可以自定義評估指標。自定義評估指標需要接收兩個張量y_true,y_pred作為輸入參數,並輸出一個標量作為評估值。
- 也可以繼承 tf.keras.metrics.Metric 自定義度量方法,update_state方法,result方法實現評估指標的計算邏輯,從而得到評估指標的類的實現形式。
常見的Metric可以參看Tensorflow的官網:tf.keras.metrics
自定義度量
繼承 tf.keras.metrics.Metric,並且實現4個類方法:
- __init__(self):創建狀態變量
- update_state(self, y_true, y_pred, sample_weight=None):根據 y_true 和 y_pred 來更新狀態變量
- result(self):使用狀態變量來計算 最終結果
- reset_state(self):重新初始化狀態變量

class SnrCustomMetrics(metrics.Metric): def __init__(self, name="snr_metrics", **kwargs): # 創建狀態變量 super(SnrCustomMetrics, self).__init__(name=name, **kwargs) self.metrics_value = self.add_weight(name="snr_value", initializer="zeros") def update_state(self, y_true, y_pred, sample_weight=None): """使用目標 y_true 和模型預測 y_pred 來更新狀態變量 :param y_true: target :param y_pred: 預測值 :param sample_weight: 權重 """ labels_pow = tf.pow(y_true, 2) noise_pow = tf.pow(y_pred - y_true, 2) snr_value = 10 * tf.math.log(tf.reduce_sum(labels_pow) / tf.reduce_sum(noise_pow)) / tf.math.log(10.) if sample_weight is not None: sample_weight = tf.cast(sample_weight, "float32") snr_value = tf.multiply(snr_value, sample_weight) self.metrics_value.assign_add(snr_value) # 使用狀態變量來計算最終結果 def result(self): return self.metrics_value # metrics 的狀態將在每個epoch開始時重置。 def reset_state(self): self.metrics_value.assign(0.0) model.compile(optimizer=..., loss=..., metrics=[SnrCustomMetrics()])
優化器
機器學習界有一群煉丹師,他們每天的日常是:拿來葯材(數據),架起八卦爐(模型),點着六味真火(優化算法),就搖着蒲扇等着丹葯出爐了。不過,當過廚子的都知道,同樣的食材,同樣的菜譜,但火候不一樣了,這出來的口味可是千差萬別。火小了夾生,火大了易糊,火不勻則半生半糊。機器學習也是一樣,模型優化算法的選擇直接關系到最終模型的性能。有時候效果不好,未必是特征的問題或者模型設計的問題,很可能就是優化算法的問題。
深度學習優化算法大概經歷了 SGD -> SGDM -> NAG ->Adagrad -> Adadelta(RMSprop) -> Adam -> Nadam 這樣的發展歷程。model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)
- SGD:默認參數為純SGD, 設置momentum參數不為0實際上變成SGDM, 考慮了一階動量, 設置 nesterov為True后變成NAG,即 Nesterov Acceleration Gradient,在計算梯度時計算的是向前走一步所在位置的梯度
- Adagrad:考慮了二階動量,對於不同的參數有不同的學習率,即自適應學習率。缺點是學習率單調下降,可能后期學習速率過慢乃至提前停止學習
- RMSprop:考慮了二階動量,對於不同的參數有不同的學習率,即自適應學習率,對Adagrad進行了優化,通過指數平滑只考慮一定窗口內的二階動量
- Adadelta:考慮了二階動量,與RMSprop類似,但是更加復雜一些,自適應性更強
- Adam:同時考慮了一階動量和二階動量,可以看成RMSprop上進一步考慮了Momentum
- Nadam:在Adam基礎上進一步考慮了 Nesterov Acceleration
對於一般新手煉丹師,優化器直接使用Adam,並使用其默認參數就OK了。一些愛寫論文的煉丹師由於追求評估指標效果,可能會偏愛前期使用Adam優化器快速下降,后期使用SGD並精調優化器參數得到更好的結果。此外目前也有一些前沿的優化算法,據稱效果比Adam更好,例如LazyAdam,Look-ahead,RAdam,Ranger等。
初始化優化器時會創建一個變量optimier.iterations用於記錄迭代的次數。因此優化器和tf.Variable一樣,一般在@tf.function外創建。
優化器主要使用apply_gradients方法傳入變量和對應梯度從而來對給定變量進行迭代,
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) with tf.GradientTape() as tape: ... grads = tape.gradient(loss, model.trainable_weights) # 根據損失 求梯度 optimizer.apply_gradients(zip(grads, model.trainable_weights)) # 根據梯度 優化模型
或者直接使用minimize方法對目標函數進行迭代優化。
@tf.function def train(epoch=1000): for _ in tf.range(epoch): optimizer.minimize(loss, model.trainable_weights) tf.print("epoch = ", optimizer.iterations) return loss
當然,更常見的使用是在編譯時將優化器傳入model.fit
model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)
回調函數
tf.keras的回調函數實際上是一個類,一般是在model.fit時作為參數指定,一般收集一些日志信息,改變學習率等超參數,提前終止訓練過程等等
大部分時候,keras.callbacks子模塊中定義的回調函數類已經足夠使用了,如果有特定的需要,我們也可以通過對keras.callbacks.Callbacks實施子類化構造自定義的回調函數
- BaseLogger: 收集每個epoch上metrics平均值,對stateful_metrics參數中的帶中間狀態的指標直接拿最終值無需對各個batch平均,指標均值結果將添加到logs變量中。該回調函數被所有模型默認添加,且是第一個被添加的
- History: 將BaseLogger計算的各個epoch的metrics結果記錄到history這個dict變量中,並作為model.fit的返回值。該回調函數被所有模型默認添加,在BaseLogger之后被添加
- EarlyStopping: 當被監控指標在設定的若干個epoch后沒有提升,則提前終止訓練
- TensorBoard: 為Tensorboard可視化保存日志信息。支持評估指標,計算圖,模型參數等的可視化
- ModelCheckpoint: 在每個epoch后保存模型
- ReduceLROnPlateau:如果監控指標在設定的若干個epoch后沒有提升,則以一定的因子減少學習率
- TerminateOnNaN:如果遇到loss為NaN,提前終止訓練
- LearningRateScheduler:學習率控制器。給定學習率lr和epoch的函數關系,根據該函數關系在每個epoch前調整學習率
- CSVLogger:將每個epoch后的logs結果記錄到CSV文件中
- ProgbarLogger:將每個epoch后的logs結果打印到標准輸出流中
EarlyStopping
當監控的指標停止改進時停止訓練
tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=0, verbose=0, mode='auto', baseline=None, restore_best_weights=False)
- monitor:要監控的 對象
- min_delta:被監測對象的最小變化 小於 min_delta 的絕對變化,將被視為沒有改進
- patience:如果經過num個epoch,監控對象沒有改善,則停止訓練
- verbose:詳細模式。1打印過程信息,0不打印
- mode:{"auto", "min", "max"}之一。
- min,當監控的數量停止減少時,培訓停止
- max,當監控的數量停止增加時,它將停止
- auto,方向會從被監控對象的名稱中自動推斷出來
- restore_best_weights:是否從監測量的最佳值的epoch恢復模型權重。如果為 False,則使用在訓練的最后一步獲得的模型權重
ModelCheckpoint
以某種頻率保存 Keras 模型或模型權重
tf.keras.callbacks.ModelCheckpoint( filepath, monitor='val_loss', verbose=0, save_best_only=False, save_weights_only=False, mode='auto', save_freq='epoch', options=None, initial_value_threshold=None, **kwargs )
- filepath:保存模型文件的路徑
- monitor:監控指標
- save_best_only:是否僅保存最佳模型
- save_weights_only:是否僅保存模型權重
- save_freq:epoch 或整數
Tensorboard
Tensorboard有助於追蹤模型訓練過程的Scalars、Graphs、Distributions等等
- Scalars:顯示損失和指標在每個時期如何變化。 還可以使用它來跟蹤訓練速度,學習率和其他標量值。
- Graphs:可幫助您可視化模型。
- Distributions 和 Histograms 顯示張量隨時間的分布。 可以 可視化權重和偏差並驗證它們是否以預期的方式變化
tf.keras.callbacks.TensorBoard( log_dir='logs', histogram_freq=0, write_graph=True, write_images=False, write_steps_per_second=False, update_freq='epoch', profile_batch=0, embeddings_freq=0, embeddings_metadata=None, **kwargs )
- log_dir:TensorBoard的日志路徑
- histogram_freq:計算模型層的激活和權重直方圖的頻率(以時期為單位)。如果設置為 0,則不會計算直方圖。
- write_graph:是否在 TensorBoard 中可視化graph。
- write_steps_per_second:是否將每step的訓練步數記錄到 Tensorboard 中。這支持epoch和batch頻率記錄。
- update_freq:'batch'或'epoch'或整數。使用 時'batch',在每批之后將損失和指標寫入 TensorBoard。
在model.fit中使用
當使用Model.fit() 函數進行訓練時, 添加 tf.keras.callback.TensorBoard 回調函數可確保創建和存儲日志
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./event_file") model.fit(... callbacks=[tensorboard_callback])
在自定義訓練框架中使用
summary_writer = tf.summary.create_file_writer("./event_file") with summary_writer.as_default(): tf.summary.scalar('train/loss', train_loss, step=epoch) tf.summary.scalar('train/accuracy', train_accuracy, step=epoch) tf.summary.scalar('val/loss', val_loss, step=epoch) tf.summary.scalar('val/accuracy', val_accuracy, step=epoch)
啟動Tensorboard,在當前文件夾中,cmd運行:
tensorboard --logdir "./"
訓練模型
訓練模型通常有3種方法,
- 內置fit方法:支持對numpy array,tf.data.Dataset以及 Python generator數據進行訓練,並且可以通過設置回調函數實現對訓練過程的復雜控制邏輯。
- 內置train_on_batch方法
- 自定義訓練循環
model.fit
model.fit( x=None, y=None, batch_size=None, epochs=1, verbose='auto', callbacks=None, validation_split=0.0, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None)
參數:
- x:輸入數據,可以是:
- numpy數組、數組列表(如果模型有多個輸入)
- Tensorflow張量或張量列表(如果模型有多個輸入)
- 如果模型具有命名輸入,則將輸入名稱映射到相應的數組/張量的字典。
- tf.data數據集,應該返回一個(inputs, targets)或 的元組(inputs, targets, sample_weights)
- 生成器或keras.utils.Sequence返回(inputs, targets) 或(inputs, targets, sample_weights)。
- y:目標數據。與輸入數據一樣
x
,它可以是 Numpy 數組或 TensorFlow 張量。它應該是一致的x
(你不能有 Numpy 輸入和張量目標,或者相反)。如果x是數據集、生成器或
keras.utils.Sequence
實例,y
則不應指定(因為目標將從 獲取x
)。 - batch_size:每次梯度更新的樣本數。如果未指定,將默認為 32。如果您的數據是數據集、生成器或實例的形式(因為它們生成批次),請不要指定。
- epochs:訓練模型的周期數
- verbose:'auto'、0、1 或 2。詳細模式。0 =靜默,1 = 進度條,2 = 每個 epoch 一行。'auto' 在大多數情況下默認為 1
- callbacks:訓練期間調用的回調列表。見
tf.keras.callbacks
。 - validation_split:在 0 和 1 之間浮動。從訓練數據集中分離一部分數據用於驗證,並將在每個 epoch 結束時評估該數據的損失和指標
- validation_data:驗證數據集
- shuffle:布爾值(是否在每個 epoch 之前對訓練數據進行洗牌)
- initial_epoch:整數。開始訓練的epoch(對於恢復之前的訓練運行很有用)。
返回:history,調用 history.history 可以查看訓練期間損失值和度量值的記錄
train_on_batch
該內置方法相比較fit方法更加靈活,可以不通過回調函數而直接在batch層次上更加精細地控制訓練的過程。
for epoch in tf.range(1, epoches + 1): for x, y in ds_train: train_result = model.train_on_batch(x, y) for x, y in ds_valid: valid_result = model.test_on_batch(x, y, reset_metrics=False)
自定義訓練循環
自定義訓練循環無需編譯模型,直接利用優化器根據損失函數反向傳播迭代參數,擁有最高的靈活性。
訓練循環包括按順序重復執行三個任務:
- 給模型輸入batch數據以生成輸出
- 通過將輸出與標簽進行比較來計算損失
- 使用GradientTape計算梯度
- 使用這些梯度優化變量
optimizer = keras.optimizers.SGD(learning_rate=1e-3) # 實例化一個優化器 loss_fn = keras.losses.BinaryCrossentropy() # 實例化損失函數 train_loss = keras.metrics.Mean(name='train_loss') valid_loss = keras.metrics.Mean(name='valid_loss') train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy') valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy') @tf.function def train_step(features, labels): with tf.GradientTape() as tape: logits = model(features, training=True) loss_value = loss_fn(labels, logits) # loss_value += sum(model.losses) # 添加額外的損失 grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) train_loss.update_state(loss_value) train_metric.update_state(labels, logits) @tf.function def valid_step(features, labels): val_logits = model(features, training=False) loss_value = loss_fn(labels, val_logits) valid_loss.update_state(loss_value) valid_metric.update_state(labels, val_logits) epochs = 2 for epoch in range(epochs): start_time = time.time() for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): loss_value = train_step(x_batch_train, y_batch_train) # 在每個epoch結束時運行驗證循環 for x_batch_val, y_batch_val in val_dataset: valid_step(x_batch_val, y_batch_val) if epoch % 5 == 0: print('Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'.format(epoch, train_loss.result(), train_metric.result(), valid_loss.result(), valid_metric.result())) train_loss.reset_states() valid_loss.reset_states() train_metric.reset_states() valid_metric.reset_states() print("運行時間: %.2fs" % (time.time() - start_time))
評估模型
通過自定義訓練循環訓練的模型沒有經過編譯,無法直接使用model.evaluate(ds_valid)方法
model.evaluate(x = x_test,y = y_test)
推理模型
可以使用以下方法:
model.predict(ds_test)
model(x_test)
model.call(x_test)
model.predict_on_batch(x_test)
推薦優先使用model.predict(ds_test)方法,既可以對Dataset,也可以對Tensor使用。
保存和加載模型
Keras模型由多個組件組成:
- 模型架構,指定模型包含的層及其連接方式
- 權重值
- 優化器及其狀態(如果有)(這使您可以在離開的地方重新開始訓練)
- 模型的編譯信息(損失和度量,通過編譯模型或通過調用 add_loss() 或 add_metric() 來定義)
您可以通過 Keras API 將這些片段一次性保存到磁盤,或僅選擇性地保存其中某一部分:
- 將所有內容以 TensorFlow SavedModel 格式(或較早的 Keras H5 格式)保存
- 僅保存架構/配置,通常保存為 JSON 文件
- 僅保存權重值。通常在訓練模型時使用
保存和加載整個模型
低級API tf.saved_model
- 保存模型: tf.saved_model.save(model, path_to_dir)
- 加載模型: model = tf.saved_model.load(path_to_dir)
高級API tf.keras.Model (如果我們的模型不包含自定義子類 keras 模型,則非常推薦)
- 保存模型:
- model.save(SaveModel_path)
- tf.keras.models.save_model(model, SaveModel_path)
- 加載模型:
- model = tf.keras.models.load_model(SaveModel_path)
SaveModel_path = 'path/to/location' model.save(SaveModel_path) # 或者 tf.keras.models.save_model(model, SaveModel_path) # 加載模型 model = tf.keras.models.load_model(SaveModel_path)
SavedModel 格式會存儲類名稱、調用函數、損失和權重(如果已實現,還包括配置)。調用函數會定義模型/層的計算圖。
如果我們 將 save_format='h5' 傳遞給 save(),則模型將以H5的格式保存
- Keras H5 格式(較舊的):包含模型架構、權重值和compile()信息。它是SavedModel的輕量級替代品。
- 通過model.add_loss()和model.add_metric()添加的外部損失和度量 不會被保存
- 自定義對象(如自定義層)的計算圖不包含在保存的h5文件中。在加載時,Keras需要訪問這些對象的Python類/函數來重構模型。
保存模型架構
模型的配置(或架構)指定模型包含的層,以及這些層的連接方式*。如果您有模型的配置,則可以使用權重的新初始化狀態創建模型,而無需編譯信息。
*請注意,這僅適用於使用函數式或序列式 API 定義的模型,不適用於子類化模型。
get_config() 和 from_config()
調用 config = model.get_config() 將返回一個包含模型配置的 Python 字典。然后可以通過 Sequential.from_config(config) (針對 Sequential 模型)或 Model.from_config(config) (針對函數式 API 模型)重建同一模型。

# -----------------層示例 layer = keras.layers.Dense(3, activation="relu") layer_config = layer.get_config() new_layer = keras.layers.Dense.from_config(layer_config) # -----------------序列模型示例 model = keras.Sequential([keras.Input((32,)), keras.layers.Dense(1)]) config = model.get_config() new_model = keras.Sequential.from_config(config) # -----------------函數式模式示例 inputs = keras.Input((32,)) outputs = keras.layers.Dense(1)(inputs) model = keras.Model(inputs, outputs) config = model.get_config() new_model = keras.Model.from_config(config)
to_json() 和 tf.keras.models.model_from_json()
這與 get_config / from_config 類似,不同之處在於它會將模型轉換成 JSON 字符串,之后該字符串可以在沒有原始模型類的情況下進行加載。它還特定於Model,不適用於Layer。
json_str = model.to_json() # 保存模型結構 model_json = tf.keras.models.model_from_json(json_str) # 恢復模型結構
自定義對象
模型和層
子類化模型和層的架構在 __init__ 和 call 方法中進行定義。無法將其序列化為與 JSON 兼容的配置。
為了保存/加載帶有自定義層的模型或子類化模型,您應該重寫 get_config 和 from_config(可選)方法。此外,您還應該注冊自定義對象,以便 Keras 能夠感知它。
自定義函數
自定義函數(如激活損失或初始化)不需要 get_config 方法。只需將函數名稱注冊為自定義對象,就足以進行加載。
保存模型權重
您可以選擇僅保存和加載模型的權重。這可能對以下情況有用:
- 您只需使用模型進行推斷:在這種情況下,您無需重新開始訓練,因此不需要編譯信息或優化器狀態。
- 您正在進行遷移學習:在這種情況下,您需要重用先驗模型的狀態來訓練新模型,因此不需要先驗模型的編譯信息。
- 如果我們的模型全是又自定義Model或Layer,則推薦用這個
在內存中保存和加載模型權重
- tf.keras.layers.Layer.get_weights() :返回 Numpy 數組列表。
- tf.keras.layers.Layer.set_weights() :將模型權重設置為 weights 參數中的值。
def create_layer(): layer = keras.layers.Dense(64, activation="relu", name="dense_2") layer.build((None, 784)) return layer layer_1 = create_layer() layer_2 = create_layer() # Copy weights from layer 1 to layer 2 layer_2.set_weights(layer_1.get_weights())
在磁盤中保存和加載模型權重
model.save_weights:將權重保存到磁盤,我們可以以save_format="tf" 或 save_format="h5"格式保存
# 保存模型權重 model.save_weights(...) # 恢復模型結構 model_json = models.model_from_json(json_str) model_json.compile(...) # 加載權重 model_json.load_weights(...)
更多詳情請參見:保存並加載Keras模型
GPU訓練
指定GPU訓練
深度學習的訓練過程常常非常耗時,一個模型訓練幾個小時是家常便飯,訓練幾天也是常有的事情,有時候甚至要訓練幾十天。
訓練過程的耗時主要來自於兩個部分,一部分來自數據准備,另一部分來自參數更新。
數據准備過程可以使用更多進程處理數據來縮減時間。參數更新時間可以應用GPU或者Google的TPU來進行加速。
當存在可用的GPU時,如果不特意指定device,tensorflow會自動優先選擇使用GPU來創建張量和執行張量計算。但如果是在公司或者學校實驗室的服務器環境,存在多個GPU和多個使用者時,為了不讓單個同學的任務占用全部GPU資源導致其他同學無法使用(tensorflow默認獲取全部GPU的全部內存資源權限,但實際上只使用一個GPU的部分資源),我們通常會在開頭增加以下幾行代碼以控制每個任務使用的GPU編號和顯存大小,以便其他同學也能夠同時訓練模型。
gpus = tf.config.list_physical_devices("GPU") tf.print(gpus) # [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), # PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'), # PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'), # PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')] if gpus: gpu0 = gpus[0] # 如果有多個GPU,僅使用第0個GPU tf.config.experimental.set_memory_growth(gpu0, True) # 設置GPU顯存用量按需使用 # 或者也可以設置GPU顯存為固定使用量(例如:4G) # tf.config.experimental.set_virtual_device_configuration(gpu0, # [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)]) tf.config.set_visible_devices([gpu0], "GPU")
單機多卡訓練
TensorFlow 在 tf.distribute.MirroredStrategy 中為我們提供了單機多卡訓練策略,使用這種策略時,我們只需實例化一個 MirroredStrategy 策略,並將模型構建的代碼放入 strategy.scope() 的上下文環境中:
strategy = tf.distribute.MirroredStrategy() with strategy.scope(): # 模型構建代碼
可以在參數中指定設備,如:
# 指定只使用第 0、1 號 GPU 參與分布式策略。 strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
MirroredStrategy 的步驟如下:
- 訓練開始前,該策略在所有 N 個計算設備上均各復制一份完整的模型;
- 每次訓練傳入一個batch的數據時,將數據分成 N 份,分別傳入 N 個計算設備(即數據並行);
- N 個計算設備使用本地變量(鏡像變量)分別計算自己所獲得的部分數據的梯度;
- 使用分布式計算的 All-reduce 操作,在計算設備間高效交換梯度數據並進行求和,使得最終每個設備都有了所有設備的梯度之和;
- 使用梯度求和的結果更新本地變量(鏡像變量);
- 當所有設備均更新本地變量后,進行下一輪訓練(即該並行策略是同步的)。
默認情況下,TensorFlow 中的 MirroredStrategy 策略使用 NVIDIA NCCL 進行 All-reduce 操作。
tf.distribute.Strategy和model.fit
tf.distribute.Strategy被集成到tf.keras,tf.keras是用於構建和訓練模型的高級 API。您可以使用 Model.fit 來無縫分布式訓練模型
TensorFlow 分布策略支持所有類型的 Keras 模型——Sequential、Functional和subclassed。以下是您需要在代碼中更改的內容:
- 創建實例 tf.distribute.Strategy。
- 在strategy.scope中創建 Keras 模型、優化器和度量
num_epochs = 5 batch_size_per_replica = 64 # 每個顯卡上的batch數 learning_rate = 0.001 strategy = tf.distribute.MirroredStrategy() print('Number of devices: %d' % strategy.num_replicas_in_sync) # 輸出設備數量 batch_size = batch_size_per_replica * strategy.num_replicas_in_sync # 總batch_size dataset = ... with strategy.scope(): model = tf.keras.applications.MobileNetV2(weights=None, classes=2) model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=tf.keras.losses.sparse_categorical_crossentropy, metrics=[tf.keras.metrics.sparse_categorical_accuracy] ) model.fit(dataset, epochs=num_epochs)
分布式訓練匯總

import os import tensorflow as tf import tensorflow_datasets as tfds strategy = tf.distribute.MirroredStrategy() print('設備數量: {}'.format(strategy.num_replicas_in_sync)) epochs = 12 batch_size_per_replica = 64 batch_size = batch_size_per_replica * strategy.num_replicas_in_sync # 數據集 def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, label datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True) mnist_train, mnist_test = datasets['train'], datasets['test'] train_dataset = mnist_train.map(scale).cache().shuffle(10000).batch(batch_size) eval_dataset = mnist_test.map(scale).batch(batch_size) with strategy.scope(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) checkpoint_dir = './training_checkpoints' # 定義用於存儲檢查點的檢查點目錄 checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") # 定義檢查點文件的名稱 # 定義一個函數來衰減學習速率。 def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5 # 定義一個回調函數,用於在每個epoch的末尾打印學習速率 class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nepoch的 {} 學習率是 {}'.format(epoch + 1, model.optimizer.lr.numpy())) # 把所有的回調放在一起 callbacks = [tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR()] # 訓練和評估 model.fit(train_dataset, epochs=epochs, callbacks=callbacks)
保存為SavedModel格式模型,您可以使用或不使用Strategy.scope.

path = 'saved_model/' model.save(path, save_format='tf') # 加載模型,不Strategy.scope unreplicated_model = tf.keras.models.load_model(path) unreplicated_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset) print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc)) # 加載模型Strategy.scope with strategy.scope(): replicated_model = tf.keras.models.load_model(path) replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) eval_loss, eval_acc = replicated_model.evaluate(eval_dataset) print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
tf.distribute.Strategy和自定義訓練循環
1、在mirrored_strategy.scope() 內創建模型和優化器
with mirrored_strategy.scope(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))]) optimizer = tf.keras.optimizers.SGD()
2、調用 tf.distribute.Strategy.experimental_distribute_dataset 創建分布式數據集
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
3、使用 tf.nn.compute_average_loss 計算損失。tf.nn.compute_average_loss對每個樣本損失求和並將總和除以global_batch_size。
def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
4、將train_step放入 tf.distribute.Strategy.run 中,並傳入之前創建的數據集
5、使用 tf.distribute.Strategy.reduce 來聚合 tf.distribute.Strategy.run。tf.distribute.Strategy.run返回每個GPU結果。您還可以tf.distribute.Strategy.experimental_local_results獲取結果值列表。
def train_step(inputs): features, labels = inputs with tf.GradientTape() as tape: predictions = model(features, training=True) loss = compute_loss(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss @tf.function def distributed_train_step(dist_inputs): per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,)) return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
6、迭代dist_dataset並循環運行訓練:
for dist_inputs in dist_dataset: print(distributed_train_step(dist_inputs))
分布式訓練匯總

import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import numpy as np import os strategy = tf.distribute.MirroredStrategy() # 實例化MirroredStrategy print('設備數量: {}'.format(strategy.num_replicas_in_sync)) epochs = 10 batch_size_per_replica = 64 # 每個GPU上得batch數 global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync # 總batch數 # 數據集 fashion_mnist = tf.keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() train_images = train_images[..., None] test_images = test_images[..., None] # 獲取[0,1]范圍內的圖像 train_images = train_images / np.float32(255) test_images = test_images / np.float32(255) buffer_size = len(train_images) train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch( global_batch_size) test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size) train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) # 分布式數據集 test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset) # 分布式數據集 # 創建模型 def create_model(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Conv2D(64, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) return model # 創建一個檢查點目錄來存儲檢查點 checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt") with strategy.scope(): # 創建訓練損失 # 將reduction設置為“none”,這樣我們可以在之后進行reduction,並除以全局batch size loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction=tf.keras.losses.Reduction.NONE) def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size) test_loss = tf.keras.metrics.Mean(name='test_loss') # 創建測試損失 train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') # 訓練精度 test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy') # 測試精度 # 模型、優化器和檢查點必須創建在 'strategy.scope' 下。 model = create_model() optimizer = tf.keras.optimizers.Adam() checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model) # 訓練step def train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, training=True) loss = compute_loss(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss # 測試step def test_step(inputs): images, labels = inputs predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions) # 分布式訓練 @tf.function def distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) # 分布式測試 @tf.function def distributed_test_step(dataset_inputs): return strategy.run(test_step, args=(dataset_inputs,)) for epoch in range(epochs): # 訓練循環 total_loss = 0.0 num_batches = 0 for x in train_dist_dataset: total_loss += distributed_train_step(x) num_batches += 1 train_loss = total_loss / num_batches # 測試循環 for x in test_dist_dataset: distributed_test_step(x) if epoch % 10 == 0: checkpoint.save(checkpoint_prefix) print("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}".format(epoch + 1, train_loss, train_accuracy.result() * 100, test_loss.result(), test_accuracy.result() * 100)) test_loss.reset_states() train_accuracy.reset_states() test_accuracy.reset_states()
tf.distribute.Strategy 可以再沒有strategy的情況下恢復最新的檢查點並測試

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size) eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy') new_model = create_model() new_optimizer = tf.keras.optimizers.Adam() @tf.function def eval_step(images, labels): predictions = new_model(images, training=False) eval_accuracy(labels, predictions) checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model) checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)) for images, labels in test_dataset: eval_step(images, labels) print('在沒有strategy的情況下恢復保存的模型后的准確性: {}'.format(eval_accuracy.result() * 100))
匯總
keras訓練匯總
自定義訓練匯總

import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import numpy as np import time # 超參數 lr = 1e-3 # 學習率 batch_size = 64 epochs = 2 # 數據准備 (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() x_train = np.reshape(x_train, (-1, 784)) x_test = np.reshape(x_test, (-1, 784)) # 保留1萬個樣品用於驗證 x_val = x_train[-10000:] y_val = y_train[-10000:] x_train = x_train[:-10000] y_train = y_train[:-10000] # 准備訓練數據集 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size) # 准備驗證數據集 val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)) val_dataset = val_dataset.batch(batch_size) # 搭建模型 inputs = keras.Input(shape=(784,), name="digits") x1 = layers.Dense(64, activation="relu")(inputs) x2 = layers.Dense(64, activation="relu")(x1) outputs = layers.Dense(10, name="predictions")(x2) model = keras.Model(inputs=inputs, outputs=outputs) optimizer = keras.optimizers.SGD(learning_rate=lr) # 實例化一個優化器 loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) # 實例化損失函數 train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy') valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy') @tf.function def train_step(x, y): with tf.GradientTape() as tape: logits = model(x, training=True) loss_value = loss_fn(y, logits) # loss_value += sum(model.losses) # 添加額外的損失 grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) train_metric.update_state(y, logits) return loss_value @tf.function def test_step(x, y): val_logits = model(x, training=False) valid_metric.update_state(y, val_logits) for epoch in range(epochs): start_time = time.time() for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): loss_value = train_step(x_batch_train, y_batch_train) if step % 200 == 0: print("訓練損失( %d: %.4f" % (step, float(loss_value))) train_acc = train_metric.result() print("訓練精度: %.4f" % (float(train_acc),)) train_metric.reset_states() # 在每個epoch結束時重置訓練指標 # 在每個epoch結束時運行驗證循環 for x_batch_val, y_batch_val in val_dataset: test_step(x_batch_val, y_batch_val) val_acc = valid_metric.result() valid_metric.reset_states() print("驗證精度: %.4f" % (float(val_acc),)) print("運行時間: %.2fs" % (time.time() - start_time))
參考
【電子書】簡單粗暴 TensorFlow 2
【和鯨社區】30天吃掉那只TensorFlow2.0 | Github
【書籍】TensorFlow 2深度學習開源書 | PDF下載 提取碼:juqs
【bilibili】tensorflow2.0入門與實戰 2019年最通俗易懂的課程
【bilibili】神經網絡與深度學習——TensorFlow2.0實戰【中文課程】
【github】TensorFlow-Examples
【github】TensorFlow-2.x-Tutorials