TensorFlow for distributed


TensorFlow for distributed 

本目錄包括了運行時分布式TensorFlow的實現,其底層使用了gRPC 作為進程內通信的支持庫。

Quick start

首先,需要構建一個TensorFlow的服務端可執行版本(grpc_tensorflow_server) 以及一個基於gRPC的客戶端。目前只能基於源代碼進行自構建, 但是會包含在未來發布的二進制版本中。可以使用如下命令進行構建:

# CPU-only build. $ bazel --output_base=/data/tensorflow_distributed   build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server bazel --output_base=/home/data/tensorflow_distributed build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server # GPU build.  

 

$ cd /data/tensorflow
 
$ bazel --output_base=/data/tensorflow_distributed build -c opt --config=cuda //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server

如果是從最新的源代碼創建的Python依賴包,它會自動包含一個基於gRPC的客戶端。如果使用的是一個之前發布的二進制版本,需要根據這個安裝說明來重新編譯安裝。在你成功地構建了分布式的TensorFlow組件之后,可以通過如下方式來啟動服務器並且判斷你的安裝是否成功:

# Start a TensorFlow server as a single-process "cluster". 
#

啟動示例1:

 
$ /data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf1:2222;tf1:2223' --job_name=ps --task_index=0 &
$ /data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf1:2222;tf1:2223' --job_name=ps --task_index=1 &

啟動示例2:

$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225'  --job_name=ps --task_id=0 &(tf3上啟動)

$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225'  --job_name=ps --task_id=1 &(tf4上啟動)

$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225'  --job_name=wk --task_id=0 &(tf3上啟動)

$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server   --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225'  --job_name=wk --task_id=1 &(tf4上啟動)

 

然后啟動Python的交互器並且啟動一個Session:

$ python
>>> import tensorflow as tf >>> c = tf.constant("Hello, distributed TensorFlow!") >>> sess = tf.Session("grpc://localhost:2222") >>> sess.run(c) 'Hello, distributed TensorFlow!'

補充:安裝完后需要重新登錄服務器,否則出現無法import  tensorflow

集群定義

命令行參數 grpc_tensorflow_server 定義了集群之間的關系. 參數 --cluster_spec 決定了集群中工作對象的多少, 譬如有一系列的jobs, 而每個jobs又包含了多個task 終端。 所有集群中的處理過程都必須設置相同的 --cluster_spec參數, 例子如下:

`local\ localhost:2222` /job:local/task:0
`local\ localhost:2222;localhost:2223` /job:local/task:0`/job:local/task:1`
`worker\ worker0:2222;worker1:2222;worker2:2222,``ps\ ps0:2222;ps1:2222` /job:worker/task:0`/job:worker/task:1/job:worker/task:2/job:ps/task:0`/job:ps/task:1
--cluster_spec='...'
Available tasks

還有 --job_name 與 --task_index 標志位指明了哪些任務會運行在當前處理過程上。 具體而言,
--job_name=local --task_index=0 意思就是該過程會被標志為
/job:local/task:0, 然后所有在該過程上的TensorFlow的設備都會使用這個前綴。

N.B. 
手動來指明這些運行參數可能是非常冗長的,特別是對一個大型集群而言。我們正在研發可以程式化啟動的工具,譬如使用一些類似於Kubernetes集群管理器。如果有啥集群管理工具你覺得挺好的希望加入進來,可以在GitHub issue上提出你的建議。

標注模型中的分布式設備

為了將某個操作放在某個特殊的處理過程上,在分布式環境下依然可以使用
tf.device()
函數,之前是用來指明是放在CPU還是GPU上的。譬如:

with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...) with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...) with tf.device("/job:worker/task:7"): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ... with tf.Session("grpc://worker7:2222") as sess: for _ in range(10000): sess.run(train_op)

在上面的例子中,Variables在job ps的兩個task上被創建,然后計算密集型的部分創建在job work上。TensorFlow會自動地在不同的job之間傳輸數據。(從jobwork是前向傳遞,而從workerps是梯度應用)。

Replicated Computation

一個常見的訓練配置(數據並行訓練)包含了job ps上共享參數以及job work上的多個任務來訓練相同的模型。每個task一般會運行在不同的機器上。現在還是有很多辦法可以在TensorFlow中來實現這一種結構的,我們未來也會提供更簡單的實現方式,主要途徑有:

  • 構建單一的包含了一系列參數的圖(in tf.Variable nodes pinned to /job:ps), 並且創建多個模型的副本來映射到/job:worker中的不同tasks。每個model的副本有一個不同的train_op,並且對於每個worker i而言一個或者多個的客戶端線程可以調用sess.run(train_ops[i])。這種方法使用了單一的tf.Session,它的工作目標是集群中的某個workers。

  • As above, but where the gradients from all workers are averaged. See the
    CIFAR-10 multi-GPU trainer

for an example of this form of replication. The implements synchronous training

  • 另一種分布式訓練器的方法使用多張圖,一張圖對應一個worker,並且每張圖都包含了一系列的參數的集合(/job:ps)和一份模型的賦值。而容器的機制就是在不同的圖之間共享變量:一旦某個變量構造完成,可選的container參數會由圖中每份復制的相同值來決定。對於較大的模型而言,這種方法會更加有效,畢竟整個圖更小了一點。
    這種方法使用多個tf.Session對象:每個worker過程都會包含一個,不過不同的Session會指向不同的目標worker。這個tf.Session對象即可以在單一的Python客戶端中創建,也可以在多個客戶端中創建。

術語

Client
一個典型的客戶端一般會構建一個TensorFlow的圖並且使用tensorflow::Session來完成與集群的交互。客戶端一般會用Python或者C++編寫,一般來說一個客戶端可以同時與多個服務端進行交互(參考上文的重復訓練),並且一個服務端也可以同時服務於多個客戶端。

Cluster
一個TensorFlow集群會包含一個或者多個TensorFlow的服務端,被切分為一系列命名的job,而每個job又會負責一系列的tasks。一個集群一般會專注於一個相對高層的目標,譬如用多台機器並行地訓練一個神經網絡。

Job
一個job會包含一系列的致力於某個相同目標的task。譬如,一個叫ps(意思是參數服務)的job會用於處理存儲於更新Variables相關的工作。而一個叫worker的job會用於承載那些用於計算密集型的無狀態節點。一般來說一個job中的tasks會運行在不同的機器中。

Master service
Master Service是一個RPC服務用於與一系列遠端的分布式設備進行交互。Master Service實現了tensorflow::Session 接口, 並且用來協調多個worker service。

Task
一個Task一般會關聯到某個單一的TensorFlow服務端的處理過程,屬於一個特定的job並且在該job的任務列表中有個唯一的索引。

TensorFlow server
用於運行grpc_tensorflow_server的處理過程,是一個集群中的一員,並且想外暴露了一個Master Service與一個Worker Service。

Worker service
一個執行部分TensorFlow圖部分內容的RPC服務。

 

————————————————————————————————————————————

運行實例

部署文件  mlp_mnist_dist.py 到安裝tensorflow的機器例如 10.100.208.23/24

分別在機器上運行

在 10.100.208.23上運行

python   mlp_mnist_dist.py  --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=ps --task_index=0

python   mlp_mnist_dist.py  --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=ps --task_index=1

在10.100.208.24上運行

python   mlp_mnist_dist.py  --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=worker --task_index=0

python   mlp_mnist_dist.py  --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=worker --task_index=1

 

啟動時需要互相依賴,所以會有連接失敗的error,待所連接的進程啟動后就可以了

運行后 ,只有一個worker會完成,日志如下

Extracting ./MNIST_data/train-images-idx3-ubyte.gz
Extracting ./MNIST_data/train-labels-idx1-ubyte.gz
Extracting ./MNIST_data/t10k-images-idx3-ubyte.gz
Extracting ./MNIST_data/t10k-labels-idx1-ubyte.gz
job : worker/0 step : 0 ,training accuracy : 0.96

——————————————————————————————————————————————

簡單測試用例:

  

import tensorflow as tf

def main() :
   with tf.device("/job:ps/task:0/cpu:0"):
      a = tf.Variable(1)
   with tf.device("/job:ps/task:0/cpu:1"):
      b = tf.Variable(1)
   with tf.Session("grpc://localhost:22223",config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True)) as sess:
      init_ab = tf.initialize_all_variables();
      sess.run(init_ab)
      result = sess.run(a+b)
      print(result)

if __name__ == '__main__':
main()

 

——————————————————————————————————————————————

問題解決:

1.如果並沒有 grpc_tensorflow_server

  可執行文件的話可以用 ./tensorflow/tools/dist_test/server/grpc_tensorflow_server.py 代替
2.在指定device時指定了cpu/gpu如果報錯

tensorflow.python.framework.errors.InvalidArgumentError: Cannot assign a device to node 'Variable_1': Could not satisfy explicit device specification '/job:ps/task:0/device:CPU:1' because no devices matching that specification are registered in this process; available devices: /job:ps/replica:0/task:0/cpu:0, /job:worker/replica:0/task:0/cpu:0
[[Node: Variable_1 = Variable[container="", dtype=DT_INT32, shape=[], shared_name="", _device="/job:ps/task:0/device:CPU:1"]()]]

解決方法 https://github.com/tensorflow/tensorflow/issues/2285

就是在生命session時加入參數 allow_soft_placement=True

 例如 :with tf.Session("grpc://localhost:22223",config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True)) as sess:

如果是用Supervisor 

   sess = sv.prepare_or_wait_for_session(server.target,config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True))

 

3.如果編譯時發生錯誤

ERROR: error loading package 'tensorflow/core/distributed_runtime/rpc': Extension file not found. Unable to load package for '//google/protobuf:protobuf.bzl': BUILD file not found on package path.

在源碼目錄下 執行 git submodule update --init  命令來下載依賴

https://github.com/bazelbuild/bazel/issues/605


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM