[阿里DIN] 從論文源碼學習 之 embedding層如何自動更新
0x00 摘要
Deep Interest Network(DIN)是阿里媽媽精准定向檢索及基礎算法團隊在2017年6月提出的。其針對電子商務領域(e-commerce industry)的CTR預估,重點在於充分利用/挖掘用戶歷史行為數據中的信息。
本系列文章解讀論文以及源碼,順便梳理一些深度學習相關概念和TensorFlow的實現。
本文通過DIN源碼 https://github.com/mouna99/dien 分析,來深入展開看看embedding層如何自動更新。
0x01 DIN源碼
1.1 問題
在上文中,我們分析了embedding層的作用,但是留了一個問題尚未解答:
- 如何更新mid_embeddings_var這樣的embedding層?
即DIN代碼中,如下變量怎么更新:
self.uid_embeddings_var = tf.get_variable("uid_embedding_var", [n_uid, EMBEDDING_DIM])
self.mid_embeddings_var = tf.get_variable("mid_embedding_var", [n_mid, EMBEDDING_DIM])
self.cat_embeddings_var = tf.get_variable("cat_embedding_var", [n_cat, EMBEDDING_DIM])
因為在DIN中,只有這一處初始化 embeddings 的地方,沒有找到迭代更新的代碼,這會給初學者帶來一些困擾。
1.2 答案
先簡要說一下答案,embedding層通過 optimizer 進行更新(自動求導),通過 session.run 進行調用更新。
一般意義的 embedding 大多是神經網絡倒數第二層的參數權重,只具有整體意義和相對意義,不具備局部意義和絕對含義,這與 embedding 的產生過程有關,
任何 embedding 一開始都是一個隨機數,然后隨着優化算法,不斷迭代更新,最后網絡收斂停止迭代的時候,網絡各個層的參數就相對固化,得到隱層權重表(此時就相當於得到了我們想要的 embedding),然后在通過查表可以單獨查看每個元素的 embedding。
DIN中對應代碼如下:
# 優化更新(自動求導)
self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss)
......
# 通過 session.run 進行調用更新
def train(self, sess, inps):
if self.use_negsampling:
loss, accuracy, aux_loss, _ = sess.run([self.loss, self.accuracy, self.aux_loss, self.optimizer], feed_dict={
self.uid_batch_ph: inps[0],
self.mid_batch_ph: inps[1],
self.cat_batch_ph: inps[2],
self.mid_his_batch_ph: inps[3],
self.cat_his_batch_ph: inps[4],
self.mask: inps[5],
self.target_ph: inps[6],
self.seq_len_ph: inps[7],
self.lr: inps[8],
self.noclk_mid_batch_ph: inps[9],
self.noclk_cat_batch_ph: inps[10],
})
return loss, accuracy, aux_loss
else:
loss, accuracy, _ = sess.run([self.loss, self.accuracy, self.optimizer], feed_dict={
self.uid_batch_ph: inps[0],
self.mid_batch_ph: inps[1],
self.cat_batch_ph: inps[2],
self.mid_his_batch_ph: inps[3],
self.cat_his_batch_ph: inps[4],
self.mask: inps[5],
self.target_ph: inps[6],
self.seq_len_ph: inps[7],
self.lr: inps[8],
})
return loss, accuracy, 0
這涉及的部分很多,我們需要一一闡釋。
0x02 原理
大多數機器學習(深度學習)任務就是最小化損失,在損失函數定義好的情況下,使用一種優化器進行求解最小損失。
而為了讓loss下降,深度學習框架常見的優化方式一般采用的是梯度下降(Gradient Descent)算法,這要求對loss公式上的每個op都需要求偏導,然后使用鏈式法則結合起來。
2.1 隨機梯度下降SGD
給定一個可微函數,理論上可以用解析法找到它的最小值:函數的最小值是導數為 0 的點,因此你只需找到所有導數為 0 的點,然后計算函數在其中哪個點具有最小值。
將這一方法應用於神經網絡,就是用解析法求出最小損失函數對應的所有權重值。可以通過對方程 gradient(f)(W) = 0 求解 W 來實現這一方法。
即使用基於梯度的優化方式進行求解,基於當前在隨機數據批量上的損失,一點一點地對參數進行調節。由於處理的是一個可微函數,你可以計算出它的梯度,然后沿着梯度的反方向更新權重,損失每次都會變小一點。
- 抽取訓練樣本 x 和對應目標y 組成的數據批量。
- 在 x 上運行網絡,得到預測值 y_pred。
- 計算網絡在這批數據上的損失,用於衡量 y_pred 和 y 之間的距離。
- 計算損失相對於網絡參數的梯度[一次反向傳播(backward pass)]。
- 將參數沿着梯度的反方向移動一點,比如
W -= step * gradient
,從而使這批數據
上的損失減小一點。
這就叫作小批量隨機梯度下降(mini-batch stochastic gradient descent,又稱為小批量SGD)。
術語隨機(stochastic)是指每批數據都是隨機抽取的(stochastic 是random在科學上的同義詞)。
2.2 反向傳播
反向傳播 算法的訓練過程則是根據網絡計算得到的 Y_out 和實際的真實結果 Y_label 來計算誤差,並且沿着網絡反向傳播來調整公式中的所有 Wi 和 bi,使誤差達到最小。強調一下,深度學習里面 BP 的本質目標是讓誤差達到最小,所以要用誤差對中間出現過的所有影響因素求偏導。
通過反向傳播算法優化神經網絡是一個迭代的過程。
- 在每次迭代的開始,首先需要選取一小部分訓練數據,這一小部分數據叫做一個batch。
- 然后,這個batch 的樣例會通過前向傳播算法得到神經網絡模型的預測結果。因為訓練數據都是有正確答案標注的,所以可以計算出當前神經網絡模型的預測答案與正確答案之間的差距,計算誤差和損失函數。
- 最后,基於這預測值和真實值之間的差距,反向傳播算法會相應更新神經網絡參數的取值,使得在這個batch 上神經網絡模型的預測結果和真實答案更加接近。即首先計算輸出層神經元損失函數的梯度,然后計算隱藏層神經元損失函數的梯度。接下來用梯度更新權重。
前向求導是從第一層開始,逐層計算梯度 ∂ / ∂X 到最后一層。反向求導是從最后一層開始,逐層計算梯度 ∂Z / ∂ 到第一層。前向求導關注的是輸入是怎么影響到每一層的,反向求導則是關注於每一層是怎么影響到最終的輸出結果的。
2.3 自動求導
自動求導就是每一個op/layer自己依據自己的輸入和輸出做前向計算/反向求導,而框架則負責組裝調度這些op/layer,表現出來就是你通過框架去定義網絡/計算圖,框架自動前向計算並自動求導。
常見的深度學習框架里每個op(op指的是最小的計算單元,caffe里叫layer)都預先定義好了 forward 和backward(或者叫grad)兩個函數,這里的 backward 也就是求導。也就是說每個op的求導都是預先定義好的,或者說是人手推的。
當你定義好了一個神經網絡,常見的深度學習框架將其解釋為一個dag(有向無環圖),dag里每個節點就是op,從loss function這個節點開始,通過鏈式法則一步一步從后往前計算每一層神經網絡的梯度,整個dag梯度計算的最小粒度就是op的 backward 函數(這里是手動的),而鏈式法則則是自動的。
TensorFlow也是如此。
TensorFlow 提供的是聲明式的編程接口,用戶不需要關心求導的細節,只需要定義好模型得到一個loss方程,然后使用TensorFlow實現的各種Optimizer來進行運算即可。
這要求TensorFlow本身提供了每個op的求偏導方法,而且雖然我們使用的是Python的加減乘除運算符,實際上是TensorFlow重載了運算符實際上會創建“Square”這樣的op,可以方便用戶更容易得構建表達式。
因此TensorFlow的求導,實際上是先提供每一個op求導的數學實現,然后使用鏈式法則求出整個表達式的導數。
具體我們可以參見RegisterGradient的實現,以及nn_grad.py,math_grad.py等幾個文件
這些文件的所有的函數都用RegisterGradient裝飾器包裝了起來,這些函數都接受兩個參數,op和grad。其他的只要注冊了op的地方也有各種使用這個裝飾器,例如batch。
RegisterGradient使用舉例如下:
@ops.RegisterGradient("Abs")
def _AbsGrad(op, grad):
x = op.inputs[0]
return grad * math_ops.sign(x)
RegisterGradient定義如下,就是注冊op梯度函數的裝飾器:
class RegisterGradient(object):
def __init__(self, op_type):
if not isinstance(op_type, six.string_types):
raise TypeError("op_type must be a string")
self._op_type = op_type
def __call__(self, f):
"""Registers the function `f` as gradient function for `op_type`."""
_gradient_registry.register(f, self._op_type)
return f
0x03 優化器
道理說着還不錯,但是神經網絡是究竟怎么反向傳遞更新呢?這就需要看Optimizer了。
回到 TensorFlow 的 Python 代碼層面,自動求導的部分是靠各種各樣的 Optimizer 串起來的:
- 構圖的時候只需要寫完前向的數據流圖部分,TensorFlow 的做法是每一個 Op 在建圖的時候就同時包含了它的梯度計算公式,構成前向計算圖的時候會自動建立反向部分的計算圖,前向計算出來的輸入輸出會保留下來,留到后向計算的時候用完了才刪除。
- 然后在最后加上一個 Optimizer(例如 GradientDescentOptimizer、AdamOptimizer).
- 最后調用它的
minimize()
方法就會自動完成反向部分的數據流圖構建。
在DIEN這里,代碼如下:
ctr_loss = - tf.reduce_mean(tf.log(self.y_hat) * self.target_ph)
self.loss = ctr_loss
if self.use_negsampling:
self.loss += self.aux_loss
self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss)
3.1 Optimizer基類
TF的optimizer都繼承自Optimizer這個類,這個類的方法非常多,幾個重要方法是 minimize、compute_gradients、apply_gradients、slot系列。
- compute_gradients: 傳入loss,如果不傳入var_list,那么默認就是所有trainable的variable,返回的是 list of (gradient, variable) pairs。
- apply_gradients: 傳入 (gradient, variable) pairs,將梯度apply到變量上。具體梯度如何更新到變量,由 _apply_dense、_resource_apply_dense、_apply_sparse、_resource_apply_spars這四個方法實現。
- minimize:就是compute_gradients + apply_gradients
- slot系列: 輸入變量和name,得到的是一個 trainable=False的變量,用來記錄optimizer中的中間值,比如在Momentum中,記錄momentum。
Optimizer 基類的這個方法為每個實現子類預留了_create_slots()
,_prepare()
,_apply_dense()
,_apply_sparse()
四個接口出來,后面新構建的 Optimizer 只需要重寫或者擴展 Optimizer 類的某幾個函數即可;
3.2 反向傳播過程
整個反向傳播過程可分為三步,這三步僅需通過一個minimize()函數完成:
- 逐層計算每一個部分的梯度,
compute_gradients()
; - 根據需要對梯度進行處理;
- 把梯度更新到參數上,
apply_gradients();
即往最小化 loss 的方向更新 var_list 中的每一個參數;
代碼如下:
def minimize(self, loss, global_step=None, var_list=None,
gate_gradients=GATE_OP, aggregation_method=None,
colocate_gradients_with_ops=False, name=None,
grad_loss=None):
grads_and_vars = self.compute_gradients(
loss, var_list=var_list, gate_gradients=gate_gradients,
aggregation_method=aggregation_method,
colocate_gradients_with_ops=colocate_gradients_with_ops,
grad_loss=grad_loss)
vars_with_grad = [v for g, v in grads_and_vars if g is not None]
return self.apply_gradients(grads_and_vars, global_step=global_step, name=name)
3.2.1 compute_gradients
該函數用於計算loss對於可訓練變量val_list的梯度,最終返回的是元組列表,即 [(gradient, variable),...]。
參數含義:
- loss: 需要被優化的Tensor
- val_list: Optional list or tuple of
tf.Variable
to update to minimizeloss
. Defaults to the list of variables collected in the graph under the keyGraphKeys.TRAINABLE_VARIABLES
.
基本邏輯如下:
- 根據原本計算圖中所有的 op 創建一個順序的 var_list。即自動找到計算圖中所有的 trainable_variables 放到 var_list 里面去,這些就是整個網絡中的參數;
- 反向遍歷這個list,對每個需要求導並且能夠求導的op(即已經定義好了對應的梯度函數的op)調用其梯度函數;
- 然后沿着原本計算圖的方向反向串起另一部分的計算圖(輸入輸出互換,原本的數據 Tensor 換成梯度 Tensor);即,往圖中插入一個 gradients 的 Op,所以反向求導的這個串圖的過程就是在這里完成的了;
其中,_get_processor函數可理解為一種快速更新variables的方法,每個processor都會包含一個update_op這樣的函數來進行variable更新操作。
變量更新公式:
代碼如下:
def compute_gradients(self, loss, var_list=None,
gate_gradients=GATE_OP,
aggregation_method=None,
colocate_gradients_with_ops=False,
grad_loss=None):
self._assert_valid_dtypes([loss])
if grad_loss is not None:
self._assert_valid_dtypes([grad_loss])
if var_list is None:
var_list = (
variables.trainable_variables() +
ops.get_collection(ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES))
else:
var_list = nest.flatten(var_list)
var_list += ops.get_collection(ops.GraphKeys._STREAMING_MODEL_PORTS)
processors = [_get_processor(v) for v in var_list]
var_refs = [p.target() for p in processors]
grads = gradients.gradients(
loss, var_refs, grad_ys=grad_loss,
gate_gradients=(gate_gradients == Optimizer.GATE_OP),
aggregation_method=aggregation_method,
colocate_gradients_with_ops=colocate_gradients_with_ops)
if gate_gradients == Optimizer.GATE_GRAPH:
grads = control_flow_ops.tuple(grads)
grads_and_vars = list(zip(grads, var_list))
return grads_and_vars
3.2.2 gradients
gradients 的實際定義在 tensorflow/python/ops/gradients_impl.py
中。把整個求導過程抽象成一個 ys=f(xs) 的函數。
簡單說,它就是為了計算一組輸出張量ys = [y0, y1, ...]
對輸入張量xs = [x0, x1, ...]
的梯度,對每個xi
有grad_i = sum[dy_j/dx_i for y_j in ys]
。默認情況下,grad_loss
是None
,此時grad_ys
被初始化為全1向量。
gradients 部分參數如下:
- xs 就是 var_list 里面輸入的變量列表(在這個過程中其實這里存的是每個變量對應過來在計算圖中的 op)。
- 參數中的ys是loss,是是計算損失值的張量,也就是用戶業務邏輯最后生成的Tensor的最終節點,從這個節點反推,可以導出全部Graph。
grad_ys
存儲計算出的梯度;gate_gradients
是一個布爾變量,指示所有梯度是否在使用前被算出,如果設為True
,可以避免競爭條件;
這個方法會維護兩個重要變量
- 一個隊列
queue
,隊列里存放計算圖里所有出度為0的操作符 - 一個字典
grads
,字典的鍵是操作符本身,值是該操作符每個輸出端收到的梯度列表
反向傳播求梯度時,每從隊列中彈出一個操作符,都會把它輸出變量的梯度加起來(對應全微分定理)得到out_grads
,然后獲取對應的梯度計算函數grad_fn
。操作符op
本身和out_grads
會傳遞給grad_fn
做參數,求出輸入的梯度。
基本邏輯如下:
- 根據原本計算圖中所有的 op 創建一個順序的 list,這個順序在圖上來說其實也是拓撲序;
- 反向遍歷這個 list,對每個需要求導並且能夠求導的 op(即已經定義好了對應的梯度函數的 op)調用其梯度函數;
- 然后沿着原本圖的方向反向串起另一部分的計算圖即可(輸入輸出互換,原本的數據 Tensor 換成梯度 Tensor);
具體代碼如下:
def gradients(ys,
xs,
grad_ys=None,
name="gradients",
colocate_gradients_with_ops=False,
gate_gradients=False,
aggregation_method=None,
stop_gradients=None):
to_ops = [t.op for t in ys]
from_ops = [t.op for t in xs]
grads = {}
# Add the initial gradients for the ys.
for y, grad_y in zip(ys, grad_ys):
_SetGrad(grads, y, grad_y)
# Initialize queue with to_ops.
queue = collections.deque()
# Add the ops in 'to_ops' into the queue.
to_ops_set = set()
for op in to_ops:
ready = (pending_count[op._id] == 0)
if ready and op._id not in to_ops_set:
to_ops_set.add(op._id)
queue.append(op)
while queue:
# generate gradient subgraph for op.
op = queue.popleft()
with _maybe_colocate_with(op, colocate_gradients_with_ops):
if loop_state:
loop_state.EnterGradWhileContext(op, before=True)
out_grads = _AggregatedGrads(grads, op, loop_state, aggregation_method)
if loop_state:
loop_state.ExitGradWhileContext(op, before=True)
if has_out_grads and (op._id not in stop_ops):
if is_func_call:
func_call = ops.get_default_graph()._get_function(op.type)
grad_fn = func_call.python_grad_func
else:
try:
grad_fn = ops.get_gradient_function(op)
for i, (t_in, in_grad) in enumerate(zip(op.inputs, in_grads)):
if in_grad is not None:
if (isinstance(in_grad, ops.Tensor) and
t_in.dtype != dtypes.resource):
try:
in_grad.set_shape(t_in.get_shape())
_SetGrad(grads, t_in, in_grad)
if loop_state:
loop_state.ExitGradWhileContext(op, before=False)
3.2.3 apply_gradients
該函數的作用是將compute_gradients()
返回的值作為輸入參數對variable進行更新,即根據前面求得的梯度,把梯度進行方向傳播給weights和biases進行參數更新。
那為什么minimize()
會分開兩個步驟呢?原因是因為在某些情況下我們需要對梯度做一定的修正,例如為了防止梯度消失(gradient vanishing)或者梯度爆炸(gradient explosion),我們需要事先干預一下以免程序出現Nan的尷尬情況;有的時候也許我們需要給計算得到的梯度乘以一個權重或者其他亂七八糟的原因,所以才分開了兩個步驟。
基本邏輯如下:
- 對於g, v, p(grads, vars, processors),把它們整合在 tuple(converted_grads_and_vars);
- 遍歷參數列表 v,對於每一個參數應用 self._create_slots函數,以創建一些優化器自帶的一些參數;
- 調用 prepare()函數,在apply梯度前創建好所有必須的tensors;
- 遍歷
grad, var, processor in converted_grads_and_vars
,應用 ops.colocate_with(var),作用是保證每個參數var的更新都在同一個device上; - ops.control_dependencies()函數用來控制計算流圖的,給圖中的某些節點指定計算的順序;
- 對每個 variable 本身應用 assign,體現在
update_ops.append(processor.update_op(self, grad))
,如果有global_step
的話,global_step需加個1。 - 最后將返回一個
train_op
。train_op
是通常訓練過程中,client為session的fetches提供的參數之一,也就是這個Operation被執行之后,模型的參數將會完成更新,並開始下一個batch的訓練。那么這也就意味着,這個方法中涉及到的計算圖將會實現說明文檔中的訓練邏輯。
具體代碼是:
def apply_gradients(self, grads_and_vars, global_step=None, name=None):
grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works.
converted_grads_and_vars = []
for g, v in grads_and_vars:
if g is not None:
# Convert the grad to Tensor or IndexedSlices if necessary.
g = ops.convert_to_tensor_or_indexed_slices(g)
p = _get_processor(v)
converted_grads_and_vars.append((g, v, p))
converted_grads_and_vars = tuple(converted_grads_and_vars)
var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]
with ops.control_dependencies(None):
self._create_slots([_get_variable_for(v) for v in var_list])
update_ops = []
with ops.name_scope(name, self._name) as name:
self._prepare()
for grad, var, processor in converted_grads_and_vars:
if grad is None:
continue
scope_name = var.op.name if context.in_graph_mode() else ""
with ops.name_scope("update_" + scope_name), ops.colocate_with(var):
update_ops.append(processor.update_op(self, grad))
if global_step is None:
apply_updates = self._finish(update_ops, name)
else:
with ops.control_dependencies([self._finish(update_ops, "update")]):
with ops.colocate_with(global_step):
apply_updates = state_ops.assign_add(global_step, 1, name=name).op
train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
if apply_updates not in train_op:
train_op.append(apply_updates)
return apply_updates
3.3 AdamOptimizer
DIEN使用的是AdamOptimizer優化器。
Adam 這個名字來源於自適應矩估計(Adaptive Moment Estimation),也是梯度下降算法的一種變形,但是每次迭代參數的學習率都有一定的范圍,不會因為梯度很大而導致學習率(步長)也變得很大,參數的值相對比較穩定。
概率論中矩的含義是:如果一個隨機變量 X 服從某個分布,X 的一階矩是 E(X),也就是樣本平均值,X 的二階矩就是 E(X^2),也就是樣本平方的平均值。
Adam 算法利用梯度的一階矩估計和二階矩估計動態調整每個參數的學習率。TensorFlow提供的tf.train.AdamOptimizer可控制學習速度,經過偏置校正后,每一次迭代學習率都有個確定范圍,使得參數比較平穩。
在利用計算好的導數對權重進行修正時,對Embedding矩陣的梯度進行特殊處理,只更新局部,見optimization.py中Adagrad.update函數。
3.3.1 _prepare
在_prepare
函數中通過convert_to_tensor
方法來存儲了輸入參數的 Tensor 版本。
def _prepare(self):
self._lr_t = ops.convert_to_tensor(self._lr, name="learning_rate")
self._beta1_t = ops.convert_to_tensor(self._beta1, name="beta1")
self._beta2_t = ops.convert_to_tensor(self._beta2, name="beta2")
self._epsilon_t = ops.convert_to_tensor(self._epsilon, name="epsilon")
3.3.2 _create_slots
_create_slots
函數用來創建參數,比如 _beta1_power,_beta2_power
def _create_slots(self, var_list):
first_var = min(var_list, key=lambda x: x.name)
create_new = self._beta1_power is None
if not create_new and context.in_graph_mode():
create_new = (self._beta1_power.graph is not first_var.graph)
if create_new:
with ops.colocate_with(first_var):
self._beta1_power = variable_scope.variable(self._beta1,
name="beta1_power",
trainable=False)
self._beta2_power = variable_scope.variable(self._beta2,
name="beta2_power",
trainable=False)
# Create slots for the first and second moments.
for v in var_list:
self._zeros_slot(v, "m", self._name)
self._zeros_slot(v, "v", self._name)
函數_apply_dense
和_resource_apply_dense
的實現中分別使用了training_ops.apply_adam
和training_ops.resource_apply_adam
方法。
函數_apply_sparse
和_resource_apply_sparse
主要用在稀疏向量的更新操作上,而具體的實現是在函數_apply_sparse_shared
中。
_apply_sparse_shared
函數,首先獲取所需要的參數值並存儲到變量里,接着按照 Adam 算法的流程,首先計算學習率,接着計算兩個 Momentum ,由於是稀疏 tensor 的更新,所以在算出更新值之后要使用
scatter_add
來完成加法操作, 最后將var_update
和m_t
、v_t
的更新操作放進control_flow_ops.group
中。
0x04 Session.run
優化器已經搭建好,剩下就是調用 session.run
進行更新。
調用一次 run 是執行一遍數據流圖, 在 TensorFlow 的訓練代碼中通常是在一個循環中多次調用 sess.run()
,一次 run 即為訓練過程中的一步。
fetches 是 run 方法的一個輸入參數,這個參數可以是很多種形式的數據,run 最后的 返回值也會和 fetches 有相同的結構。
至此,DIN分析暫時告一段落,下篇開始 DIEN 的分析,敬請期待。
0xFF 參考
TensorFlow SyncReplicasOptimizer 解讀
tensorflow中有向圖(計算圖、Graph)、上下文環境(Session)和執行流程
TensorFlow 拆包(一):Session.Run ()
tensorflow源碼分析(五)session.run()
Tensorflow中優化器--AdamOptimizer詳解
【TensorFlow】優化器AdamOptimizer的源碼分析
TensorFlow中Session、Graph、Operation以及Tensor詳解
TensorFlow 拆包(二):TF 的數據流模型實現以及自動求導
分布式Tensorflow中同步梯度更新tf.train.SyncReplicasOptimizer解讀(backup_worker的用法)
TensorFlow學習筆記之--[compute_gradients和apply_gradients原理淺析]