[源碼解析] PyTorch 分布式(5) ------ DistributedDataParallel 總述&如何使用
0x00 摘要
本文是 PyTorch 分布式系列的第五篇,以幾篇官方文檔的翻譯為基礎,加入了自己的一些思考,帶領大家進入DistributedDataParallel,在后續會用5~6篇左右做深入分析。
本系列其他文章如下:
[源碼解析]PyTorch如何實現前向傳播(1) --- 基礎類(上)
[源碼解析]PyTorch如何實現前向傳播(2) --- 基礎類(下)
[源碼解析] PyTorch如何實現前向傳播(3) --- 具體實現
[源碼解析] Pytorch 如何實現后向傳播 (1)---- 調用引擎
[源碼解析] Pytorch 如何實現后向傳播 (2)---- 引擎靜態結構
[源碼解析] Pytorch 如何實現后向傳播 (3)---- 引擎動態邏輯
[源碼解析] PyTorch 如何實現后向傳播 (4)---- 具體算法
[源碼解析] PyTorch 分布式(1)------歷史和概述
[源碼解析] PyTorch 分布式(2) ----- DataParallel(上)
[源碼解析] PyTorch 分布式(3) ----- DataParallel(下)
[源碼解析] PyTorch 分布式(4)------分布式應用基礎概念
0x01 數據並行
因為DistributedDataParallel 是數據並行,所以我們首先通過兩個圖,復習一下什么是數據並行。
第一個圖片來自 https://www.cnblogs.com/yh-blog/p/12877922.html,其原始出處未知。
我們可以看到,模型並行與數據並行的區別。

第二張圖來自fairscale github源碼,清晰的給出了一個數據並行的運行模式,具體包括:
模型分片,本地前向計算,本地反向傳播,AllReduce來同步梯度,本地更新梯度這幾步。

0x02 DDP 運行邏輯
Torch.distributed 包 為多個計算節點的 PyTorch 提供多進程並行通信原語,可以並行化跨進程和跨集群的計算。torch.nn.parallel.DistributedDataParallel基於torch.distributed 包的功能提供了一個同步分布式訓練wrapper,這個wrapper可以對 PyTorch 模型封裝進行訓練。其核心功能是基於多進程級別的通信,與Multiprocessing package - torch.multiprocessing 和 DataParrallel 提供的並行性有明顯區別。
以下是 DDP 的整體架構,大家可以看到ddp在整個架構之中的位置,依賴項等等。圖片來自來自源碼。

我們通過一個圖來說明 DDP 的運行邏輯。
圖片來自 https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/

