TensorFlow for distributed
本目錄包括了運行時分布式TensorFlow的實現,其底層使用了gRPC 作為進程內通信的支持庫。
Quick start
首先,需要構建一個TensorFlow的服務端可執行版本(grpc_tensorflow_server
) 以及一個基於gRPC的客戶端。目前只能基於源代碼進行自構建, 但是會包含在未來發布的二進制版本中。可以使用如下命令進行構建:
# CPU-only build. $ bazel --output_base=/data/tensorflow_distributed build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server bazel --output_base=/home/data/tensorflow_distributed build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server # GPU build.
$ cd /data/
tensorflow
$ bazel
--output_base=/data/tensorflow_distributed build -c opt --config=cuda //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server
如果是從最新的源代碼創建的Python依賴包,它會自動包含一個基於gRPC的客戶端。如果使用的是一個之前發布的二進制版本,需要根據這個安裝說明來重新編譯安裝。在你成功地構建了分布式的TensorFlow組件之后,可以通過如下方式來啟動服務器並且判斷你的安裝是否成功:
# Start a TensorFlow server as a single-process "cluster".
#
啟動示例1:
$
/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf1:2222;tf1:2223' --job_name=ps --task_index=0 &
$
/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf1:2222;tf1:2223' --job_name=ps --task_index=1 &
啟動示例2:
$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225' --job_name=ps --task_id=0 &(tf3上啟動)
$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225' --job_name=ps --task_id=1 &(tf4上啟動)
$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225' --job_name=wk --task_id=0 &(tf3上啟動)
$/data/tensorflow_distributed/tensorflow/bazel-out/local_linux-opt/bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server --cluster_spec='ps|tf3:2222;tf4:2223','wk|tf3:2224;tf4:2225' --job_name=wk --task_id=1 &(tf4上啟動)
然后啟動Python的交互器並且啟動一個Session:
$ python
>>> import tensorflow as tf >>> c = tf.constant("Hello, distributed TensorFlow!") >>> sess = tf.Session("grpc://localhost:2222") >>> sess.run(c) 'Hello, distributed TensorFlow!'
補充:安裝完后需要重新登錄服務器,否則出現無法import tensorflow

集群定義
命令行參數 grpc_tensorflow_server
定義了集群之間的關系. 參數 --cluster_spec
決定了集群中工作對象的多少, 譬如有一系列的jobs, 而每個jobs又包含了多個task 終端。 所有集群中的處理過程都必須設置相同的 --cluster_spec
參數, 例子如下:
`local\ | localhost:2222` | /job:local/task:0 |
|
`local\ | localhost:2222;localhost:2223` | /job:local/task:0 `/job:local/task:1` |
|
`worker\ | worker0:2222;worker1:2222;worker2:2222,``ps\ | ps0:2222;ps1:2222` | /job:worker/task:0 `/job:worker/task:1/job:worker/task:2 /job:ps/task:0`/job:ps/task:1 |
--cluster_spec='...'
|
Available tasks
|
---|
還有 --job_name
與 --task_index
標志位指明了哪些任務會運行在當前處理過程上。 具體而言,--job_name=local --task_index=0
意思就是該過程會被標志為/job:local/task:0
, 然后所有在該過程上的TensorFlow的設備都會使用這個前綴。
N.B.
手動來指明這些運行參數可能是非常冗長的,特別是對一個大型集群而言。我們正在研發可以程式化啟動的工具,譬如使用一些類似於Kubernetes集群管理器。如果有啥集群管理工具你覺得挺好的希望加入進來,可以在GitHub issue上提出你的建議。
標注模型中的分布式設備
為了將某個操作放在某個特殊的處理過程上,在分布式環境下依然可以使用tf.device()
函數,之前是用來指明是放在CPU還是GPU上的。譬如:
with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...) with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...) with tf.device("/job:worker/task:7"): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ... with tf.Session("grpc://worker7:2222") as sess: for _ in range(10000): sess.run(train_op)
在上面的例子中,Variables在job ps
的兩個task上被創建,然后計算密集型的部分創建在job work
上。TensorFlow會自動地在不同的job之間傳輸數據。(從job
到work
是前向傳遞,而從worker
到ps
是梯度應用)。
Replicated Computation
一個常見的訓練配置(數據並行訓練)包含了job ps
上共享參數以及job work
上的多個任務來訓練相同的模型。每個task一般會運行在不同的機器上。現在還是有很多辦法可以在TensorFlow中來實現這一種結構的,我們未來也會提供更簡單的實現方式,主要途徑有:
-
構建單一的包含了一系列參數的圖(in
tf.Variable
nodes pinned to/job:ps
), 並且創建多個模型的副本來映射到/job:worker
中的不同tasks。每個model的副本有一個不同的train_op
,並且對於每個workeri
而言一個或者多個的客戶端線程可以調用sess.run(train_ops[i])
。這種方法使用了單一的tf.Session
,它的工作目標是集群中的某個workers。 -
As above, but where the gradients from all workers are averaged. See the
CIFAR-10 multi-GPU trainer
for an example of this form of replication. The implements synchronous training
-
另一種分布式訓練器的方法使用多張圖,一張圖對應一個worker,並且每張圖都包含了一系列的參數的集合(
/job:ps
)和一份模型的賦值。而容器的機制就是在不同的圖之間共享變量:一旦某個變量構造完成,可選的container
參數會由圖中每份復制的相同值來決定。對於較大的模型而言,這種方法會更加有效,畢竟整個圖更小了一點。
這種方法使用多個tf.Session
對象:每個worker過程都會包含一個,不過不同的Session會指向不同的目標worker。這個tf.Session
對象即可以在單一的Python客戶端中創建,也可以在多個客戶端中創建。
術語
Client
一個典型的客戶端一般會構建一個TensorFlow的圖並且使用tensorflow::Session
來完成與集群的交互。客戶端一般會用Python或者C++編寫,一般來說一個客戶端可以同時與多個服務端進行交互(參考上文的重復訓練),並且一個服務端也可以同時服務於多個客戶端。
Cluster
一個TensorFlow集群會包含一個或者多個TensorFlow的服務端,被切分為一系列命名的job,而每個job又會負責一系列的tasks。一個集群一般會專注於一個相對高層的目標,譬如用多台機器並行地訓練一個神經網絡。
Job
一個job會包含一系列的致力於某個相同目標的task。譬如,一個叫ps
(意思是參數服務)的job會用於處理存儲於更新Variables相關的工作。而一個叫worker
的job會用於承載那些用於計算密集型的無狀態節點。一般來說一個job中的tasks會運行在不同的機器中。
Master service
Master Service是一個RPC服務用於與一系列遠端的分布式設備進行交互。Master Service實現了tensorflow::Session
接口, 並且用來協調多個worker service。
Task
一個Task一般會關聯到某個單一的TensorFlow服務端的處理過程,屬於一個特定的job並且在該job的任務列表中有個唯一的索引。
TensorFlow server
用於運行grpc_tensorflow_server的處理過程,是一個集群中的一員,並且想外暴露了一個Master Service與一個Worker Service。
Worker service
一個執行部分TensorFlow圖部分內容的RPC服務。
————————————————————————————————————————————
運行實例
部署文件 mlp_mnist_dist.py 到安裝tensorflow的機器例如 10.100.208.23/24
分別在機器上運行
在 10.100.208.23上運行
python mlp_mnist_dist.py --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=ps --task_index=0
python mlp_mnist_dist.py --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=ps --task_index=1
在10.100.208.24上運行
python mlp_mnist_dist.py --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=worker --task_index=0
python mlp_mnist_dist.py --ps_hosts=10.100.208.23:2222,10.100.208.23:2223 --worker_hosts=10.100.208.24:2224,10.100.208.24:2225 --job_name=worker --task_index=1
啟動時需要互相依賴,所以會有連接失敗的error,待所連接的進程啟動后就可以了
運行后 ,只有一個worker會完成,日志如下
Extracting ./MNIST_data/train-images-idx3-ubyte.gz
Extracting ./MNIST_data/train-labels-idx1-ubyte.gz
Extracting ./MNIST_data/t10k-images-idx3-ubyte.gz
Extracting ./MNIST_data/t10k-labels-idx1-ubyte.gz
job : worker/0 step : 0 ,training accuracy : 0.96
——————————————————————————————————————————————
簡單測試用例:
import tensorflow as tf
def main() :
with tf.device("/job:ps/task:0/cpu:0"):
a = tf.Variable(1)
with tf.device("/job:ps/task:0/cpu:1"):
b = tf.Variable(1)
with tf.Session("grpc://localhost:22223",config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True)) as sess:
init_ab = tf.initialize_all_variables();
sess.run(init_ab)
result = sess.run(a+b)
print(result)
if __name__ == '__main__':
main()
——————————————————————————————————————————————
問題解決:
1.如果並沒有 grpc_tensorflow_server
可執行文件的話可以用 ./tensorflow/tools/dist_test/server/grpc_tensorflow_server.py 代替
2.在指定device時指定了cpu/gpu如果報錯
tensorflow.python.framework.errors.InvalidArgumentError: Cannot assign a device to node 'Variable_1': Could not satisfy explicit device specification '/job:ps/task:0/device:CPU:1' because no devices matching that specification are registered in this process; available devices: /job:ps/replica:0/task:0/cpu:0, /job:worker/replica:0/task:0/cpu:0
[[Node: Variable_1 = Variable[container="", dtype=DT_INT32, shape=[], shared_name="", _device="/job:ps/task:0/device:CPU:1"]()]]
解決方法 https://github.com/tensorflow/tensorflow/issues/2285
就是在生命session時加入參數 allow_soft_placement=True
例如 :with tf.Session("grpc://localhost:22223",config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True)) as sess:
如果是用Supervisor
sess = sv.prepare_or_wait_for_session(server.target,config=tf.ConfigProto(allow_soft_placement=True,log_device_placement=True))
3.如果編譯時發生錯誤
ERROR: error loading package 'tensorflow/core/distributed_runtime/rpc': Extension file not found. Unable to load package for '//google/protobuf:protobuf.bzl': BUILD file not found on package path.
在源碼目錄下 執行 git submodule update --init 命令來下載依賴