參考:tensorflow中的batch_norm以及tf.control_dependencies和tf.GraphKeys.UPDATE_OPS的探究
1. Batch Normalization
對卷積層來說,批量歸一化發生在卷積計算之后、應用激活函數之前。訓練階段:如果卷積計算輸出多個通道,我們需要對這些通道的輸出分別做批量歸一化,且每個通道都擁有獨立的拉伸和偏移參數,並均為標量。假設小批量中有 m 個樣本。在單個通道上,假設卷積計算輸出的高和寬分別為p和q。我們需要對該通道中m×p×q個元素同時做批量歸一化。對這些元素做標准化計算時,我們使用相同的均值和方差,即該通道中m×p×q個元素的均值和方差。測試階段:就直接用訓練階段利用 moving average 方式獲得的整個訓練集的 均值和方差來歸一化。
以一個 mxnet 版本的代碼來理解具體實現吧:

def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum): # 通過autograd來判斷當前模式是訓練模式還是預測模式 if not autograd.is_training(): # 如果是在預測模式下,直接使用傳入的移動平均所得的均值和方差 X_hat = (X - moving_mean) / nd.sqrt(moving_var + eps) else: assert len(X.shape) in (2, 4) if len(X.shape) == 2: # 使用全連接層的情況,計算特征維上的均值和方差 mean = X.mean(axis=0) var = ((X - mean) ** 2).mean(axis=0) else: # 使用二維卷積層的情況,計算通道維上(axis=1)的均值和方差。這里我們需要保持 # X的形狀以便后面可以做廣播運算 mean = X.mean(axis=(0, 2, 3), keepdims=True) var = ((X - mean) ** 2).mean(axis=(0, 2, 3), keepdims=True) # 訓練模式下用當前的均值和方差做標准化 X_hat = (X - mean) / nd.sqrt(var + eps) # 更新移動平均的均值和方差 moving_mean = momentum * moving_mean + (1.0 - momentum) * mean moving_var = momentum * moving_var + (1.0 - momentum) * var Y = gamma * X_hat + beta # 拉伸和偏移 return Y, moving_mean, moving_var class BatchNorm(nn.Block): def __init__(self, num_features, num_dims, **kwargs): super(BatchNorm, self).__init__(**kwargs) if num_dims == 2: shape = (1, num_features) else: shape = (1, num_features, 1, 1) # 參與求梯度和迭代的拉伸和偏移參數,分別初始化成1和0 self.gamma = self.params.get('gamma', shape=shape, init=init.One()) self.beta = self.params.get('beta', shape=shape, init=init.Zero()) # 不參與求梯度和迭代的變量,全在內存上初始化成0 self.moving_mean = nd.zeros(shape) self.moving_var = nd.zeros(shape) def forward(self, X): # 如果X不在內存上,將moving_mean和moving_var復制到X所在顯存上 if self.moving_mean.context != X.context: self.moving_mean = self.moving_mean.copyto(X.context) self.moving_var = self.moving_var.copyto(X.context) # 保存更新過的moving_mean和moving_var Y, self.moving_mean, self.moving_var = batch_norm( X, self.gamma.data(), self.beta.data(), self.moving_mean, self.moving_var, eps=1e-5, momentum=0.9) return Y
2. TensorFlow 實現
tensorflow中關於batch_norm現在有三種實現方式。
2.1 tf.nn.batch_normalization
tf.nn.batch_normalization( x, mean, variance, offset, scale, variance_epsilon, name=None )
該函數是一種最底層的實現方法,在使用時mean、variance、scale、offset等參數需要自己傳遞並更新,因此實際使用時還需自己對該函數進行封裝,一般不建議使用,但是對了解batch_norm的原理很有幫助。

import tensorflow as tf def batch_norm(x, name_scope, training, epsilon=1e-3, decay=0.99): """ Assume nd [batch, N1, N2, ..., Nm, Channel] tensor""" with tf.variable_scope(name_scope): size = x.get_shape().as_list()[-1] scale = tf.get_variable('scale', [size], initializer=tf.constant_initializer(0.1)) offset = tf.get_variable('offset', [size]) pop_mean = tf.get_variable('pop_mean', [size], initializer=tf.zeros_initializer(), trainable=False) pop_var = tf.get_variable('pop_var', [size], initializer=tf.ones_initializer(), trainable=False) batch_mean, batch_var = tf.nn.moments(x, list(range(len(x.get_shape())-1))) train_mean_op = tf.assign(pop_mean, pop_mean * decay + batch_mean * (1 - decay)) train_var_op = tf.assign(pop_var, pop_var * decay + batch_var * (1 - decay)) def batch_statistics(): with tf.control_dependencies([train_mean_op, train_var_op]): return tf.nn.batch_normalization(x, batch_mean, batch_var, offset, scale, epsilon) def population_statistics(): return tf.nn.batch_normalization(x, pop_mean, pop_var, offset, scale, epsilon) return tf.cond(training, batch_statistics, population_statistics) is_traing = tf.placeholder(dtype=tf.bool) input = tf.ones([1, 2, 2, 3]) output = batch_norm(input, name_scope='batch_norm_nn', training=is_traing)
2.2 tf.layers.batch_normalization
tf.layers.batch_normalization( inputs, axis=-1, momentum=0.99, epsilon=0.001, center=True, scale=True, beta_initializer=tf.zeros_initializer(), gamma_initializer=tf.ones_initializer(), moving_mean_initializer=tf.zeros_initializer(), moving_variance_initializer=tf.ones_initializer(), beta_regularizer=None, gamma_regularizer=None, beta_constraint=None, gamma_constraint=None, training=False, trainable=True, name=None, reuse=None, renorm=False, renorm_clipping=None, renorm_momentum=0.99, fused=None, virtual_batch_size=None, adjustment=None ) """ Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in `tf.GraphKeys.UPDATE_OPS`, so they need to be executed alongside the `train_op`. Also, be sure to add any batch_normalization ops before getting the update_ops collection. Otherwise, update_ops will be empty, and training/inference will not work properly. For example: ```python x_norm = tf.compat.v1.layers.batch_normalization(x, training=training) # ... update_ops = tf.compat.v1.get_collection(tf.GraphKeys.UPDATE_OPS) train_op = optimizer.minimize(loss) train_op = tf.group([train_op, update_ops]) ``` """
以下是個完整的 MNIST 訓練網絡:

import tensorflow as tf # 1. create data from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets('../MNIST_data', one_hot=True) X = tf.placeholder(tf.float32, shape=(None, 784), name='X') y = tf.placeholder(tf.int32, shape=(None), name='y') is_training = tf.placeholder(tf.bool, None, name='is_training') # 2. define network he_init = tf.initializers.he_normal() xavier_init = tf.initializers.glorot_normal() with tf.name_scope('dnn'): hidden1 = tf.layers.dense(X, 300, kernel_initializer=he_init, name='hidden1') hidden1 = tf.layers.batch_normalization(hidden1, momentum=0.9) hidden1 = tf.nn.relu(hidden1) hidden2 = tf.layers.dense(hidden1, 100, kernel_initializer=he_init, name='hidden2') hidden2 = tf.layers.batch_normalization(hidden2, training=is_training, momentum=0.9) hidden2 = tf.nn.relu(hidden2) logits = tf.layers.dense(hidden2, 10, kernel_initializer=he_init, name='output') # prob = tf.layers.dense(hidden2, 10, tf.nn.softmax, name='prob') # 3. define loss with tf.name_scope('loss'): # tf.losses.sparse_softmax_cross_entropy() label is not one_hot and dtype is int* # xentropy = tf.losses.sparse_softmax_cross_entropy(labels=tf.argmax(y, axis=1), logits=logits) # tf.nn.sparse_softmax_cross_entropy_with_logits() label is not one_hot and dtype is int* # xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=tf.argmax(y, axis=1), logits=logits) # loss = tf.reduce_mean(xentropy) loss = tf.losses.softmax_cross_entropy(onehot_labels=y, logits=logits) # label is one_hot # 4. define optimizer learning_rate = 0.01 with tf.name_scope('train'): update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) # for batch normalization with tf.control_dependencies(update_ops): optimizer_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss) with tf.name_scope('eval'): correct = tf.nn.in_top_k(logits, tf.argmax(y, axis=1), 1) # 目標是否在前K個預測中, label's dtype is int* accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) # 5. initialize init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) saver = tf.train.Saver() # ================= print([v.name for v in tf.trainable_variables()]) print([v.name for v in tf.global_variables()]) # ================= # 5. train & test n_epochs = 20 n_batches = 50 batch_size = 50 with tf.Session() as sess: sess.run(init_op) for epoch in range(n_epochs): for iteration in range(mnist.train.num_examples // batch_size): X_batch, y_batch = mnist.train.next_batch(batch_size) sess.run(optimizer_op, feed_dict={X: X_batch, y: y_batch, is_training:True}) acc_train = accuracy.eval(feed_dict={X: X_batch, y: y_batch, is_training:False}) # 最后一個 batch 的 accuracy acc_test = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False}) loss_test = loss.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False}) print(epoch, "Train accuracy:", acc_train, "Test accuracy:", acc_test, "Test loss:", loss_test) save_path = saver.save(sess, "./my_model_final.ckpt") with tf.Session() as sess: sess.run(init_op) saver.restore(sess, "./my_model_final.ckpt") acc_test = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False}) loss_test = loss.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False}) print("Test accuracy:", acc_test, ", Test loss:", loss_test)
2.3 tf.contrib.layers.batch_norm
涉及到 contribe 部分就先 pass 吧
3. 關於tf.GraphKeys.UPDATA_OPS
3.1 tf.control_dependencies
按照下面這個例子理解下:
import tensorflow as tf a_1 = tf.Variable(1) b_1 = tf.Variable(2) update_op = tf.assign(a_1, 10) add = tf.add(a_1, b_1) a_2 = tf.Variable(1) b_2 = tf.Variable(2) update_op2 = tf.assign(a_2, 10) with tf.control_dependencies([update_op2]): add_with_dependencies = tf.add(a_2, b_2) init = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) with tf.Session() as sess: sess.run(init) ans_1, ans_2 = sess.run([add, add_with_dependencies]) print("Add: ", ans_1) print("Add_with_dependency: ", ans_2) """ 可以看到兩組加法進行的對比,正常的計算圖在計算 add 時是不會經過 update_op 操作的, 因此在加法時 a 的值仍為 1,但是采用 tf.control_dependencies 函數,可以控制在進行 add 前先完成 update_op 的操作,因此在加法時 a 的值為 10,因此最后兩種加法的結果不同。 """
3.2 tf.GraphKeys.UPDATE_OPS
關於 tf.GraphKeys.UPDATE_OPS,這是一個 tensorflow 的計算圖中內置的一個集合,其中會保存一些需要在訓練操作之前完成的操作,並配合 tf.control_dependencies 函數使用。
關於在 batch_norm 中,即為更新 mean 和 variance 的操作。通過下面一個例子可以看到 tf.layers.batch_normalization 中是如何實現的。
import tensorflow as tf is_traing = tf.placeholder(dtype=tf.bool, shape=None) input = tf.ones([1, 2, 2, 3]) output = tf.layers.batch_normalization(input, training=is_traing) update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) print(update_ops) print([v.name for v in tf.trainable_variables()]) print([v.name for v in tf.global_variables()]) """ [<tf.Operation 'batch_normalization/AssignMovingAvg' type=AssignSub>, <tf.Operation 'batch_normalization/AssignMovingAvg_1' type=AssignSub>] ['batch_normalization/gamma:0', 'batch_normalization/beta:0'] ['batch_normalization/gamma:0', 'batch_normalization/beta:0', 'batch_normalization/moving_mean:0', 'batch_normalization/moving_variance:0'] """
可以看到輸出的即為兩個 batch_normalization 中更新 mean 和 variance 的操作,需要保證它們在 train_op 前完成。
這兩個操作是在 tensorflow 的內部實現中自動被加入 tf.GraphKeys.UPDATE_OPS 這個集合的,在 tf.contrib.layers.batch_norm 的參數中可以看到有一項 updates_collections 的默認值即為 tf.GraphKeys.UPDATE_OPS,而在 tf.layers.batch_normalization 中則是直接將兩個更新操作放入了上述集合。
如果在使用時不添加 tf.control_dependencies 函數,即在訓練時(training=True)每批次時只會計算當批次的 mean 和 var,並傳遞給 tf.nn.batch_normalization 進行歸一化,由於 mean_update 和 variance_update 在計算圖中並不在上述操作的依賴路徑上,因為並不會主動完成,也就是說,在訓練時 mean_update 和 variance_update 並不會被使用到,其值一直是初始值。因此在測試階段(training=False)使用這兩個作為 mean 和 variance 並進行歸一化操作,這樣就會尷尬了。而如果使用 tf.control_dependencies 函數,會在訓練階段每次訓練操作執行前被動地去執行 mean_update 和 variance_update,因此 moving_mean 和 moving_variance 會被不斷更新,在測試時使用該參數也就不會有問題了。
One more thing: notice that the list of trainable variables is shorter than the list of all global variables. This is because the moving averages are non-trainable variables. If you want to reuse a pretrained neural network (see below), you must not forget these non-trainable variables.
4. Dropout
以上圖為例,對隱含層做概率為 $p$ 的Dropout操作時,有 $p$ 的概率 $h_i$ 會被清零,而又 $1-p$ 的概率 $h_i$ 會除以 $1-p$ 做拉伸。目的是在不改變隱含層的期望值的前提下,用隨機丟棄的方式使得輸出層的計算無法過度依賴於隱含層的任節點。
def dropout(X, drop_prob): assert 0 <= drop_prob <= 1 keep_prob = 1 - drop_prob # 這種情況下把全部元素都丟棄 if keep_prob == 0: return X.zeros_like() mask = nd.random.uniform(0, 1, X.shape) < keep_prob return mask * X / keep_prob
tf.nn.dropout
tf.nn.dropout( x, keep_prob=None, noise_shape=None, seed=None, name=None, rate=None )
以下是一個是否使用 Dropout 訓練的實例:

import tensorflow as tf import numpy as np import matplotlib.pyplot as plt print(tf.__version__) # Hyper parameters tf.set_random_seed(1) np.random.seed(1) N_SAMPLES = 20 # 樣本點數 N_HIDDEN = 300 # 隱含層節點數 LR = 0.01 # 初始學習率 # 1. create data x_train = np.linspace(-1, 1, N_SAMPLES)[:, np.newaxis] y_train = x_train + 0.3 * np.random.randn(N_SAMPLES)[:, np.newaxis] x_test = x_train.copy() y_test = x_test + 0.3 * np.random.randn(N_SAMPLES)[:, np.newaxis] # show data plt.scatter(x_train, y_train, c='magenta', s=50, alpha=0.5, label='train') plt.scatter(x_test, y_test, c='cyan', s=50, alpha=0.5, label='test') plt.legend(loc='upper left') plt.show() # tf placeholder tf_x = tf.placeholder(tf.float32, [None, 1]) tf_y = tf.placeholder(tf.float32, [None, 1]) is_training = tf.placeholder(tf.bool, None) # to control dropout # 2. construct network o1 = tf.layers.dense(tf_x, N_HIDDEN, tf.nn.relu) o2 = tf.layers.dense(o1, N_HIDDEN, tf.nn.relu) o_predict = tf.layers.dense(o2, 1) # ========== d1 = tf.layers.dense(tf_x, N_HIDDEN, tf.nn.relu) d1 = tf.layers.dropout(d1, rate=0.5, training=is_training) # drop out 50% d2 = tf.layers.dense(d1, N_HIDDEN, tf.nn.relu) d2 = tf.layers.dropout(d2, rate=0.5, training=is_training) # drop out 50% d_predict = tf.layers.dense(d2, 1) # ========== # 3. define loss o_loss = tf.losses.mean_squared_error(tf_y, o_predict) d_loss = tf.losses.mean_squared_error(tf_y, d_predict) # 4. define optimizer o_optimizer = tf.train.AdamOptimizer(LR).minimize(o_loss) d_optimizer = tf.train.AdamOptimizer(LR).minimize(d_loss) # 5. initialize init_op = tf.global_variables_initializer() plt.ion() # something about plotting # 6. train with tf.Session() as sess: sess.run(init_op) for step in range(500): sess.run([o_optimizer, d_optimizer], {tf_x: x_train, tf_y: y_train, is_training: True}) # train, set is_training=True if step % 10 == 0: # ploting plt.cla() plt.scatter(x_train, y_train, c='magenta', s=50, alpha=0.3, label='train') plt.scatter(x_test, y_test, c='cyan', s=50, alpha=0.3, label='test') o_loss_, d_loss_, o_predict_, d_predict_ = sess.run( [o_loss, d_loss, o_predict, d_predict], feed_dict={tf_x: x_test, tf_y: y_test, is_training: False} ) plt.plot(x_test, o_predict_, 'r-', lw=3, label='overfitting') plt.plot(x_test, d_predict_, 'b--', lw=3, label='dropout(50%)') plt.text(0, -1.2, 'overfitting loss=%.4f' % o_loss_, fontdict={'size': 20, 'color': 'red'}) plt.text(0, -1.5, 'dropout loss=%.4f' % d_loss_, fontdict={'size': 20, 'color': 'blue'}) plt.legend(loc='upper left') plt.ylim((-2.5, 2.5)) plt.pause(0.1) plt.ioff() plt.show()
Dropout 的使用使得網絡不過分依賴於少數節點,每個節點都盡可能地發揮自己的作用,他們最終對輸入的微小變化不那么敏感。最后,你會得到一個更健壯的網絡,它能更好地泛化。