具體邏輯如下:
- 加載模型階段。每個GPU都擁有模型的一個副本,所以不需要拷貝模型。rank為0的進程會將網絡初始化參數broadcast到其它每個進程中,確保每個進程中的模型都擁有一樣的初始化值。
- 加載數據階段。DDP 不需要廣播數據,而是使用多進程並行加載數據。在 host 之上,每個worker進程都會把自己負責的數據從硬盤加載到 page-locked memory。DistributedSampler 保證每個進程加載到的數據是彼此不重疊的。
- 前向傳播階段。在每個GPU之上運行前向傳播,計算輸出。每個GPU都執行同樣的訓練,所以不需要有主 GPU。
- 計算損失。在每個GPU之上計算損失。
- 反向傳播階段。運行后向傳播來計算梯度,在計算梯度同時也對梯度執行all-reduce操作。
- 更新模型參數階段。因為每個GPU都從完全相同的模型開始訓練,並且梯度被all-reduced,因此每個GPU在反向傳播結束時最終得到平均梯度的相同副本,所有GPU上的權重更新都相同,也就不需要模型同步了。注意,在每次迭代中,模型中的Buffers 需要從rank為0的進程廣播到進程組的其它進程上。
0x03 VS DataParallel
3.1 本質區別
既然 DataParallel 可以進行數據並行訓練,那么為什么還需要提出 DistributedDataParallel呢?這里我們就需要知道兩種方法的實現原理與區別:
-
大型模型訓練。
- 如果模型太大而無法容納在單個 GPU 上,則必須使用模型並行將其拆分到多個 GPU 中。
- DataParallel 因為必須將模型放入單塊 GPU 中,所以難以完成大型模型的訓練,即,無法和模型並行(跨多個 GPU 拆分單個模型)一起合作。
- DistributedDataParallel 可以只包括大型模型的一部分,因此可以與模型並行一起合作。
- 如果數據太大而無法容納在一台計算機上,則需要使用數據並行。
- 在這種情況下,每個 DistributedDataParallel 進程都可以並行使用模型,而所有進程都將並行使用數據。此時與 DP 沒有太大區別。
- 如果您的模型需要跨越多台機器,或者您的用例不適合數據並行性范式,請參閱 RPC API ,以獲得更多通用的分布式訓練支持。
- 如果模型太大而無法容納在單個 GPU 上,則必須使用模型並行將其拆分到多個 GPU 中。
-
多進程還是多線程:
- DataParallel 是單進程,多線程的並行訓練方式,並且只能在單台機器上運行。
- 而DistributedDataParallel 是多進程,並且適用於單機和多機訓練。DistributedDataParallel 還預先復制模型,而不是在每次迭代時復制模型,並避免了全局解釋器鎖定。
- 每個進程維護自己的優化器,並且在每次迭代中執行一個完整的優化步驟。由於梯度已經聚合(gather)並跨進程平均,因此梯度對於每個進程都是相同的,這就不需要廣播參數步驟,因此減少了在節點之間傳輸張量的時間。
- 每個進程包含一個獨立的 Python 解釋器,因而消除了單個 Python 進程驅動多個執行線程、模型副本或者 GPU 的額外解釋器開銷和"GIL 顛簸"(
GIL-thrashing)。對於嚴重依賴 Python 運行時的模型(比如說包含RNN層或大量小組件的models)這尤其重要。
- 即使在單台機器上,
DataParallel通常也比DistributedDataParallel慢,這是因為跨線程的 GIL 爭用,每次迭代復制的模型以及分散輸入和收集輸出所帶來的額外開銷。
3.2 實現區別
DDP 與DP在具體實現上的區別如下:
- 關於優化器:
- DDP :在每次迭代之中,DDP 的每個進程都有自己的
optimizer,每個進程都獨立完成所有優化步驟,這和非分布式訓練一樣。 - DP :在 DP 中只有一個
optimizer,在主線程執行。其對各GPU上梯度進行求和,而在主GPU進行參數更新,之后再將模型參數broadcast到其他GPU。
- DDP :在每次迭代之中,DDP 的每個進程都有自己的
- 關於梯度。
- DDP :每個進程在自己 GPU之上計算損失,運行后向傳播來計算梯度,在計算梯度同時對梯度執行all-reduce操作。
- DP :在各進程梯度計算完成之后,各進程需要將梯度進行匯總規約到主進程,主進程用梯度來更新模型權重,然后其
broadcast模型到所有進程(其他GPU)進行下一步訓練。
- 關於傳播數據:
- DDP :只對梯度等少量數據進行交換。由於各進程中的模型,初始參數一致 (初始時刻進行一次
broadcast),而每次用於更新參數的梯度也一致,因此,各進程的模型參數始終保持一致。相較於DataParallel來說,torch.distributed傳輸的數據量更少,因此速度更快,效率更高。 - DP :每次迭代,有大量交互,比如模型,前向輸出,損失,梯度等。
- DDP :只對梯度等少量數據進行交換。由於各進程中的模型,初始參數一致 (初始時刻進行一次
0x04 使用
Pytorch 中分布式的基本使用流程如下:
- 首先需要使用
init_process_group初始化進程組,同時初始化distributed包,然后才能使用distributed包的其他函數。 - 如果需要進行組內集體通信,用
new_group創建子分組。 - 使用
DDP(model, device_ids=device_ids)創建 DistributedDataParalle 模型。 - 為數據集創建分布式
Sampler。 - 使用啟動工具
torch.distributed.launch在每個主機上執行腳本,開始訓練。 - 使用
destory_process_group()銷毀進程組。
4.1 基本示例
首先,我們使用 https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 來看看。
4.1.1 設置進程組
在示例的最開始,我們首先要正確設置進程組。
init_process_group 的參數解釋如下:
- "gloo" 說明后端使用 "gloo"。
- rank 是本進程對應的rank,如果是0,則說明本進程是 master 進程,負責廣播模型狀態等工作。
- world_size 指的是總的並行進程數目,如果連接的進程數小於world_size,進程就會阻塞在 init_process_group之上,如果達到了 world_size,程序才會繼續運行。如果 batch_size = 16,那么總體的batch size 就是 16 * world_size。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size) # 這條命令之后,master進程就處於等待狀態
def cleanup():
dist.destroy_process_group()
4.1.2 簡單模型
現在,讓我們創建一個簡單模塊,用 DDP 包裝它,並用一些虛擬輸入數據饋送它。請注意,由於 DDP 將模型狀態從 rank 0 進程廣播到 DDP 構造函數中的所有其他進程,因此對於所有 DDP 進程來說,它們的起始模型參數是一樣的,用戶無需擔心不同的 DDP 進程從不同的模型參數初始值開始。
+-----------+
| |
| Rank 0 |
| |
+-----+-----+
|
| Model Parameters
|
|
+---------------+---------v----------------------+
| | |
| | |
| | |
| | |
v v v
+----+-----+ +----+-----+ +---+-------+
| | | | | |
| Rank 1 | | Rank 2 | ...... | Rank n |
| | | | | |
+----------+ +----------+ +-----------+
DDP 包裝了較低級別的分布式通信細節,並提供了一個干凈的 API,就好像它是一個本地模型一樣。梯度同步通信發生在反向傳播期間,並與反向計算重疊。當backward()返回時,param.grad已經包含同步梯度張量。因為DDP 封裝了分布式通信原語,所以模型參數的梯度可以進行 all-reduce。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
具體如下圖
+--------------------------+ +------------------------+
| torch.optim.SGD | | DDP |
| | parameters() | |
| | | +------------+ |
| | <-----------------+ | | |
| | | | ToyModel | |
| | | | | |
| | | +------------+ |
| | | |
+--------------------------+ +--------+---------------+
|
|
| forward outputs
|
|
v
+-------------------------+
| nn.MSELoss() |
| |
| |
| |
| |
+-------------------------+
4.1.3 處理速度偏差
在 DDP 中,構造函數、前向傳遞和后向傳遞是分布式同步點。我們期望不同的進程會啟動相同數量的同步操作,並在大致相同的時間以相同的順序到達這些同步點。否則,進度快的進程可能會提前到達同步點,如果快進程等待落后者的時間過長,那么先到的進程會超時。
因此,用戶需要負責平衡進程間的工作負載分布。有時,由於網絡延遲,資源爭用,不可預測的工作負載峰值等原因,處理速度的偏差是不可避免的。為避免在這些情況下超時,請確保在調用 init_process_group 時。timeout這個參數傳遞足夠大的值 。
4.1.4 保存和加載檢查點
一般來說,用戶可以使用torch.save和torch.load作為checkpoints,以便從檢查點恢復訓練。
在使用 DDP 時,一種優化是只在一個進程中保存模型,然后在所有進程中加載模型,從而減少寫入開銷(這其實很像數據庫中的讀寫分離)。因為所有進程都從相同的參數開始,並且在反向傳遞中同步梯度,所以優化器應該將參數設置為相同的值。如果使用此優化,請確保在保存完成之前所有進程都不會開始加載。
此外,在加載模塊時,您需要提供適當的map_location 參數,以防止一個進程進入他人的設備。如果map_location 缺失,torch.load將首先將模塊加載到 CPU,然后將每個參數復制到它之前保存的地方,這將導致同一台機器上的所有進程使用相同的一組設備。
有關更高級的故障恢復和彈性支持,請參閱TorchElastic。后續也會有專門系列介紹彈性部分。
從下圖可以看出來,Rank 0 負責保存模型到存儲之上,其他 Rank 會加載模型到其本地。
+-----------+
| |
| Rank 0 |
| |
+-----+-----+
|
save | Model Parameters
|
|
v
+-------+------+
| |
+-----------+ Model file +---------------------+
| | | |
| +---+----------+ |
| | |
| | |
| | |
| | |
|load |load load |
| | |
| | |
| | |
| | |
v v v
+----+-----+ +----+-----+ +---+-------+
| | | | | |
| Rank 1 | | Rank 2 | ...... | Rank n |
| | | | | |
+----------+ +----------+ +-----------+
具體如下:
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location))
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn = nn.MSELoss()
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
4.2 將 DDP 與模型並行相結合
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html 后半部分是與模型並行的結合,我們一起來看看。
DDP 也適用於多 GPU 模型。DDP 在使用大數據訓練大模型時候特別有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
注意,當把一個多GPU 模型傳遞給DDP時候,不能設置device_ids和output_device。
輸入和輸出數據將通過應用程序或模型forward()方法來放置在適當的設備中。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = (rank * 2) % world_size
dev1 = (rank * 2 + 1) % world_size
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
run_demo(demo_model_parallel, world_size)
請注意,這里沒有使用 Sampler,正常在使用之中,需要用DistributedSampler來配合 DDP 使用,DistributedSampler 會把數據集樣本針對每個進程來划分,這樣每個進程就讀取到了自己應該使用的樣本,而且 DistributedSampler 會為 DDP 模式使用 set_epoch 來shuffle數據集。
0x05 如何多進程啟動
前面提到,如果應用程序需要跨機器邊界進行擴展,需要使用多機 DistributedDataParallel 和 啟動腳本。torch.nn.parallel.DistributedDataParallel() 支持多個通過網絡互聯的機器,用戶必須為每個進程顯式啟動一個主訓練腳本。
我們下面就看看這個啟動腳本 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。以下就是這個md文件的翻譯。
在本教程中,我們將演示如何構建分布式模型訓練應用程序,這樣它可以在多個節點上方便地啟動。這里每個節點都有多個 GPU,並且使用 PyTorch 的分布式啟動程序腳本 https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py 啟動實用程序torch.distributed.launch,此腳本程序可用於為每個節點啟動多個進程以進行分布式訓練,它在每個訓練節點上產生多個分布式訓練進程。
這個工具可以用作CPU訓練或者GPU 訓練,如果被用於GPU,每個GPU產生一個進程Process。該工具既可以用來做單節點多GPU訓練,也可用於多節點多GPU訓練。
- 如果是單節點多GPU,將會在單個GPU上運行一個分布式進程,據稱可以非常好地改進單節點訓練性能。
- 如果用於多節點分布式訓練,則通過在每個節點上產生多個進程來獲得更好的多節點分布式訓練性能。如果有Infiniband接口則加速比會更高。
在 單節點分布式訓練 或 多節點分布式訓練 的兩種情況下,該工具將為每個節點啟動給定數量的進程(--nproc_per_node)。如果用於GPU培訓,則此數字需要小於或等於當前系統上的GPU數量(nproc_per_node),每個進程將在從GPU 0到GPU(nproc_per_node - 1)的單個GPU上運行。
5.1 先決條件
多個worker通過處理大型數據集的不同部分來訓練同一個全局模型,每個worker將獨立計算局部梯度(也稱為子梯度 sub-gradients),然后使用 AllReduce 原語來同步梯度。因為同一個程序在所有應用上運行,但每個應用都在訓練數據集的不同部分上運行,所以在 HPC 術語中,這種執行模型稱為單程序多數據或 SPMD,
5.2 應用進程拓撲
一個分布式數據並行 (DDP) 應用程序可以在多個節點上執行,其中每個節點可以由多個 GPU 設備組成。每個節點依次可以運行 DDP 應用程序的多個副本,每個副本在多個 GPU 上處理其模型。
設N為運行應用程序的節點數, G為每個節點的 GPU 數。同時在所有節點上運行的應用程序進程總數稱為 World Size,簡寫為W。在每個節點上運行的進程數稱為Local World Size,簡寫為L。
每個應用進程都分配了兩個 ID:local rank 取值在 [0, L -1] 中,global rank 取值在 [0, W -1] 之中。
為了闡明上面定義的術語,我們考慮在兩個節點上啟動 DDP 應用程序的情況,每個節點都有四個 GPU。然后我們希望每個進程跨越(span)兩個 GPU。進程到節點的映射如下圖所示:
下面圖片也出自於 https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md。

雖然有很多方法可以將進程映射到節點,但一個好的經驗法則是讓一個進程跨越(span)單個 GPU。這使得 DDP 應用程序能夠擁有與 GPU 一樣多的並行讀取流,並且在現實中也提供了 I/O 和計算成本之間的良好平衡。
5.3 准備和啟動 DDP 應用程序
無論 DDP 應用程序采用何種啟動方式,每個進程都需要一種機制來了解其全局和本地等級。所以,所有進程會創建一個ProcessGroup,基於ProcessGroup可以使它們能夠參與諸如 AllReduce 之類的集合通信操作。
有一種便捷的方法可以啟動多個 DDP 進程,並且可以初始化所有參數(這些數值是建立一個ProcessGroup 所需要的),這就是使用PyTorch 提供的分布式 腳本launch.py。
這個 Launcher 可以在本地torch 安裝目錄的distributed子目錄下找到。這是在任何操作系統上獲取launch.py路徑的快捷方法 :
python -c " from os import path; import torch; print(path.join(path.dirname(torch.__file__), 'distributed', 'launch.py')) "
這將打印如下內容:
/home/username/miniconda3/envs/pytorch/lib/python3.8/site-packages/torch/distributed/launch.py
當 DDP 應用程序通過 launch.py啟動時,它通過環境變量將 world size、 global rank、local rank,master address 和端口作為命令行參數傳遞給每個實例。要使用 Launcher,應用程序需要遵守以下約定:
- 必須為單個 worker提供入口點函數。例如,它不應該使用
torch.multiprocessing.spawn啟動子進程。 - 必須使用環境變量來初始化進程組。
為簡單起見,應用程序可以假設每個進程映射到單個 GPU,但在下一節中,我們還將展示如何用更通用的辦法來執行進程到 GPU 的映射。
5.4 示例應用
此示例 DDP 應用程序基於 DDP 教程 的 “Hello, World” 應用。
5.4.1 參數傳遞約定
DDP 應用程序采用兩個命令行參數:
--local_rank: 此參數將通過launch.py傳入。--local_world_size:這是明確傳遞的,通常是數字 \(1\) 或每個節點的 GPU 數量。
應用程序解析這些並調用spmd_main入口點:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--local_world_size", type=int, default=1)
args = parser.parse_args()
spmd_main(args.local_world_size, args.local_rank)
在 spmd_main之中,進程組使用后端(NCCL 或 Gloo)進行初始化。集合點(rendezvous )所需的其余信息來自launch.py設置的環境變量:
def spmd_main(local_world_size, local_rank):
# These are the parameters used to initialize the process group
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
}
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
dist.init_process_group(backend="nccl")
print(
f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
)
demo_basic(local_world_size, local_rank)
# Tear down the process group
dist.destroy_process_group()
給定 local rank 和 world size,訓練函數demo_basic將通過device_ids在本地節點的一組 GPU 上初始化DistributedDataParallel模型:
def demo_basic(local_world_size, local_rank):
# setup devices for this process. For local_world_size = 2, num_gpus = 8,
# rank 0 uses GPUs [0, 1, 2, 3] and
# rank 1 uses GPUs [4, 5, 6, 7].
n = torch.cuda.device_count() // local_world_size
device_ids = list(range(local_rank * n, (local_rank + 1) * n))
print(
f"[{os.getpid()}] rank = {dist.get_rank()}, "
+ f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
)
model = ToyModel().cuda(device_ids[0])
ddp_model = DDP(model, device_ids)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_ids[0])
loss_fn(outputs, labels).backward()
optimizer.step()
該應用程序可以通過launch.py以下方式在一個 8 GPU 的節點上啟動,每個 GPU 一個進程:
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=8 example.py --local_world_size=8
並產生類似於下圖所示的輸出:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
[238627] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '8'}
[238630] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '3', 'WORLD_SIZE': '8'}
[238628] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '1', 'WORLD_SIZE': '8'}
[238634] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[238631] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[238632] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[238629] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '2', 'WORLD_SIZE': '8'}
[238633] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[238633] world_size = 8, rank = 6, backend=nccl
[238628] world_size = 8, rank = 1, backend=nccl
[238629] world_size = 8, rank = 2, backend=nccl
[238631] world_size = 8, rank = 4, backend=nccl
[238630] world_size = 8, rank = 3, backend=nccl
[238632] world_size = 8, rank = 5, backend=nccl
[238634] world_size = 8, rank = 7, backend=nccl
[238627] world_size = 8, rank = 0, backend=nccl
[238633] rank = 6, world_size = 8, n = 1, device_ids = [6]
[238628] rank = 1, world_size = 8, n = 1, device_ids = [1]
[238632] rank = 5, world_size = 8, n = 1, device_ids = [5]
[238634] rank = 7, world_size = 8, n = 1, device_ids = [7]
[238629] rank = 2, world_size = 8, n = 1, device_ids = [2]
[238630] rank = 3, world_size = 8, n = 1, device_ids = [3]
[238631] rank = 4, world_size = 8, n = 1, device_ids = [4]
[238627] rank = 0, world_size = 8, n = 1, device_ids = [0]
同樣,它可以使用一個跨越(span)所有 8 個 GPU 的單進程來啟動:
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=1 example.py --local_world_size=1
為當前主機創建 nproc_per_node 個進程,每個進程獨立執行訓練腳本,同時還為每個進程分配一個 local_rank 參數,表示當前進程在當前主機上的編號。
比如 node_rank = 2, local_rank = 0,表示 node_rank 第2個節點,上第一個進程。
依次產生以下輸出
[262816] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '1'}
[262816]: world_size = 1, rank = 0, backend=nccl
[262816] rank = 0, world_size = 1, n = 8, device_ids = [0, 1, 2, 3, 4, 5, 6, 7]
5.5 結論
作為分布式數據並行應用程序的作者,您的代碼需要了解兩種類型的資源:計算節點和每個節點內的 GPU。但是需要跟蹤GPU集如何映射到應用程序進程,這個簿記(bookkeeping )工作可能既乏味又容易出錯。
所以我們希望通過按照本示例所示的方法,使用 launcher 來構建您的應用程序,這樣可以顯著簡化分布式訓練的設置。
5.6 啟動腳本的背后
知道了啟動腳本的作用依然不夠,我們還需要知道其內部做了什么。
5.6.1 launch.py
launch.py 位於 torch/distributed/launch.py,但是實際上,它的大部分功能都被轉移到了 torch/distributed/run.py 之中。
def main(args=None):
logger.warn(
"The module torch.distributed.launch is deprecated "
"and going to be removed in future."
"Migrate to torch.distributed.run"
)
args = parse_args(args)
run(args)
所以我們要看看 run.py。
5.6.2 run.py
可以看到,run.py 的基本思路就是:使用 config_from_args 來從命令行之中提取信息,構建了對應的配置,執行語句和其參數,然后調用 elastic_launch 來執行。由此可見,彈性訓練是未來趨勢。我們后續也有系列來分析彈性訓練。
def run(args):
if args.standalone:
args.rdzv_backend = "c10d"
args.rdzv_endpoint = "localhost:29400"
args.rdzv_id = str(uuid.uuid4())
log.info(
f"\n**************************************\n"
f"Rendezvous info:\n"
f"--rdzv_backend={args.rdzv_backend} "
f"--rdzv_endpoint={args.rdzv_endpoint} "
f"--rdzv_id={args.rdzv_id}\n"
f"**************************************\n"
)
config, cmd, cmd_args = config_from_args(args)
elastic_launch(
config=config,
entrypoint=cmd,
)(*cmd_args)
run.py 也可以獨立運行,比如。
>>> python -m torch.distributed.run
--nnodes=$NUM_NODES
--nproc_per_node=$NUM_TRAINERS
--rdzv_id=$JOB_ID
--rdzv_backend=c10d
--rdzv_endpoint=$HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
5.6.3 定義
因為run.py 有很多配置參數,所以我們大致看一下。
-
Node- 物理實例或容器;映射到與 job manager 所協調的單元。 -
Worker- 分布式培訓環境中的worker。 -
WorkerGroup- 執行相同功能的一組worker(例如trainer)。 -
LocalWorkerGroup- 在同一節點上運行的工作組中的workers子集。 -
RANK- 工作組中worker的rank,是全局rank,可以認為是一個全局GPU資源列表。 -
LOCAL_RANK- 本地工作組中,某個worker 的 rank,可以認為是當前節點上的GPU資源列表。 -
GROUP_RANK- worker group的rank。介於0和“最大節點數”之間的數字。如果每個節點運行一個單一工作組,那就是這個節點的rank。 -
ROLE_RANK- 對於具有相同角色worker來說,他們之間共享的rank,角色在“WorkerSpec”中被指定。 -
WORLD_SIZE- 工作組中worker的總數。因為節點會加入/離開,所以WORLD_SIZE會變化,不能依賴WORLD_SIZE的穩定性進行編碼。 -
LOCAL_WORLD_SIZE- 本地工作組的大小,即本地運行的worker數目,等於在torch.distributed.run運行時候指定的--nproc_per_node。目前,torch/distributed/run.py 僅支持同構的LOCAL_WORLD_SIZE。也就是說,假設所有節點運行相同數量的本地工作者(每個角色)。 -
ROLE_WORLD_SIZE- 具有同樣角色的workers總數,在WorkerSpec之中被指定。 -
rdzv_id- 用戶定義的id,用於唯一標識作業的工作組。這個id在每個節點加入特定工作組時候使用。 -
rdzv_backend-rendezvous 的后端(例如“c10d”)。這通常是一個強一致性的鍵值存儲。 -
rdzv_endpoint- rendezvous 后端端點;通常以“<host>:<port>”的形式出現。 -
run_id: 用戶定義的id,它唯一地標識分布式應用程序的一個實例。它通常映射到作業id並用於允許節點加入正確的分布式應用程序。
-
TORCHELASTIC_RESTART_COUNT- 迄今為止,工作組重啟的次數。 -
TORCHELASTIC_MAX_RESTARTS- 配置的最大重啟數目。 -
TORCHELASTIC_RUN_ID- 與 rendezvousrun_id相等,即唯一的job id。
我們后面會有專門系列來介紹彈性訓練,所以就此略過。下一篇我們開始介紹通信所需要的store概念,敬請期待。
0xFF 參考
https://github.com/pytorch/examples/blob/master/distributed/ddp/README.md
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
