博客斷更了一周,干啥去了?想做個聊天機器人出來,去看教程了,然后大受打擊,哭着回來補TensorFlow和自然語言處理的基礎了。本來如意算盤打得挺響,作為一個初學者,直接看項目(不是指MINIST手寫數字識別這種),哪里不會補哪里,這樣不僅能學習到TensorFlow和算法知識,還知道如何在具體項目中應用,學完后還能出來一個項目。是不是要為博主的想法雙擊666?圖樣!
現在明白了什么叫基礎不牢地動山搖,明白了什么叫步子太大直接就放棄,明白了我是適合循序漸進的學習,暫時不適合對着項目直接干。
同時也明白了一點,那就是為什么很多TensoFlow教程都用MINIST數據集來展示如何構建各種模型,我之前還很鄙視一MINIST到底,覺得這就是個toy項目。現在明白了,用這個數據集是為了把精力集中在模型搭建和優化上,而不是浪費在數據預處理上。知道數據的輸入格式,明白如何構建模型和優化模型,那么面對新的任務時,只要把數據處理成相同的輸入格式,就能比較快的用TensorFlow和模型來完成任務最關鍵的部分。
所以打算好好學習一下用TensorFlow實現深度學習模型的基礎知識,主要用的這本書:《Hands On Machine Learning with Scikit-Learn and TensorFlow》。這本書真是神書,可以說是學習TensorFlow最好的資料了,一方面這本書把各種深度學習基礎都用代碼實現了,從DNN到CNN、RNN、自編碼器,從參數初始化、選擇優化器、Batch Norm、調整學習率、網絡預訓練等加速技巧到dropout、早停等正則化技巧,代碼豐富,講解詳盡;另一方面作者一直在Github上更新代碼,根據TensorFlow語法的變化更改代碼,我看到前幾天還在更新,感動哭。建議電子書和Github一起看。
好,接下來首先整理如何用TensorFlow構建DNN網絡,實現參數初始化、選擇優化算法、Batch Norm、梯度截斷和學習率衰減這些加速技巧,以及實現dropout、L1范數和L2范數等正則化技巧。
這一篇博客整理如何用TensorFlow構建一個DNN網絡,后面再寫博客整理如何使用加速訓練技巧和正則化技巧來優化模型,這樣做也是為了方便對比。
一、全連接神經網絡的“裸機版”
我們先用TensorFlow搭建一個沒有使用任何加速優化技巧的全連接神經網絡, 可以看作是低配的“裸機版”全連接神經網絡。數據集是經典的MINIST數據集,訓練一個DNN用於分類,有兩個隱藏層(一個有300個神經元,另一個有100個神經元)和一個帶有10個神經元的softmax輸出層,用小批量梯度下降算法(Mini-Batch Gradient Descent)來進行訓練。我們一步步來構建,最后再給出一份完整的代碼。
第一步:指定輸入的維度(每個樣本的特征維度),輸出的維度(類別數),並設置每個隱藏層神經元的數量
MINIST數據集就是手寫數字識別數據集,里面的數字圖片是黑白的,所以通道數是1,那么特征維度就是長和高兩維,為了方便輸入到網絡中,把2維矩陣拉平成1維的向量,就是28 * 28。而標簽是0-9這10個數字,所以輸出是10維。將第2個隱藏層的神經元設為100個,小於第1個隱藏層的神經元數,因為神經網絡中越深的隱藏層,提取到的特征越高級,維度越小。
import tensorflow as tf import numpy as np n_inputs = 28*28 n_hidden1 = 300 n_hidden2 = 100 n_outputs = 10
第二步:切分數據集,打亂順序,並生成小批量樣本
首先獲取數據,推薦用tf.keras.datasets來獲取這個內置的數據集,速度快。訓練集和測試集都可以直接獲取,再把訓練集切分為用於訓練的樣本和用於驗證的樣本(5000個)。
然后用np.random.permutation這個函數打亂訓練樣本的索引列表,再用np.array_split這個函數把索引列表進行切分,用來獲取小批量樣本。
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() # 一開始是三維數組,(60000, 28, 28) print(X_train.shape,y_train.shape) # 把數據重新組合為二維數組,(60000, 784) X_train = X_train.astype(np.float32).reshape(-1, 28*28) / 255.0 X_test = X_test.astype(np.float32).reshape(-1, 28*28) / 255.0 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) print(X_train.shape) X_valid, X_train = X_train[:5000],X_train[5000:] y_valid, y_train = y_train[:5000],y_train[5000:] # 打亂數據,並生成batch def shuffle_batch(X, y, batch_size): # permutation不直接在原來的數組上進行操作,而是返回一個新的打亂順序的數組,並不改變原來的數組。 rnd_idx = np.random.permutation(len(X)) n_batches = len(X) // batch_size # 把rnd_idx這個一位數組進行切分 for batch_idx in np.array_split(rnd_idx, n_batches): X_batch, y_batch = X[batch_idx], y[batch_idx] yield X_batch, y_batch
第三步:定義占位符節點
為了使用小批量梯度下降算法,我們需要使用占位符節點,然后在每次迭代時用下一個小批量樣本替換X和y。這些占位符節點在訓練階段才將小批量樣本傳入給TensorFlow,目前是構建圖階段,不執行運算。
注意到X和y的形狀中有None,也就是只定義了一部分。為什么呢?
X是一個2D張量,第一個維度是樣本數,第二個維度是特征。我們知道特征的數量是28*28,但是還不知道batch size是多少,所以第一個維度指定為None,也就是任意大小。
y是一個1D張量,維度是樣本數,同樣指定為None。
X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X") y = tf.placeholder(tf.int32, shape=(None), name="y")
第四步:搭建網絡層
注意最后一層並沒有定義softmax函數來求出概率分布,而是得到通過softmax函數激活之前的輸入值logtis。接下來會說明為什么這里不定義一個softmax激活函數。
當然輸出的概率分布也可以計算出來,用tf.nn.softmax,但是這個值我們並不會用於下面的計算。
with tf.name_scope("dnn"): hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1", activation=tf.nn.relu) hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2", activation=tf.nn.relu) logits = tf.layers.dense(hidden2, n_outputs, name="outputs") y_proba = tf.nn.softmax(logits)
第五步:定義損失函數和優化器
損失函數用交叉熵損失函數。tf.nn.sparse_softmax_cross_entropy_with_logits這個老長老長的函數,是用softmax激活之前的輸入值直接計算交叉熵損失,為什么這么干呢?一方面是為了正確處理像log等於0的極端情況,這是為啥不先用softmax激活,另一方面是與樣本標簽的格式相匹配,我們說了樣本的標簽是1D張量,也就是1或者7這種整數,這是這個函數需要的格式,而不是[0, 1, 0, ..., 0]這種獨熱編碼格式。獨熱編碼格式的標簽需要使用tf.nn.softmax_cross_entropy_with_logits這個函數。
然后定義一個GD優化器。
# 定義損失函數和計算損失
with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") # 定義優化器 learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss)
第六步:評估模型
使用准確性作為我們的模型評估指標。首先,對於每個樣本,通過檢查logits最大值的索引是否與標簽y相等,來確定神經網絡的預測是否正確。為此,可以使用in_top_k()函數,這會返回一個充滿布爾值的1D張量。
然后我們需要用tf.cast這個函數將這些布爾值轉換為浮點值,然后計算平均值。
# 評估模型,使用准確性作為我們的績效指標 with tf.name_scope("eval"): # logists最大值的索引在0-9之間,恰好就是被預測所屬於的類,因此和y進行對比,相等就是True,否則為False correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
第七步:初始化所有變量,並保存模型
init = tf.global_variables_initializer()
saver = tf.train.Saver()
第八步:定義訓練輪次和batch size,訓練模型和保存模型,記錄訓練時間
訓練40輪(epoch),把所有的樣本都輸入進去訓練一次,叫做一輪。然后一個小批量是輸入200個樣本。
在訓練階段可以把小批量樣本傳入模型中開始訓練了。
定義了一個記錄訓練時間的函數,因為訓練時間的長短也是調參的關注點之一。
# 定義好訓練輪次和batch-size n_epochs = 40 batch_size = 200 # 獲取計算所花費的時間
import time
from datetime import timedelta
def get_time_dif(start_time): end_time = time.time() time_dif = end_time - start_time #timedelta是用於對間隔進行規范化輸出,間隔10秒的輸出為:00:00:10 return timedelta(seconds=int(round(time_dif))) with tf.Session() as sess: init.run() start_time = time.time() for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) if epoch % 5 == 0: acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) print(epoch, "Batch accuracy:", acc_batch, "Val accuracy:", acc_val) time_dif = get_time_dif(start_time) print("\nTime usage:", time_dif) save_path = saver.save(sess, "./my_model_final.ckpt")
得到的輸出結果如下。用時22秒,感覺效果賊一般啊,最后一輪的驗證精度為96.3%。后面通過調整batch size,看是否能得到更好的結果。
0 Batch accuracy: 0.835 Val accuracy: 0.8132
5 Batch accuracy: 0.895 Val accuracy: 0.9178
10 Batch accuracy: 0.94 Val accuracy: 0.934
15 Batch accuracy: 0.96 Val accuracy: 0.9414
20 Batch accuracy: 0.955 Val accuracy: 0.948
25 Batch accuracy: 0.94 Val accuracy: 0.9534
30 Batch accuracy: 0.96 Val accuracy: 0.9576
35 Batch accuracy: 0.955 Val accuracy: 0.9608
39 Batch accuracy: 0.96 Val accuracy: 0.963
Time usage: 0:00:22
第九步:調用訓練好的模型進行預測
把保存好的模型恢復,然后對測試集中的20個樣本進行預測,發現全部預測正確。
再評估模型在全部測試集上的正確率,得到正確率為95.76%。
效果確實賊一般。
with tf.Session() as sess: saver.restore(sess, "./my_model_final.ckpt") # or better, use save_path X_test_20 = X_test[:20] # 得到softmax之前的輸出 Z = logits.eval(feed_dict={X: X_test_20}) # 得到每一行最大值的索引 y_pred = np.argmax(Z, axis=1) print("Predicted classes:", y_pred) print("Actual calsses: ", y_test[:20]) # 評估在測試集上的正確率 acc_test = accuracy.eval(feed_dict={X: X_test, y: y_test}) print("\nTest_accuracy:", acc_test)
INFO:tensorflow:Restoring parameters from ./my_model_final.ckpt Predicted classes: [7 2 1 0 4 1 4 9 6 9 0 6 9 0 1 5 9 7 3 4] Actual calsses: [7 2 1 0 4 1 4 9 5 9 0 6 9 0 1 5 9 7 3 4] Test_accuracy: 0.9576
二、對全連接神經網絡進行微調
這里的微調是調整batch size,學習率和迭代輪次。調參還是有點花時間,我就以調整batch size為例,來看看是否可以得到更好的結果。
分別把batch size設置為200,100,50,10,訓練好模型后,計算在測試集上的正確率,分別為:
batch_size = 200 Test_accuracy: 0.9576 batch_size = 100 Test_accuracy: 0.9599 batch_size = 50 Test_accuracy: 0.9781 batch_size = 10 Test_accuracy: 0.9812
發現規律沒有,在這個數據集上,batch size越小,則測試精度越高。batch size為10的時候,測試精度達到了98.12%,還是很不錯的。
然后我想,如果用SGD,每次只輸入一個樣本又會怎么樣呢?測試精度還能不能提高我不知道,可是能確定的一點是,我點擊運行之后,我可以去聽幾首鄧紫棋的歌再回來了。然后我就去跑步了。
我跑步和聽歌回來了,用SGD訓練模型耗時26分30秒,測試精度為98.51%,取得了到目前為止最好的成績。
三、完整代碼整理
import tensorflow as tf import numpy as np import time from datetime import timedelta # 記錄訓練花費的時間 def get_time_dif(start_time): end_time = time.time() time_dif = end_time - start_time #timedelta是用於對間隔進行規范化輸出,間隔10秒的輸出為:00:00:10 return timedelta(seconds=int(round(time_dif))) n_inputs = 28*28 n_hidden1 = 300 n_hidden2 = 100 n_outputs = 10 (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data() # 一開始是三維數組,(60000, 28, 28) print(X_train.shape,y_train.shape) # 把數據重新組合為二維數組,(60000, 784) X_train = X_train.astype(np.float32).reshape(-1, 28*28) / 255.0 X_test = X_test.astype(np.float32).reshape(-1, 28*28) / 255.0 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) print(X_train.shape) X_valid, X_train = X_train[:5000],X_train[5000:] y_valid, y_train = y_train[:5000],y_train[5000:] # 打亂數據,並生成batch def shuffle_batch(X, y, batch_size): # permutation不直接在原來的數組上進行操作,而是返回一個新的打亂順序的數組,並不改變原來的數組。 rnd_idx = np.random.permutation(len(X)) n_batches = len(X) // batch_size # 把rnd_idx這個一位數組進行切分 for batch_idx in np.array_split(rnd_idx, n_batches): X_batch, y_batch = X[batch_idx], y[batch_idx] yield X_batch, y_batch X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X") y = tf.placeholder(tf.int32, shape=(None), name="y") with tf.name_scope("dnn"): hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1", activation=tf.nn.relu) hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2", activation=tf.nn.relu) logits = tf.layers.dense(hidden2, n_outputs, name="outputs") y_proba = tf.nn.softmax(logits) # 定義損失函數和計算損失 with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") # 定義優化器 learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss) # 評估模型,使用准確性作為我們的績效指標 with tf.name_scope("eval"): # logists最大值的索引在0-9之間,恰好就是被預測所屬於的類,因此和y進行對比,相等就是True,否則為False correct = tf.nn.in_top_k(logits, y, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) init = tf.global_variables_initializer() saver = tf.train.Saver() # 定義好訓練輪次和batch-size n_epochs = 40 batch_size = 200 with tf.Session() as sess: init.run() start_time = time.time() for epoch in range(n_epochs): for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size): sess.run(training_op, feed_dict={X: X_batch, y: y_batch}) if epoch % 5 == 0 or epoch == 39: acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch}) acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid}) print(epoch, "Batch accuracy:", acc_batch, "Val accuracy:", acc_val) time_dif = get_time_dif(start_time) print("\nTime usage:", time_dif) save_path = saver.save(sess, "./my_model_final.ckpt")
with tf.Session() as sess: saver.restore(sess, "./my_model_final.ckpt") # or better, use save_path X_test_20 = X_test[:20] # 得到softmax之前的輸出 Z = logits.eval(feed_dict={X: X_test_20}) # 得到每一行最大值的索引 y_pred = np.argmax(Z, axis=1) print("Predicted classes:", y_pred) print("Actual calsses: ", y_test[:20]) # 評估在測試集上的正確率 acc_test = accuracy.eval(feed_dict={X: X_test, y: y_test}) print("\nTest_accuracy:", acc_test)
參考資料:
《Hands On Machine Learning with Scikit-Learn and TensorFlow》