Tensorflow的一個特色就是分布式計算。分布式Tensorflow是由高性能的gRPC框架作為底層技術來支持的。這是一個通信框架gRPC(google remote procedure call),是一個高性能、跨平台的RPC框架。RPC協議,即遠程過程調用協議,是指通過網絡從遠程計算機程序上請求服務。
分布式原理
Tensorflow分布式是由多個服務器進程和客戶端進程組成。有幾種部署方式,例如單機多卡和多機多卡(分布式)。
單機多卡
單機多卡是指單台服務器有多塊GPU設備。假設一台機器上有4塊GPU,單機多GPU的訓練過程如下:
- 在單機單GPU的訓練中,數據是一個batch一個batch的訓練。 在單機多GPU中,數據一次處理4個batch(假設是4個GPU訓練), 每個GPU處理一個batch的數據計算。
- 變量,或者說參數,保存在CPU上。數據由CPU分發給4個GPU,在GPU上完成計算,得到每個批次要更新的梯度
- 在CPU上收集完4個GPU上要更新的梯度,計算一下平均梯度,然后更新。
- 循環進行上面步驟
多機多卡(分布式)
而分布式是指有多台計算機,充分使用多台計算機的性能,處理數據的能力。可以根據不同計算機划分不同的工作節點。當數據量或者計算量達到超過一台計算機處理能力的上限的話,必須使用分布式。
分布式的架構
當我們知道的基本的分布式原理之后,我們來看看分布式的架構的組成。分布式架構的組成可以說是一個集群的組成方式。那么一般我們在進行Tensorflow分布式時,需要建立一個集群。通常是我們分布式的作業集合。一個作業中又包含了很多的任務(工作結點),每個任務由一個工作進程來執行。
節點之間的關系
一般來說,在分布式機器學習框架中,我們會把作業分成參數作業(parameter job)和工作結點作業(worker job)。運行參數作業的服務器我們稱之為參數服務器(parameter server,PS),負責管理參數的存儲和更新,工作結點作業負責主要從事計算的任務,如運行操作。
參數服務器,當模型越來越大時,模型的參數越來越多,多到一台機器的性能不夠完成對模型參數的更新的時候,就需要把參數分開放到不同的機器去存儲和更新。參數服務器可以是由多台機器組成的集群。工作節點是進行模型的計算的。Tensorflow的分布式實現了作業間的數據傳輸,也就是參數作業到工作結點作業的前向傳播,以及工作節點到參數作業的反向傳播。
分布式的模式
在訓練一個模型的過程中,有哪些部分可以分開,放在不同的機器上運行呢?在這里就要接觸到數據並行的概念。
數據並行
數據並總的原理很簡單。其中CPU主要負責梯度平均和參數更新,而GPU主要負責訓練模型副本。
- 模型副本定義在GPU上
- 對於每一個GPU,都是從CPU獲得數據,前向傳播進行計算,得到損失,並計算出梯度
- CPU接到GPU的梯度,取平均值,然后進行梯度更新
每一個設備的計算速度不一樣,有的快有的滿,那么CPU在更新變量的時候,是應該等待每一個設備的一個batch進行完成,然后求和取平均來更新呢?還是讓一部分先計算完的就先更新,后計算完的將前面的覆蓋呢?這就由同步更新和異步更新的問題。
同步更新和異步更新
更新參數分為同步和異步兩種方式,即異步隨機梯度下降法(Async-SGD)和同步隨機梯度下降法(Sync-SGD)
- 同步隨即梯度下降法的含義是在進行訓練時,每個節點的工作任務需要讀入共享參數,執行並行的梯度計算,同步需要等待所有工作節點把局部的梯度算好,然后將所有共享參數進行合並、累加,再一次性更新到模型的參數;下一個批次中,所有工作節點拿到模型更新后的參數再進行訓練。這種方案的優勢是,每個訓練批次都考慮了所有工作節點的訓練情況,損失下降比較穩定;劣勢是,性能瓶頸在於最慢的工作結點上。
- 異步隨機梯度下降法的含義是每個工作結點上的任務獨立計算局部梯度,並異步更新到模型的參數中,不需要執行協調和等待操作。這種方案的優勢是,性能不存在瓶頸;劣勢是,每個工作節點計算的梯度值發送回參數服務器會有參數更新的沖突,一定程度上會影響算法的收斂速度,在損失下降的過程中抖動較大。
分布式API
創建集群的方法是為每一個任務啟動一個服務,這些任務可以分布在不同的機器上,也可以同一台機器上啟動多個任務,使用不同的GPU等來運行。每個任務都會創建完成以下工作
- 1、創建一個tf.train.ClusterSpec,用於對集群中的所有任務進行描述,該描述內容對所有任務應該是相同的
- 2、創建一個tf.train.Server,用於創建一個任務,並運行相應作業上的計算任務。
Tensorflow的分布式API使用如下:
- tf.train.ClusterSpec()
創建ClusterSpec,表示參與分布式TensorFlow計算的一組進程
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", /job:worker/task:0
"worker1.example.com:2222", /job:worker/task:1
"worker2.example.com:2222"],/job:worker/task:2
"ps": ["ps0.example.com:2222", /job:ps/task:0
"ps1.example.com:2222"]}) /job:ps/task:1
創建Tensorflow的集群描述信息,其中ps和worker為作業名稱,通過指定ip地址加端口創建
- tf.train.Server(server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True)
- server_or_cluster_def: 集群描述
- job_name: 任務類型名稱
- task_index: 任務數
創建一個服務(主節點或者工作節點服務),用於運行相應作業上的計算任務,運行的任務在task_index指定的機器上啟動,例如在不同的ip+端口上啟動兩個工作任務
- 屬性:target
- 返回tf.Session連接到此服務器的目標
- 方法:join()
- 參數服務器端等待接受參數任務,直到服務器關閉
tf.device(device_name_or_function):選擇指定設備或者設備函數
- if device_name:
- 指定設備
- 例如:"/job:worker/task:0/cpu:0”
- if function:
- tf.train.replica_device_setter(worker_device=worker_device, cluster=cluster)
- 作用:通過此函數協調不同設備上的初始化操作
- worker_device:為指定設備, “/job:worker/task:0/cpu:0” or "/job:worker/task:0/gpu:0"
- cluster:集群描述對象
分布式案例
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("job_name", "worker", "啟動服務類型,ps或者worker")
tf.app.flags.DEFINE_integer("task_index", 0, "指定是哪一台服務器索引")
def main(argv):
# 集群描述
cluster = tf.train.ClusterSpec({
"ps": ["127.0.0.1:4466"],
"worker": ["127.0.0.1:4455"]
})
# 創建不同的服務
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
else:
work_device = "/job:worker/task:0/cpu:0"
with tf.device(tf.train.replica_device_setter(
worker_device=work_device,
cluster=cluster
)):
# 全局計數器
global_step = tf.train.get_or_create_global_step()
# 准備數據
mnist = input_data.read_data_sets("./data/mnist/", one_hot=True)
# 建立數據的占位符
with tf.variable_scope("data"):
x = tf.placeholder(tf.float32, [None, 28 * 28])
y_true = tf.placeholder(tf.float32, [None, 10])
# 建立全連接層的神經網絡
with tf.variable_scope("fc_model"):
# 隨機初始化權重和偏重
weight = tf.Variable(tf.random_normal([28 * 28, 10], mean=0.0, stddev=1.0), name="w")
bias = tf.Variable(tf.constant(0.0, shape=[10]))
# 預測結果
y_predict = tf.matmul(x, weight) + bias
# 所有樣本損失值的平均值
with tf.variable_scope("soft_loss"):
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_true, logits=y_predict))
# 梯度下降
with tf.variable_scope("optimizer"):
train_op = tf.train.GradientDescentOptimizer(0.1).minimize(loss, global_step=global_step)
# 計算准確率
with tf.variable_scope("acc"):
equal_list = tf.equal(tf.argmax(y_true, 1), tf.argmax(y_predict, 1))
accuracy = tf.reduce_mean(tf.cast(equal_list, tf.float32))
# 創建分布式會話
with tf.train.MonitoredTrainingSession(
checkpoint_dir="./temp/ckpt/test",
master="grpc://127.0.0.1:4455",
is_chief=(FLAGS.task_index == 0),
config=tf.ConfigProto(log_device_placement=True),
hooks=[tf.train.StopAtStepHook(last_step=100)]
) as mon_sess:
while not mon_sess.should_stop():
mnist_x, mnist_y = mnist.train.next_batch(4000)
mon_sess.run(train_op, feed_dict={x: mnist_x, y_true: mnist_y})
print("訓練第%d步, 准確率為%f" % (global_step.eval(session=mon_sess), mon_sess.run(accuracy, feed_dict={x: mnist_x, y_true: mnist_y})))
if __name__ == '__main__':
tf.app.run()
運行參數服務器:
$ python zfx.py --job_name=ps
運行worker服務器:
$ python zfx.py --job_name=worker