TensorFlow使用記錄 (七): BN 層及 Dropout 層的使用


參考:tensorflow中的batch_norm以及tf.control_dependencies和tf.GraphKeys.UPDATE_OPS的探究

1. Batch Normalization

對卷積層來說,批量歸一化發生在卷積計算之后、應用激活函數之前。訓練階段:如果卷積計算輸出多個通道,我們需要對這些通道的輸出分別做批量歸一化,且每個通道都擁有獨立的拉伸和偏移參數,並均為標量。假設小批量中有 m 個樣本。在單個通道上,假設卷積計算輸出的高和寬分別為p和q。我們需要對該通道中m×p×q個元素同時做批量歸一化。對這些元素做標准化計算時,我們使用相同的均值和方差,即該通道中m×p×q個元素的均值和方差。測試階段:就直接用訓練階段利用 moving average 方式獲得的整個訓練集的 均值和方差來歸一化。

 以一個 mxnet 版本的代碼來理解具體實現吧:

def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
    # 通過autograd來判斷當前模式是訓練模式還是預測模式
    if not autograd.is_training():
        # 如果是在預測模式下,直接使用傳入的移動平均所得的均值和方差
        X_hat = (X - moving_mean) / nd.sqrt(moving_var + eps)
    else:
        assert len(X.shape) in (2, 4)
        if len(X.shape) == 2:
            # 使用全連接層的情況,計算特征維上的均值和方差
            mean = X.mean(axis=0)
            var = ((X - mean) ** 2).mean(axis=0)
        else:
            # 使用二維卷積層的情況,計算通道維上(axis=1)的均值和方差。這里我們需要保持
            # X的形狀以便后面可以做廣播運算
            mean = X.mean(axis=(0, 2, 3), keepdims=True)
            var = ((X - mean) ** 2).mean(axis=(0, 2, 3), keepdims=True)
        # 訓練模式下用當前的均值和方差做標准化
        X_hat = (X - mean) / nd.sqrt(var + eps)
        # 更新移動平均的均值和方差
        moving_mean = momentum * moving_mean + (1.0 - momentum) * mean
        moving_var = momentum * moving_var + (1.0 - momentum) * var
    Y = gamma * X_hat + beta  # 拉伸和偏移
    return Y, moving_mean, moving_var

class BatchNorm(nn.Block):
    def __init__(self, num_features, num_dims, **kwargs):
        super(BatchNorm, self).__init__(**kwargs)
        if num_dims == 2:
            shape = (1, num_features)
        else:
            shape = (1, num_features, 1, 1)
        # 參與求梯度和迭代的拉伸和偏移參數,分別初始化成1和0
        self.gamma = self.params.get('gamma', shape=shape, init=init.One())
        self.beta = self.params.get('beta', shape=shape, init=init.Zero())
        # 不參與求梯度和迭代的變量,全在內存上初始化成0
        self.moving_mean = nd.zeros(shape)
        self.moving_var = nd.zeros(shape)

    def forward(self, X):
        # 如果X不在內存上,將moving_mean和moving_var復制到X所在顯存上
        if self.moving_mean.context != X.context:
            self.moving_mean = self.moving_mean.copyto(X.context)
            self.moving_var = self.moving_var.copyto(X.context)
        # 保存更新過的moving_mean和moving_var
        Y, self.moving_mean, self.moving_var = batch_norm(
            X, self.gamma.data(), self.beta.data(), self.moving_mean,
            self.moving_var, eps=1e-5, momentum=0.9)
        return Y
View Code

 

2. TensorFlow 實現

tensorflow中關於batch_norm現在有三種實現方式。

2.1 tf.nn.batch_normalization

tf.nn.batch_normalization(
    x,
    mean,
    variance,
    offset,
    scale,
    variance_epsilon,
    name=None
)

該函數是一種最底層的實現方法,在使用時mean、variance、scale、offset等參數需要自己傳遞並更新,因此實際使用時還需自己對該函數進行封裝,一般不建議使用,但是對了解batch_norm的原理很有幫助。

import tensorflow as tf

def batch_norm(x, name_scope, training, epsilon=1e-3, decay=0.99):
    """ Assume nd [batch, N1, N2, ..., Nm, Channel] tensor"""
    with tf.variable_scope(name_scope):
        size = x.get_shape().as_list()[-1]
        scale = tf.get_variable('scale', [size], initializer=tf.constant_initializer(0.1))
        offset = tf.get_variable('offset', [size])

        pop_mean = tf.get_variable('pop_mean', [size], initializer=tf.zeros_initializer(), trainable=False)
        pop_var = tf.get_variable('pop_var', [size], initializer=tf.ones_initializer(), trainable=False)
        batch_mean, batch_var = tf.nn.moments(x, list(range(len(x.get_shape())-1)))
        train_mean_op = tf.assign(pop_mean, pop_mean * decay + batch_mean * (1 - decay))
        train_var_op = tf.assign(pop_var, pop_var * decay + batch_var * (1 - decay))

        def batch_statistics():
            with tf.control_dependencies([train_mean_op, train_var_op]):
                return tf.nn.batch_normalization(x, batch_mean, batch_var, offset, scale, epsilon)
        def population_statistics():
            return tf.nn.batch_normalization(x, pop_mean, pop_var, offset, scale, epsilon)

        return tf.cond(training, batch_statistics, population_statistics)

is_traing = tf.placeholder(dtype=tf.bool)
input = tf.ones([1, 2, 2, 3])
output = batch_norm(input, name_scope='batch_norm_nn', training=is_traing)
View Code

2.2 tf.layers.batch_normalization

tf.layers.batch_normalization(
    inputs,
    axis=-1,
    momentum=0.99,
    epsilon=0.001,
    center=True,
    scale=True,
    beta_initializer=tf.zeros_initializer(),
    gamma_initializer=tf.ones_initializer(),
    moving_mean_initializer=tf.zeros_initializer(),
    moving_variance_initializer=tf.ones_initializer(),
    beta_regularizer=None,
    gamma_regularizer=None,
    beta_constraint=None,
    gamma_constraint=None,
    training=False,
    trainable=True,
    name=None,
    reuse=None,
    renorm=False,
    renorm_clipping=None,
    renorm_momentum=0.99,
    fused=None,
    virtual_batch_size=None,
    adjustment=None
)
"""
Note: when training, the moving_mean and moving_variance need to be updated.
By default the update ops are placed in `tf.GraphKeys.UPDATE_OPS`, so they
need to be executed alongside the `train_op`. Also, be sure to add any
batch_normalization ops before getting the update_ops collection. Otherwise,
update_ops will be empty, and training/inference will not work properly. For
example:

```python
x_norm = tf.compat.v1.layers.batch_normalization(x, training=training)

# ...

update_ops = tf.compat.v1.get_collection(tf.GraphKeys.UPDATE_OPS)
train_op = optimizer.minimize(loss)
train_op = tf.group([train_op, update_ops])
```
"""

以下是個完整的 MNIST 訓練網絡:

import tensorflow as tf

# 1. create data
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('../MNIST_data', one_hot=True)

X = tf.placeholder(tf.float32, shape=(None, 784), name='X')
y = tf.placeholder(tf.int32, shape=(None), name='y')
is_training = tf.placeholder(tf.bool, None, name='is_training')

# 2. define network
he_init = tf.initializers.he_normal()
xavier_init = tf.initializers.glorot_normal()
with tf.name_scope('dnn'):
    hidden1 = tf.layers.dense(X, 300, kernel_initializer=he_init, name='hidden1')
    hidden1 = tf.layers.batch_normalization(hidden1, momentum=0.9)
    hidden1 = tf.nn.relu(hidden1)
    hidden2 = tf.layers.dense(hidden1, 100, kernel_initializer=he_init, name='hidden2')
    hidden2 = tf.layers.batch_normalization(hidden2, training=is_training, momentum=0.9)
    hidden2 = tf.nn.relu(hidden2)
    logits = tf.layers.dense(hidden2, 10, kernel_initializer=he_init, name='output')
    # prob = tf.layers.dense(hidden2, 10, tf.nn.softmax, name='prob')

# 3. define loss
with tf.name_scope('loss'):
    # tf.losses.sparse_softmax_cross_entropy() label is not one_hot and dtype is int*
    # xentropy = tf.losses.sparse_softmax_cross_entropy(labels=tf.argmax(y, axis=1), logits=logits)
    # tf.nn.sparse_softmax_cross_entropy_with_logits() label is not one_hot and dtype is int*
    # xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=tf.argmax(y, axis=1), logits=logits)
    # loss = tf.reduce_mean(xentropy)
    loss = tf.losses.softmax_cross_entropy(onehot_labels=y, logits=logits) # label is one_hot

# 4. define optimizer
learning_rate = 0.01
with tf.name_scope('train'):
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)  # for batch normalization
    with tf.control_dependencies(update_ops):
        optimizer_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)


with tf.name_scope('eval'):
    correct = tf.nn.in_top_k(logits, tf.argmax(y, axis=1), 1) # 目標是否在前K個預測中, label's dtype is int*
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

# 5. initialize
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
saver = tf.train.Saver()
# =================
print([v.name for v in tf.trainable_variables()])
print([v.name for v in tf.global_variables()])
# =================
# 5. train & test
n_epochs = 20
n_batches = 50
batch_size = 50

with tf.Session() as sess:
    sess.run(init_op)
    for epoch in range(n_epochs):
        for iteration in range(mnist.train.num_examples // batch_size):
            X_batch, y_batch = mnist.train.next_batch(batch_size)
            sess.run(optimizer_op, feed_dict={X: X_batch, y: y_batch, is_training:True})
        acc_train = accuracy.eval(feed_dict={X: X_batch, y: y_batch, is_training:False}) # 最后一個 batch 的 accuracy
        acc_test = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False})
        loss_test = loss.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False})
        print(epoch, "Train accuracy:", acc_train, "Test accuracy:", acc_test, "Test loss:", loss_test)
    save_path = saver.save(sess, "./my_model_final.ckpt")

with tf.Session() as sess:
    sess.run(init_op)
    saver.restore(sess, "./my_model_final.ckpt")
    acc_test = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False})
    loss_test = loss.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels, is_training:False})
    print("Test accuracy:", acc_test, ", Test loss:", loss_test)
View Code

2.3 tf.contrib.layers.batch_norm

涉及到 contribe 部分就先 pass 吧

3. 關於tf.GraphKeys.UPDATA_OPS

3.1 tf.control_dependencies

按照下面這個例子理解下:

import tensorflow as tf

a_1 = tf.Variable(1)
b_1 = tf.Variable(2)
update_op = tf.assign(a_1, 10)
add = tf.add(a_1, b_1)

a_2 = tf.Variable(1)
b_2 = tf.Variable(2)
update_op2 = tf.assign(a_2, 10)
with tf.control_dependencies([update_op2]):
    add_with_dependencies = tf.add(a_2, b_2)

init = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
with tf.Session() as sess:
    sess.run(init)
    ans_1, ans_2 = sess.run([add, add_with_dependencies])
    print("Add: ", ans_1)
    print("Add_with_dependency: ", ans_2)

"""
可以看到兩組加法進行的對比,正常的計算圖在計算 add 時是不會經過 update_op 操作的,
因此在加法時 a 的值仍為 1,但是采用 tf.control_dependencies 函數,可以控制在進行
add 前先完成 update_op 的操作,因此在加法時 a 的值為 10,因此最后兩種加法的結果不同。
"""

3.2 tf.GraphKeys.UPDATE_OPS

關於 tf.GraphKeys.UPDATE_OPS,這是一個 tensorflow 的計算圖中內置的一個集合,其中會保存一些需要在訓練操作之前完成的操作,並配合 tf.control_dependencies 函數使用。
關於在 batch_norm 中,即為更新 mean 和 variance 的操作。通過下面一個例子可以看到 tf.layers.batch_normalization 中是如何實現的。

import tensorflow as tf

is_traing = tf.placeholder(dtype=tf.bool, shape=None)
input = tf.ones([1, 2, 2, 3])
output = tf.layers.batch_normalization(input, training=is_traing)

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
print(update_ops)
print([v.name for v in tf.trainable_variables()])
print([v.name for v in tf.global_variables()])

"""
[<tf.Operation 'batch_normalization/AssignMovingAvg' type=AssignSub>, <tf.Operation 'batch_normalization/AssignMovingAvg_1' type=AssignSub>]
['batch_normalization/gamma:0', 'batch_normalization/beta:0']
['batch_normalization/gamma:0', 'batch_normalization/beta:0', 'batch_normalization/moving_mean:0', 'batch_normalization/moving_variance:0']
"""

可以看到輸出的即為兩個 batch_normalization 中更新 mean 和 variance 的操作,需要保證它們在 train_op 前完成。
這兩個操作是在 tensorflow 的內部實現中自動被加入 tf.GraphKeys.UPDATE_OPS 這個集合的,在 tf.contrib.layers.batch_norm 的參數中可以看到有一項 updates_collections 的默認值即為 tf.GraphKeys.UPDATE_OPS,而在 tf.layers.batch_normalization 中則是直接將兩個更新操作放入了上述集合。

如果在使用時不添加 tf.control_dependencies 函數,即在訓練時(training=True)每批次時只會計算當批次的 mean 和 var,並傳遞給 tf.nn.batch_normalization 進行歸一化,由於 mean_update 和 variance_update 在計算圖中並不在上述操作的依賴路徑上,因為並不會主動完成,也就是說,在訓練時 mean_update 和 variance_update 並不會被使用到,其值一直是初始值。因此在測試階段(training=False)使用這兩個作為 mean 和 variance 並進行歸一化操作,這樣就會尷尬了。而如果使用 tf.control_dependencies 函數,會在訓練階段每次訓練操作執行前被動地去執行 mean_update 和 variance_update,因此 moving_mean 和 moving_variance 會被不斷更新,在測試時使用該參數也就不會有問題了。

One more thing: notice that the list of trainable variables is shorter than the list of all global variables. This is because the moving averages are non-trainable variables. If you want to reuse a pretrained neural network (see below), you must not forget these non-trainable variables.

4. Dropout

以上圖為例,對隱含層做概率為 $p$ 的Dropout操作時,有 $p$ 的概率 $h_i$ 會被清零,而又 $1-p$ 的概率 $h_i$ 會除以 $1-p$ 做拉伸。目的是在不改變隱含層的期望值的前提下,用隨機丟棄的方式使得輸出層的計算無法過度依賴於隱含層的任節點。

def dropout(X, drop_prob):
    assert 0 <= drop_prob <= 1
    keep_prob = 1 - drop_prob
    # 這種情況下把全部元素都丟棄
    if keep_prob == 0:
        return X.zeros_like()
    mask = nd.random.uniform(0, 1, X.shape) < keep_prob
    return mask * X / keep_prob

tf.nn.dropout

tf.nn.dropout(
    x,
    keep_prob=None,
    noise_shape=None,
    seed=None,
    name=None,
    rate=None
)

以下是一個是否使用 Dropout 訓練的實例:

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

print(tf.__version__)

# Hyper parameters
tf.set_random_seed(1)
np.random.seed(1)
N_SAMPLES = 20  # 樣本點數
N_HIDDEN = 300  # 隱含層節點數
LR = 0.01  # 初始學習率

# 1. create data
x_train = np.linspace(-1, 1, N_SAMPLES)[:, np.newaxis]
y_train = x_train + 0.3 * np.random.randn(N_SAMPLES)[:, np.newaxis]

x_test = x_train.copy()
y_test = x_test + 0.3 * np.random.randn(N_SAMPLES)[:, np.newaxis]

# show data
plt.scatter(x_train, y_train, c='magenta', s=50, alpha=0.5, label='train')
plt.scatter(x_test, y_test, c='cyan', s=50, alpha=0.5, label='test')
plt.legend(loc='upper left')
plt.show()

# tf placeholder
tf_x = tf.placeholder(tf.float32, [None, 1])
tf_y = tf.placeholder(tf.float32, [None, 1])
is_training = tf.placeholder(tf.bool, None)  # to control dropout

# 2. construct network
o1 = tf.layers.dense(tf_x, N_HIDDEN, tf.nn.relu)
o2 = tf.layers.dense(o1, N_HIDDEN, tf.nn.relu)
o_predict = tf.layers.dense(o2, 1)
# ==========
d1 = tf.layers.dense(tf_x, N_HIDDEN, tf.nn.relu)
d1 = tf.layers.dropout(d1, rate=0.5, training=is_training)  # drop out 50%
d2 = tf.layers.dense(d1, N_HIDDEN, tf.nn.relu)
d2 = tf.layers.dropout(d2, rate=0.5, training=is_training)  # drop out 50%
d_predict = tf.layers.dense(d2, 1)
# ==========

# 3. define loss
o_loss = tf.losses.mean_squared_error(tf_y, o_predict)
d_loss = tf.losses.mean_squared_error(tf_y, d_predict)

# 4. define optimizer
o_optimizer = tf.train.AdamOptimizer(LR).minimize(o_loss)
d_optimizer = tf.train.AdamOptimizer(LR).minimize(d_loss)

# 5. initialize
init_op = tf.global_variables_initializer()

plt.ion()  # something about plotting

# 6. train
with tf.Session() as sess:
    sess.run(init_op)
    for step in range(500):
        sess.run([o_optimizer, d_optimizer],
                 {tf_x: x_train, tf_y: y_train, is_training: True})  # train, set is_training=True
        
        if step % 10 == 0:
            # ploting
            plt.cla()
            plt.scatter(x_train, y_train, c='magenta', s=50, alpha=0.3, label='train')
            plt.scatter(x_test, y_test, c='cyan', s=50, alpha=0.3, label='test')
            o_loss_, d_loss_, o_predict_, d_predict_ = sess.run(
                [o_loss, d_loss, o_predict, d_predict],
                feed_dict={tf_x: x_test, tf_y: y_test, is_training: False}
            )
            plt.plot(x_test, o_predict_, 'r-', lw=3, label='overfitting')
            plt.plot(x_test, d_predict_, 'b--', lw=3, label='dropout(50%)')
            plt.text(0, -1.2, 'overfitting loss=%.4f' % o_loss_, fontdict={'size': 20, 'color': 'red'})
            plt.text(0, -1.5, 'dropout loss=%.4f' % d_loss_, fontdict={'size': 20, 'color': 'blue'})
            plt.legend(loc='upper left')
            plt.ylim((-2.5, 2.5))
            plt.pause(0.1)

plt.ioff()
plt.show()
View Code

Dropout 的使用使得網絡不過分依賴於少數節點,每個節點都盡可能地發揮自己的作用,他們最終對輸入的微小變化不那么敏感。最后,你會得到一個更健壯的網絡,它能更好地泛化。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM