用華為MindSpore進行分布式訓練


技術背景

分布式和並行計算,在計算機領域是非常重要的概念。對於一些行外人來說,總覺得這是一些很簡單的工作,但是如果我們縱觀計算機的硬件發展史,從CPU到GPU,再到TPU和華為的昇騰(NPU),乃至當下的熱點量子計算機(QPU),其實就是一個分布式與並行計算的發展史。從簡單的數據並行,到算法並行,到圖的並行,最后是量子疊加所帶來的物理並行。因此能否做好分布式與並行的技術,很大程度上決定了一個工具的性能上限,本文我們一起來研究一下MindSpore分布式訓練的方法。

環境部署

在前面的博客中,我們探討過用Docker和Singularity容器等方案來安裝MindSpore的CPU版本和GPU版本,感興趣的讀者可以翻一翻這些歷史博客。而這篇文章中,我們將默認已經在本地安裝好一個MindSpore的GPU環境,以此為前提進行探討在單機多GPU卡的環境下去使用MindSpore的分布式功能。比較完整的介紹可以參考這個官方地址,里面包含了完整的安裝部署和使用的介紹。這里我們僅針對本地Ubuntu的環境介紹基本安裝和使用方法。

安裝openmpi

這里一共需要安裝2個軟件,我們都是采取了源碼安裝的方法,首先到這個MindSpore給出的下載鏈接中下載對應版本的源碼:

按照版本配套我們下載這個4.0.3的tar.gz的壓縮包:

下載完成后可以大致的按照這里面的操作指導進行源碼安裝:

安裝成功的話運行mpirun --version可以看到版本號。有這里面有兩個需要提醒的點是:1. 解壓縮不一定用這里面給出的指令,可以用自己的;2. 如果按照這個指引的prefix,后面需要在LD_LIBRARY_PATH這個環境變量中加入/usr/local/lib/這個路徑,否則會有一些so庫無法被識別。

安裝NCCL

MindSpore在GPU上的分布式通信是采用了NCCL這個工具,同樣的我們先去這里面給出的鏈接找源碼包下載安裝:

這里推薦使用Local的安裝方案:

找到適配自己的系統版本的軟件包之后,就可以按照下述的指引一步一步安裝:

其中如果在apt-key這一步執行一次失敗的話,可以多執行兩次看看。

環境測試

安裝成功后,openmpi和NCCL都安裝成功后,可以用如下的初始化示例測試一下環境的部署情況:

# test-init.py

from mindspore import context
from mindspore.communication.management import init

if __name__ == "__main__":
    context.set_context(mode=context.GRAPH_MODE, device_target="GPU")
    init("nccl")

如果執行的結果如下所示,沒有任何的報錯信息,則說明環境部署成功:

dechin@ubuntu2004:~/projects/mindspore/test$ mindspore test-init.py 
dechin@ubuntu2004:~/projects/mindspore/test$

MindSpore分布式訓練

首先可能會有人好奇,上一個章節中最后用的mindspore是個啥指令?這其實是我自己在本地配置的一個mindspore-gpu版本的快捷命令:

dechin@ubuntu2004:~/projects/$ cat ~/.bashrc | grep mindspore
alias mindspore='singularity exec --nv /home/dechin/tools/singularity/mindspore-gpu_1.2.0.sif python'

如果覺得還算方便的話,讀者也可以按照自己的喜好去alias一個快捷命令。

接下來我們要看一個實現的案例,這個代碼來自於前面的這一篇博客,只是我們在這個代碼基礎之上,加入了上一個章節中的初始化代碼init(),完整代碼如下所示:

# test_nonlinear.py

from mindspore import context
from mindspore.communication.management import init
context.set_context(mode=context.GRAPH_MODE, device_target="GPU")
init()
import numpy as np
from mindspore import dataset as ds
from mindspore import nn, Tensor, Model
import time
from mindspore.train.callback import Callback, LossMonitor

def get_data(num, a=2.0, b=3.0, c=5.0):
    for _ in range(num):
        x = np.random.uniform(-1.0, 1.0)
        y = np.random.uniform(-1.0, 1.0)
        noise = np.random.normal(0, 0.03)
        z = a * x ** 2 + b * y ** 3 + c + noise
        yield np.array([[x**2], [y**3]],dtype=np.float32).reshape(1,2), np.array([z]).astype(np.float32)

def create_dataset(num_data, batch_size=16, repeat_size=1):
    input_data = ds.GeneratorDataset(list(get_data(num_data)), column_names=['xy','z'])
    input_data = input_data.batch(batch_size)
    input_data = input_data.repeat(repeat_size)
    return input_data

data_number = 160
batch_number = 16
repeat_number = 200

ds_train = create_dataset(data_number, batch_size=batch_number, repeat_size=repeat_number)
dict_datasets = next(ds_train.create_dict_iterator())

class LinearNet(nn.Cell):
    def __init__(self):
        super(LinearNet, self).__init__()
        self.fc = nn.Dense(2, 1, 0.02, 0.02)

    def construct(self, x):
        x = self.fc(x)
        return x

net = LinearNet()
model_params = net.trainable_params()
print ('Param Shape is: {}'.format(len(model_params)))
for net_param in net.trainable_params():
    print(net_param, net_param.asnumpy())
net_loss = nn.loss.MSELoss()

optim = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.6)
model = Model(net, net_loss, optim)

epoch = 1
model.train(epoch, ds_train, callbacks=[LossMonitor(10)], dataset_sink_mode=False)

for net_param in net.trainable_params():
    print(net_param, net_param.asnumpy())

此時我們需要改成用mpirun去運行這個代碼:

dechin@ubuntu2004:~/projects/gitlab/dechin/src/mindspore$ mpirun -n 2 singularity exec --nv /home/dechin/tools/singularity/mindspore-gpu_1.2.0.sif python test_nonlinear.py

運行過程中我們就可以看到,有兩個python的任務分別跑在兩個不同的GPU卡上:

這個監控方式其實還是nvidia-smi的指令,只不過為了長期監視GPU狀態,我是使用了watch -n 1 nvidia-smi的指令,每隔1s的時間就刷新一次nvidia-smi的狀態。但是這里需要注意的也有兩點:1. 這個代碼直接用mindspore指令也可以跑在單GPU卡上,但是如果要用mpirun來運行,那么我們就不能使用剛才alias的mindspore指令,而需要手動寫上完整的指令,除非把新的指令再alias一個;2. 上述的代碼因為只是初始化了一下,所以雖然跑在兩張卡上,但是實際上訓練過程並沒有互相通信,是兩個獨立的任務。那么如果要構造一個完整的自動化的分布式訓練,就需要像如下代碼一樣,再加入context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL)這樣的一個字樣,以及checkpoint中的一些配置,新的完整代碼如下所示:

# test_nonlinear.py

from mindspore import context
from mindspore.communication.management import init
context.set_context(mode=context.GRAPH_MODE, device_target="GPU")
init()

import numpy as np
from mindspore import dataset as ds
from mindspore import nn, Tensor, Model
import time
from mindspore.train.callback import Callback, LossMonitor, ModelCheckpoint, CheckpointConfig
from mindspore.context import ParallelMode
import mindspore as ms
ms.common.set_seed(0)

start_time = time.time()
def get_data(num, a=2.0, b=3.0, c=5.0):
    for _ in range(num):
        x = np.random.uniform(-1.0, 1.0)
        y = np.random.uniform(-1.0, 1.0)
        noise = np.random.normal(0, 0.03)
        z = a * x ** 2 + b * y ** 3 + c + noise
        yield np.array([[x**2], [y**3]],dtype=np.float32).reshape(1,2), np.array([z]).astype(np.float32)

def create_dataset(num_data, batch_size=16, repeat_size=1):
    input_data = ds.GeneratorDataset(list(get_data(num_data)), column_names=['xy','z'])
    input_data = input_data.batch(batch_size)
    input_data = input_data.repeat(repeat_size)
    return input_data

data_number = 160
batch_number = 16
repeat_number = 20

context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL)
ds_train = create_dataset(data_number, batch_size=batch_number, repeat_size=repeat_number)
dict_datasets = next(ds_train.create_dict_iterator())

class LinearNet(nn.Cell):
    def __init__(self):
        super(LinearNet, self).__init__()
        self.fc = nn.Dense(2, 1, 0.02, 0.02)

    def construct(self, x):
        x = self.fc(x)
        return x

net = LinearNet()
model_params = net.trainable_params()
print ('Param Shape is: {}'.format(len(model_params)))
for net_param in net.trainable_params():
    print(net_param, net_param.asnumpy())
net_loss = nn.loss.MSELoss()

optim = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.6)
ckpt_config = CheckpointConfig()
ckpt_callback = ModelCheckpoint(prefix='data_parallel', config=ckpt_config)

model = Model(net, net_loss, optim)

epoch = 10
model.train(epoch, ds_train, callbacks=[ckpt_callback], dataset_sink_mode=True)

for net_param in net.trainable_params():
    print(net_param, net_param.asnumpy())

print ('The total time cost is: {}s'.format(time.time() - start_time))

這個代碼的運行結果如下所示:

dechin@ubuntu2004:~/projects/gitlab/dechin/src/mindspore$ mpirun -n 2 singularity exec --nv /home/dechin/tools/singularity/mindspore-gpu_1.2.0.sif python test_nonlinear.py 
Param Shape is: 2
Parameter (name=fc.weight, shape=(1, 2), dtype=Float32, requires_grad=True) [[0.02 0.02]]
Parameter (name=fc.bias, shape=(1,), dtype=Float32, requires_grad=True) [0.02]
Param Shape is: 2
Parameter (name=fc.weight, shape=(1, 2), dtype=Float32, requires_grad=True) [[0.02 0.02]]
Parameter (name=fc.bias, shape=(1,), dtype=Float32, requires_grad=True) [0.02]
[WARNING] ME(2528801:139843698521024,MainProcess):2021-06-10-09:45:37.603.010 [mindspore/train/callback/_checkpoint.py:428] OSError, failed to remove the older ckpt file /home/dechin/projects/gitlab/dechin/src/mindspore/data_parallel-1_200.ckpt.
[WARNING] ME(2528799:139709496722368,MainProcess):2021-06-10-09:45:37.713.232 [mindspore/train/callback/_checkpoint.py:428] OSError, failed to remove the older ckpt file /home/dechin/projects/gitlab/dechin/src/mindspore/data_parallel-2_200.ckpt.
[WARNING] ME(2528799:139709496722368,MainProcess):2021-06-10-09:45:37.824.271 [mindspore/train/callback/_checkpoint.py:428] OSError, failed to remove the older ckpt file /home/dechin/projects/gitlab/dechin/src/mindspore/data_parallel-3_200.ckpt.
[WARNING] ME(2528799:139709496722368,MainProcess):2021-06-10-09:45:37.943.749 [mindspore/train/callback/_checkpoint.py:428] OSError, failed to remove the older ckpt file /home/dechin/projects/gitlab/dechin/src/mindspore/data_parallel-4_200.ckpt.
[WARNING] ME(2528801:139843698521024,MainProcess):2021-06-10-09:45:38.433.85 [mindspore/train/callback/_checkpoint.py:428] OSError, failed to remove the older ckpt file /home/dechin/projects/gitlab/dechin/src/mindspore/data_parallel-5_200.ckpt.
Parameter (name=fc.weight, shape=(1, 2), dtype=Float32, requires_grad=True) [[0.12186428 0.21167319]]
Parameter (name=fc.bias, shape=(1,), dtype=Float32, requires_grad=True) [5.561276]
The total time cost is: 8.412446737289429s
Parameter (name=fc.weight, shape=(1, 2), dtype=Float32, requires_grad=True) [[0.12186428 0.21167319]]
Parameter (name=fc.bias, shape=(1,), dtype=Float32, requires_grad=True) [5.561276]
The total time cost is: 8.439369916915894s

雖然運行成功了,但是有一點需要注意的是,分布式的這個案例耗時為8.44s,而單卡的訓練耗時為8.15s,分布式的訓練速度甚至比單卡的要慢,我們在總結里面對這個現象進行一個解釋。

總結概要

這篇文章我們主要探討如何去部署一個基於MindSpore框架的分布式訓練環境,在MindSpore環境已經配置好的情況下,我們只需要安裝好openmpi和nccl這兩個工具就可以實現分布式的訓練,在文中我們已經給出了相應的示例。雖然分布式與並行技術的目的是為了提升性能,但不是說對所有的場景都能夠起到加速的作用,比如文章中的案例就沒有加速的效果。這其實是因為我們的用例場景太簡單了,縱觀整個訓練過程,GPU的使用率不到10%,在這種情況下還要考慮上通信的開銷,自然是沒有放在同一個卡上去訓練來得快。這也給我們一個啟發,考慮使用分布式和並行計算的技術時,一定也要先評估好問題本身是否適用於並行化的處理,否則是達不到預期的加速的目的的。

版權聲明

本文首發鏈接為:https://www.cnblogs.com/dechinphy/p/dms.html

作者ID:DechinPhy

更多原著文章請參考:https://www.cnblogs.com/dechinphy/

打賞專用鏈接:https://www.cnblogs.com/dechinphy/gallery/image/379634.html

騰訊雲專欄同步:https://cloud.tencent.com/developer/column/91958


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM