使用浪潮AI計算平台之分布式計算(Tensorflow框架下 PS/Worker模式下的異步計算)


雖然Tensorflow一直都是支持分布式計算的,但是由於只有一台電腦,一個GPU,所以別說分布式的tensorflow的使用了,就是單機多卡都是沒有使用過的,由於后來可以有機會使用這個浪潮的AI計算平台於是在上面試了試tensorflow的分布式計算,這里說的是其中的PS/Worker模式下的異步計算。

 

 

看下圖,可以知道這個平台是支持單機計算,以及分布式的 PS/Worker模式和MPI模式的:

 

 

其中使用的代碼為:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import os
import argparse
from tensorflow.examples.tutorials.mnist import input_data
import tensorflow as tf

FLAGS = None

def weight_variable(shape):
    init = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(init)


def bias_variable(shape):
    init = tf.constant(0.1, shape=shape)
    return tf.Variable(init)


def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding="SAME")


def max_pool(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")


def main(_):

    tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
    task_config = tf_config.get('task', {})
    task_type = task_config.get('type')
    task_index = task_config.get('index')

    FLAGS.job_name = task_type
    print('job_name:%s' %(task_type))
    FLAGS.task_index = task_index

    #ps_hosts = FLAGS.ps_hosts.split(",")
    #worker_hosts = FLAGS.worker_hosts.split(",")

    cluster_config = tf_config.get('cluster', {})
    ps_hosts = cluster_config.get('ps')
    worker_hosts = cluster_config.get('worker')
    
    ps_hosts_str = ','.join(ps_hosts)
    worker_hosts_str = ','.join(worker_hosts)
    
    FLAGS.ps_hosts = ps_hosts_str
    FLAGS.worker_hosts = worker_hosts_str
    
    # Construct the cluster and start the server
    ps_spec = FLAGS.ps_hosts.split(",")
    worker_spec = FLAGS.worker_hosts.split(",")
    
    # Get the number of workers.
    num_workers = len(worker_spec)


    # Create a cluster from the parameter server and worker hosts.
    #cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})

    # Create and start a server for the local task.
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index,
                             start=True)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        # Import data
        mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
        
        FLAGS.batch_size = 550
        step = mnist.train.num_examples / 500 - 1
        print("train examples: %d, step: %d" % (mnist.train.num_examples, step) )
        
        

        with tf.device(tf.train.replica_device_setter(
                #worker_device="/job:worker/task:%d" % FLAGS.task_index,
                worker_device="/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, 0),
                cluster=cluster)):

            x = tf.placeholder(tf.float32, [None, 784])
            y_actual = tf.placeholder(tf.float32, [None, 10])
            keep_prob = tf.placeholder(tf.float32)


            x_image = tf.reshape(x, [-1, 28, 28, 1])

            W_conv1 = weight_variable([5, 5, 1, 32])
            b_conv1 = bias_variable([32])
            h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)  
            h_pool1 = max_pool(h_conv1) 

            W_conv2 = weight_variable([5, 5, 32, 64])
            b_conv2 = bias_variable([64])
            h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)  
            h_pool2 = max_pool(h_conv2)  

            W_fc1 = weight_variable([7 * 7 * 64, 1024])
            b_fc1 = bias_variable([1024])
            h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])  # reshape
            h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)  

            # dropout
            h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)  # dropout

            W_fc2 = weight_variable([1024, 10])
            b_fc2 = bias_variable([10])
            y_predict = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)  # softmax, [-1, 10]

            cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_actual * tf.log(y_predict), 1))  
            global_step = tf.train.get_or_create_global_step()
            #global_step = tf.train.get_global_step()
            optimizer = tf.train.GradientDescentOptimizer(FLAGS.learning_rate)
            train_op = optimizer.minimize(cross_entropy, global_step=global_step)

            cross_prediction = tf.equal(tf.argmax(y_predict, 1), tf.argmax(y_actual, 1))
            accuracy = tf.reduce_mean(tf.cast(cross_prediction, tf.float32))  

            # tensorboard
            tf.summary.scalar('cost', cross_entropy)
            tf.summary.scalar("accuracy", accuracy)
            summary_op = tf.summary.merge_all()

            # The StopAtStepHook handles stopping after running given steps.
            #hooks = [tf.train.StopAtStepHook(last_step=400)]
            hooks = [tf.train.StopAtStepHook(last_step=(50000+step))]

            config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False,
                #device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index]
                device_filters=["/job:ps", "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, 0)]
            )

            # The MonitoredTrainingSession takes care of session initialization,
            # restoring from a checkpoint, saving to a checkpoint, and closing when done
            # or an error occurs.
            # master="grpc://" + worker_hosts[FLAGS.task_index]
            
            with tf.train.MonitoredTrainingSession(master=server.target,
                                                   config=config,
                                                   is_chief=(FLAGS.task_index == 0),
                                                   hooks=hooks,
                               max_wait_secs = 120) as mon_sess:
                while not mon_sess.should_stop():
                  
                    # Run a training step asynchronously.
                    # See `tf.train.SyncReplicasOptimizer` for additional details on how to
                    # perform *synchronous* training.
                    # mon_sess.run handles AbortedError in case of preempted PS.
                    #batch_x, batch_y = mnist.train.next_batch(64)
                    batch_x, batch_y = mnist.train.next_batch(FLAGS.batch_size)
#                    step, _ = mon_sess.run([global_step, train_op], feed_dict={
#                        x: batch_x,
#                        y_actual: batch_y,
#                        keep_prob: 0.8})
                    

                    #print("global_step: %f" % step)
                    #if step > 0 and step % 10 == 0:
                    step, _, loss, acc = mon_sess.run([global_step, train_op, cross_entropy, accuracy], feed_dict={
                        x: batch_x,
                        y_actual: batch_y,
                        keep_prob: 1.0})
                    print("step: %d, loss: %f, acc: %f" % (step, loss, acc))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.register("type", "bool", lambda v: v.lower() == "true")
    # Flags for defining the tf.train.ClusterSpec

    parser.add_argument(
        "--data_dir",
        type=str,
        default="/home/guojun/MNIST_data/",
        help="data directory"
    )
    parser.add_argument(
        "--batch_size",
        type=int,
        default=600,
        help="batch size"
    )
    parser.add_argument(
        "--max_step",
        type=int,
        default=100,
        help="step num"
    )

    parser.add_argument(
        "--ps_hosts",
        type=str,
        default="",
        help="Comma-separated list of hostname:port pairs"
    )
    parser.add_argument(
        "--worker_hosts",
        type=str,
        default="",
        help="Comma-separated list of hostname:port pairs"
    )
    parser.add_argument(
        "--job_name",
        type=str,
        default="",
        help="One of 'ps', 'worker'"
    )
    # Flags for defining the tf.train.Server
    parser.add_argument(
        "--task_index",
        type=int,
        default=0,
        help="Index of task within the job"
    )

    parser.add_argument(
#        "--checkpoint_dir",
        "--model_save_path",
        type=str,
        default="/testsoft01/models/logs/log_tf_mnist_dist/",
        help="path to a directory where to restore variables."
    )

    parser.add_argument(
        "--learning_rate",
        type=float,
        default=0.001,
        help="learning rate"
    )

    FLAGS, _ = parser.parse_known_args()
    tf.app.run(main=main)

這里不對代碼做解釋,這是比較簡單的PS/Worker模式下的異步計算代碼。

 

 

 

 

