pytorch(分布式)數據並行個人實踐總結——DataParallel/DistributedDataParallel


pytorch的並行分為模型並行、數據並行

源碼詳見我的github: TextCNN_parallel,個人總結,還有很多地方理解不到位,求輕噴。

 

  左側模型並行:是網絡太大,一張卡存不了,那么拆分,然后進行模型並行訓練。

  右側數據並行:多個顯卡同時采用數據訓練網絡的副本。

  

 

模型並行

  

 

  

數據並行

數據並行的操作要求我們將數據划分成多份,然后發送給多個 GPU 進行並行的計算。

注意:多卡訓練要考慮通信開銷的,是個trade off的過程,不見得四塊卡一定比兩塊卡快多少,可能是訓練到四塊卡的時候通信開銷已經占了大頭

下面是一個簡單的示例。要實現數據並行,第一個方法是采用 nn.parallel 中的幾個函數,分別實現的功能如下所示:

  • 復制(Replicate):將模型拷貝到多個 GPU 上;

  • 分發(Scatter):將輸入數據根據其第一個維度(通常就是 batch 大小)划分多份,並傳送到多個 GPU 上;

  • 收集(Gather):從多個 GPU 上傳送回來的數據,再次連接回一起;

  • 並行的應用(parallel_apply):將第三步得到的分布式的輸入數據應用到第一步中拷貝的多個模型上。

  • 實現代碼如下
# Replicate module to devices in device_ids
replicas = nn.parallel.replicate(module, device_ids)
# Distribute input to devices in device_ids
inputs = nn.parallel.scatter(input, device_ids)
# Apply the models to corresponding inputs
outputs = nn.parallel.parallel_apply(replicas, inputs)
# Gather result from all devices to output_device
result = nn.parallel.gather(outputs, output_device)  
  • 事實上PyTorch也提供了簡單的函數,只用幾行代碼可實現簡單高效的並行GPU計算。

  ①nn.parallel.data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None)

  ②class torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

  可見二者的參數十分相似,通過device_ids參數可以指定在哪些GPU上進行優化,output_device指定輸出到哪個GPU上。唯一的不同就在於前者直接利用多GPU並行計算得出結果,而后者則返回一個新的module,能夠自動在多GPU上進行並行加速。

# method 1
output= nn.parallel.data_parallel(new_net, input, device_ids=[0, 1])
# method 2
new_net= nn.DataParallel(net, device_ids=[0, 1])
output= new_net(input)

 

並行數據加載

流行的深度學習框架(例如Pytorch和Tensorflow)為分布式培訓提供內置支持。從廣義上講,從磁盤讀取輸入數據開始,加載數據涉及四個步驟:

  1. 將數據從磁盤加載到主機
  2. 將數據從可分頁內存傳輸到主機上的固定內存。請參閱有關分頁和固定的內存更多信息。
  3. 將數據從固定內存傳輸到GPU
  4. 在GPU上向前和向后傳遞

PyTorch中的Dataloader提供使用多個進程(通過將num_workers> 0設置)從磁盤加載數據以及將多頁數據從可分頁內存到固定內存的能力(通過設置) pin_memory = True)。

一般的,對於大批量的數據,若僅有一個線程用於加載數據,則數據加載時間占主導地位,這意味着無論我們如何加快數據處理速度,性能都會受到數據加載時間的限制。現在,設置num_workers = 4以及pin_memory = True。這樣,可以使用多個進程從磁盤讀取不重疊的數據,並啟動生產者-消費者線程以將這些進程讀取的數據從可分頁的內存轉移到固定的內存。

 

 

多個進程能夠更快地加載數據,並且當數據處理時間足夠長時,流水線數據加載幾乎可以完全隱藏數據加載延遲。這是因為在處理當前批次的同時,將從磁盤讀取下一個批次的數據,並將其傳輸到固定內存。如果處理當前批次的時間足夠長,則下一個批次的數據將立即可用。這個想法還建議如何為num_workers參數設置適當的值。應該設置此參數,以使從磁盤讀取批處理數據的速度比GPU處理當前批處理的速度更快(但不能更高,因為這只會浪費多個進程使用的系統資源)。

請注意,到目前為止,我們僅解決了從磁盤加載數據以及從可分頁到固定內存的數據傳輸問題。從固定內存到GPU的數據傳輸(tensor.cuda())也可以使用CUDA流進行流水線處理。

 

現在將使用GPU網絡檢查數據並行處理。基本思想是,網絡中的每個GPU使用模型的本地副本對一批數據進行正向和反向傳播。反向傳播期間計算出的梯度將發送到服務器,該服務器運行reduce歸約操作以計算平均梯度。然后將平均梯度結果發送回GPU,GPU使用SGD更新模型參數。使用數據並行性和有效的網絡通信軟件庫(例如NCCL),可以實現使訓練時間幾乎線性減少。

 

數據並行DataParallel

PyTorch 中實現數據並行的操作可以通過使用 torch.nn.DataParallel

並行處理機制

DataParallel系統通過將整個小型批處理加載到主線程上,然后將子小型批處理分散到整個GPU網絡中來工作。

具體是將輸入一個 batch 的數據均分成多份,分別送到對應的 GPU 進行計算。與 Module 相關的所有數據也都會以淺復制的方式復制多份。每個 GPU 在單獨的線程上將針對各自的輸入數據獨立並行地進行 forward 計算。然后在主GPU上收集網絡輸出,並通過將網絡輸出與批次中每個元素的真實數據標簽進行比較來計算損失函數值。接下來,損失值分散給各個GPU,每個GPU進行反向傳播以計算梯度。最后,在主GPU上歸約梯度、進行梯度下降,並更新主GPU上的模型參數。由於模型參數僅在主GPU上更新,而其他從屬GPU此時並不是同步更新的,所以需要將更新后的模型參數復制到剩余的從屬 GPU 中,以此來實現並行。

DataParallel會將定義的網絡模型參數默認放在GPU 0上,所以dataparallel實質是可以看做把訓練參數從GPU拷貝到其他的GPU同時訓練,這樣會導致內存和GPU使用率出現很嚴重的負載不均衡現象,即GPU 0的使用內存和使用率會大大超出其他顯卡的使用內存,因為在這里GPU0作為master來進行梯度的匯總和模型的更新,再將計算任務下發給其他GPU,所以他的內存和使用率會比其他的高。

具體流程見下圖:

 

 

使用代碼

注意我這里的代碼時一個文本分類的,模型叫TextCNN

1.單gpu(用做對比)

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

#在訓練函數和測試函數中,有兩個地方要判斷是否用cuda,將模型和數據搬到gpu上去
model = TextCNN(args)
if args.cuda: 
  model.cuda()
。。。

for batch_idx, (data, target) in enumerate(train_loader):  
  if args.cuda:
    data, target = data.cuda(), target.cuda()

2.多gpu,DataParallel使用

#device_ids = [0,1,2,3]
如果不設定好要使用的device_ids的話, 程序會自動找到這個機器上面可以用的所有的顯卡用於訓練。
如果想要限制使用的顯卡數,怎么辦呢?
在代碼最前面使用:
os.environ['CUDA_VISIBLE_DEVICES'] == '0,5'
或者
CUDA_VISIBLE_DEVICES=1,2,3 python
# 限制代碼能看到的GPU個數,這里表示指定只使用實際的0號和5號GPU
# 注意:這里的賦值必須是字符串,list會報錯

————————————————下面是重點

if args.cuda:
    model =  model.cuda()    #這里將模型復制到gpu
if len(device_ids)>1:
  model = nn.DataParallel(model)

  
#when train and test
data = data.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)

稍微完整一點:

# 這里要 model.cuda()
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
      # 這里要 images/target.cuda()
      data = data.cuda(non_blocking=True)
      target = target.cuda(non_blocking=True)
      ...
      output = model(data)
      loss = criterion(output, target)
      ...
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

3.cuda()函數解釋

.cuda()函數返回一個存儲在CUDA內存中的復制,其中device可以指定cuda設備。 但如果此storage對象早已在CUDA內存中存儲,並且其所在的設備編號與cuda()函數傳入的device參數一致,則不會發生復制操作,返回原對象。

cuda()函數的參數信息:

  • device (int) – 指定的GPU設備id. 默認為當前設備,即 torch.cuda.current_device()的返回值。

  • non_blocking (bool) – 如果此參數被設置為True, 並且此對象的資源存儲在固定內存上(pinned memory),那么此cuda()函數產生的復制將與host端的原storage對象保持同步。否則此參數不起作用。

許多低效率之處:

  • 冗余數據副本
    • 數據從主機復制到主GPU,然后將子微型批分散在其他GPU上
  • 在前向傳播之前跨GPU進行模型復制
    • 由於模型參數是在主GPU上更新的,因此模型必須在每次正向傳遞的開始時重新同步
  • 每批的線程創建/銷毀開銷
    • 並行轉發是在多個線程中實現的(這可能只是PyTorch問題)
  • 梯度減少流水線機會未開發
    • 在Pytorch 1.0數據並行實現中,梯度下降發生在反向傳播的末尾。
  • 在主GPU上不必要地收集模型輸出output
  • GPU利用率不均
    • 在主GPU上執行損失loss計算
    • 梯度下降,在主GPU上更新參數 

    

分布式數據並行 DistributedDataParallel

並行處理機制

DistributedDataParallel,支持 all-reduce,broadcast,send 和 receive 等等。通過 MPI 實現 CPU 通信,通過 NCCL 實現 GPU 通信。可以用於單機多卡也可用於多機多卡, 官方也曾經提到用 DistributedDataParallel 解決 DataParallel 速度慢,GPU 負載不均衡的問題。

效果比DataParallel好太多!!!torch.distributed相對於torch.nn.DataParalle 是一個底層的API,所以我們要修改我們的代碼,使其能夠獨立的在機器(節點)中運行。

與 DataParallel 的單進程控制多 GPU 不同,在 distributed 的幫助下,我們只需要編寫一份代碼,torch 就會自動將其分配給n個進程,分別在 n 個 GPU 上運行。不再有主GPU,每個GPU執行相同的任務。對每個GPU的訓練都是在自己的過程中進行的。每個進程都從磁盤加載其自己的數據。分布式數據采樣器可確保加載的數據在各個進程之間不重疊。損失函數的前向傳播和計算在每個GPU上獨立執行。因此,不需要收集網絡輸出。在反向傳播期間,梯度下降在所有GPU上均被執行,從而確保每個GPU在反向傳播結束時最終得到平均梯度的相同副本。

 

 

區別:DDP通過多進程實現的。也就是說操作系統會為每個GPU創建一個進程,從而避免了Python解釋器GIL帶來的性能開銷。而DataParallel()是通過單進程控制多線程來實現的。

對比DataParallel,DistributedDataParallel的優勢如下:

1.每個進程對應一個獨立的訓練過程,且只對梯度等少量數據進行信息交換。

在每次迭代中,每個進程具有自己的 optimizer ,並獨立完成所有的優化步驟,進程內與一般的訓練無異。

在各進程梯度計算完成之后,各進程需要將梯度進行匯總平均,然后再由 rank=0 的進程,將其 broadcast 到所有進程。之后,各進程用該梯度來獨立的更新參數。

而 DataParallel是梯度匯總到gpu0,反向傳播更新參數,再廣播參數給其他的gpu

由於各進程中的模型,初始參數一致 (初始時刻進行一次 broadcast),而每次用於更新參數的梯度也一致,因此,各進程的模型參數始終保持一致。

而在 DataParallel 中,全程維護一個 optimizer,對各 GPU 上梯度進行求和,而在主 GPU 進行參數更新,之后再將模型參數 broadcast 到其他 GPU

相較於 DataParalleltorch.distributed 傳輸的數據量更少,因此速度更快,效率更高。

2.每個進程包含獨立的解釋器和 GIL。

一般使用的Python解釋器CPython:是用C語言實現Pyhon,是目前應用最廣泛的解釋器。全局鎖使Python在多線程效能上表現不佳,全局解釋器鎖(Global Interpreter Lock)是Python用於同步線程的工具,使得任何時刻僅有一個線程在執行。

由於每個進程擁有獨立的解釋器和 GIL,消除了來自單個 Python 進程中的多個執行線程,模型副本或 GPU 的額外解釋器開銷和 GIL-thrashing ,因此可以減少解釋器和 GIL 使用沖突。這對於嚴重依賴 Python runtimemodels 而言,比如說包含 RNN 層或大量小組件的 models 而言,這尤為重要。

分布式幾個概念:

  • group:

    即進程組。默認情況下,只有一個組,一個 job 即為一個組,也即一個 world。

    當需要進行更加精細的通信時,可以通過 new_group 接口,使用 word 的子集,創建新組,用於集體通信等。

  • world size :

    表示全局進程個數。

  • rank:

    表示進程序號,用於進程間通訊,表征進程優先級。rank = 0 的主機為 master 節點。

  • local_rank:

    進程內,GPU 編號,非顯式參數,由 torch.distributed.launch 內部指定。例如, rank = 3,local_rank = 0 表示第 3 個進程內的第 1 塊 GPU。

使用流程

Pytorch 中分布式的基本使用流程如下:

  1. 在使用 distributed 包的任何其他函數之前,需要使用 init_process_group 初始化進程組,同時初始化 distributed 包。

  2. 如果需要進行小組內集體通信,用 new_group 創建子分組

  3. 創建分布式並行(DistributedDataParallel)模型 DDP(model, device_ids=device_ids)

  4. 為數據集創建 Sampler

  5. 使用啟動工具 torch.distributed.launch 在每個主機上執行一次腳本,開始訓練

  6. 使用 destory_process_group() 銷毀進程組

使用代碼

1. 添加參數  --local_rank
#每個進程分配一個 local_rank 參數,表示當前進程在當前主機上的編號。例如:rank=2, local_rank=0 表示第 3 個節點上的第 1 個進程。
# 這個參數是torch.distributed.launch傳遞過來的,我們設置位置參數來接受,local_rank代表當前程序進程使用的GPU標號
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int,
                    help='node rank for distributed training')
args = parser.parse_args()
print(args.local_rank))

2.初始化使用nccl后端
dist.init_process_group(backend='nccl') 
# When using a single GPU per process and per
# DistributedDataParallel, we need to divide the batch size
# ourselves based on the total number of GPUs we have
device_ids=[1,3]
ngpus_per_node=len(device_ids)
args.batch_size = int(args.batch_size / ngpus_per_node)
#ps 檢查nccl是否可用
#torch.distributed.is_nccl_available ()

3.使用DistributedSampler
#別忘了設置pin_memory=true
#使用 DistributedSampler 對數據集進行划分。它能幫助我們將每個 batch 划分成幾個 partition,在當前進程中只需要獲取和 rank 對應的那個 partition 進行訓練

train_dataset = MyDataset(train_filelist, train_labellist, args.sentence_max_size, embedding, word2id)
train_sampler = t.utils.data.distributed.DistributedSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, 
                                  pin_memory=true,
                                shuffle=(train_sampler is None),
                                batch_size=args.batch_size, 
                                num_workers=args.workers,
                                sampler=train_sampler    )
#DataLoader:num_workers這個參數決定了有幾個進程來處理data loading。0意味着所有的數據都會被load進主進程

#注意 testset不用sampler

4.分布式訓練
#使用 DistributedDataParallel 包裝模型,它能幫助我們為不同 GPU 上求得的梯度進行 all reduce(即匯總不同 GPU 計算所得的梯度,並同步計算結果)。
#all reduce 后不同 GPU 中模型的梯度均為 all reduce 之前各 GPU 梯度的均值. 注意find_unused_parameters參數!
net = textCNN(args,vectors=t.FloatTensor(wvmodel.vectors)) if args.cuda: # net.cuda(device_ids[0]) net.cuda() if len(device_ids)>1: net=torch.nn.parallel.DistributedDataParallel(net,find_unused_parameters=True)
5.最后,把數據和模型加載到當前進程使用的 GPU 中,正常進行正反向傳播: for batch_idx, (data, target) in enumerate(train_loader): if args.cuda: data, target = data.cuda(), target.cuda() output = net(images) loss = criterion(output, target) ... optimizer.zero_grad() loss.backward() optimizer.step() 6.在使用時,命令行調用 torch.distributed.launch 啟動器啟動: #pytorch 為我們提供了 torch.distributed.launch 啟動器,用於在命令行分布式地執行 python 文件。 #--nproc_per_node參數指定為當前主機創建的進程數。一般設定為=NUM_GPUS_YOU_HAVE當前主機的 GPU 數量,每個進程獨立執行訓練腳本。 #這里是單機多卡,所以node=1,就是一台主機,一台主機上--nproc_per_node個進程 CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py

# 如果是2機3卡, nnode=2, 就是兩台主機, 一台主機上--nproc_per_node=3個進程,命令應該如下(未測試過)
python torch.distributed.launch --nprocs_per_node=3 --nnodes=2 --node_rank=0 --master_addr="master-ip" --master_port=6005 main.py --my arguments
python torch.distributed.launch --nprocs_per_node=3 --nnodes=2 --node_rank=1 --master_addr="master-ip" --master_port=6005 main.py --my arguments

一些解釋

啟動輔助工具 Launch utility

啟動實用程序torch.distributed.launch,此幫助程序可用於為每個節點啟動多個進程以進行分布式訓練,它在每個訓練節點上產生多個分布式訓練進程。

這個工具可以用作CPU或者GPU,如果被用於GPU,每個GPU產生一個進程Process。該工具既可以用來做單節點多GPU訓練,也可用於多節點多GPU訓練。如果是單節點多GPU,將會在單個GPU上運行一個分布式進程,據稱可以非常好地改進單節點訓練性能。如果用於多節點分布式訓練,則通過在每個節點上產生多個進程來獲得更好的多節點分布式訓練性能。如果有Infiniband接口則加速比會更高。

在 單節點分布式訓練 或 多節點分布式訓練 的兩種情況下,該工具將為每個節點啟動給定數量的進程(--nproc_per_node)。如果用於GPU培訓,則此數字需要小於或等於當前系統上的GPU數量(nproc_per_node),並且每個進程將在從GPU 0到GPU(nproc_per_node - 1)的單個GPU上運行。

 

NCCL 后端

NCCL 的全稱為 Nvidia 聚合通信庫(NVIDIA Collective Communications Library),是一個可以實現多個 GPU、多個結點間聚合通信的庫,在 PCIe、Nvlink、InfiniBand 上可以實現較高的通信速度。

NCCL 高度優化和兼容了 MPI,並且可以感知 GPU 的拓撲,促進多 GPU 多節點的加速,最大化 GPU 內的帶寬利用率,所以深度學習框架的研究員可以利用 NCCL 的這個優勢,在多個結點內或者跨界點間可以充分利用所有可利用的 GPU。

NCCL 對 CPU 和 GPU 均有較好支持,且 torch.distributed 對其也提供了原生支持。

對於每台主機均使用多進程的情況,使用 NCCL 可以獲得最大化的性能。每個進程內,不許對其使用的 GPUs 具有獨占權。若進程之間共享 GPUs 資源,則可能導致 deadlocks。

 

DistributedSampler

原型

torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None)

參數

  • dataset

進行采樣的數據集

  • num_replicas

分布式訓練中,參與訓練的進程數

  • rank

當前進程的 rank 序號(必須位於分布式訓練中)

 

說明:

對數據集進行采樣,使之划分為幾個子集

一般與 DistributedDataParallel 配合使用。此時,每個進程可以傳遞一個 DistributedSampler 實例作為一個 Dataloader sampler,並加載原始數據集的一個子集作為該進程的輸入。每個進程都應加載數據的非重疊副本

Dataparallel 中,數據被直接划分到多個 GPU 上,數據傳輸會極大的影響效率。相比之下,在 DistributedDataParallel 使用 sampler 可以為每個進程划分一部分數據集,並避免不同進程之間數據重復。

注意:在 DataParallel 中,batch size 設置必須為單卡的 n 倍,因為一個batch的數據會被主GPU分散為minibatch給其他GPU,但是在 DistributedDataParallel 內,batch size 設置於單卡一樣即可,因為各個GPU對應的進程獨立從磁盤中加載數據。

 

兩種方法的使用情況,負載和訓練時間

DataParallel

PID相同的是一個程序

在兩張GPU上,本人用了3種不同batchsize跑的結果如下,

batch=64(藍色),主卡3678MB,副卡1833MB

batch=128(黑色),主卡3741MB,副卡1863MB

batch=256(紅色),主卡3821MB,副卡1925MB

 總體看來,主卡,默認是指定的卡中序號排在第一個的,內存使用情況是副卡的2倍,負載不太均衡

 

 batch=512時,訓練時間和准確率

 

 

DistributedDataParallel

 

其中PID為113673、113674的是我的程序,總體有兩個進程,每個進程都用了兩張卡,可以看到,負載相對很均衡了。 

運行腳本: CUDA_VISIBLE_DEVICES=1,3 python -m torch.distributed.launch --nproc_per_node=2 main.py

 

batch=500時,訓練時間和准確率:

 

相比DataParallel,DistributedDataParallel訓練時間縮減了好幾倍!推薦大家使用分布式數據並行

源碼詳見我的github: TextCNN_parallel

 

---------------------------------------------------- 

參考:https://blog.csdn.net/zwqjoy/article/details/89415933

https://www.ctolib.com/tczhangzhi-pytorch-distributed.html

https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/    https://zhuanlan.zhihu.com/p/76638962 👍


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM