TensorBoard計算加速
0. 寫在前面
參考書
《TensorFlow:實戰Google深度學習框架》(第2版)
工具
python3.5.1,pycharm
1. TensorFlow使用GPU
1. 如何使用log_device_placement參數來打印運行每一個運算的設備。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test1.py
@time: 2019/5/14 19:40
@desc: 如何使用log_device_placement參數來打印運行每一個運算的設備
"""
import tensorflow as tf
a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
c = a + b
# 通過log_device_placement參數來輸出運行每一個運算的設備。
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))
運行結果:
2. 通過tf.device手工指定運行設備的樣例。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test2.py
@time: 2019/5/14 19:54
@desc: 通過tf.device手工指定運行設備的樣例。
"""
import tensorflow as tf
# 通過tf.device將運行指定到特定的設備上。
with tf.device('/cpu:0'):
a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
with tf.device('/gpu:1'):
c = a + b
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))
由於我並沒有GPU,所以只是尬碼代碼。。。
3. 不是所有的操作都可以被放在GPU上,如果強行將無法放在GPU上的操作指定到GPU上,那么程序將會報錯。
4. 為了避免這個問題,TensorFlow在生成會話時,可以指定allow_soft_placement參數,當這個參數為True時,如果運算無法由GPU執行,那么TensorFlow會自動將它放到CPU上執行。
2. 深度學習訓練並行模式
- 常用的並行化深度學習模型訓練方式有兩種,同步模式和異步模式。
- 可以簡單的認為異步模式就是單機模式復制了多份,每一份使用不同的訓練數據進行訓練。
- 在異步模式下,不同設備之間是完全獨立的。
- 使用異步模式訓練的深度學習模型有可能無法達到較優的訓練結果。
- 在同步模式下,所有的設備同時讀取參數的取值,並且當反向傳播算法完成之后同步更新參數的取值。
- 總結來說就是,異步模式達到全局最優要難一些,但是速度快;同步模式更能達到全局最優,但墨跡。兩者都有應用。
3. 多GPU並行
在多GPU上訓練深度學習模型解決MNIST問題。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test3.py
@time: 2019/5/15 10:35
@desc: 在多GPU上訓練深度學習模型解決MNIST問題。
"""
from datetime import datetime
import os
import time
import tensorflow as tf
import BookStudy.book2.mnist_inference as mnist_inference
# 定義訓練神經網絡時需要用到的模型。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99
N_GPU = 2
# 定義日志和模型輸出的路徑
MODEL_SAVE_PATH = 'logs_and_models/'
MODEL_NAME = 'model.ckpt'
# 定義數據存儲的路徑。因為需要為不同的GPU提供不同的訓練數據,所以通過placeholder的方式
# 就需要手動准備多份數據。為了方便訓練數據的獲取過程,可以采用前面介紹的Dataset的方式從
# TFRecord中讀取數據。於是在這里提供的數據文件路徑為將MNIST訓練數據轉化為TFRecord格式
# 之后的路徑。如何將MNIST數據轉化為TFRecord格式在前面有詳細介紹,這里不再贅述。
DATA_PATH = 'output.tfrecords'
# 定義輸入隊列得到的訓練數據,具體細節可以參考前面。
def get_input():
dataset = tf.data.TFRecordDataset([DATA_PATH])
# 定義數據解析格式。
def parser(record):
features = tf.parse_single_example(
record,
features={
'image_raw': tf.FixedLenFeature([], tf.string),
'pixels': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
}
)
# 解析圖片和標簽信息。
decoded_image = tf.decode_raw(features['image_raw'], tf.uint8)
reshaped_image = tf.reshape(decoded_image, [784])
retyped_image = tf.cast(reshaped_image, tf.float32)
label = tf.cast(features['label', tf.int32])
return retyped_image, label
# 定義輸入隊列
dataset = dataset.map(parser)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(10)
dataset = dataset.batch(BATCH_SIZE)
iterator = dataset.make_one_shot_iterator()
features, labels = iterator.get_next()
return features, labels
# 定義損失函數。對於給定的訓練數據、正則化損失計算規則和命名空間,計算在這個命名空間下的總損失。
# 之所以需要給定命名空間就是因為不同的GPU上計算得出的正則化損失都會加入名為loss的集合,如果不
# 通過命名空間就會將不同GPU上的正則化損失都加進來。
def get_loss(x, y_, regularizer, scope, reuse_variables=None):
# 沿用前面定義的函數來計算神經網絡的前向傳播結果。
with tf.variable_scope(tf.get_variable_scope(), reuse=reuse_variables):
y = mnist_inference.inference(x, regularizer)
# 計算交叉熵損失。
cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=y_))
# 計算當前GPU上計算得到的正則化損失。
regularization_loss = tf.add_n(tf.get_collection('losses', scope))
# 計算最終的總損失。
loss = cross_entropy + regularization_loss
return loss
# 計算每一個變量梯度的平均值。
def average_gradients(tower_grads):
average_grads = []
# 枚舉所有的變量和變量在不同GPU上計算得出的梯度。
for grad_and_vars in zip(*tower_grads):
# 計算所有GPU上的梯度平均值
grads = []
for g, _ in grad_and_vars:
expanded_g = tf.expand_dims(g, 0)
grads.append(expanded_g)
grad = tf.concat(grads, 0)
grad = tf.reduce_mean(grad, 0)
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
# 將變量和它的平均梯度對應起來。
average_grads.append(grad_and_var)
# 返回所有變量的平均梯度,這個將被用於變量的更新。
return average_grads
# 主訓練過程。
def main(argv=None):
# 將簡單的運算放在CPU上,只有神經網絡的訓練過程在GPU上。
with tf.Graph().as_default(), tf.device('/cpu:0'):
# 定義基本的訓練過程
x, y_ = get_input()
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step, 60000/BATCH_SIZE, LEARNING_RATE_DECAY)
opt = tf.train.GradientDescentOptimizer(learning_rate)
tower_grads = []
reuse_variables = False
# 將神經網絡的優化過程跑在不同的GPU上。
for i in range(N_GPU):
# 將優化過程指定在一個GPU上
with tf.device('/gpu:%d' % i):
with tf.name_scope('GPU_%d' % i) as scope:
cur_loss = get_loss(x, y_, regularizer, scope, reuse_variables)
# 在第一次聲明變量之后,將控制變量重用的參數設置為True。這樣可以讓不同的GPU更新同一組參數
reuse_variables = True
grads = opt.compute_gradients(cur_loss)
tower_grads.append(grads)
# 計算變量的平均梯度
grads = average_gradients(tower_grads)
for grad, var in grads:
if grad is not None:
tf.summary.histogram('gradients_on_average/%s' % var.op.name, grad)
# 使用平均梯度更新參數。
apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
for var in tf.trainable_variables():
tf.summary.histogram(var.op.name, var)
# 計算變量的滑動平均值。
variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
variable_to_average = (tf.trainable_variables() + tf.moving_average_variables())
variable_averages_op = variable_averages.apply(variable_to_average)
# 每一輪迭代需要更新變量的取值並更新變量的滑動平均值
train_op = tf.group(apply_gradient_op, variable_averages_op)
saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init = tf.global_variables_initializer()
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess:
# 初始化所有變量並啟動隊列。
init.run()
summary_writer = tf.summary.FileWriter(MODEL_SAVE_PATH, sess.graph)
for step in range(TRAINING_STEPS):
# 執行神經網絡訓練操作,並記錄訓練操作的運行時間。
start_time = time.time()
_, loss_value = sess.run([train_op, cur_loss])
duration = time.time() - start_time
# 每隔一段時間輸出當前的訓練進度,並統計訓練速度
if step != 0 and step % 10 == 0:
# 計算使用過的訓練數據個數。因為在每一次運行訓練操作時,每一個GPU都會使用一個batch的訓練數據,
# 所以總共用到的訓練數據個數為batch大小 X GPU個數。
num_examples_per_step = BATCH_SIZE * N_GPU
# num_example_per_step為本次迭代使用到的訓練數據個數,duration為運行當前訓練過程使用的時間,
# 於是平均每秒可以處理的訓練數據個數為num_examples_per_step / duration
examples_per_sec = num_examples_per_step / duration
# duration為運行當前訓練過程使用的時間,因為在每一個訓練過程中,每一個GPU都會使用一個batch的
# 訓練數據,所以在單個batch上的訓練所需要的時間為duration / GPU個數
sec_per_batch = duration / N_GPU
# 輸出訓練信息。
format_str = '%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)'
print(format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch))
# 通過TensorBoard可視化訓練過程。
summary = sess.run(summary_op)
summary_writer.add_summary(summary, step)
# 每隔一段時間保存當前的模型。
if step % 1000 == 0 or (step + 1) == TRAINING_STEPS:
checkpoint_path = os.path.join(MODEL_SAVE_PATH, MODEL_NAME)
saver.save(sess, checkpoint_path, global_step=step)
if __name__ == '__main__':
tf.app.run()
由於我依然沒有GPU,所以只是尬碼代碼。。。
4. 分布式TensorFlow
4.1 分布式TensorFlow原理
- 創建一個最簡單的TensorFlow集群。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test4.py
@time: 2019/5/15 22:19
@desc: 創建一個最簡單的TensorFlow集群
"""
import tensorflow as tf
c = tf.constant("Hello, distributed TensorFlow!")
# 創建一個本地的TensorFlow集群
server = tf.train.Server.create_local_server()
# 在集群上創建一個會話。
sess = tf.Session(server.target)
# 輸出Hello,distributed TensorFlow!
print(sess.run(c))
輸出得到:
- 在本地運行有兩個任務的TensorFlow集群。
第一個任務代碼:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test5.py
@time: 2019/5/15 22:27
@desc: 在本地運行有兩個任務的TensorFlow集群。第一個任務的代碼。
"""
import tensorflow as tf
c = tf.constant("Hello from server1!")
# 生成一個有兩個任務的集群,一個任務跑在本地2222端口,另外一個跑在本地2223端口。
cluster = tf.train.ClusterSpec({"local": ["localhost: 2222", "localhost: 2223"]})
# 通過上面生成的集群配置生成Server,並通過job_name和task_index指定當前所啟動的任務。
# 因為該任務是第一個任務,所以task_index為0.
server = tf.train.Server(cluster, job_name="local", task_index=0)
# 通過server.target生成會話來使用TensorFlow集群中的資源。通過設置
# log_device_placement可以看到執行每一個操作的任務。
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))
server.join()
第二個任務代碼:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test6.py
@time: 2019/5/16 10:14
@desc: 在本地運行有兩個任務的TensorFlow集群。第二個任務的代碼。
"""
import tensorflow as tf
c = tf.constant("Hello from server2!")
# 和第一個程序一樣的集群配置。集群中的每一個任務需要采用相同的配置。
cluster = tf.train.ClusterSpec({"local": ["localhost: 2222", "localhost: 2223"]})
# 指定task_index為1,所以這個程序將在localhost: 2223啟動服務。
server = tf.train.Server(cluster, job_name="local", task_index=1)
# 剩下的代碼都和第一個任務的代碼一致。
# 通過server.target生成會話來使用TensorFlow集群中的資源。通過設置
# log_device_placement可以看到執行每一個操作的任務。
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))
server.join()
啟動第一個任務后,可以得到類似下面的輸出:
從第一個任務的輸出中可以看到,當只啟動第一個任務時,程序會停下來等待第二個任務啟動。當第二個任務啟動后,可以得到如下輸出:
值得注意的是:第二個任務中定義的計算也被放在了同一個設備上,也就是說這個計算將由第一個任務來執行。
使用分布式TensorFlow訓練深度學習模型一般有兩種方式:
- 一種方式叫做計算圖內分布式(in-graph replication)。優點:同步更新參數比較容易控制。缺點:當數據量太大時,中心節點容易造成性能瓶頸。
- 另外一種叫做計算圖之間分布式(between-graph replication)。優點:並行程度更高。缺點:同步更新困難。
4.2 分布式TensorFlow模型訓練
異步模式樣例程序
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test7.py
@time: 2019/5/16 14:01
@desc: 實現異步模式的分布式神經網絡訓練過程。
"""
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import mnist_inference
# 配置神經網絡的參數。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 20000
MOVING_AVERAGE_DECAY = 0.99
# 模型保存的路徑。
MODEL_SAVE_PATH = "./log1"
# MNIST數據路徑。
DATA_PATH = "D:/Python3Space/BookStudy/book2/MNIST_data"
# 通過flags指定運行的參數。在前面對於不同的任務(task)給出了不同的程序。
# 但這不是一種可擴展的方式。在這一節中將使用運行程序時給出的參數來配置在
# 不同任務中運行的程序。
FLAGS = tf.app.flags.FLAGS
# 指定當前運行的是參數服務器還是計算服務器。參數服務器只負責TensorFlow中變量的維護
# 和管理,計算服務器負責每一輪迭代時運行反向傳播過程。
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
# 指定集群中的參數服務器地址。
tf.app.flags.DEFINE_string(
'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
'Comma-separated list of hostname:port for the parameter server jobs.'
' e.g. "tf-ps0:2222,tf-ps1:1111" '
)
# 指定集群中的計算服務器地址。
tf.app.flags.DEFINE_string(
'worker_hosts', ' tf-worker0:2222, tf-worker1:1111',
'Comma-separated list of hostname:port for the worker jobs. '
'e.g. "tf-worker0:2222,tf-worker1:1111" '
)
# 指定當前程序的任務ID。TensorFlow會自動根據參數服務器/計算服務器列表中的端口號來啟動服務。
# 注意參數服務器和計算服務器的編號都是從0開始的。
tf.app.flags.DEFINE_integer(
'task_id', 0, 'Task ID of the worker/replica running the training.'
)
# 定義TensorFlow的計算圖,並返回每一輪迭代時需要運行的操作。這個過程和前面的主函數基本一致,
# 但為了使處理分布式計算的部分更加突出,這里將此過程整理為一個函數。
def build_model(x, y_, is_chief):
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
# 通過前面給出的mnist_inference計算神經網絡前向傳播的結果。
y = mnist_inference.inference(x, regularizer)
global_step = tf.train.get_or_create_global_step()
# 計算損失函數並定義反向傳播的過程。
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
cross_entropy_mean = tf.reduce_mean(cross_entropy)
loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
learning_rate = tf.train.exponential_decay(
LEARNING_RATE_BASE,
global_step,
60000 / BATCH_SIZE,
LEARNING_RATE_DECAY
)
train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
# 定義每一輪迭代需要運行的操作。
if is_chief:
# 計算變量的滑動平均值。
variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
variable_averages_op = variable_averages.apply(tf.trainable_variables())
with tf.control_dependencies([variable_averages_op, train_op]):
train_op = tf.no_op()
return global_step, loss, train_op
def main(argv=None):
# 解析flags並通過tf.train.ClusterSpe配置TensorFlow集群。
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
cluster = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})
# 通過tf.train.ClusterSpec以及當前任務創建tf.train.Server。
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)
# 參數服務器只需要管理TensorFlow中的變量,不需要執行訓練的過程。server.join()會一直停在這條語句上。
if FLAGS.job_name == 'ps':
with tf.device("/cpu:0"):
server.join()
# 定義計算服務器需要運行的操作。
is_chief = (FLAGS.task_id == 0)
mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)
# 通過tf.train.replica_device_setter函數來指定執行每一個運算的設備。
# tf.train.replica_device_setter函數會自動將所有的參數分配到參數服務器上,將
# 計算分配到當前的計算服務器上。
device_setter = tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_id,
cluster=cluster
)
with tf.device(device_setter):
# 定義輸入並得到每一輪迭代需要運行的操作。
x = tf.placeholder(
tf.float32,
[None, mnist_inference.INPUT_NODE],
name='x-input'
)
y_ = tf.placeholder(
tf.float32,
[None, mnist_inference.OUTPUT_NODE],
name='y-input'
)
global_step, loss, train_op = build_model(x, y_, is_chief)
hooks = [tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
# 通過tf.train.MonitoredTrainingSession管理訓練深度學習模型的通用功能。
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=is_chief,
checkpoint_dir=MODEL_SAVE_PATH,
hooks=hooks,
save_checkpoint_secs=60,
config=sess_config
) as mon_sess:
print("session started.")
step = 0
start_time = time.time()
# 執行迭代過程。在迭代過程中tf.train.MonitoredTrainingSession會幫助完成初始化、
# 從checkpoint中加載訓練過的模型、輸出日志並保存模型,所以以下程序中不需要再調用
# 這些過程。tf.train.StopAtStepHook會幫忙判斷是否需要退出。
while not mon_sess.should_stop():
xs, ys = mnist.train.next_batch(BATCH_SIZE)
_, loss_value, global_step_value = mon_sess.run(
[train_op, loss, global_step],
feed_dict={x: xs, y_: ys}
)
# 每隔一段時間輸出訓練信息,不同的計算服務器都會更新全局的訓練輪數,
# 所以這里使用global_step_value得到在訓練中使用過的batch的總數。
if step > 0 and step % 100 == 0:
duration = time.time() - start_time
sec_per_batch = duration / global_step_value
format_str = "After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)"
print(format_str % (step, global_step_value, loss_value, sec_per_batch))
step += 1
if __name__ == '__main__':
try:
tf.app.run()
except Exception as e:
print(e)
要啟動一個擁有一個參數服務器、兩個計算服務器的集群,需要現在運行參數服務器的機器上啟動以下命令。
python gpu_test7.py --job_name=ps --task_id=0 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
然后再運行第一個計算服務器的機器上啟動以下命令:
python gpu_test7.py --job_name=worker --task_id=0 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
最后再運行第二個計算服務器的機器上啟動以下命令:
python gpu_test7.py --job_name=worker --task_id=1 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
注意:如果你報錯了:
1. UnknownError: Could not start gRPC server
一定是你的參數問題!檢查你的task_id有沒有寫成taske_id等等,類似的,一定是這樣!!!一定要跟程序中的參數名保持一致!!!
2. 跑的過程中python搞不清楚崩了,兩個計算服務器都會崩,跟時間無關,隨機的那種。。。直接彈出python已停止的那種
聽我的換台電腦或者重裝系統。
3. 報錯‘ps’不存在的,沒定義的注意了!
在cmd(windows系統)中啟動上面三個命令的時候,參數的內容不要加引號!!!書上面加引號真的是坑死了好嗎!
我就是解決了上述三個問題,換了台macbook,才總算功德圓滿了跑出了結果。
左上:參數服務器
右上:計算服務器0
左下:計算服務器1
右下:運行tensorboard,結果如下:
同步模式樣例程序
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8
"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: gpu_test8.py
@time: 2019/5/17 13:52
@desc: 實現同步模式的分布式神經網絡訓練過程。
"""
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import mnist_inference
# 配置神經網絡的參數。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 20000
MOVING_AVERAGE_DECAY = 0.99
# 模型保存的路徑。
MODEL_SAVE_PATH = "./log2"
# MNIST數據路徑。
DATA_PATH = "D:/Python3Space/BookStudy/book2/MNIST_data"
# 和異步模式類似的設置flags。
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
tf.app.flags.DEFINE_string(
'ps_hosts', ' tf-ps0:2222, tf-ps1:1111',
'Comma-separated list of hostname:port for the parameter server jobs.'
' e.g. "tf-ps0:2222,tf-ps1:1111" '
)
tf.app.flags.DEFINE_string(
'worker_hosts', ' tf-worker0:2222, tf-worker1:1111',
'Comma-separated list of hostname:port for the worker jobs. '
'e.g. "tf-worker0:2222,tf-worker1:1111" '
)
tf.app.flags.DEFINE_integer(
'task_id', 0, 'Task ID of the worker/replica running the training.'
)
# 和異步模式類似的定義TensorFlow的計算圖。唯一的區別在於使用。
# tf.train.SyncReplicasOptimizer函數處理同步更新。
def build_model(x, y_, n_workers, is_chief):
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
y = mnist_inference.inference(x, regularizer)
global_step = tf.train.get_or_create_global_step()
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
cross_entropy_mean = tf.reduce_mean(cross_entropy)
loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
learning_rate = tf.train.exponential_decay(
LEARNING_RATE_BASE,
global_step,
60000 / BATCH_SIZE,
LEARNING_RATE_DECAY
)
# 通過tf.train.SyncReplicasOptimizer函數實現同步更新。
opt = tf.train.SyncReplicasOptimizer(
tf.train.GradientDescentOptimizer(learning_rate),
replicas_to_aggregate=n_workers,
total_num_replicas=n_workers
)
sync_replicas_hook = opt.make_session_run_hook(is_chief)
train_op = opt.minimize(loss, global_step=global_step)
if is_chief:
variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
variable_averages_op = variable_averages.apply(tf.trainable_variables())
with tf.control_dependencies([variable_averages_op, train_op]):
train_op = tf.no_op()
return global_step, loss, train_op, sync_replicas_hook
def main(argv=None):
# 和異步模式類似地創建TensorFlow集群。
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
n_workers = len(worker_hosts)
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_id)
if FLAGS.job_name == 'ps':
with tf.device("/cpu:0"):
server.join()
is_chief = (FLAGS.task_id == 0)
mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)
device_setter = tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_id,
cluster=cluster
)
with tf.device(device_setter):
# 定義輸入並得到每一輪迭代需要運行的操作。
x = tf.placeholder(
tf.float32,
[None, mnist_inference.INPUT_NODE],
name='x-input'
)
y_ = tf.placeholder(
tf.float32,
[None, mnist_inference.OUTPUT_NODE],
name='y-input'
)
global_step, loss, train_op, sync_replicas_hook = build_model(x, y_, n_workers, is_chief)
# 把處理同步更新的hook也加進來
hooks = [sync_replicas_hook, tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
# 訓練過程和異步一致。
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=is_chief,
checkpoint_dir=MODEL_SAVE_PATH,
hooks=hooks,
save_checkpoint_secs=60,
config=sess_config
) as mon_sess:
print("session started.")
step = 0
start_time = time.time()
while not mon_sess.should_stop():
xs, ys = mnist.train.next_batch(BATCH_SIZE)
_, loss_value, global_step_value = mon_sess.run(
[train_op, loss, global_step],
feed_dict={x: xs, y_: ys}
)
if step > 0 and step % 100 == 0:
duration = time.time() - start_time
sec_per_batch = duration / global_step_value
format_str = "After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)"
print(format_str % (step, global_step_value, loss_value, sec_per_batch))
step += 1
if __name__ == '__main__':
tf.app.run()
同上運行出來得到:
和異步模式不同,在同步模式下,global_step差不多是兩個計算服務器local_step的平均值。比如在第二個計算服務器還沒有開始之前,global_step是第一個服務器local_step的一般。這是因為同步模式要求收集replicas_to_average份梯度才會開始更新(注意這里TensorFlow不要求每一份梯度來自不同的計算服務器)。同步模式不僅僅是一次使用多份梯度,tf.train.SyncReplicasOptimizer的實現同時也保證了不會出現陳舊變量的問題,該函數會記錄每一份梯度是不是由最新的變量值計算得到的,如果不是,那么這一份梯度將會被丟棄。
5. 寫在最后
總算是把這本書一步一步的實現,一步一步的修改,從頭認認真真的刷了一遍。取其精華,棄其糟粕。學習TensorFlow真的是一件不容易,也是一件很有成就感的過程。還有黑皮書《TensorFlow實戰》和聖經花書《深度學習》要學,希望后面的書不會讓我失望吧。
我的CSDN:https://blog.csdn.net/qq_21579045
我的博客園:https://www.cnblogs.com/lyjun/
我的Github:https://github.com/TinyHandsome
紙上得來終覺淺,絕知此事要躬行~
歡迎大家過來OB~
by 李英俊小朋友