給出配置:(以三台服務器為示例,每個進程分別運行在一台服務器上,IP及端口分別為

ps進程:           "10.233.113.70:2222",
worker進程:       "10.233.112.166:2223","10.233.91.193:2223"

 

環境變量設置如下:

export TF_CONFIG='{"cluster":{"ps":["10.233.113.70:2222"],"worker":["10.233.112.166:2223","10.233.91.193:2223"]},"task":{"type":"ps","index":0},"environment":"cloud"}'

export TF_CONFIG='{"cluster":{"ps":["10.233.113.70:2222"],"worker":["10.233.112.166:2223","10.233.91.193:2223"]},"task":{"type":"worker","index":0},"environment":"cloud"}'

export TF_CONFIG='{"cluster":{"ps":["10.233.113.70:2222"],"worker":["10.233.112.166:2223","10.233.91.193:2223"]},"task":{"type":"worker","index":1},"environment":"cloud"}'

 

 

如果指定某個進程對GPU的使用限制時使用如下配置:

不使用顯卡,      export CUDA_VISIBLE_DEVICES=-1

使用0號顯卡,    export CUDA_VISIBLE_DEVICES=0

使用1號顯卡,    export CUDA_VISIBLE_DEVICES=1

 

 

 

 

 

 

 

 

=========================================================

 

 第一種類型試驗:

task為worker的兩個task同時運行在同一台電腦上(這台電腦有2個GPU),每個task分別使用一個GPU;task為ps的task運行在另一台電腦上:
運行異步aggregate(A電腦運行2個worker,每個worker使用一個GPU; B電腦為ps)
31m12.532s (ps 做匯聚時使用CPU運算, CPU有 90%利用率)
31m21.559s (ps 做匯聚時使用CPU運算, CPU有 90%利用率)


 


38m21.106s (ps 做匯聚時使用GPU運算 )
38m11.901s (ps 做匯聚時使用GPU運算 )
38m9.435s   (ps 做匯聚時使用GPU運算 )
38m35.333s (ps 做匯聚時使用GPU運算 )
38m30.296s (ps 做匯聚時使用GPU運算 )
38m24.708s (ps 做匯聚時使用GPU運算 )
39m34.298s (ps 做匯聚時使用GPU運算 )
39m30.530s (ps 做匯聚時使用GPU運算 )
39m18.943s (ps 做匯聚時使用GPU運算 )
39m38.846s (ps 做匯聚時使用GPU運算 )

 

可以看到ps進程使用  CPU做匯聚時竟然比GPU做匯聚時效率高,當然這里由於只使用了兩個Worker並不能完全說明問題,但是多個分布的worker不太有條件也就只能做到這里了。個人觀點是分布式的數據需要相互交換,從網卡出來的數據傳到內存的數據比較快,而傳到顯卡的速度會稍微慢些,個人感覺這里的顯卡可能並沒有使用nccl, 這樣的話網卡數據可以直接傳到顯存中而不用再通過內存了,這樣的話使用GPU做匯聚的時間可能就不會和CPU做匯聚有顯着差距了,當然這也只是個人的猜測(計算平台明確說明使用了nccl,所以這里為什么gpu做匯聚效率不及cpu還是難以說清的)。當然這個結果還是比較有限的,如果你不是使用分布式的計算而是單機多卡,而且卡比較多的話還是推薦使用horovod框架的,畢竟

Ring Allreduce 這個東西理論上是OK的,但實際效果沒用過。

在這個AI計算平台上horovod框架指的就是MPI模式的分布式計算。

 

 

 

---------------------------------------------------

 

 

第二種試驗:

 

task類型為worker的兩個task分別運行在兩台電腦,每個worker利用1個GPU;task類型為ps的task運行在另一台電腦:
運行異步aggregate(A電腦1個worker,並使用一個GPU, B電腦1個worker,並使用一個GPU, C電腦為ps)

27m31.965s (ps CPU運算 100%) (1個CPU)
28m7.096s (ps CPU運算 100%) (1個CPU)
28m54.965s (ps CPU運算 100%) (1個CPU)
28m55.451s (ps CPU運算 100%) (1個CPU)
29m0.394s (ps CPU運算 100%) (1個CPU)
26m30.772s (ps CPU運算 100%) (1個CPU)
26m45.423s (ps CPU運算 100%) (1個CPU)
27m2.666s (ps CPU運算 100%) (1個CPU)

22m52.598s(ps CPU運算   )  (多個CPU)
23m27.257s (ps CPU運算  )  (多個CPU)
23m21.331s(ps CPU運算   )  (多個CPU)
23m35.570s(ps CPU運算   )  (多個CPU)
23m5.596s  (ps CPU運算   )  (多個CPU)

 

 

 


27m19.363s (ps GPU運算)
27m8.633s   (ps GPU運算)
27m8.849s   (ps GPU運算)
27m40.217s (ps GPU運算)
27m26.957s (ps GPU運算)
27m19.115s (ps GPU運算)

 

可以看到第二種試驗是將第一種試驗中的運行在同一個電腦上的worker分別運行在兩個電腦上,這樣竟然提高了計算速度。

個人觀點是兩個worker在同一個電腦上對於網絡傳輸的負載較大,所以導致了網絡傳輸的速度慢了下來,這樣也就解釋了第一種試驗中PS上用CPU做匯聚時只使用了90%,因為兩個worker占用一個網卡傳輸數據但是整體的傳輸速度慢了下來,ps上做匯聚時存在等待數據的情況,於是沒有把CPU跑滿。而在第二種試驗中兩個worker分別在不同電腦上所以傳輸數據較快,ps電腦上利用CPU做匯聚時不需要等待,因為worker傳輸數據已經傳過來了,網絡傳輸數據的數據快於ps上CPU的匯聚速度,於是ps上CPU跑滿了,速度也就提升了。在第二種試驗中,ps上使用CPU做匯聚和使用GPU做匯聚也存在一定速度差異,gpu做匯聚操作還是要差於cpu做匯聚,說明網絡傳輸的數據到底是傳輸到內存還是顯存還是會一定程度的影響計算效率的。

 

這里傳輸的數據的數據量沒有過大還在CPU的計算能力范圍內,如果網絡數據的傳輸速度足夠快,而且數據足夠大時(大於CPU匯聚速度並且小於單GPU顯存時)GPU的速度會快於CPU匯聚速度,但是在數據傳輸速度足夠快而數據量過大時(大於GPU顯存時)那么只能使用CPU做匯聚了。

 

在第二種試驗中依然可以看到GPU做匯聚時要慢於CPU,個人觀點依然是數據從網卡傳輸到顯卡的速度要稍微慢於網卡數據傳輸到內存的速度(或許網卡數據不能直接傳輸到顯存還是要中間經過內存呢,這樣是不是nccl並沒有真的支持呢)。

 個人猜測,在網絡速度足夠快的理想條件下(待匯聚的速度足夠多),數據量在顯存可以容納的情況下,GPU做匯聚的速度會隨着數據量增加而提升的。

 

 

---------------------------------------------------

 

 

第三種試驗:

 

task為worker的兩個task運行在同一台電腦(2個GPU),每個task利用一個GPU,task為ps的task運行在同一台電腦:
運行異步aggregate (A電腦2個worker,每個worker一個GPU, A電腦也運行task類型為ps的進程,且ps只使用CPU做匯聚操作)
12m31.103s
12m15.255s
12m11.336s
12m15.212s
10m27.511s
10m38.285s
10m32.073s
10m36.449s
10m20.578s
10m25.668s
10m49.644s
10m54.085s
10m33.264s
10m59.180s
10m47.720s
10m11.117s

 

第三種試驗與第一,二種試驗不同的地方是把PS端和worker端放在了同一個電腦上,ps和worker之間的數據傳輸不通過網卡而是直接在內核層進行交換。發現第三種試驗的速度是前三種中最快的。前三種試驗之所以速度不同是因為數據的網絡傳輸導致的,不經過網卡的數據傳輸是最快的;每個worker都獨占一個網卡進行數據傳輸是第二快的,兩個worker共享一個網卡來傳輸數據是最慢的。可以看到由於網絡傳輸造成的速度差距有幾倍之多,可以講能單機多卡的情況下絕對不使用多機多卡的。

 

 

 

這里給個網絡測速的代碼,因為服務器上不能連接外網不能安裝測速軟件,於是自己寫了個簡單的網絡傳輸代碼:

服務端,數據接收端:

import socket

sk = socket.socket()
ip_port=('0.0.0.0',9999)
sk.bind(ip_port)
sk.listen(5)

s = 8096*4048

for _ in range(1):
    conn,address = sk.accept()
    while True:
        data = conn.recv(s)
        #print(data)
        if not data:
            break
    print('文件接收完成')
sk.close()

 

 

客戶端,數據發送端:

 

import socket
import time


sk = socket.socket()
#ip_port=('10.233.119.52',9999)
ip_port=('127.0.0.1',9999)
sk.connect(ip_port)


with open('xxx.tar','rb') as f:
data = f.read()

start = time.time()
sk.send(data)
end = time.time()

print('over')
print("time", end-start)

 

 

可以看到,這里的傳輸數據為44GB大,傳輸數據時把數據完全加載到服務器上(服務器內存不到500GB)。

 

發送端和接收端都在同一主機上(數據傳輸不經過網卡而是經過內核時):

time:(秒)

1.0947113037109375

1.9611408710479736

1.1916041374206543

1.6991233825683594

1.500579833984375

 

 

 平均傳輸速度為29.808885595040923 GB/s

 

 

 

 

 

發送端和接收端分別在兩個主機上(數據傳輸經過網卡時):

平均傳輸速度為9.711139239030665 GB/s

 

可以看到經過網卡的數據傳輸速度是不經過網卡的3分之一

 

 

而這個速度是什么概念呢,可以參照PCIE3.0*16的速度:

PCIE3.0*16 通道單方向速度為16GB/s。

 

也就是說經過網卡的數據傳輸速度9.71 GB/s比PCI3.0*8的單向速度8GB/s快些,但是遠低於PCI3.0*16的單向速度16GB/s。而不經過網卡的數據傳輸,從內存經過內核再回到內存的速度為 29.80 GB/s 比內存和顯存之間的PCIE*16的速度 16GB/s 還要快出很多。這也解釋了試驗第一種時PS上用CPU做匯聚時速度稍微快於GPU做匯聚的速度。

 

 

 

9.71 GB/s 的網絡傳輸速度 也就意味着  網卡的傳輸速度在 9.71*8Gb/s=77.68Gb/s 以上,由於是估計的算法,保守的看網卡速度在 50Gb/s  以上,從服務器的網卡配置上來看這個速度也是可以的了,但是這個速度的網卡還是不能完全發揮出多機顯卡的性能。

 

但是,單機多卡的話可能顯卡的卡槽還是有限的,這個平台上單機多卡的話最多是8卡,如果再多的卡就只能多機多卡了。

 

多機網絡傳輸速度在  PCI3.0*8 這個級別上, 雖然達不到  PCI3.0*16的速度,但是現在來看也是極高的配置了,網絡傳輸速度如果想要達到PCI3.0*16這個級別不僅是網卡,網絡協議,甚至主板南橋的傳輸速度都是一個天然的限制(一般主板南橋和北橋的數據交互通道中   是硬盤和網卡共享帶寬的)。

 

 雖然內存數據通過內核回到內存的速度高於PCIE3.0*16, 但是GPU數據到內存或GPU間數據傳輸的最大速度還是PCIE3.0*16, 也就是說如果理想情況下,單機多卡的數據傳輸速度最高為PCIE3.0*16,  而多機多卡的(多機間速度傳輸速度)最高為PCIE3.0*8。

 

 

---------------------------------------------------

 

 

第四種試驗:

 

 

task為worker的一個task運行在一台電腦(1個GPU),每個task利用一個GPU,task為ps的task運行在同一台電腦:
運行異步aggregate (A電腦運行1個worker類型task,worker類型task使用一個GPU; 同時A電腦也運行ps類型task,ps類型task使用CPU做匯聚操作)
24m59.654s
24m6.746s
23m22.713s
22m26.681s
24m36.398s
24m3.182s
22m54.110s
19m37.633s
20m27.468s
21m8.842s
20m46.086s
19m23.438s
21m21.057s
21m21.145s
20m11.565s
21m25.062s
21m18.727s
21m16.270s
21m13.142s
22m19.513s
21m27.789s
21m37.047s

 

第三種試驗和第四種試驗都是worker類型task和ps類型task運行在同一台電腦上,並且匯聚操作也只使用CPU,也就是說第三種和第四種的試驗worker與ps之間的數據傳輸沒有經過網卡,worker的梯度數據由GPU通過PCIE3.0*16直接傳輸到內存,ps用傳輸到內存的梯度更新網絡參數並將更新好的參數從內存通過PCIE3.0*16傳輸會GPU顯存中。第三種和第四種試驗中可以看到ps和worker運行在同一電腦上時運行加速比約為worker的個數。也就是說當PS和WORKER在同一主機上時,隨着worker個數的增加運算時間成倍的縮減。一個worker時時20分鍾,兩個worker時時10分鍾,那么4個worker時就是5分鍾。

 

於是又給第三、四種試驗補充了一個試驗,就是4個worker,1個ps在同個主機上,測試運行時間:

 

 

 

 

 

 

 

可以看到最后的結果也是5分鍾多些,大致符合前面的猜測(運行加速比等於worker數),但是這種情況不會一直隨着worker數的增加而增加,因為增加到一定數量的worker后,CPU或GPU做匯聚操作的速度將會小於worker提供梯度的速度從而導致加速比下降。當然一般我們也不會一直在一個主機上增加worker數,因為一個主機的顯卡數是固定的(8卡,10卡基本就差不多這個數了),如果顯卡的顯存足夠雖然我們可以在一個顯卡上同時跑多個worker但是這樣的設計本身就不對,如果要要在一個顯卡上跑多個worker的話我們不如盡量的在一個顯卡上跑較少的worker(如1個worker, 2個worker),而每個worker我們可以盡量的增加它的batch_size, 這樣的話既提高了顯卡的利用率又減少了ps進程做匯聚的工作負載。

 

 

---------------------------------------------------

 

 

 

第五種試驗:

 

task為worker的兩個task運行在兩台電腦,每個task利用一個GPU,task為ps的task運行在其中一台電腦:
運行異步aggregate(A電腦1個worker,並使用一個GPU; B電腦1個worker,並使用一個GPU; A電腦上同時運行task為ps的進程)
17m17.498s
16m16.028s
16m9.360s
14m36.767s
15m39.217s
14m33.790s
15m28.215s
15m19.840s
14m33.529s
14m50.261s
15m40.956s
13m59.947s
15m32.166s
14m43.113s
15m17.454s
14m54.340s
14m42.791s
14m34.187s
14m24.599s

 

第三種試驗中兩個task和ps運行在同一個電腦上用時約11min;第二種試驗中兩個worker分別運行在兩個電腦上,ps運行在第三台電腦上,用時約28min;而在第五種試驗中ps和一個worker運行在一台電腦上,另一個worker運行在第二台電腦上,用時約15min。

 

 

---------------------------------------------------

 

 

第六種試驗:

 

task為worker的一個task運行在一台電腦,並使用一個GPU;task為ps的task運行在另一台電腦:
運行異步aggregate(A電腦1個worker,並使用一個GPU, B電腦運行task類型為ps)

 

44m21.566s

45m6.509s

45m25.585s

45m53.505s

45m49.932s

46m26.030s

 

 

================================================

 

 

根據上面的試驗我們知道:

ps和worker在一個主機上時,一個worker用時22min, 兩個worker用時11min,四個worker用時5.5min。

ps和worker, worker和worker之間都不在一台主機上時, 一個worker用時45min,兩個worker用時23min。(如果一個worker和ps在一台主機上,第二個worker在另一個主機上時用時16min)

 

如果兩個worker在一個主機上,ps在另一個主機上時用時32min。

示意圖如下:

 

 

 

-------------------------------------------

其中,一個worker和ps在一個主機,第二個worker在另一個主機時耗時16min,而兩個worker和ps均在同一主機時用時22min,這樣的話我們可以假設worker和ps在同一個主機時處理數據的速率為a, 而worker和ps不在一個主機時數據的處理速率為a*x, 那么假設總工作量為s,可得  s/2a=11min,  s/(a+ax)=16min, 那么x=3/8=0.375,

那么s/(a*x)=47min, 而實際中一個worker時,且worker和ps不在同一主機時用時45min, 由於這里的計算本身就是使用少數試驗來估計的,所以姑且可以認為對x的估計是相當的或者說是基本准確的。

 

既然ps和worker不在一個主機,worker之間也不在一個主機情況下,只有一個worker時需要耗時45min,兩個worker時需要耗時23min,可以看到加速比略微小於worker數,當worker數為2時加速比45/23=1.95, 此時加速比與worker數的比值為1.95。如果worker數為3時加速比與worker數的比值仍為1.95/2,那么worker為3時加速比為2.93,耗時為預估為15.33min。當然,這里得到15.33min是假設加速比與worker數的比值不隨worker數增加而改變的,具體 我們可以再加一種試驗。

 

 

第七種試驗

ps和task,task和task均各自在一個主機上運行,這里的具體設置為A電腦運行ps(只使用CPU), B電腦運行worker0(使用GPU),  C電腦運行worker1(使用GPU), D電腦運行worker2(使用GPU)。

 

 

 

 

 

 

可以看到第七種試驗得到的結果與我們剛才的假設(如果worker數為3時加速比與worker數的比值仍為1.95/2,那么worker為3時加速比為2.93,耗時為預估為15.33min)基本吻合。也就是說ps和worker,worker與worker均各自獨占一台物理主機時加速比與worker數相當。而通過三、四種試驗我們可以看到ps與worker均在同一主機時加速比也與worker數相當。但是,同時我們也可以發現worker個數相同時,ps和worker在同一主機的運算效率是ps與worker,worker與worker均獨占一個主機的運算效率的2倍左右,在第六種試驗中我們假設過這個倍數的數值約為8/3=2.66, 這里看的話這個估計的倍數還是不准的,8/4=2倍,這個2倍的倍數估計是更為貼近的。

 

 

但是通過2,6,7種試驗,ps與worker均獨占一個主機時,一個worker用時45min,兩個worker用時23min,三個worker用時16min, 雖然加速比近似於worker個數,但是如果ps所在主機的網卡速率最高為PCIE3.0*8的,多個worker都是共享這個帶寬的,如果這個帶寬是瓶頸的話加速比不應該近似於worker個數的,對此也是不好解釋的。

因為網卡最高的速率是固定的,一個網卡最高提供PCI3.0*8的速度,而這個速度是所有網絡連接共享的,而不是GPU顯存和內存之間各自獨占PCIE3.0*16或*8的通道),因此再多的worker所提供的數據對於ps來說應該是沒有辦法提升速度的,而這個事情這里就互相違背了,所以還是沒法解釋這個問題。

 

 

 

 

-----------------------------------------

 試驗1中 : 

 主機A運行PS,主機B運行兩個worker,每個worker分別使用一個GPU,  用時32分多些。

再加一個試驗:   

 主機A運行PS,主機B運行兩個worker,每個worker分別使用一個GPU,  主機C運行兩個worker,每個worker分別使用一個GPU, 用時16分多些:

 

 

 也就是說worker與ps在同一個主機的情況下加速比約等於worker數;worker與ps不在一個主機上,worker所在的主機同樣克隆的話(每個worker所在主機具有相同worker個數),那么加速比約等於worker所在的主機個數。

 

 worker與ps在同一個主機速度最快; worker與ps不在同一個主機,但是在總worker數固定的情況下worker所在的主機上worker數越少(worker主機越多),那么速度越快。且加速比都是線性的(在worker數較少的情況下)。

 --------------------------------------------------------------------------------

ps和worker不在一個主機時:

ps運行在A主機,只有一個worker運行在B主機時,用時45min;

ps運行在A主機,兩個worker運行在B主機時,用時32min;

 

 

ps運行在A主機,三個worker運行在B主機時,用時 29min;

 

 

 

 

ps運行在A主機,四個worker運行在B主機時,用時28min;

 

 

 

可以看到如果多個worker運行在同一個主機上時加速性能非常差,隨着worker數的增加加速性趨於停滯。

 

 

------------------------------------------------------------------------------

給出下面的一種試驗:

ps獨立運行在A電腦上(只使用CPU)

 

worker0運行在B電腦上(使用GPU)

 

worker1運行在C電腦上(使用GPU0)

worker2運行在C電腦上(使用GPU1)

 

 worker3運行在D電腦上(使用GPU0)

 worker4運行在D電腦上(使用GPU1)

 worker5運行在D電腦上(使用GPU2)

 worker6運行在D電腦上(使用GPU3)

 

 根據前面的現象:多個worker運行在同一個主機上時加速性能非常差,隨着worker數的增加加速性趨於停滯。

只有一個worker運行在獨立主機時需要45min,兩個worker運行在同一主機時需要32min,四個worker運行在同一主機時需要28min,而ps端不會因為worker的增加而影響匯聚操作的性能的話,可得:

 

 

也就是說這次的試驗,如果ps端不會因為worker的增加而影響匯聚操作的性能的話,可以預估時間為11.21min。而下面的試驗發現,預估的時間基本相仿:

 

 

 

 

 

 

 

 可以看出7個worker都不和ps在一個主機上,這時用11min多些時間,還沒有ps和worker在同一主機但是有兩個worker的速度快。

 

----------------------------------------------------

 

 

給出下面的一種試驗:

ps獨立運行在D電腦上(只使用CPU)

 

worker0運行在B電腦上(使用GPU)

 

worker1運行在C電腦上(使用GPU0)

worker2運行在C電腦上(使用GPU1)

 

 worker3運行在D電腦上(使用GPU0)

 worker4運行在D電腦上(使用GPU1)

 worker5運行在D電腦上(使用GPU2)

 worker6運行在D電腦上(使用GPU3)

 

 如果上面的預估方式正確,那么這個預估時間:

 

 

 

 

 

 

 

 

可以看到最終的結果還是要比預估的高些,這個運行時間與 ps和worker同一主機,共運行4個worker的情況是相當的,一個ps與4個worker共同主機耗時5分30左右,由此可以看到這里使用7個顯卡的運算效率與4個顯卡的運算效率相當,這也進一步說明了如果我們不會選擇合適的多主機配置極有可能無法得到進一步的性能提升,比如這里我們其實相當於浪費了三個顯卡。

總之,還是選擇一個適合的主機配置才對。

 

 分布式多主機時因為我們盡量要把ps放在性能性能最高的worker主機上,如果各個worker主機性能不同就可能會浪費掉性能較弱主機的性能,就像這里弱性能(2個GPU,1個GPU)的主機基本就沒有在整個運行中發揮多大的作用。而不運行ps的主機上如果有worker也盡量只運行一個worker,因為非ps運行的主機上worker的數量增加對性能提升是由瓶頸的,比如這里不運行ps的主機不論運行多少worker運算時間都在30min左右,不運行ps的主機上如果有10個顯卡也是需要30min左右的時間,而如果有10個不運行ps的worker主機每個主機一個GPU,那么這樣的worker主機需要45min的運行時間,而運行ps的主機仍然需要5.5分鍾的話,可得:

 

A主機,一個ps,4個worker(4GPU)

B主機, 10個worker(10GPU)

預估時間:

 

 

 

A主機,一個ps,4個worker(4GPU)

B1,2,3,4,5,6,7,8,9,10主機, 各運行一個worker,共10個worker,(共10GPU)

預估時間:(推薦的配置)

 

 

 -------------------------------------------------------------------------

 

 

上面做的這些試驗可以有效的說明以下問題:

1. worker與ps在同一主機時性能最高;worker和ps不在同一主機時,多個worker各自在獨立的主機上(一個主機一個worker)性能最高,如果多個worker在同一個主機上時性能的提升會有一個瓶頸,上面的測試代碼&環境下多個worker在一個主機時性能最高提升到30min左右就不會有明顯提升了。

2. worker與ps在同一主機上時,加速比約等於worker個數;

    worker與ps不在同一主機上時,且各worker獨占一個主機時,加速比約等於worker個數;

     worker與ps不在同一主機上時,多個worker運行在一個主機上時,如果各個worker所在的主機運行相同個數的worker,那么加速比約等於運行worker的主機個數。

 

 

從上面我們可以學到,寫分布式的深度學習代碼是盡量將ps和worker放在一個主機上,如果非要利用其它主機上的worker也盡量在其它主機上運行較少的worker。比如運行worker的主機上有4個GPU,我們在worker所在主機上啟4個worker的性能會有瓶頸(次數4個worker的性能或許和2個worker的差不太多),而我們在worker主機上只啟一個worker類型的task,在該task上同時管理4個顯卡,使用多顯卡梯度規約的方法(也就是單主機多顯卡中的方法)會有效提升運行效率,這樣我們將在worker主機上已經規約好的梯度再同其他worker主機在ps上進行再次的規約。在一個顯卡上啟動多個worker進行規約不如在該顯卡上增加batch_size, 這樣更有利於提升運行效率。

 

 

 

 

 

注:這里的GPU型號全部為V100,每個進程都是獨占一個物理顯卡的。

 

 

 

------------

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM