本文基於tensorflow-v1.15分支,簡單分析下TensorFlow中的優化器。
optimizer = tf.train.GradientDescentOptimizer(learning_rate=.05)
train_op = optimizer.minimize(loss)
當我們調用optimizer.minimize()
時,其內部會調用兩個方法compute_gradients()
和apply_gradients()
,分別用來計算梯度和使用梯度更新權重,其核心邏輯如下所示。
def minimize(self, loss, global_step=None, var_list=None,
gate_gradients=GATE_OP, aggregation_method=None,
colocate_gradients_with_ops=False, name=None,
grad_loss=None):
grads_and_vars = self.compute_gradients(
loss, var_list=var_list, gate_gradients=gate_gradients,
aggregation_method=aggregation_method,
colocate_gradients_with_ops=colocate_gradients_with_ops,
grad_loss=grad_loss)
vars_with_grad = [v for g, v in grads_and_vars if g is not None]
return self.apply_gradients(grads_and_vars, global_step=global_step, name=name)
如果我們想在模型更新前對梯度搞一些自定義的操作,TensorFlow中推薦的方式是
- 通過
compute_gradients
計算梯度 - 對梯度進行一些自定義操作
- 通過
apply_gradients
將處理后的梯度更新到模型權重
在optimizer.minimize
的第一階段,我們首先通過compute_gradients
計算出梯度。在compute_gradients函數中,TensorFlow使用了兩種計算梯度的方式,分別是針對靜態圖的tf.gradients
接口和針對動態圖的tf.GradientTape
接口,這兩個接口內部分別使用了符號微分和自動微分的方式來計算梯度。下面是compute_gradients的核心執行邏輯,代碼中省略了部分異常判斷的語句。可以看到,如果傳入的loss是一個可調用對象,那么就會調用backprop.GradientTape
相關的接口去求解梯度;否則,就會調用gradients.gradients
接口去求解梯度。
from tensorflow.python.eager import backprop
from tensorflow.python.ops import gradients
def compute_gradients(self, loss, var_list=None,
gate_gradients=GATE_OP,
aggregation_method=None,
colocate_gradients_with_ops=False,
grad_loss=None):
if callable(loss):
with backprop.GradientTape() as tape:
if var_list is not None:
tape.watch(var_list)
loss_value = loss()
loss_value = self._scale_loss(loss_value)
if var_list is None:
var_list = tape.watched_variables()
with ops.control_dependencies([loss_value]):
grads = tape.gradient(loss_value, var_list, grad_loss)
return list(zip(grads, var_list))
# Non-callable/Tensor loss case
# Scale loss if using a "mean" loss reduction and multiple replicas.
loss = self._scale_loss(loss)
if var_list is None:
var_list = (
variables.trainable_variables() +
ops.get_collection(ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES))
else:
var_list = nest.flatten(var_list)
var_list += ops.get_collection(ops.GraphKeys._STREAMING_MODEL_PORTS)
processors = [_get_processor(v) for v in var_list]
var_refs = [p.target() for p in processors]
grads = gradients.gradients(
loss, var_refs, grad_ys=grad_loss,
gate_gradients=(gate_gradients == Optimizer.GATE_OP),
aggregation_method=aggregation_method,
colocate_gradients_with_ops=colocate_gradients_with_ops)
if gate_gradients == Optimizer.GATE_GRAPH:
grads = control_flow_ops.tuple(grads)
grads_and_vars = list(zip(grads, var_list))
return grads_and_vars
一般來說,loss是一個tensor,因此我們主要關注上述代碼的第16-29行。在第16-19行,我們獲取需要求解梯度的變量列表。如果沒有指定var_list,那么compute_gradient函數會默認獲取所有的TRAINABLE_VARIABLES
和TRAINABLE_RESOURCE_VARIABLES
。第20行貌似啥也沒做,因為在源代碼中2找不到名為_STRAMING_MODEL_PROTS
的變量集合。注意到第23行調用gradients.gradients函數計算梯度,這個函數實現在python/ops/gradient_impl.py文件中,其內部調用了_GradientsHelper
來實現真正的計算。因為_GradientsHelper
這個函數特別長,而且它和gradients函數的參數相同,所以我們這里先介紹幾個重要形參的含義。
@tf_export(v1=["gradients"])
def gradients(ys,
xs,
grad_ys=None,
name="gradients",
colocate_gradients_with_ops=False,
gate_gradients=False,
aggregation_method=None,
stop_gradients=None,
unconnected_gradients=UnconnectedGradients.NONE):
with ops.get_default_graph()._mutation_lock():
return gradients_util._GradientsHelper(
ys, xs, grad_ys, name, colocate_gradients_with_ops,
gate_gradients, aggregation_method, stop_gradients,
unconnected_gradients)
ys和xs參數均接收單個tensor或tensor列表,分別對應\(\frac{\partial Y}{\partial X}\)中的\(Y\)和\(X\)。
grad_ys參數接收單個tensor或tensor列表,它的維度必須和ys的維度相同。grad_ys為ys中的每個tensor提供初始值,如果grad_ys為None,那么ys中每個tensor的初始值就被設置為1。
aggregation_method表示梯度聚合的方式,TensorFlow支持的所有聚合方式均定義於tf.AggregationMethods
類中,包括ADD_N
、DEFAULT
、EXPERIMENTAL_N
以及EXPERIMENTAL_ACCUMULATE_N
等方法。
stop_gradients參數接收單個tensor或tensor列表,這些tensor不參與反向傳播梯度的計算。注意,tensorflow提供了另一個接口tf.stop_gradients,也可以完成相同的工作。二者的區別在於tf.stop_gradient作用於計算圖構建時,而tf.gradients的stop_gradients參數作用於計算圖的運行時。
_GradientHelper是構建反向計算圖並求解梯度的關鍵方法,需要仔細閱讀。這里暫時給出一個簡略的分析。
這個方法會維護兩個重要變量:
- 一個隊列queue,隊列里存放計算圖里所有出度為0的Op
- 一個字典grads,字典的鍵是Op本身,值是該Op每個輸出端收到的梯度列表
反向傳播求梯度時,每從隊列中彈出一個Op,都會把它輸出變量的梯度加起來(對應全微分定理)得到out_grads,然后獲取對應的梯度計算函數grad_fn。Op本身和out_grads會傳遞給grad_fn做參數,求出輸入的梯度。每當一個Op的梯度被求出來,就會更新所有未經處理的Op的出度和queue。當queue為空時,就表示整個反向計算圖處理完畢。
if grad_fn:
in_grads = _MaybeCompile(grad_scope, op, func_call,
lambda: grad_fn(op, *out_grads))
else:
in_grads = _MaybeCompile(grad_scope, op, func_call,
lambda: _SymGrad(op, out_grads, xs))
grad_fn是梯度計算函數,它用來計算給定Op的梯度。在TensorFlow里,每個Op都會定義一個對應的梯度計算函數。例如,下面是平方函數(tf.square)的梯度:
@ops.RegisterGradient("Square")
def _SquareGrad(op, grad):
x = op.inputs[0]
# Added control dependencies to prevent 2*x from being computed too early.
with ops.control_dependencies([grad]):
x = math_ops.conj(x)
y = constant_op.constant(2.0, dtype=x.dtype)
return math_ops.multiply(grad, math_ops.multiply(x, y))
apply_gradients是optimizer.minimize的第二階段,它將梯度更新應用到變量上。根據所使用的學習算法的不同,apply_gradients內部會調用不同的Optimizer實現。下面的代碼展示了apply_gradients的核心執行邏輯。
converted_grads_and_vars = []
for g, v in grads_and_vars:
if g is not None:
g = ops.convert_to_tensor_or_indexed_slices(g)
p = _get_processor(v)
converted_grads_and_vars.append((g, v, p))
converted_grads_and_vars = tuple(converted_grads_and_vars)
var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]
with ops.init_scope():
self._create_slots(var_list)
update_ops = []
with ops.name_scope(name, self._name) as name:
self._prepare()
for grad, var, processor in converted_grads_and_vars:
if grad is None:
continue
else:
scope_name = var.op.name
with ops.name_scope("update_" + scope_name), ops.colocate_with(var):
update_ops.append(processor.update_op(self, grad))
if global_step is None:
apply_updates = self._finish(update_ops, name)
else:
with ops.control_dependencies([self._finish(update_ops, "update")]):
with ops.colocate_with(global_step):
if isinstance(
global_step, resource_variable_ops.BaseResourceVariable):
# TODO(apassos): the implicit read in assign_add is slow; consider
# making it less so.
apply_updates = resource_variable_ops.assign_add_variable_op(
global_step.handle,
ops.convert_to_tensor(1, dtype=global_step.dtype),
name=name)
else:
apply_updates = state_ops.assign_add(global_step, 1, name=name)
if not context.executing_eagerly():
if isinstance(apply_updates, ops.Tensor):
apply_updates = apply_updates.op
train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
if apply_updates not in train_op:
train_op.append(apply_updates)
return apply_updates
在1-6行,程序將每個非None的梯度轉化成tensor(稠密)或indexedslices(稀疏),根據每個變量存儲類型的不同,我們獲取到不同的processor(第5行),最終將一個三元組(g, v, p)保存到列表converted_grads_and_vars中,以備后用。這里主要解釋下第5行,對於所有可優化的變量OptimizableVariable,根據其類型的不同,我們需要調用不同update_op。
在程序的8-10行,我們首先獲取到非None的grad對應的var,然后對相應的var創建slots。_create_slots方法需要Optimzier的子類自己去實現,它的作用是創建學習算法所需要的中間變量。以momentum sgd為例,它的更新公式為:
accumulation = momentum * accumulation + gradient
variable -= learning_rate * accumulation
可以看到,它在更新變量的時候需要用到一個中間變量accumulation。因此,我們需要為每個變量創建一個slot,用來保存這個中間變量,以便在下次進行權重更新時繼續使用它:
def _create_slots(self, var_list):
for v in var_list:
self._zeros_slot(v, "momentum", self._name)
在程序的12-20行,我們為每個var添加對應的update_op。第14行調用了_prepare方法,它是用來初始化一些必要的變量(例如學習率、動量),為應用梯度做准備。還是以momentum sgd為例子,它的_prepare函數實現如下:
def _prepare(self):
learning_rate = self._learning_rate
if callable(learning_rate):
learning_rate = learning_rate()
self._learning_rate_tensor = ops.convert_to_tensor(learning_rate,
name="learning_rate")
momentum = self._momentum
if callable(momentum):
momentum = momentum()
self._momentum_tensor = ops.convert_to_tensor(momentum, name="momentum")
可以看到,在這個函數中,它將學習率和動量都轉化成了tensor(為什么要轉成tensor?)。准備工作完成后,我們就可以給每個非None的grad和var添加相應的update_op。注意第18行,我們使用ops.colocate_with(var)把var對應的update_op放置到var所在的設備上。最后,我們調用_finish函數以完成所有的更新。一般來說_finish函數不需要重寫,唯一的例外是Adam算法,它在實現時重寫了_finish算法。
前面提到,對於所有的可優化的變量,根據其類型的不同,我們會調用不同的update_op。一般來說,不同的Optimizer需要實現的update_op主要包括四種:_apply_dense、_resource_apply_dense、_apply_sparse和_resource_apply_sparse,其中前兩種對應稠密更新,后兩種對應稀疏更新。以_resource開頭的方法是針對variable handle,而不帶_resource的update_op則是針對variable的。這里我們還是以momentum sgd為例,介紹一下它的_apply_dense的實現。下面是對應的代碼,可以看到它首先獲取了中間變量mom,然后直接調用了training_ops中的apply_momentum方法。
def _apply_dense(self, grad, var):
mom = self.get_slot(var, "momentum")
return training_ops.apply_momentum(
var, mom,
math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype),
grad,
math_ops.cast(self._momentum_tensor, var.dtype.base_dtype),
use_locking=self._use_locking,
use_nesterov=self._use_nesterov).op
apply_momentum方法是由bazel構建生成的代碼,它會調用op_def_lib中的_apply_op_helper函數,將一個名為ApplyMomentum的Op添加到計算圖中:
_, _, _op = _op_def_lib._apply_op_helper(
"ApplyMomentum", var=var, accum=accum, lr=lr, grad=grad,
momentum=momentum, use_locking=use_locking,
use_nesterov=use_nesterov, name=name)
根據gen_training_ops.py中的注釋,我們可以找到ApplyMomemtum這個Op的注冊信息:
REGISTER_OP("ApplyMomentum")
.Input("var: Ref(T)")
.Input("accum: Ref(T)")
.Input("lr: T")
.Input("grad: T")
.Input("momentum: T")
.Output("out: Ref(T)")
.Attr("T: numbertype")
.Attr("use_locking: bool = false")
.Attr("use_nesterov: bool = false")
.SetShapeFn([](InferenceContext* c) {
return ApplyMomentumShapeFn(c, false /* not sparse */);
});
最終,我們可以在kernel目錄下找到ApplyMomentum這個Op的實現。針對不同的設備,ApplyMomentum有不同的特化實現。
template <typename T>
struct ApplyMomentum<CPUDevice, T> {
void operator()(const CPUDevice& d, typename TTypes<T>::Flat var,
typename TTypes<T>::Flat accum,
typename TTypes<T>::ConstScalar lr,
typename TTypes<T>::ConstFlat grad,
typename TTypes<T>::ConstScalar momentum, bool use_nesterov) {
accum.device(d) = accum * momentum() + grad;
if (use_nesterov) {
var.device(d) -= grad * lr() + accum * momentum() * lr();
} else {
var.device(d) -= accum * lr();
}
}
};