摘要:本系列主要對tf的一些常用概念與方法進行描述。本文主要針對tensorflow的模型訓練Training與測試Testing等相關函數進行講解。為‘Tensorflow一些常用基本概念與函數’系列之四。
1、序言
本文所講的內容主要為以下列表中相關函數。函數training()通過梯度下降法為最小化損失函數增加了相關的優化操作,在訓練過程中,先實例化一個優化函數,比如 tf.train.GradientDescentOptimizer,並基於一定的學習率進行梯度優化訓練:
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
然后,可以設置 一個用於記錄全局訓練步驟的單值。以及使用minimize()操作,該操作不僅可以優化更新訓練的模型參數,也可以為全局步驟(global step)計數。與其他tensorflow操作類似,這些訓練操作都需要在tf.session會話中進行
global_step = tf.Variable(0, name='global_step', trainable=False)
train_op = optimizer.minimize(loss, global_step=global_step)
操作組 | 操作 |
---|---|
Training | Optimizers,Gradient Computation,Gradient Clipping,Distributed execution |
Testing | Unit tests,Utilities,Gradient checking |
2、Tensorflow函數
2.1 訓練 (Training)
一個TFRecords 文件為一個字符串序列。這種格式並非隨機獲取,它比較適合大規模的數據流,而不太適合需要快速分區或其他非序列獲取方式。
█ 優化 (Optimizers)
tf中各種優化類提供了為損失函數計算梯度的方法,其中包含比較經典的優化算法,比如GradientDescent 和Adagrad。
▶▶class tf.train.Optimizer
操作 | 描述 |
---|---|
class tf.train.Optimizer | 基本的優化類,該類不常常被直接調用,而較多使用其子類, 比如GradientDescentOptimizer, AdagradOptimizer 或者MomentumOptimizer |
tf.train.Optimizer.__init__(use_locking, name) | 創建一個新的優化器, 該優化器必須被其子類(subclasses)的構造函數調用 |
tf.train.Optimizer.minimize(loss, global_step=None, var_list=None, gate_gradients=1, aggregation_method=None, colocate_gradients_with_ops=False, name=None, grad_loss=None) |
添加操作節點,用於最小化loss,並更新var_list 該函數是簡單的合並了compute_gradients()與apply_gradients()函數 返回為一個優化更新后的var_list,如果global_step非None,該操作還會為global_step做自增操作 |
tf.train.Optimizer.compute_gradients(loss,var_list=None, gate_gradients=1, aggregation_method=None, colocate_gradients_with_ops=False, grad_loss=None) |
對var_list中的變量計算loss的梯度 該函數為函數minimize()的第一部分,返回一個以元組(gradient, variable)組成的列表 |
tf.train.Optimizer.apply_gradients(grads_and_vars, global_step=None, name=None) | 將計算出的梯度應用到變量上,是函數minimize()的第二部分,返回一個應用指定的梯度的操作Operation,對global_step做自增操作 |
tf.train.Optimizer.get_name() | 獲取名稱 |
▷ class tf.train.Optimizer
用法
# Create an optimizer with the desired parameters. opt = GradientDescentOptimizer(learning_rate=0.1) # Add Ops to the graph to minimize a cost by updating a list of variables. # "cost" is a Tensor, and the list of variables contains tf.Variable objects. opt_op = opt.minimize(cost, var_list=<list of variables>) # Execute opt_op to do one step of training: opt_op.run()
▶▶在使用它們之前處理梯度
使用minimize()操作,該操作不僅可以計算出梯度,而且還可以將梯度作用在變量上。如果想在使用它們之前處理梯度,可以按照以下三步驟使用optimizer :
1、使用函數compute_gradients()計算梯度
2、按照自己的願望處理梯度
3、使用函數apply_gradients()應用處理過后的梯度
例如:
# 創建一個optimizer. opt = GradientDescentOptimizer(learning_rate=0.1) # 計算<list of variables>相關的梯度 grads_and_vars = opt.compute_gradients(loss, <list of variables>) # grads_and_vars為tuples (gradient, variable)組成的列表。 #對梯度進行想要的處理,比如cap處理 capped_grads_and_vars = [(MyCapper(gv[0]), gv[1]) for gv in grads_and_vars] # 令optimizer運用capped的梯度(gradients) opt.apply_gradients(capped_grads_and_vars)
▶▶選通梯度(Gating Gradients)
函數minimize() 與compute_gradients()都含有一個參數gate_gradient,用於控制在應用這些梯度時並行化的程度。
其值可以取:GATE_NONE, GATE_OP 或 GATE_GRAPH
GATE_NONE : 並行地計算和應用梯度。提供最大化的並行執行,但是會導致有的數據結果沒有再現性。比如兩個matmul操作的梯度依賴輸入值,使用GATE_NONE可能會出現有一個梯度在其他梯度之前便應用到某個輸入中,導致出現不可再現的(non-reproducible)結果
GATE_OP: 對於每個操作Op,確保每一個梯度在使用之前都已經計算完成。這種做法防止了那些具有多個輸入,並且梯度計算依賴輸入情形中,多輸入Ops之間的競爭情況出現。
GATE_GRAPH: 確保所有的變量對應的所有梯度在他們任何一個被使用前計算完成。該方式具有最低級別的並行化程度,但是對於想要在應用它們任何一個之前處理完所有的梯度計算時很有幫助的。
█ Slots
一些optimizer的之類,比如 MomentumOptimizer 和 AdagradOptimizer 分配和管理着額外的用於訓練的變量。這些變量稱之為’Slots’,Slots有相應的名稱,可以向optimizer訪問的slots名稱。有助於在log debug一個訓練算法以及報告slots狀態
操作 | 描述 |
---|---|
tf.train.Optimizer.get_slot_names() | 返回一個由Optimizer所創建的slots的名稱列表 |
tf.train.Optimizer.get_slot(var, name) | 返回一個name所對應的slot,name是由Optimizer為var所創建 var為用於傳入 minimize() 或 apply_gradients()的變量 |
class tf.train.GradientDescentOptimizer | 使用梯度下降算法的Optimizer |
tf.train.GradientDescentOptimizer.__init__(learning_rate, use_locking=False, name=’GradientDescent’) |
構建一個新的梯度下降優化器(Optimizer) |
class tf.train.AdadeltaOptimizer | 使用Adadelta算法的Optimizer |
tf.train.AdadeltaOptimizer.__init__(learning_rate=0.001, rho=0.95, epsilon=1e-08, use_locking=False, name=’Adadelta’) |
創建Adadelta優化器 |
class tf.train.AdagradOptimizer | 使用Adagrad算法的Optimizer |
tf.train.AdagradOptimizer.__init__(learning_rate, initial_accumulator_value=0.1, use_locking=False, name=’Adagrad’) |
創建Adagrad優化器 |
class tf.train.MomentumOptimizer | 使用Momentum算法的Optimizer |
tf.train.MomentumOptimizer.__init__(learning_rate, momentum, use_locking=False, name=’Momentum’, use_nesterov=False) |
創建momentum優化器 momentum:動量,一個tensor或者浮點值 |
class tf.train.AdamOptimizer | 使用Adam 算法的Optimizer |
tf.train.AdamOptimizer.__init__(learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-08, use_locking=False, name=’Adam’) |
創建Adam優化器 |
class tf.train.FtrlOptimizer | 使用FTRL 算法的Optimizer |
tf.train.FtrlOptimizer.__init__(learning_rate, learning_rate_power=-0.5, initial_accumulator_value=0.1, l1_regularization_strength=0.0, l2_regularization_strength=0.0, use_locking=False, name=’Ftrl’) |
創建FTRL算法優化器 |
class tf.train.RMSPropOptimizer | 使用RMSProp算法的Optimizer |
tf.train.RMSPropOptimizer.__init__(learning_rate, decay=0.9, momentum=0.0, epsilon=1e-10, use_locking=False, name=’RMSProp’) |
創建RMSProp算法優化器 |
▷ tf.train.AdamOptimizer
Adam 的基本運行方式,首先初始化:
m_0 <- 0 (Initialize initial 1st moment vector)
v_0 <- 0 (Initialize initial 2nd moment vector)
t <- 0 (Initialize timestep)
在論文中的 section2 的末尾所描述了更新規則,該規則使用梯度g來更新變量:
t <- t + 1
lr_t <- learning_rate * sqrt(1 - beta2^t) / (1 - beta1^t)
m_t <- beta1 * m_{t-1} + (1 - beta1) * g
v_t <- beta2 * v_{t-1} + (1 - beta2) * g * g
variable <- variable - lr_t * m_t / (sqrt(v_t) + epsilon)
其中epsilon 的默認值1e-8可能對於大多數情況都不是一個合適的值。例如,當在ImageNet上訓練一個 Inception network時比較好的選擇為1.0或者0.1。
需要注意的是,在稠密數據中即便g為0時, m_t, v_t 以及variable都將會更新。而在稀疏數據中,m_t, v_t 以及variable不被更新且值為零。
█ 梯度計算與截斷(Gradient Computation and Clipping)
TensorFlow 提供了計算給定tf計算圖的求導函數,並在圖的基礎上增加節點。優化器(optimizer )類可以自動的計算網絡圖的導數,但是優化器中的創建器(creators )或者專業的人員可以通過本節所述的函數調用更底層的方法。
操作 | 描述 |
---|---|
tf.gradients(ys, xs, grad_ys=None, name=’gradients’, colocate_gradients_with_ops=False, gate_gradients=False, aggregation_method=None) |
構建一個符號函數,計算ys關於xs中x的偏導的和, 返回xs中每個x對應的sum(dy/dx) |
tf.stop_gradient(input, name=None) | 停止計算梯度, 在EM算法、Boltzmann機等可能會使用到 |
tf.clip_by_value(t, clip_value_min, clip_value_max, name=None) | 基於定義的min與max對tesor數據進行截斷操作, 目的是為了應對梯度爆發或者梯度消失的情況 |
tf.clip_by_norm(t, clip_norm, axes=None, name=None) | 使用L2范式標准化tensor最大值為clip_norm 返回 t * clip_norm / l2norm(t) |
tf.clip_by_average_norm(t, clip_norm, name=None) | 使用平均L2范式規范tensor數據t, 並以clip_norm為最大值 返回 t * clip_norm / l2norm_avg(t) |
tf.clip_by_global_norm(t_list, clip_norm, use_norm=None, name=None) |
返回t_list[i] * clip_norm / max(global_norm, clip_norm) 其中global_norm = sqrt(sum([l2norm(t)**2 for t in t_list])) |
tf.global_norm(t_list, name=None) | 返回global_norm = sqrt(sum([l2norm(t)**2 for t in t_list])) |
█ 退化學習率(Decaying the learning rate)
操作 | 描述 |
---|---|
tf.train.exponential_decay(learning_rate, global_step, decay_steps, decay_rate, staircase=False, name=None) |
對學習率進行指數衰退 |
▷ tf.train.exponential_decay
#該函數返回以下結果 decayed_learning_rate = learning_rate * decay_rate ^ (global_step / decay_steps) ##例: 以0.96為基數,每100000 步進行一次學習率的衰退 global_step = tf.Variable(0, trainable=False) starter_learning_rate = 0.1 learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 100000, 0.96, staircase=True) # Passing global_step to minimize() will increment it at each step. learning_step = ( tf.train.GradientDescentOptimizer(learning_rate) .minimize(...my loss..., global_step=global_step) )
█ 移動平均(Moving Averages)
一些訓練優化算法,比如GradientDescent 和Momentum 在優化過程中便可以使用到移動平均方法。使用移動平均常常可以較明顯地改善結果。
操作 | 描述 |
---|---|
class tf.train.ExponentialMovingAverage | 將指數衰退加入到移動平均中 |
tf.train.ExponentialMovingAverage.apply(var_list=None) | 對var_list變量保持移動平均 |
tf.train.ExponentialMovingAverage.average_name(var) | 返回var均值的變量名稱 |
tf.train.ExponentialMovingAverage.average(var) | 返回var均值變量 |
tf.train.ExponentialMovingAverage.variables_to_restore(moving_avg_variables=None) | 返回用於保存的變量名稱的映射 |
▷ tf.train.ExponentialMovingAverage
# Example usage when creating a training model: # Create variables. var0 = tf.Variable(...) var1 = tf.Variable(...) # ... use the variables to build a training model... ... # Create an op that applies the optimizer. This is what we usually # would use as a training op. opt_op = opt.minimize(my_loss, [var0, var1]) # Create an ExponentialMovingAverage object ema = tf.train.ExponentialMovingAverage(decay=0.9999) # Create the shadow variables, and add ops to maintain moving averages # of var0 and var1. maintain_averages_op = ema.apply([var0, var1]) # Create an op that will update the moving averages after each training # step. This is what we will use in place of the usual training op. with tf.control_dependencies([opt_op]): training_op = tf.group(maintain_averages_op) ...train the model by running training_op... #Example of restoring the shadow variable values: # Create a Saver that loads variables from their saved shadow values. shadow_var0_name = ema.average_name(var0) shadow_var1_name = ema.average_name(var1) saver = tf.train.Saver({shadow_var0_name: var0, shadow_var1_name: var1}) saver.restore(...checkpoint filename...) # var0 and var1 now hold the moving average values
▷ tf.train.ExponentialMovingAverage.variables_to_restore
variables_to_restore = ema.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
█ 協調器和隊列運行器(Coordinator and QueueRunner)
查看queue中,queue相關的內容,了解tensorflow中隊列的運行方式。
操作 | 描述 |
---|---|
class tf.train.Coordinator | 線程的協調器 |
tf.train.Coordinator.clear_stop() | 清除停止標記 |
tf.train.Coordinator.join(threads=None, stop_grace_period_secs=120) | 等待線程終止 threads:一個threading.Threads的列表,啟動的線程,將額外加入到registered的線程中 |
tf.train.Coordinator.register_thread(thread) | Register一個用於join的線程 |
tf.train.Coordinator.request_stop(ex=None) | 請求線程結束 |
tf.train.Coordinator.should_stop() | 檢查是否被請求停止 |
tf.train.Coordinator.stop_on_exception() | 上下文管理器,當一個例外出現時請求停止 |
tf.train.Coordinator.wait_for_stop(timeout=None) | 等待Coordinator提示停止進程 |
class tf.train.QueueRunner | 持有一個隊列的入列操作列表,用於線程中運行 queue:一個隊列 enqueue_ops: 用於線程中運行的入列操作列表 |
tf.train.QueueRunner.create_threads(sess, coord=None, daemon=False, start=False) |
創建運行入列操作的線程,返回一個線程列表 |
tf.train.QueueRunner.from_proto(queue_runner_def) | 返回由queue_runner_def創建的QueueRunner對象 |
tf.train.add_queue_runner(qr, collection=’queue_runners’) | 增加一個QueueRunner到graph的收集器(collection )中 |
tf.train.start_queue_runners(sess=None, coord=None, daemon=True, start=True, collection=’queue_runners’) | 啟動所有graph收集到的隊列運行器(queue runners) |
▷ class tf.train.Coordinator
#Coordinator的使用,用於多線程的協調 try: ... coord = Coordinator() # Start a number of threads, passing the coordinator to each of them. ...start thread 1...(coord, ...) ...start thread N...(coord, ...) # Wait for all the threads to terminate, give them 10s grace period coord.join(threads, stop_grace_period_secs=10) except RuntimeException: ...one of the threads took more than 10s to stop after request_stop() ...was called. except Exception: ...exception that was passed to coord.request_stop()
▷ tf.train.Coordinator.stop_on_exception()
with coord.stop_on_exception(): # Any exception raised in the body of the with # clause is reported to the coordinator before terminating # the execution of the body. ...body... #等價於 try: ...body... exception Exception as ex: coord.request_stop(ex)
█ 布執行(Distributed execution)
可以閱讀TensorFlow的分布式學習框架簡介 查看更多tensorflow分布式細節。
操作 | 描述 |
---|---|
class tf.train.Server | 一個進程內的tensorflow服務,用於分布式訓練 |
tf.train.Server.init(server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True) |
創建一個新的服務,其中job_name, task_index, 和protocol為可選參數, 優先級高於server_or_cluster_def中相關信息 server_or_cluster_def : 為一個tf.train.ServerDef 或 tf.train.ClusterDef 協議(protocol)的buffer, 或者一個tf.train.ClusterSpec對象 |
tf.train.Server.create_local_server(config=None, start=True) | 創建一個新的運行在本地主機的單進程集群 |
tf.train.Server.target | 返回tf.Session所連接的目標服務器 |
tf.train.Server.server_def | 返回該服務的tf.train.ServerDef |
tf.train.Server.start() | 開啟服務 |
tf.train.Server.join() | 阻塞直到服務已經關閉 |
# | |
---|---|
class tf.train.Supervisor | 一個訓練輔助器,用於checkpoints模型以及計算的summaries。該監視器只是一個小的外殼(wrapper),用於Coordinator, a Saver, 和a SessionManager周圍 |
tf.train.Supervisor.__init__(graph=None, ready_op=0, is_chief=True, init_op=0, init_feed_dict=None, local_init_op=0, logdir=None, summary_op=0, saver=0, global_step=0, save_summaries_secs=120, save_model_secs=600, recovery_wait_secs=30, stop_grace_secs=120, checkpoint_basename=’model.ckpt’, session_manager=None, summary_writer=0, init_fn=None) |
創建一個監視器Supervisor |
tf.train.Supervisor.managed_session(master=”, config=None, start_standard_services=True, close_summary_writer=True) | 返回一個管路session的上下文管理器 |
tf.train.Supervisor.prepare_or_wait_for_session(master=”, config=None, wait_for_checkpoint=False, max_wait_secs=7200, start_standard_services=True) | 確保model已經准備好 |
tf.train.Supervisor.start_standard_services(sess) | 為sess啟動一個標准的服務 |
tf.train.Supervisor.start_queue_runners(sess, queue_runners=None) | 為QueueRunners啟動一個線程,queue_runners為一個QueueRunners列表 |
tf.train.Supervisor.summary_computed(sess, summary, global_step=None) | 指示計算的summary |
tf.train.Supervisor.stop(threads=None, close_summary_writer=True) | 停止服務以及協調器(coordinator),並沒有關閉session |
tf.train.Supervisor.request_stop(ex=None) | 參考Coordinator.request_stop() |
tf.train.Supervisor.should_stop() | 參考Coordinator.should_stop() |
tf.train.Supervisor.stop_on_exception() | 參考 Coordinator.stop_on_exception() |
tf.train.Supervisor.Loop(timer_interval_secs, target, args=None, kwargs=None) | 開啟一個循環器線程用於調用一個函數 每經過timer_interval_secs秒執行,target(*args, **kwargs) |
tf.train.Supervisor.coord | 返回監督器(Supervisor)使用的協調器(Coordinator ) |
# | |
---|---|
class tf.train.SessionManager | 訓練的輔助器,用於從checkpoint恢復數據以及創建一個session |
tf.train.SessionManager.__init__(local_init_op=None, ready_op=None, graph=None, recovery_wait_secs=30) | 創建一個SessionManager |
tf.train.SessionManager.prepare_session(master, init_op=None, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None, init_feed_dict=None, init_fn=None) | 創建一個session,並確保model可以被使用 |
tf.train.SessionManager.recover_session(master, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None) | 創建一個session,如果可以的話,使用恢復方法創建 |
tf.train.SessionManager.wait_for_session(master, config=None, max_wait_secs=inf) | 創建一個session,並等待model准備完成 |
# | |
---|---|
class tf.train.ClusterSpec | 將一個集群表示為一系列“tasks”,並整合至“jobs”中 |
tf.train.ClusterSpec.as_cluster_def() | 返回該cluster中一個tf.train.ClusterDef協議的buffer |
tf.train.ClusterSpec.as_dict() | 返回一個字典,由job名稱對應於網絡地址 |
tf.train.ClusterSpec.job_tasks(job_name) | 返回一個給定的job對應的task列表 |
tf.train.ClusterSpec.jobs | 返回該cluster的job名稱列表 |
tf.train.replica_device_setter(ps_tasks=0, ps_device=’/job:ps’, worker_device=’/job:worker’, merge_devices=True, cluster=None, ps_ops=None) | 返回一個設備函數(device function),以在建立一個副本graph的時候使用,設備函數(device function)用在with tf.device(device_function)中 |
▷ tf.train.Server
server = tf.train.Server(...)
with tf.Session(server.target): # ...
▷ tf.train.Supervisor
相關參數:
ready_op : 一維 字符串 tensor。該tensor是用過監視器在prepare_or_wait_for_session()計算,檢查model是否准備好可以使用。如果准備好,將返回一個空陣列,如果為None,該model沒有被檢查。
is_chief : 如果為True,創建一個主監視器用於負責初始化與模型的恢復,若為False,則依賴主監視器。
init_op : 一個操作,用於模型不能恢復時的初始化操作。默認初始化所有操作
local_init_op : 可被所有監視器運行的初始化操作。
logdir : 設置log目錄
summary_op : 一個操作(Operation ),返回Summary 和事件logs,需要設置 logdir
saver : 一個Saver對象
save_summaries_secs : 保存summaries的間隔秒數
save_model_secs : 保存model的間隔秒數
checkpoint_basename : checkpoint保存的基本名稱
- 使用在單進程中
with tf.Graph().as_default(): ...add operations to the graph... # Create a Supervisor that will checkpoint the model in '/tmp/mydir'. sv = Supervisor(logdir='/tmp/mydir') # Get a TensorFlow session managed by the supervisor. with sv.managed_session(FLAGS.master) as sess: # Use the session to train the graph. while not sv.should_stop(): sess.run(<my_train_op>) # 在上下文管理器with sv.managed_session()內,所有在graph的變量都被初始化。 # 或者說,一些服務器checkpoint相應模型並增加summaries至事件log中。 # 如果有例外發生,should_stop()將返回True
- 使用在多副本運行情況中
要使用副本訓練已經部署在集群上的相同程序,必須指定其中一個task為主要,該task處理 initialization, checkpoints, summaries, 和recovery相關事物。其他task依賴該task。
# Choose a task as the chief. This could be based on server_def.task_index, # or job_def.name, or job_def.tasks. It's entirely up to the end user. # But there can be only one *chief*. is_chief = (server_def.task_index == 0) server = tf.train.Server(server_def) with tf.Graph().as_default(): ...add operations to the graph... # Create a Supervisor that uses log directory on a shared file system. # Indicate if you are the 'chief' sv = Supervisor(logdir='/shared_directory/...', is_chief=is_chief) # Get a Session in a TensorFlow server on the cluster. with sv.managed_session(server.target) as sess: # Use the session to train the graph. while not sv.should_stop(): sess.run(<my_train_op>)
如果有task崩潰或重啟,managed_session() 將檢查是否Model被初始化。如果已經初始化,它只需要創建一個session並將其返回至正在訓練的正常代碼中。如果model需要被初始化,主task將對它進行重新初始化,而其他task將等待模型初始化完成。
注意:該程序方法一樣適用於單進程的work,該單進程標注自己為主要的便行
▷ supervisor中master的字符串形式
無論運行在本機或者集群上,都可以使用以下值設定master flag:
- 定義為 ” ,要求一個進程內且沒有使用RPC的session
- 定義為 ‘local’,要求一個使用基於RPC的主服務接口(“Master interface” )的session來運行tensorflow程序。更多細節可以查看 tf.train.Server.create_local_server()相關內容。
- 定義為 ‘grpc://hostname:port’,要求一個指定的RPC接口的session,同時運行內部進程的master接入遠程的tensorflow workers。可用server.target返回該形式
▷ supervisor高級用法
- 啟動額外的服務
managed_session()啟動了 Checkpoint 和Summary服務。如果需要運行更多的服務,可以在managed_session()控制的模塊中啟動他們。
#例如: 開啟一個線程用於打印loss. 設置每60秒該線程運行一次,我們使用sv.loop() ... sv = Supervisor(logdir='/tmp/mydir') with sv.managed_session(FLAGS.master) as sess: sv.loop(60, print_loss, (sess)) while not sv.should_stop(): sess.run(my_train_op)
- 啟動更少的的服務
managed_session() 啟動了 “summary” 和 “checkpoint” 線程,這些線程通過構建器或者監督器默認自動創建了summary_op 和saver操作。如果想運行自己的 summary 和checkpointing方法,關閉這些服務,通過傳遞None值給summary_op 和saver參數。
在chief中每100個step,創建summaries # Create a Supervisor with no automatic summaries. sv = Supervisor(logdir='/tmp/mydir', is_chief=is_chief, summary_op=None) # As summary_op was None, managed_session() does not start the # summary thread. with sv.managed_session(FLAGS.master) as sess: for step in xrange(1000000): if sv.should_stop(): break if is_chief and step % 100 == 0: # Create the summary every 100 chief steps. sv.summary_computed(sess, sess.run(my_summary_op)) else: # Train normally sess.run(my_train_op)
▷ tf.train.Supervisor.managed_session
def train(): sv = tf.train.Supervisor(...) with sv.managed_session(<master>) as sess: for step in xrange(..): if sv.should_stop(): break sess.run(<my training op>) ...do other things needed at each training step...
▷ tf.train.SessionManager
with tf.Graph().as_default(): ...add operations to the graph... # Create a SessionManager that will checkpoint the model in '/tmp/mydir'. sm = SessionManager() sess = sm.prepare_session(master, init_op, saver, checkpoint_dir) # Use the session to train the graph. while True: sess.run(<my_train_op>) #其中prepare_session()初始化和恢復一個模型參數。 #另一個進程將等待model准備完成,代碼如下 with tf.Graph().as_default(): ...add operations to the graph... # Create a SessionManager that will wait for the model to become ready. sm = SessionManager() sess = sm.wait_for_session(master) # Use the session to train the graph. while True: sess.run(<my_train_op>) #wait_for_session()等待一個model被其他進程初始化
▷ tf.train.ClusterSpec
一個tf.train.ClusterSpec表示一系列的進程,這些進程都參與分布式tensorflow的計算。每一個 tf.train.Server都在一個獨有的集群中構建。
創建一個具有兩個jobs及其5個tasks的集群們需要定義從job名稱列表到網絡地址列表之間的映射。
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222"], "ps": ["ps0.example.com:2222", "ps1.example.com:2222"]})
▷ tf.train.replica_device_setter
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker # jobs on hosts worker0, worker1 and worker2. cluster_spec = { "ps": ["ps0:2222", "ps1:2222"], "worker": ["worker0:2222", "worker1:2222", "worker2:2222"]} with tf.device(tf.replica_device_setter(cluster=cluster_spec)): # Build your graph v1 = tf.Variable(...) # assigned to /job:ps/task:0 v2 = tf.Variable(...) # assigned to /job:ps/task:1 v3 = tf.Variable(...) # assigned to /job:ps/task:0 # Run compute
█ 匯總操作(Summary Operations)
我們可以在一個session中獲取summary操作的輸出,並將其傳輸到SummaryWriter以添加至一個事件記錄文件中。
操作 | 描述 |
---|---|
tf.scalar_summary(tags, values, collections=None, name=None) | 輸出一個標量值的summary協議buffer tag的shape需要與values的相同,用來做summaries的tags,為字符串 |
tf.image_summary(tag, tensor, max_images=3, collections=None, name=None) | 輸出一個圖像tensor的summary協議buffer |
tf.audio_summary(tag, tensor, sample_rate, max_outputs=3, collections=None, name=None) | 輸出一個音頻tensor的summary協議buffer |
tf.histogram_summary(tag, values, collections=None, name=None) | 輸出一個直方圖的summary協議buffer |
tf.nn.zero_fraction(value, name=None) | 返回0在value中的小數比例 |
tf.merge_summary(inputs, collections=None, name=None) | 合並summary |
tf.merge_all_summaries(key=’summaries’) | 合並在默認graph中手機的summaries |
▶▶將記錄匯總寫入文件中(Adding Summaries to Event Files)
操作 | 描述 |
---|---|
class tf.train.SummaryWriter | 將summary協議buffer寫入事件文件中 |
tf.train.SummaryWriter.__init__(logdir, graph=None, max_queue=10, flush_secs=120, graph_def=None) | 創建一個SummaryWriter實例以及新建一個事件文件 |
tf.train.SummaryWriter.add_summary(summary, global_step=None) | 將一個summary添加到事件文件中 |
tf.train.SummaryWriter.add_session_log(session_log, global_step=None) | 添加SessionLog到一個事件文件中 |
tf.train.SummaryWriter.add_event(event) | 添加一個事件到事件文件中 |
tf.train.SummaryWriter.add_graph(graph, global_step=None, graph_def=None) | 添加一個Graph到時間文件中 |
tf.train.SummaryWriter.add_run_metadata(run_metadata, tag, global_step=None) | 為一個單一的session.run()調用添加一個元數據信息 |
tf.train.SummaryWriter.flush() | 刷新時間文件到硬盤中 |
tf.train.SummaryWriter.close() | 將事件問價寫入硬盤中並關閉該文件 |
tf.train.summary_iterator(path) | 一個用於從時間文件中讀取時間協議buffer的迭代器 |
▷ tf.train.SummaryWriter
創建一個SummaryWriter 和事件文件。如果我們傳遞一個Graph進入該構建器中,它將被添加到事件文件當中,這一點與使用add_graph()具有相同功能。
TensorBoard 將從事件文件中提取該graph,並將其顯示。所以我們能直觀地看到我們建立的graph。我們通常從我們啟動的session中傳遞graph:
...create a graph...
# Launch the graph in a session. sess = tf.Session() # Create a summary writer, add the 'graph' to the event file. writer = tf.train.SummaryWriter(<some-directory>, sess.graph)
▷ tf.train.summary_iterator
#打印時間文件中的內容 for e in tf.train.summary_iterator(path to events file): print(e) #打印指定的summary值 # This example supposes that the events file contains summaries with a # summary value tag 'loss'. These could have been added by calling # `add_summary()`, passing the output of a scalar summary op created with # with: `tf.scalar_summary(['loss'], loss_tensor)`. for e in tf.train.summary_iterator(path to events file): for v in e.summary.value: if v.tag == 'loss': print(v.simple_value)
█ 訓練的通用函數及其他(Training utilities)
操作 | 描述 |
---|---|
tf.train.global_step(sess, global_step_tensor) | 一個用於獲取全局step的小輔助器 |
tf.train.write_graph(graph_def, logdir, name, as_text=True) | 將一個graph proto寫入一個文件中 |
# | |
:— | |
class tf.train.LooperThread | 可重復地執行代碼的線程 |
tf.train.LooperThread.init(coord, timer_interval_secs, target=None, args=None, kwargs=None) | 創建一個LooperThread |
tf.train.LooperThread.is_alive() | 返回是否該線程是活躍的 |
tf.train.LooperThread.join(timeout=None) | 等待線程結束 |
tf.train.LooperThread.loop(coord, timer_interval_secs, target, args=None, kwargs=None) | 啟動一個LooperThread,用於周期地調用某個函數 調用函數target(args) |
tf.py_func(func, inp, Tout, stateful=True, name=None) | 將python函數包裝成tf中操作節點 |
▷ tf.train.global_step
# Creates a variable to hold the global_step. global_step_tensor = tf.Variable(10, trainable=False, name='global_step') # Creates a session. sess = tf.Session() # Initializes the variable. sess.run(global_step_tensor.initializer) print('global_step: %s' % tf.train.global_step(sess, global_step_tensor)) global_step: 10
▷ tf.train.write_graph
v = tf.Variable(0, name='my_variable') sess = tf.Session() tf.train.write_graph(sess.graph_def, '/tmp/my-model', 'train.pbtxt')
▷ tf.py_func
#tf.py_func(func, inp, Tout, stateful=True, name=None) #func:為一個python函數 #inp:為輸入函數的參數,Tensor列表 #Tout: 指定func返回的輸出的數據類型,是一個列表 def my_func(x): # x will be a numpy array with the contents of the placeholder below return np.sinh(x) inp = tf.placeholder(tf.float32, [...]) y = py_func(my_func, [inp], [tf.float32])
2.2 測試 (Testing)
TensorFlow 提供了一個方便的繼承unittest.TestCase類的方法,該類增加有關TensorFlow 測試的方法。如下例子:
import tensorflow as tf class SquareTest(tf.test.TestCase): def testSquare(self): with self.test_session(): x = tf.square([2, 3]) self.assertAllEqual(x.eval(), [4, 9]) if __name__ == '__main__': tf.test.main()
█ 共用(Utilities)
操作 | 描述 |
---|---|
tf.test.main() | 運行所有的單元測試 |
tf.test.assert_equal_graph_def(actual, expected) | 斷言 兩個GraphDefs 是否幾乎一樣 |
tf.test.get_temp_dir() | 返回測試期間使用的臨時目錄 |
tf.test.is_built_with_cuda() | 返回是否Tensorflow支持CUDA(GPU)的build |
█ 梯度檢查(Gradient checking)
可對比compute_gradient 和compute_gradient_error函數的用法
操作 | 描述 |
---|---|
tf.test.compute_gradient(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) | 計算並返回理論的和數值的Jacobian矩陣 |
tf.test.compute_gradient_error(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) | 計算梯度的error。在計算所得的與數值估計的Jacobian中 為dy/dx計算最大的error |
注:本教程不是原創 轉自http://blog.csdn.net/lenbow/article/details/52218551
感謝作者 的總結 與付出