理論部分
TensorFlow快速瀏覽
- TensorFlow2.0(beta)與2019年6月發布,使TensorFlow更加易於使用
- TensorFlow是一個強大的用於數值計算的庫
- 它的核心與NumPy非常相似,但具有GPU支持
- 它支持分布式計算(跨多個設備和服務器)
- 它包含一種即時(JIT)編譯器,可使其針對速度和內存使用情況來優化計算。它的工作方式是從Python函數中提取計算圖,然后進行優化(通過修剪未使用的節點),最后有效地運行它(通過自動並行運行相互獨立的操作)
- 計算圖可以導出為可移植格式,因此你可以在一個環境中(例如在Linux上使用Python)訓練TensorFlow模型,然后在另外一個環境中(例如Android設備上使用Java)運行TensorFlow模型
- 它實現了自動微分,並提供了一些優秀的優化器,因此你可以輕松地最小化各種損失函數
- 在最底層,每個TensorFlow操作都是使用高效地C++代碼實現的。許多操作都有稱為內核的多種實現:每個內核專用於特定的設備類型,例如CPU、GPU甚至TPU(張量處理單元)。如你所知,GPU可以通過將GPU分成許多較小的塊並在多個GPU線程中並行它們來極大地加快計算速度。TPU甚至更快:它們是專門為深度學習操作而構建的定制ASIC芯片
像NumPy一樣使用TensorFlow
- TensorFlow的API一切都圍繞張量,張量從一個操作流向另一個操作,因此命名為TensorFlow。張量非常類似NumPy的ndarray,它通常是一個多維數組,但它也可以保存標量
- 張量和操作
- @運算符是Python3.5中添加的,用於矩陣乘法,等效於調用tf.matual()函數
- tf.transpose()函數與NumPy的T屬性沒有完全相同的功能:在TensorFlow中,使用自己的轉置數據副本創建一個新的張量,而在NumPy中,t.T只是相同數據的轉置視圖。類似地,tf.reduce_sum()操作之所以這樣命名,是因為其GPU內核(即GPU實現)使用的reduce算法不能保證元素添加的順序:因為32位浮點數的精度有限,因此每次你調用此操作時,結果可能稍有不同。tf.reduce_mean()也是如此,當然tf.reduce_max()是確定性的
- 張量和NumPy
- 張量可以與NumPy配合使用:你可以用NumPy數組創建張量,反之亦然。你甚至可以將TensorFlow操作應用於NumPy數組,將NumPy操作應用於張量
- 默認情況下NumPy使用64位精度,而TensorFlow使用32位精度。這是因為32位精度通常對於神經網絡來說綽綽有余,而且運行速度更快且使用的RAM更少。因此,當你從NumPy數組創建張量時,請確保設置dtype=tf.float32
- 類型轉換
- 類型轉換嚴重影響性能,並且自動完成轉換很容易被忽視。為了避免這種情況,TensorFlow不會自動執行任何類型轉換:如果你對不兼容類型的張量執行操作,會引發異常。例如,你不能把浮點張量和整數張量相加,甚至不能相加32位浮點和64位浮點
- 變量
- 我們不能使用常規張量在神經網絡中實現權重,因為它們需要通過反向傳播進行調整,我們需要的是tf.Variable
- 實際上,你幾乎不需要手動創建變量,因為Keras提供了add_weight()方法,而且模型參數通常由優化器直接更新,因此你幾乎不需要手動更新變量
- 其他數據結構
- 稀疏張量(tf.SparseTensor)
- 有效地表示主要包含零的張量
- 張量數組(tf.TensorArray)
- 張量的列表,默認情況下,它們的大小是固定的,但可以選擇動態設置。它們包含的所有張量必須具有相同的形狀和數據類型
- 不規則張量(tf.RaggedTensor)
- 表示張量列表的靜態列表,其中每個張量具有相同的形狀和數據類型
- 字符串張量
- tf.string類型的常規張量。它們表示字節字符串,而不是Unicode字符串,因此如果使用Unicode字符串(常規的Python3字符串,例如'cafe')創建字符串張量,則它將自動被編碼為UTF-8(例如,b'caf\ xc3\ xa9')。或者,你可以使用tf.int32類型的張量來表示Unicode字符串,其中每個項都表示一個Unicode代碼點(例如[99、97、102、233])。tf.string是原子級的,這意味着它的長度不會出現在張量的形狀中。一旦你將其轉換為Unicode張量(即包含Unicode代碼點的tf.int32類型的張量)后,長度就會顯示在形狀中
- 集合
- 表示為常規張量(或稀疏張量)。例如tf.constant([[1,2], [3,4]])代表兩個集合{1,2}和{3,4}。通常,每個集合由張量的最后一個軸上上的向量表示。
- 隊列
- 跨多個步驟存儲的張量。TensorFlow提供了各種隊列:簡單的先進先出(FIFO)隊列(FIFOQueue),可以區分某些元素優先級的隊列(PriorityQueue),將其元素隨機排序的隊列(RandomShuffleQueue),通過填充批處理具有不同形狀的元素(PaddingFIFOQueue)
- 稀疏張量(tf.SparseTensor)
定制模型和訓練算法
- 自定義損失函數
- 只需創建一個將標簽和預測作為參數的函數,然后使用TensorFlow操作來計算每個實例的損失
\[Huber損失:\\ Huber Loss是一種將MSE和MAE結合起來,取兩者優點的損失函數,也被稱為\\ Smooth Mean Absolute Error Loss,原理:\\ 在誤差接近0時使用MSE,在誤差較大時使用MAE。\\ J_{Huber}= \frac{1}{N}\sum^N_{i=1}I_{|y_i-\hat y_i|<=δ} \frac{(y_i-\hat y_i)^2}{2} + I_{|y_i-\hat y_i|>δ} (δ|y_i-\hat y_i|-\frac{1}{2} δ^2) \]
- 保存和加載包含自定義組件的模型
- 當加載包含自定義對象的模型時,需要將名稱映射到對象。不幸的是,當你保存模型時,閾值不會被保存,這意味着在加載模型時必須指定閾值。你可以通過創建keras.losses.Loss類的子類,然后實現其get_config()方法來解決此問題
- Keras API當前僅指定如何使用子類定義層、模型、回調和正則化。如果使用子類構建其他組件(例如損失、性能度量、初始化或約束),則它們可能無法移植到其他Keras實現中
- get_config()方法返回一個字典,將每個超參數名稱映射到此值。它首先調用父類的get_config()方法,然后將新的超參數添加到此字典中
- 當你保存模型時,Keras會調用損失實例的get_config()方法,並將配置以JSON格式保存到HDF5文件中。加載模型時,它在HuberLoss類上調用from_config()類方法:此方法由基類(Loss)實現,並創建該類的實例,並將**config傳遞給構造函數
- 自定義激活函數、初始化、正則化和約束
- 如果函數具有需要與模型一起保存的超參數,你需要繼承適當的類
- 你必須為損失、層(包括激活函數)和模型實現call()方法,或者為正則化、初始化和約束實現__call__()方法
- 自定義指標
- 損失和指標在概念上不是一回事:損失(例如交叉熵)被梯度下降用來訓練模型,因此它們必須是可微的(至少是在求值的地方),並且梯度在任何地方都不應為0。另外,如果人類不容易解釋他們也沒有問題。相反,指標(例如准確率)用於評估模型,它們必須更容易被解釋,並且可以是不可微的或在各處具有0梯度
- 流式指標(或狀態指標):逐批次更新的指標。某些指標(如精度)不能簡單地按批次平均,在這種情況下,除了實現流式指標之外,別無選擇
- 自定義層
- 你可能偶爾會想要構建一個包含獨特層的架構,而TensorFlow沒有為其提供默認實現。在這種情況下,你將需要創建一個自定義層。或者你可能只想構建一個重復的架構,其中包含重復多次的相同層塊,因此將每個層塊視為一個層會很方便
- 某些層沒有權重,如果要創建帶任何權重的自定義層,最簡單的選擇是編寫一個函數並將其包裝在keras.layers.Lambda層中
- 要構建自定義的有狀態層(即具有權重的層),你需要創建keras.layers.Layer類的子類
- 自定義模型
- 我們在構造函數中創建層,並在call()方法中使用它們。然后就可以像使用任何其他模型一樣使用此模型(並對其進行編譯、擬合、評估和預測)
- 基於模型內部的損失和指標
- 你可能要根據模型的其他部分來定義損失,例如權重或隱藏層的激活。這對於進行正則化或監視模型的某些內部方面可能很有用。要基於模型內部自定義損失,根據所需模型的任何部分進行計算,然后將結果傳遞給add_loss()方法。
- 與輔助輸出相關的損失稱為重建損失,我們鼓勵模型通過隱藏層保留盡可能多的信息,即使對回歸任務本身沒有直接用處的信息。實際中,這種損失有時會提高泛化性(這是正則化損失)
- 你可以通過所需的任何方式計算來添加基於模型內部的自定義指標,只要結果是指標對象的輸出即可
- 使用自動微分計算梯度
- 對於神經網絡,通常具有數以萬計的參數,用手工分析找到偏導數將幾乎是不可能的任務。一種解決方案是通過在調整相應參數時測量函數輸出的變化來計算每個偏導數的近似值。這工作看起來很好並且易於實現,但這只是一個近似值,重要的是每個參數至少要調用一次f()。每個參數至少需要調用f()一次,這種方法對於大型神經網絡來說很棘手。因此我們應該使用自動微分,TensorFlow使這個變得非常簡單
- 大多數情況下,一個梯度tape是用來計算單個值(通常是損失)相對於一組值(通常是模型參數)的梯度。這就是反向模式自動微分有用的地方,因為它只需執行一次正向傳播和一次反向傳播即可一次獲得所有梯度。
- 自定義訓練循環
- Wide&Deep論文使用了兩種不同的優化器:一種用於寬路徑,另一種用於深路徑。由於fit()方法只使用一個優化器(編譯模型時指定的優化器),因此實現該論文需要編寫你自己的自定義循環
- 除非你真的需要額外的靈活性,否則應該更傾向使用fit()方法,而不是實現你自己的訓練循環,尤其是在團隊合作中
TensorFlow函數和圖
- 使用tf.function()將Python函數轉換為TensorFlow函數,或者使用tf.function作為裝飾器
- TensorFlow可以優化計算圖,修剪未使用的節點,簡化表達式。准備好優化的圖后,TF函數會以適當的順序(並在可能時並行執行)有效地執行圖中的操作。因此TF函數通常比原始的Python函數運行得更快,尤其是在執行復雜計算的情況下。大多數時候,你並不需要真正了解很多:當你想增強Python函數時,只需將其轉換為TF函數即可
- 當你編寫自定義損失函數、自定義指標、自定義層或任何其他自定義函數,並在Keras模型中使用它時,Keras會自動將你的函數轉換為TF函數,不需要使用tf.function()
- 默認情況下,TF函數會為每個不同的輸入形狀和數據類型集生成一個新圖形,並將其緩存以供后續調用。這就是TF函數處理多態(即變化的參數類型和形狀)的方式。但是這僅適用於張量參數:如果將Python數值傳遞給TF函數,則將為每個不同的值生成一個新圖
- 如果用不同的Python數值多次調用TF函數,則會生成許多圖,這會降低程序的運行速度並消耗大量RAM(必須刪除TF函數才能釋放它)。Python值應保留給很少有唯一值的參數,例如像每層神經元的數量那樣的超參數,這是TensorFlow可以更好地優化模型的每個變體
- 自動圖和跟蹤
- TensorFlow如何生成圖
- 第一步稱為自動圖:TensorFlow分析了函數的代碼之后,自動圖輸出該函數的升級版本,其中所有控制流程語句都被相應的TensorFlow操作替換
- 接下來,TensorFlow調用此“升級”函數,但不傳遞參數,而是傳遞符號張量—沒有任何實際值的張量,僅包含名稱、數據類型和形狀,該函數將在圖模式下運行,這意味着每個TensorFlow操作都會在圖形中添加一個節點來表示自身及其輸出張量(與常規模式相對,稱為eager執行或eager模式)。在圖形模式下,TF操作不執行任何計算
- TensorFlow如何生成圖
- TF函數規則
- 如果調用任何外部庫,包括NumPy甚至標准庫,此調用將僅在跟蹤過程中運行。它不會稱為圖表的一部分。實際上,TensorFlow圖只能包含TensorFlow構造(張量、運算、變量、數據集等)
- 如果你定義了一個返回np.random.rand()的TF函數法f(x),則僅在跟蹤該函數時才會生成隨機數,因此f(tf.constant(2.))和f(tf.constant(2.))將返回相同的隨機數。但f(tf.constant([2., 3.]))將返回不同的隨機數。把np.random.rand()替換為tf.random.uniform([]),則每次操作都會生層一個新的隨機數,該操作將成為圖形的一部分
- 如果你得非TensorFlow代碼具有副作用(例如記錄某些內容或更新Python計數器),那么你不應期望每次調用TF函數時都會發生這些副作用,因為它們只會在跟蹤該函數時發生
- 你可以在tf.py_function()操作中包裝任何Python代碼,但這樣做會降低性能,因為TensorFlow無法對此代碼進行任何圖優化。這也會降低可移植性,因為該圖僅可在安裝了Python(並且安裝了正確的庫)的平台上運行
- 你可以調用其他Python函數或TF函數,但它們應遵循相同的規則,因為TensorFlow會在計算圖中捕獲它們的操作,請注意這些其他函數不需要用@tf.function修飾
- 如果該函數創建了TensorFlow變量(或任何其他有狀態的TensorFlow對象,例如數據集或隊列),則必須在第一次調用時這樣做(只有這樣做),否則你會得到一個異常,通常最好在TF函數(例如在自定義層的build()方法中)外部創建變量。如果要為變量分配一個新值,確保調用它的assign()方法,而不要使用=運算符
- 你的Python函數的源代碼可用於TensorFlow
- TensorFlow只能捕獲在張量或數據集上迭代的for循環。因此,請確保你是用for i in tf.range(x),而不是for i in range(x),否則這個循環不會在圖中被捕獲
- 與通常一樣,出於性能原因,應盡可能使用向量化實現,而不是使用循環
- 如果調用任何外部庫,包括NumPy甚至標准庫,此調用將僅在跟蹤過程中運行。它不會稱為圖表的一部分。實際上,TensorFlow圖只能包含TensorFlow構造(張量、運算、變量、數據集等)
代碼部分
引入
import sys
assert sys.version_info >= (3, 5)
import sklearn
assert sklearn.__version__ >= '0.20'
try:
%tensorflow_version 2.x
except Exception as e:
pass
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= '2.4'
import numpy as np
import os
np.random.seed(42)
tf.random.set_seed(42)
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
PROJECT_ROOT_DIR = '.'
CHAPTER_ID = 'deep'
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, 'images', CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)
def save_fig(fig_id, tight_layout=True, fig_extension='png', resolution=300):
path = os.path.join(IMAGES_PATH, fig_id + '.' + fig_extension)
print('Saving figure', fig_id)
if tight_layout:
plt.tight_layout()
plt.savefig(path, format=fig_extension, dpi=resolution)
張量和操作
# 張量
tf.constant([[1., 2., 3.], [4., 5., 6.]])
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
'''
tf.constant(42) # <tf.Tensor: shape=(), dtype=int32, numpy=42>
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
'''
t.shape # TensorShape([2, 3])
t.dtype # tf.float32
# 索引
t[:, 1:]
'''
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
[5., 6.]], dtype=float32)>
'''
# t[...,1]等價於t[:,1];如果是三維數值,t[...,1]等價於t[:,:,1]
# tf.newaxis和np.newaxis功能相同,都是增加維度。
t[..., 1, tf.newaxis]
'''
<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
[5.]], dtype=float32)>
'''
t[..., 1] # <tf.Tensor: shape=(2,), dtype=float32, numpy=array([2., 5.], dtype=float32)>
# 操作
t + 10
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
[14., 15., 16.]], dtype=float32)>
'''
tf.square(t)
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)>
'''
t @ tf.transpose(t)
'''
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
[32., 77.]], dtype=float32)>
'''
# 使用Keras.backend
from tensorflow import keras
K = keras.backend
K.square(K.transpose(t)) + 10
'''
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
[14., 35.],
[19., 46.]], dtype=float32)>
'''
張量與NumPy相互轉換
a = np.array([2., 4., 5.])
tf.constant(a) # <tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>
t.numpy()
'''
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
'''
np.array(t)
'''
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
'''
tf.square(a) # <tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>
np.square(t)
'''
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)
'''
類型沖突
try:
tf.constant(2.0) + tf.constant(40)
except tf.errors.InvalidArgumentError as e:
print(e)
'''
cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2]
'''
try:
tf.constant(2.0) + tf.constant(40., dtype=tf.float64)
except tf.errors.InvalidArgumentError as e:
print(e)
'''
cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2]
'''
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32) # <tf.Tensor: shape=(), dtype=float32, numpy=42.0>
字符串張量
tf.constant(b'hello world') # <tf.Tensor: shape=(), dtype=string, numpy=b'hello world'>
tf.constant("café") # <tf.Tensor: shape=(), dtype=string, numpy=b'caf\xc3\xa9'>
# ASCII碼
u = tf.constant([ord(c) for c in 'café'])
u # <tf.Tensor: shape=(4,), dtype=int32, numpy=array([ 99, 97, 102, 233])>
b = tf.strings.unicode_encode(u, 'UTF-8')
tf.strings.length(b, unit='UTF8_CHAR') # <tf.Tensor: shape=(), dtype=int32, numpy=4>
tf.strings.unicode_decode(b, 'UTF-8') # <tf.Tensor: shape=(4,), dtype=int32, numpy=array([ 99, 97, 102, 233])>
字符串數組張量
p = tf.constant(['Café', 'Coffee', 'caffé', '咖啡'])
tf.strings.length(p, unit='UTF8_CHAR') # <tf.Tensor: shape=(4,), dtype=int32, numpy=array([4, 6, 5, 2])>
r = tf.strings.unicode_decode(p, 'UTF8')
r # <tf.RaggedTensor [[67, 97, 102, 233], [67, 111, 102, 102, 101, 101], [99, 97, 102, 102, 233], [21654, 21857]]>
不規則張量
print(r[1]) # tf.Tensor([ 67 111 102 102 101 101], shape=(6,), dtype=int32)
print(r[1: 3]) # <tf.RaggedTensor [[67, 111, 102, 102, 101, 101], [99, 97, 102, 102, 233]]>
r2 = tf.ragged.constant([[65, 66], [], [67]])
print(tf.concat([r, r2], axis=0))
'''
<tf.RaggedTensor [[67, 97, 102, 233], [67, 111, 102, 102, 101, 101], [99, 97, 102, 102, 233], [21654, 21857], [65, 66], [], [67]]>
'''
r3 = tf.ragged.constant([[68, 69, 70],
[67, 111, 102, 102, 101, 101, 71],
[99, 97, 102, 102, 232],
[21654, 21857, 72, 73]])
print(tf.concat([r, r3], axis=1))
'''
<tf.RaggedTensor [[67, 97, 102, 233, 68, 69, 70], [67, 111, 102, 102, 101, 101, 67, 111, 102, 102, 101, 101, 71], [99, 97, 102, 102, 233, 99, 97, 102, 102, 232], [21654, 21857, 21654, 21857, 72, 73]]>
'''
tf.strings.unicode_encode(r3, "UTF-8")
'''
<tf.Tensor: shape=(4,), dtype=string, numpy=
array([b'DEF', b'CoffeeG', b'caff\xc3\xa8', b'\xe5\x92\x96\xe5\x95\xa1HI'],
dtype=object)>
'''
r.to_tensor()
'''
<tf.Tensor: shape=(4, 6), dtype=int32, numpy=
array([[ 67, 97, 102, 233, 0, 0],
[ 67, 111, 102, 102, 101, 101],
[ 99, 97, 102, 102, 233, 0],
[21654, 21857, 0, 0, 0, 0]])>
'''
稀疏張量
s = tf.SparseTensor(indices=[[0, 1], [1, 0], [2, 3]], values=[1., 2., 3.], dense_shape=[3, 4])
print(s)
'''
SparseTensor(indices=tf.Tensor(
[[0 1]
[1 0]
[2 3]], shape=(3, 2), dtype=int64), values=tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32), dense_shape=tf.Tensor([3 4], shape=(2,), dtype=int64))
'''
tf.sparse.to_dense(s)
'''
<tf.Tensor: shape=(3, 4), dtype=float32, numpy=
array([[0., 1., 0., 0.],
[2., 0., 0., 0.],
[0., 0., 0., 3.]], dtype=float32)>
'''
s2 = s * 2.0
try:
s3 = s + 1
except TypeError as e:
print(e) # unsupported operand type(s) for +: 'SparseTensor' and 'int'
s4 = tf.constant([[10., 20.], [30., 40.], [50., 60.], [70., 80.]])
tf.sparse.sparse_dense_matmul(s, s4)
'''
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[ 30., 40.],
[ 20., 40.],
[210., 240.]], dtype=float32)>
'''
s5 = tf.SparseTensor(indices=[[0, 2], [0, 1]], values=[1., 2.], dense_shape=[3, 4])
print(s5)
'''
SparseTensor(indices=tf.Tensor(
[[0 2]
[0 1]], shape=(2, 2), dtype=int64), values=tf.Tensor([1. 2.], shape=(2,), dtype=float32), dense_shape=tf.Tensor([3 4], shape=(2,), dtype=int64))
'''
try:
tf.sparse.to_dense(s5)
except tf.errors.InvalidArgumentError as e:
print(e)
'''
indices[1] = [0,1] is out of order. Many sparse ops require sorted indices.
Use `tf.sparse.reorder` to create a correctly ordered copy.
[Op:SparseToDense]
'''
s6 = tf.sparse.reorder(s5)
tf.sparse.to_dense(s6)
'''
<tf.Tensor: shape=(3, 4), dtype=float32, numpy=
array([[0., 2., 1., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.]], dtype=float32)>
'''
集合
set1 = tf.constant([[2, 3, 5, 7], [7, 9, 0, 0]])
set2 = tf.constant([[4, 5, 6], [9, 10, 0]])
tf.sparse.to_dense(tf.sets.union(set1, set2))
'''
<tf.Tensor: shape=(2, 6), dtype=int32, numpy=
array([[ 2, 3, 4, 5, 6, 7],
[ 0, 7, 9, 10, 0, 0]])>
'''
tf.sparse.to_dense(tf.sets.difference(set1, set2))
'''
<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[2, 3, 7],
[7, 0, 0]])>
'''
tf.sparse.to_dense(tf.sets.intersection(set1, set2))
'''
<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[5, 0],
[0, 9]])>
'''
變量
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v.assign(2 * v)
'''
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 4., 6.],
[ 8., 10., 12.]], dtype=float32)>
'''
v[0, 1].assign(42)
'''
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 6.],
[ 8., 10., 12.]], dtype=float32)>
'''
v[:, 2].assign([0., 1.])
'''
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 0.],
[ 8., 10., 1.]], dtype=float32)>
'''
try:
v[1] = [2., 8., 9.]
except TypeError as e:
print(e) # 'ResourceVariable' object does not support item assignment
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
'''
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100., 42., 0.],
[ 8., 10., 200.]], dtype=float32)>
'''
sparse_delta = tf.IndexedSlices(values=[[1., 2., 3.], [4., 5., 6.]], indices=[1, 0])
v.scatter_update(sparse_delta)
'''
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[4., 5., 6.],
[1., 2., 3.]], dtype=float32)>
'''
張量數組
array = tf.TensorArray(dtype=tf.float32, size=3)
array = array.write(0, tf.constant([1., 2.]))
array = array.write(1, tf.constant([2., 10.]))
array = array.write(2, tf.constant([5., 7.]))
array.read(1) # <tf.Tensor: shape=(2,), dtype=float32, numpy=array([ 2., 10.], dtype=float32)>
array.stack()
'''
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 2.],
[0., 0.],
[5., 7.]], dtype=float32)>
'''
mean, variance = tf.nn.moments(array.stack(), axes=0)
mean # <tf.Tensor: shape=(2,), dtype=float32, numpy=array([2., 3.], dtype=float32)>
variance # <tf.Tensor: shape=(2,), dtype=float32, numpy=array([4.6666665, 8.666667 ], dtype=float32)>
自定義損失函數
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
housing.data, housing.target.reshape(-1, 1), random_state=42
)
X_train, X_valid, y_train, y_valid = train_test_split(
X_train_full, y_train_full, random_state=42
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < 1
squared_loss = tf.square(error) / 2
linear_loss = tf.abs(error) - 0.5
return tf.where(is_small_error, squared_loss, linear_loss)
plt.figure(figsize=(8, 3.5))
z = np.linspace(-4, 4, 200)
plt.plot(z, huber_fn(0, z), 'b-', linewidth=2, label='huber($z$)')
plt.plot(z, z**2 / 2, 'b:', linewidth=1, label=r'$\frac{1}{2}z^2$')
plt.plot([-1, -1], [0, huber_fn(0., -1.)], 'r--')
plt.plot([1, 1], [0, huber_fn(0., 1.)], 'r--')
plt.gca().axhline(y=0, color='k')
plt.gca().axvline(x=0, color='k')
plt.axis([-4, 4, 0, 4])
plt.grid(True)
plt.xlabel('$z$')
plt.legend(fontsize=14)
plt.title('Huber loss', fontsize=14)
plt.show()
input_shape = X_train.shape[1:]
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1)
])
model.compile(loss=huber_fn, optimizer='nadam', metrics=['mae'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
保存/加載包含自定義對象的模型
model.save('my_model_with_a_custom_loss.h5')
model = keras.models.load_model('my_model_with_a_custom_loss.h5', custom_objects={'huber_fn': huber_fn})
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
def create_huber(threshold=1.0):
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < threshold
squared_loss = tf.square(error) / 2
linear_loss = threshold * tf.abs(error) - threshold**2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
return huber_fn
model.compile(loss=create_huber(2.0), optimizer='nadam', metrics=['mae'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.save('my_model_with_a_custom_loss_threshild_2.h5')
model = keras.models.load_model('my_model_with_a_custom_loss_threshild_2.h5', custom_objects={'huber_fn': create_huber(2.0)})
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
class HuberLoss(keras.losses.Loss):
def __init__(self, threshold=1.0, **kwargs):
self.threshold = threshold
super().__init__(**kwargs)
def call(self, y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < self.threshold
squared_loss = tf.square(error) / 2
linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
def get_config(self):
base_config = super().get_config()
return {**base_config, 'threshold': self.threshold}
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1)
])
model.compile(loss=HuberLoss(2.), optimizer='nadam', metrics=['mae'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.save('my_model_with_a_custom_loss.h5')
model = keras.models.load_model('my_model_with_a_custom_loss.h5', custom_objects={'HuberLoss': HuberLoss})
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.loss.threshold # 2.0
其他自定義函數
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
def my_softplus(z):
return tf.math.log(tf.exp(z) + 1.0) # tf.nn.softplus(z)
def my_glorot_initializer(shape, dtype=tf.float32):
'''
tf.random.normal(
shape, mean=0.0, stddev=1.0, dtype=tf.dtypes.float32, seed=None, name=None)
'''
stddev = tf.sqrt(2. / (shape[0] + shape[1]))
return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights):
return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights):
return tf.where(weights < 0., tf.zeros_like(weights), weights) # tf.nn.relu(weights)
layers = keras.layers.Dense(1, activation=my_softplus,
kernel_initializer=my_glorot_initializer,
kernel_regularizer=my_l1_regularizer,
kernel_constraint=my_positive_weights)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1, activation=my_softplus,
kernel_initializer=my_glorot_initializer,
kernel_regularizer=my_l1_regularizer,
kernel_constraint=my_positive_weights)
])
model.compile(loss='mse', optimizer='nadam', metrics=['mae'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.save('my_model_with_many_custom_parts.h5')
model = keras.models.load_model('my_model_with_many_custom_parts.h5',
custom_objects={
'my_l1_regularizer': my_l1_regularizer,
'my_positive_weights': my_positive_weights,
'my_glorot_initializer': my_glorot_initializer,
'my_softplus': my_softplus
})
class MyL1Regularizer(keras.regularizers.Regularizer):
def __init__(self, factor):
self.factor = factor
def __call__(self, weights):
return tf.reduce_sum(tf.abs(self.factor * weights))
def get_config(self):
return {'factor': self.factor}
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1, activation=my_softplus,
kernel_regularizer=MyL1Regularizer(0.01),
kernel_constraint=my_positive_weights,
kernel_initializer=my_glorot_initializer)
])
model.compile(loss='mse', optimizer='nadam', metrics=['mae'])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.save('my_model_with_many_custom_parts.h5')
model = keras.models.load_model('my_model_with_many_custom_parts.h5', custom_objects={
'MyL1Regularizer': MyL1Regularizer,
'my_positive_weights': my_positive_weights,
'my_glorot_initializer': my_glorot_initializer,
'my_softplus': my_softplus
})
自定義指標
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1)
])
model.compile(loss='mse', optimizer='nadam', metrics=[create_huber(2.0)])
model.fit(X_train_scaled, y_train, epochs=2)
model.compile(loss=create_huber(2.0), optimizer='nadam', metrics=[create_huber(2.0)])
# 如果使用與損失和度量相同的函數,您可能會驚訝地看到不同的結果。
# 這通常是由於浮點精度錯誤造成的:即使數學方程是等價的,但運算的順序也不相同,
# 這可能導致微小的差異。
sample_weight = np.random.rand(len(y_train))
history = model.fit(X_train_scaled, y_train, epochs=2, sample_weight=sample_weight)
# 如果你做數學運算,你會發現loss=metric*樣本權重的平均值(加上一些浮點精度誤差)
history.history['loss'][0], history.history['huber_fn'][0] * sample_weight.mean() # (0.11749906837940216, 0.11906625573138947)
流式指標
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) # <tf.Tensor: shape=(), dtype=float32, numpy=0.8>
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]) # <tf.Tensor: shape=(), dtype=float32, numpy=0.5>
precision.result() # <tf.Tensor: shape=(), dtype=float32, numpy=0.5>
precision.variables
'''
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
<tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]
'''
precision.reset_states()
# 創建一個流式指標
class HuberMetric(keras.metrics.Metric):
def __init__(self, threshold=1.0, **kwargs):
super().__init__(**kwargs)
self.threshold = threshold
self.huber_fn = create_huber(threshold)
self.total = self.add_weight('total', initializer='zeros')
self.count = self.add_weight('count', initializer='zeros')
def update_state(self, y_true, y_pred, sample_weight=None):
metric = self.huber_fn(y_true, y_pred)
self.total.assign_add(tf.reduce_sum(metric))
self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
def result(self):
return self.total / self.count
def get_config(self):
base_config = super().get_config()
return {**base_config, 'threshold': self.threshold}
m = HuberMetric(2.)
# total = 2 * |10 - 2| - 2²/2 = 14
# count = 1
# result = 14 / 1 = 14
m(tf.constant([[2.]]), tf.constant([[10.]])) # <tf.Tensor: shape=(), dtype=float32, numpy=14.0>
m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]]))
m.result() # <tf.Tensor: shape=(), dtype=float32, numpy=7.0>
m.variables
'''
[<tf.Variable 'total:0' shape=() dtype=float32, numpy=21.0>,
<tf.Variable 'count:0' shape=() dtype=float32, numpy=3.0>]
'''
m.reset_states()
m.variables
'''
[<tf.Variable 'total:0' shape=() dtype=float32, numpy=0.0>,
<tf.Variable 'count:0' shape=() dtype=float32, numpy=0.0>]
'''
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1),
])
model.compile(loss=create_huber(2.0), optimizer='nadam', metrics=[HuberMetric(2.0)])
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)
model.save('my_model_with_a_custom_loss.h5')
model = keras.models.load_model('my_model_with_a_custom_loss.h5', custom_objects={
'huber_fn': create_huber(2.0),
'HuberMetric': HuberMetric
})
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)
model.metrics[-1].threshold # 2.0
class HuberMetric(keras.metrics.Mean):
def __init__(self, threshold=1.0, name='HuberMetric', dtype=None):
self.threshold = threshold
self.huber_fn = create_huber(threshold)
super().__init__(name=name, dtype=dtype)
def update_state(self, y_true, y_pred, sample_weight=None):
metric = self.huber_fn(y_true, y_pred)
super(HuberMetric, self).update_state(metric, sample_weight)
def get_config(self):
base_config = super().get_config()
return {**base_config, 'threshold': self.threshold}
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal', input_shape=input_shape),
keras.layers.Dense(1),
])
model.compile(loss=keras.losses.Huber(2.0), optimizer='nadam', weighted_metrics=[HuberMetric(2.0)])
sample_weight = np.random.rand(len(y_train))
history = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2, sample_weight=sample_weight)
history.history['loss'][0], history.history['HuberMetric'][0] * sample_weight.mean() # (0.44554394483566284, 0.44554404180100277)
model.save('my_model_with_a_cumstom_metric_v2.h5')
model = keras.models.load_model('my_model_with_a_cumstom_metric_v2.h5', custom_objects={
'HuberMetric': HuberMetric
})
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)
model.metrics[-1].threshold # 2.0
自定義層
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))
exponential_layer([-1., 0., 1.]) # <tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787945, 1. , 2.7182817 ], dtype=float32)>
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='relu', input_shape=input_shape),
keras.layers.Dense(1),
exponential_layer
])
model.compile(loss='mse', optimizer='sgd')
model.fit(X_train_scaled, y_train, epochs=5, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test) # 0.3586341142654419
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, batch_input_shape):
self.kernel = self.add_weight(
name='kernel', shape=[batch_input_shape[-1], self.units],
initializer='glorot_normal')
self.bias = self.add_weight(
name='bias', shape=[self.units], initializer='zeros')
super().build(batch_input_shape) # must be at the end
def call(self, X):
return self.activation(X @ self.kernel + self.bias)
def compute_output_shape(self, batch_input_shape):
return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
def get_config(self):
base_config = super().get_config()
return {**base_config, 'units': self.units, 'activation': keras.activations.serialize(self.activation)}
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
MyDense(30, activation='relu', input_shape=input_shape),
MyDense(1)
])
model.compile(loss='mse', optimizer='nadam')
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test) # 0.5473727583885193
model.save('my_model_with_a_custom_layer.h5')
model = keras.models.load_model('my_model_with_a_custom_layer.h5', custom_objects={
'MyDense': MyDense
})
class MyMultiLayer(keras.layers.Layer):
def call(self, X):
X1, X2 = X
print('X1.shape:', X1.shape, 'X2.shape:', X2.shape)
return X1 + X2, X1 * X2
def compute_output_shape(self, batch_input_shape):
batch_input_shape1, batch_input_shape2 = batch_input_shape
return [batch_input_shape1, batch_input_shape2]
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, output2 = MyMultiLayer()((inputs1, inputs2))
'''
X1.shape: (None, 2) X2.shape: (None, 2)
'''
def split_data(data):
columns_count = data.shape[-1]
half = columns_count // 2
return data[:, :half], data[:, half:]
X_train_scaled_A, X_train_scaled_B = split_data(X_train_scaled)
X_valid_scaled_A, X_valid_scaled_B = split_data(X_valid_scaled)
X_test_scaled_A, X_test_scaled_B = split_data(X_test_scaled)
X_train_scaled_A.shape, X_train_scaled_B.shape # ((11610, 4), (11610, 4))
outputs1, output2 = MyMultiLayer()((X_train_scaled_A, X_train_scaled_B))
'''
X1.shape: (11610, 4) X2.shape: (11610, 4)
'''
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
input_A = keras.layers.Input(shape=X_train_scaled_A.shape[-1])
input_B = keras.layers.Input(shape=X_train_scaled_B.shape[-1])
hidden_A, hidden_B = MyMultiLayer()((input_A, input_B))
hidden_A = keras.layers.Dense(30, activation='selu')(hidden_A)
hidden_B = keras.layers.Dense(30, activation='selu')(hidden_B)
concat = keras.layers.Concatenate()((hidden_A, hidden_B))
output = keras.layers.Dense(1)(concat)
model = keras.models.Model(inputs=[input_A, input_B], outputs=[output]) # X1.shape: (None, 4) X2.shape: (None, 4)
model.compile(loss='mse', optimizer='nadam')
model.fit((X_train_scaled_A, X_train_scaled_B), y_train, epochs=2, validation_data=((X_valid_scaled_A, X_valid_scaled_B), y_valid))
class AddGaussianNoise(keras.layers.Layer):
def __init__(self, stddev, **kwargs):
super().__init__(**kwargs)
self.stddev = stddev
def call(self, X, training=None):
if training:
noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
return X + noise
else:
return X
def compute_output_shape(self, batch_input_shape):
return batch_input_shape
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
AddGaussianNoise(stddev=1.0),
keras.layers.Dense(30, activation='selu'),
keras.layers.Dense(1)
])
model.compile(loss='mse', optimizer='nadam')
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test) # 0.7559615969657898
自定義模型
X_new_scaled = X_test_scaled
class ResidualBlock(keras.layers.Layer):
def __init__(self, n_layers, n_neurous, **kwargs):
super().__init__(**kwargs)
self.hidden = [keras.layers.Dense(n_neurous, activation='elu', kernel_initializer='he_normal') for _ in range(n_layers)]
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
return inputs + Z
class ResidualRegressor(keras.models.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(30, activation='elu', kernel_initializer='he_normal')
self.block1 = ResidualBlock(2, 30)
self.block2 = ResidualBlock(2, 30)
self.out = keras.layers.Dense(output_dim)
def call(self, inputs):
Z = self.hidden1(inputs)
for _ in range(1 + 3):
Z = self.block1(Z)
Z = self.block2(Z)
return self.out(Z)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = ResidualRegressor(1)
model.compile(loss='mse', optimizer='nadam')
history = model.fit(X_train_scaled, y_train, epochs=5)
score = model.evaluate(X_test_scaled, y_test)
y_pred = model.predict(X_new_scaled)
model.save('my_custom_model.ckpt')
model = keras.models.load_model('my_custom_model.ckpt')
history = model.fit(X_train_scaled, y_train, epochs=5)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
block1 = ResidualBlock(2, 30)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='elu', kernel_initializer='he_normal'),
block1, block1, block1, block1,
ResidualBlock(2, 30),
keras.layers.Dense(1)
])
model.compile(loss='mse', optimizer='nadam')
history = model.fit(X_train_scaled, y_train, epochs=5)
score = model.evaluate(X_test_scaled, y_test)
y_pred = model.predict(X_new_scaled)
基於模型內部的損失和指標
class ReconstructingRegressor(keras.models.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden = [keras.layers.Dense(30, activation='selu', kernel_initializer='lecun_normal') for _ in range(5)]
self.out = keras.layers.Dense(output_dim)
self.reconstruct = keras.layers.Dense(8)
self.reconstruction_mean = keras.metrics.Mean(name='reconstruction_error')
def call(self, inputs, training=None):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
reconstruction = self.reconstruct(Z)
recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
self.add_loss(0.05 * recon_loss)
if training:
result = self.reconstruction_mean(recon_loss)
self.add_metric(result)
return self.out(Z)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = ReconstructingRegressor(1)
model.compile(loss='mse', optimizer='nadam')
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)
利用自動微分計算梯度
def f(w1, w2):
return 3 * w1 ** 2+ 2 * w1 * w2
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps # 36.000003007075065
(f(w1, w2 + eps) - f(w1, w2)) / eps # 10.000000003174137
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
z = f(w1, w2)
gradient = tape.gradient(z, [w1, w2])
gradient
'''
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
'''
with tf.GradientTape() as tape:
z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
try:
dz_dw2 = tape.gradient(z, w2)
except RuntimeError as e:
print(e) # A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)
with tf.GradientTape(persistent=True) as tape:
z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape
dz_dw1, dz_dw2
'''
(<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>)
'''
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients # [None, None]
with tf.GradientTape() as tape:
tape.watch(c1)
tape.watch(c2)
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients
'''
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
'''
with tf.GradientTape() as tape:
z1 = f(w1, w2 + 2.)
z2 = f(w1, w2 + 5.)
z3 = f(w1, w2 + 7.)
# 如果你嘗試計算向量的梯度,那么TensorFlow將計算向量和的梯度
'''
3 * w1 ** 2+ 2 * w1 * w2
3 * w1 ** 2+ 2 * w1 * (w2+2)
3 * w1 ** 2+ 2 * w1 * (w2+5)
3 * w1 ** 2+ 2 * w1 * (w2+7)
9 * w1 ** 2 + 6 * w1 * w2 + 28 * w1
18 * 5 + 6 * 3 + 28 = 136
6 * 5 = 30
'''
tape.gradient([z1, z2, z3], [w1, w2])
'''
[<tf.Tensor: shape=(), dtype=float32, numpy=136.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=30.0>]
'''
with tf.GradientTape(persistent=True) as tape:
z1 = f(w1, w2 + 2.)
z2 = f(w1, w2 + 5.)
z3 = f(w1, w2 + 7.)
tf.reduce_sum(tf.stack([tape.gradient(z, [w1, w2]) for z in (z1, z2, z3)]), axis=0)
del tape
with tf.GradientTape(persistent=True) as hessian_tape:
with tf.GradientTape() as jacobian_tape:
z = f(w1, w2)
jacobians = jacobian_tape.gradient(z, [w1, w2])
hessians = [hessian_tape.gradient(jacobian, [w1, w2]) for jacobian in jacobians]
del hessian_tape
jacobians
'''
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
'''
hessians
'''
[[<tf.Tensor: shape=(), dtype=float32, numpy=6.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=2.0>],
[<tf.Tensor: shape=(), dtype=float32, numpy=2.0>, None]]
'''
def f(w1, w2):
return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)
with tf.GradientTape() as tape:
z = f(w1, w2)
tape.gradient(z, [w1, w2]) # [<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]
x = tf.Variable(100.)
with tf.GradientTape() as tape:
z = my_softplus(x)
tape.gradient(z, [x]) # [<tf.Tensor: shape=(), dtype=float32, numpy=nan>]
tf.math.log(tf.exp(tf.constant(30., dtype=tf.float32)) + 1.) # <tf.Tensor: shape=(), dtype=float32, numpy=30.0>
x = tf.Variable([100.])
with tf.GradientTape() as tape:
z = my_softplus(x)
tape.gradient(z, [x]) # [<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]
@tf.custom_gradient
def my_better_softplus(z):
exp = tf.exp(z)
def my_softplus_gradients(grad):
return grad / (1 + 1/ exp)
return tf.math.log(exp + 1), my_softplus_gradients
def my_better_softplus(z):
return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.))
x = tf.Variable([1000.])
with tf.GradientTape() as tape:
z = my_better_softplus(x)
z, tape.gradient(z, [x])
'''
(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1000.], dtype=float32)>,
[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>])
'''
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='elu', kernel_initializer='he_normal', kernel_regularizer=l2_reg),
keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
def random_batch(X, y, batch_size=32):
idx = np.random.randint(len(X), size=batch_size)
return X[idx], y[idx]
def print_status_bar(iteration, total, loss, metrics=None):
metrics = ' - '.join(['{}: {:.4f}'.format(m.name, m.result()) for m in [loss] + (metrics or [])])
end = '' if iteration < total else '\n'
print('\r{}/{} - '.format(iteration, total) + metrics, end=end)
import time
mean_loss = keras.metrics.Mean(name='loss')
mean_square = keras.metrics.Mean(name='mean_square')
for i in range(1, 50 + 1):
loss = 1 / i
mean_loss(loss)
mean_square(i ** 2)
print_status_bar(i, 50, mean_loss, [mean_square])
time.sleep(0.05)
def progress_bar(iteration, total, size=30):
running = iteration < total
c = '>' if running else '='
p = (size - 1) * iteration // total
fmt = '{{:-{}d}} / {{}} [{{}}]'.format(len(str(total)))
params = [iteration, total, '=' * p + c + '.' * (size -p -1)]
return fmt.format(*params)
progress_bar(3500, 10000, size=6)
def print_status_bar(iteration, total, loss, metrics=None, size=30):
metrics = ' - '.join(['{}: {:.4f}'.format(m.name, m.result()) for m in [loss] + (metrics or [])])
end = '' if iteration < total else '\n'
print('\r{} - {}'.format(progress_bar(iteration, total), metrics), end=end)
mean_loss = keras.metrics.Mean(name='loss')
mean_square = keras.metrics.Mean(name='mean_square')
for i in range(1, 50 + 1):
loss = 1 / i
mean_loss(loss)
mean_square(i ** 2)
print_status_bar(i, 50, mean_loss, [mean_square])
time.sleep(0.05)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
for epoch in range(1, n_epochs + 1):
print('Epoch {} / {}'.format(epoch, n_epochs))
for step in range(1, n_steps + 1):
X_batch, y_batch = random_batch(X_train_scaled, y_train)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
mean_loss(loss)
for metric in metrics:
metric(y_batch, y_pred)
print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
for metric in [mean_loss] + metrics:
metric.reset_states()
try:
from tqdm.notebook import trange
from collections import OrderedDict
with trange(1, n_epochs + 1, desc='All epochs') as epochs:
for epoch in epochs:
with trange(1, n_steps + 1, desc='Epoch {} / {}'.format(epoch, n_epochs)) as steps:
for step in steps:
X_batch, y_batch = random_batch(X_train_scaled, y_train)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
status = OrderedDict()
mean_loss(loss)
status['loss'] = mean_loss.result().numpy()
for metric in metrics:
metric(y_batch, y_pred)
status[metric.name] = metric.result().numpy()
steps.set_postfix(status)
for metric in [mean_loss] + metrics:
metric.reset_states()
except Exception as e:
print(e)
TensorFlow函數
def cube(x):
return x ** 3
cube(2) # 8
cube(tf.constant(2.)) # <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
tf_cube = tf.function(cube)
tf_cube # <tensorflow.python.eager.def_function.Function at 0x1fa41b8d908>
tf_cube(2) # <tf.Tensor: shape=(), dtype=int32, numpy=8>
tf_cube(tf.constant(2.)) # <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
TF函數及具體功能
concrete_function = tf_cube.get_concrete_function(tf.constant(2.))
concrete_function.graph # <tensorflow.python.framework.func_graph.FuncGraph at 0x1fa41bb5eb8>
concrete_function(tf.constant(2.)) # <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
concrete_function is tf_cube.get_concrete_function(tf.constant(2.0)) # True
探索函數定義和圖
concrete_function.graph # <tensorflow.python.framework.func_graph.FuncGraph at 0x1fa41bb5eb8>
ops = concrete_function.graph.get_operations()
ops
'''
[<tf.Operation 'x' type=Placeholder>,
<tf.Operation 'pow/y' type=Const>,
<tf.Operation 'pow' type=Pow>,
<tf.Operation 'Identity' type=Identity>]
'''
pow_op = ops[2]
list(pow_op.inputs)
'''
[<tf.Tensor 'x:0' shape=() dtype=float32>,
<tf.Tensor 'pow/y:0' shape=() dtype=float32>]
'''
pow_op.outputs # [<tf.Tensor 'pow:0' shape=() dtype=float32>]
concrete_function.graph.get_operation_by_name('x') # <tf.Operation 'x' type=Placeholder>
concrete_function.graph.get_tensor_by_name('Identity:0') # <tf.Tensor 'Identity:0' shape=() dtype=float32>
concrete_function.function_def.signature
'''
name: "__inference_cube_1067234"
input_arg {
name: "x"
type: DT_FLOAT
}
output_arg {
name: "identity"
type: DT_FLOAT
}
'''
TF函數如何追蹤Python函數並抽取計算圖
@tf.function
def tf_cube(x):
print('print:', x)
return x ** 3
result = tf_cube(tf.constant(2.0)) # print: Tensor("x:0", shape=(), dtype=float32)
result # <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
result = tf_cube(2.)
result = tf_cube(3.)
result = tf_cube(tf.constant([[1, 2]]))
result = tf_cube(tf.constant([[3, 4], [5, 6]]))
result = tf_cube(tf.constant([[7, 8], [9, 10], [11, 12]]))
'''
print: 2.0
print: 3.0
print: Tensor("x:0", shape=(1, 2), dtype=int32)
print: Tensor("x:0", shape=(2, 2), dtype=int32)
WARNING:tensorflow:5 out of the last 5 calls to <function tf_cube at 0x000001A61F4BCD90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.
print: Tensor("x:0", shape=(3, 2), dtype=int32)
WARNING:tensorflow:6 out of the last 6 calls to <function tf_cube at 0x000001A61F4BCD90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.
'''
@tf.function(input_signature=[tf.TensorSpec([None, 28, 28], tf.float32)])
def shrink(images):
print('Tracing', images)
return images[:, ::2, ::2]
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
img_batch_1 = tf.random.uniform(shape=[100, 28, 28])
img_batch_2 = tf.random.uniform(shape=[50, 28, 28])
preprocessed_images = shrink(img_batch_1)
preprocessed_images = shrink(img_batch_2)
'''
Tracing Tensor("images:0", shape=(None, 28, 28), dtype=float32)
'''
img_batch_3 = tf.random.uniform(shape=[2, 2, 2])
try:
preprocessed_images = shrink(img_batch_3)
except Exception as e:
print(e)
'''
Python inputs incompatible with input_signature:
inputs: (
tf.Tensor(
[[[0.7413678 0.62854624]
[0.01738465 0.3431449 ]]
[[0.51063764 0.3777541 ]
[0.07321596 0.02137029]]], shape=(2, 2, 2), dtype=float32))
input_signature: (
TensorSpec(shape=(None, 28, 28), dtype=tf.float32, name=None))
'''
用自動圖追蹤控制流
@tf.function
def add_10(x):
for i in range(10):
x += 1
return x
add_10(tf.constant(5)) # <tf.Tensor: shape=(), dtype=int32, numpy=15>
add_10.get_concrete_function(tf.constant(5)).graph.get_operations()
'''
[<tf.Operation 'x' type=Placeholder>,
<tf.Operation 'add/y' type=Const>,
<tf.Operation 'add' type=AddV2>,
<tf.Operation 'add_1/y' type=Const>,
<tf.Operation 'add_1' type=AddV2>,
<tf.Operation 'add_2/y' type=Const>,
<tf.Operation 'add_2' type=AddV2>,
<tf.Operation 'add_3/y' type=Const>,
<tf.Operation 'add_3' type=AddV2>,
<tf.Operation 'add_4/y' type=Const>,
<tf.Operation 'add_4' type=AddV2>,
<tf.Operation 'add_5/y' type=Const>,
<tf.Operation 'add_5' type=AddV2>,
<tf.Operation 'add_6/y' type=Const>,
<tf.Operation 'add_6' type=AddV2>,
<tf.Operation 'add_7/y' type=Const>,
<tf.Operation 'add_7' type=AddV2>,
<tf.Operation 'add_8/y' type=Const>,
<tf.Operation 'add_8' type=AddV2>,
<tf.Operation 'add_9/y' type=Const>,
<tf.Operation 'add_9' type=AddV2>,
<tf.Operation 'Identity' type=Identity>]
'''
@tf.function
def add_10(x):
condition = lambda i, x: tf.less(i, 10)
body = lambda i, x: (tf.add(i, 1), tf.add(x, 1))
final_i, final_x = tf.while_loop(condition, body, [tf.constant(0), x])
return final_x
add_10(tf.constant(5)) # <tf.Tensor: shape=(), dtype=int32, numpy=15>
add_10.get_concrete_function(tf.constant(5)).graph.get_operations()
'''
[<tf.Operation 'x' type=Placeholder>,
<tf.Operation 'Const' type=Const>,
<tf.Operation 'while/maximum_iterations' type=Const>,
<tf.Operation 'while/loop_counter' type=Const>,
<tf.Operation 'while' type=StatelessWhile>,
<tf.Operation 'Identity' type=Identity>]
'''
@tf.function
def add_10(x):
for i in tf.range(10):
x += 1
return x
add_10.get_concrete_function(tf.constant(0)).graph.get_operations()
'''
[<tf.Operation 'x' type=Placeholder>,
<tf.Operation 'range/start' type=Const>,
<tf.Operation 'range/limit' type=Const>,
<tf.Operation 'range/delta' type=Const>,
<tf.Operation 'range' type=Range>,
<tf.Operation 'sub' type=Sub>,
<tf.Operation 'floordiv' type=FloorDiv>,
<tf.Operation 'mod' type=FloorMod>,
<tf.Operation 'zeros_like' type=Const>,
<tf.Operation 'NotEqual' type=NotEqual>,
<tf.Operation 'Cast' type=Cast>,
<tf.Operation 'add' type=AddV2>,
<tf.Operation 'zeros_like_1' type=Const>,
<tf.Operation 'Maximum' type=Maximum>,
<tf.Operation 'while/maximum_iterations' type=Const>,
<tf.Operation 'while/loop_counter' type=Const>,
<tf.Operation 'while' type=StatelessWhile>,
<tf.Operation 'Identity' type=Identity>]
'''
在TF函數中處理變量和其他資源
counter = tf.Variable(0)
@tf.function
def increment(counter, c=1):
return counter.assign_add(c)
increment(counter)
increment(counter)
'''
<tf.Tensor: shape=(), dtype=int32, numpy=2>
'''
function_def = increment.get_concrete_function(counter).function_def
function_def.signature.input_arg[0]
'''
name: "counter"
type: DT_RESOURCE
'''
counter = tf.Variable(0)
@tf.function
def increment(c=1):
return counter.assign_add(c)
increment()
increment() # <tf.Tensor: shape=(), dtype=int32, numpy=2>
function_def = increment.get_concrete_function().function_def
function_def.signature.input_arg[0]
'''
name: "assignaddvariableop_resource"
type: DT_RESOURCE
'''
class Counter:
def __init__(self):
self.counter = tf.Variable(0)
@tf.function
def increment(self, c=1):
return self.counter.assign_add(c)
c = Counter()
c.increment()
c.increment() # <tf.Tensor: shape=(), dtype=int32, numpy=2>
@tf.function
def add_10(x):
for i in tf.range(10):
x += 1
return x
print(tf.autograph.to_code(add_10.python_function))
'''
def tf__add(x):
with ag__.FunctionScope('add_10', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
do_return = False
retval_ = ag__.UndefinedReturnValue()
def get_state():
return (x,)
def set_state(vars_):
nonlocal x
(x,) = vars_
def loop_body(itr):
nonlocal x
i = itr
x = ag__.ld(x)
x += 1
i = ag__.Undefined('i')
ag__.for_stmt(ag__.converted_call(ag__.ld(tf).range, (10,), None, fscope), None, loop_body, get_state, set_state, ('x',), {'iterate_names': 'i'})
try:
do_return = True
retval_ = ag__.ld(x)
except:
do_return = False
raise
return fscope.ret(retval_, do_return)
'''
def display_tf_code(func):
from IPython.display import display, Markdown
if hasattr(func, 'python_function'):
func = func.python_function
code = tf.autograph.to_code(func)
display(Markdown('```python\n{}\n```'.format(code)))
display_tf_code(add_10)
'''
def tf__add(x):
with ag__.FunctionScope('add_10', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
do_return = False
retval_ = ag__.UndefinedReturnValue()
def get_state():
return (x,)
def set_state(vars_):
nonlocal x
(x,) = vars_
def loop_body(itr):
nonlocal x
i = itr
x = ag__.ld(x)
x += 1
i = ag__.Undefined('i')
ag__.for_stmt(ag__.converted_call(ag__.ld(tf).range, (10,), None, fscope), None, loop_body, get_state, set_state, ('x',), {'iterate_names': 'i'})
try:
do_return = True
retval_ = ag__.ld(x)
except:
do_return = False
raise
return fscope.ret(retval_, do_return)
'''
在Keras中使用TF函數
# Custom loss functin
def my_mse(y_true, y_pred):
print('Tracing loss my_mse()')
return tf.reduce_mean(tf.square(y_pred - y_true))
# Custom metric function
def my_mae(y_true, y_pred):
print('Tracing metric my_mae()')
return tf.reduce_mean(tf.abs(y_pred - y_true))
# 自定義層
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
self.kernel = self.add_weight(name='kernel', shape=(input_shape[1], self.units),
initializer='uniform', trainable=True)
self.biases = self.add_weight(name='bias', shape=(self.units,), initializer='zeros', trainable=True)
super().build(input_shape)
def call(self, X):
print('Tracing MyDense.call()')
return self.activation(X @ self.kernel + self.biases)
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
# Custom model
class MyModel(keras.models.Model):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.hidden1 = MyDense(30, activation='relu')
self.hidden2 = MyDense(30, activation='relu')
self.output_ = MyDense(1)
def call(self, inputs):
print('Tracing MyModel.call()')
hidden1 = self.hidden1(inputs)
hidden2 = self.hidden2(hidden1)
concat = keras.layers.concatenate([inputs, hidden2])
output = self.output_(concat)
return output
model = MyModel()
model.compile(loss=my_mse, optimizer='nadam', metrics=[my_mae])
model.fit(X_train_scaled, y_train, epochs=2, validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test) # [0.4163525402545929, 0.4639028012752533]
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = MyModel(dynamic=True)
model.compile(loss=my_mse, optimizer='nadam', metrics=[my_mae])
# 不是每次迭代都會調用自定義代碼
model.fit(X_test_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0)
model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0)
'''
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
[5.507431983947754, 2.055328845977783]
'''
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = MyModel()
model.compile(loss=my_mse, optimizer='nadam', metrics=[my_mae], run_eagerly=True)
model.fit(X_test_scaled[:64], y_train[:64], epochs=1, validation_data=(X_valid_scaled[:64], y_valid[:64]), verbose=0)
model.evaluate(X_test_scaled[:64], y_test[:64], verbose=0)
'''
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
[5.507431983947754, 2.055328845977783]
'''
自定義優化器
class MyMomentumOptimizer(keras.optimizers.Optimizer):
def __init__(self, learning_rate=0.001, momentum=0.9, name='MyMomentumOptimizer', **kwargs):
"""Call super().__init__() and use _set_hyper() to store hyperparameters"""
super().__init__(name, **kwargs)
self._set_hyper('learning_rate', kwargs.get('lr', learning_rate))
self._set_hyper('decay', self._initial_decay)
self._set_hyper('momentum', momentum)
def _create_slots(self, var_list):
"""For each model variable, create the optimizer variable associated with it.
TensorFlow calls these optimizer variables "slots".
For momentum optimization, we need one momentum slot per model variable.
"""
for var in var_list:
self.add_slot(var, 'momentum')
@tf.function
def _resource_apply_dense(self, grad, var):
"""Update the slots and perform one optimization step for one model variable
"""
var_dtype = var.dtype.base_dtype
lr_t = self._decayed_lr(var_dtype)
momentum_var = self.get_slot(var, 'momentum')
momentum_hyper = self._get_hyper('momentum', var_dtype)
momentum_var.assign(momentum_var * momentum_hyper - (1. - momentum_hyper) * grad)
var.assign_add(momentum_var * lr_t)
def _resource_apply_sparse(self, grad, var):
raise NotImplementedError
def get_config(self):
base_config = super().get_config()
return {**base_config, 'learning_rate': self._serialize_hyperparameter('learning_rate'),
'decay': self._serialize_hyperparameter('decay'),
'momentum': self._serialize_hyperparameter('momentum')}
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([keras.layers.Dense(1, input_shape=[8])])
model.compile(loss='mse', optimizer=MyMomentumOptimizer())
model.fit(X_train_scaled, y_train, epochs=5)