Pytorch中的Distributed Data Parallel與混合精度訓練(Apex)


Pytorch中的Distributed Data Parallel與混合精度訓練(Apex)

磐創AI 昨天
 

以下文章來源於Smarter ,作者薰風初入弦

Smarter
Smarter

Smarter做CV界最優質的內容輸出

磐創AI推薦搜索
 
Keras
PyTorch
Github

 


 

  磐創AI分享  

來源 | Smarter

作者 | 薰風初入弦

 

【導讀】之前我在並行訓練的時候一直用的是DataParallel,而不管是同門師兄弟還是其他大佬一直推薦Distributed DataParallel。前兩天改代碼的時候我終於碰到坑了,各種原因導致單進程多卡的時候只有一張卡在進行運算。痛定思痛,該學習一下傳說中的分布式並行了。

 

基本上是一篇教程的翻譯,原文鏈接:

Distributed data parallel training in Pytorchyangkky.github.io圖片

后續等我把這些並行計算的內容捋清楚了,會再自己寫一份更詳細的tutorial~

注意:需要在每一個進程設置相同的隨機種子,以便所有模型權重都初始化為相同的值。

1. 動機

加速神經網絡訓練最簡單的辦法就是上GPU,如果一塊GPU還是不夠,就多上幾塊。

事實上,比如BERT和GPT-2這樣的大型語言模型甚至是在上百塊GPU上訓練的。

為了實現多GPU訓練,我們必須想一個辦法在多個GPU上分發數據和模型,並且協調訓練過程。

2. Why Distributed Data Parallel?

Pytorch兼顧了主要神經網絡結構的易用性和可控性。而其提供了兩種辦法在多GPU上分割數據和模型:即

nn.DataParallel 以及 nn.DistributedDataParallel。

nn.DataParallel 使用起來更加簡單(通常只要封裝模型然后跑訓練代碼就ok了)。但是在每個訓練批次(batch)中,因為模型的權重都是在 一個進程上先算出來 然后再把他們分發到每個GPU上,所以網絡通信就成為了一個瓶頸,而GPU使用率也通常很低。

除此之外,nn.DataParallel 需要所有的GPU都在一個節點(一台機器)上,且並不支持 Apex 的 混合精度訓練.

3. 現有文檔的局限性

 

總的來說,Pytorch的文檔是全面且清晰的,特別是在1.0版本的那些。完全通過文檔和教程就可以自學Pytorch,這並不是顯示一個人有多大佬,而顯然更多地反映了Pytorch的易用性和優秀的文檔。

但是好巧不巧的,就是在(Distributed)DataParallel這個系列的文檔講的就不甚清楚,或者干脆沒有/不完善/有很多無關內容。以下是一些例子(抱怨)。

    • Pytorch提供了一個使用AWS(亞馬遜網絡服務)進行分布式訓練的教程,這個教程在教你如何使用AWS方面很出色,但甚至沒提到 nn.DistributedDataParallel 是干什么用的,這導致相關的代碼塊很難follow。

    • 而另外一篇Pytorch提供的教程又太細了,它對於一個不是很懂Python中MultiProcessing的人(比如我)來說很難讀懂。因為它花了大量的篇幅講 nn.DistributedDataParallel 中的復制功能(數據是怎么復制的)。然而,他並沒有在高層邏輯上總結一下都在扯啥,甚至沒說這個DistributedDataParallel是咋用的?

    • 這里還有一個Pytorch關於入門分布式數據並行的(Distributed data parallel)教程。這個教程展示了如何進行一些設置,但並沒解釋這些設置是干啥用的,之后也展示了一些講模型分到各個GPU上並執行一個優化步驟(optimization step)。然而,這篇教程里的代碼是跑不同的(函數名字都對不上),也沒告訴你怎么跑這個代碼。和之前的教程一樣,他也沒給一個邏輯上分布式訓練的工作概括。

    • 而官方給的最好的例子,無疑是ImageNet的訓練,然而因為這個例子要 素 過 多,導致也看不出來哪個部分是用於分布式多GPU訓練的。

    • Apex提供了他們自己的ImageNet的訓練例。例子的文檔告訴大家他們的 nn.DistributedDataParallel 是自己重寫的,但是如果連最初的版本都不會用,更別說重寫的了。

    • 而這個教程很好地描述了在底層, nn.DistributedDataParallel 和 nn.DataParallel 到底有什么不同。然而他並沒有如何使用 nn.DataParallel 的例程。

4. 大綱

 

本教程實際上是針對那些已經熟悉在Pytorch中訓練神經網絡模型的人的,本文不會詳細介紹這些代碼的任何一部分。

本文將首先概述一下總體情況,然后展示一個最小的使用GPU訓練MNIST數據集的例程。之后對這個例程進行修改,以便在多個gpu(可能跨多個節點)上進行訓練,並逐行解釋這些更改。重要的是,本文還將解釋如何運行代碼。

另外,本文還演示了如何使用Apex進行簡單的混合精度分布式訓練。

 

5.大圖景(The big picture)

使用 nn.DistributedDataParallel 進行Multiprocessing可以在多個gpu之間復制該模型,每個gpu由一個進程控制。(如果你想,也可以一個進程控制多個GPU,但這會比控制一個慢得多。也有可能有多個工作進程為每個GPU獲取數據,但為了簡單起見,本文將省略這一點。)這些GPU可以位於同一個節點上,也可以分布在多個節點上。每個進程都執行相同的任務,並且每個進程與所有其他進程通信。

只有梯度會在進程/GPU之間傳播,這樣網絡通信就不至於成為一個瓶頸了。

 

圖片

訓練過程中,每個進程從磁盤加載自己的小批(minibatch)數據,並將它們傳遞給自己的GPU。每個GPU都做它自己的前向計算,然后梯度在GPU之間全部約簡。每個層的梯度不僅僅依賴於前一層,因此梯度全約簡與並行計算反向傳播,進一步緩解網絡瓶頸。在反向傳播結束時,每個節點都有平均的梯度,確保模型權值保持同步(synchronized)。

上述的步驟要求需要多個進程,甚至可能是不同結點上的多個進程同步和通信。而Pytorch通過它的 distributed.init_process_group 函數實現。這個函數需要知道如何找到進程0(process 0),一邊所有的進程都可以同步,也知道了一共要同步多少進程。每個獨立的進程也要知道總共的進程數,以及自己在所有進程中的階序(rank),當然也要知道自己要用那張GPU。總進程數稱之為 world size。最后,每個進程都需要知道要處理的數據的哪一部分,這樣批處理就不會重疊。而Pytorch通過 nn.utils.data.DistributedSampler 來實現這種效果。

6. 最小例程與解釋

為了展示如何做到這些,這里有一個在MNIST上訓練的例子,並且之后把它修改為可以在多節點多GPU上運行,最終修改的版本還可以支持混合精度運算。

首先,我們import所有我們需要的庫

import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from apex.parallel import DistributedDataParallel as DDP
from apex import amp

之后,我們訓練了一個MNIST分類的簡單卷積網絡

 class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)

def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out

這個 main() 函數會接受一些參數並運行訓練函數。

 def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)

而這部分則是訓練函數

 def train(gpu, args):
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)

start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)

# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))

最后,我們要確保 main() 函數會被調用

if __name__ == '__main__':
main()
上述代碼中肯定有一些我們還不需要的額外的東西(例如gpu和節點的數量),但是將整個框架放置到位是很有幫助的。之后在命令行輸入

python src/mnist.py -n 1 -g 1 -nr 0

就可以在一個結點上的單個GPU上訓練啦~

7. 加上MultiProcessing

我們需要一個腳本,用來啟動一個進程的每一個GPU。每個進程需要知道使用哪個GPU,以及它在所有正在運行的進程中的階序(rank)。而且,我們需要在每個節點上運行腳本。

現在讓我們康康每個函數的變化,這些改變將被單獨框出方便查找。

 def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes #
os.environ['MASTER_ADDR'] = '10.57.23.164' #
os.environ['MASTER_PORT'] = '8888' #
mp.spawn(train, nprocs=args.gpus, args=(args,)) #
#########################################################

上一節中一些參數在這個地方才需要

    • args.nodes 是我們使用的結點數

    • args.gpus 是每個結點的GPU數.

    • args.nr 是當前結點的階序rank,這個值的取值范圍是 0 到 args.nodes - 1.

OK,現在我們一行行看都改了什么

    • Line 14:基於結點數以及每個結點的GPU數,我們可以計算 world_size 或者需要運行的總進程數,這和總GPU數相等。

    • Line 15:告訴Multiprocessing模塊去哪個IP地址找process 0以確保初始同步所有進程。

    • Line 16:同樣的,這個是process 0所在的端口

    • Line 17:現在,我們需要生成 args.gpus 個進程, 每個進程都運行 train(i, args), 其中 i 從 0 到 args.gpus - 1。注意, main() 在每個結點上都運行, 因此總共就有 args.nodes * args.gpus = args.world_size 個進程.

除了14,15行的設置,也可以在終端中運行

export MASTER_ADDR=10.57.23.164 和 export MASTER_PORT=8888

接下來,需要修改的就是訓練函數了,改動的地方依然被框出來啦。

 def train(gpu, args):
############################################################
rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank
)
############################################################

torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)

###############################################################
# Wrap the model
model = nn.parallel.DistributedDataParallel(model,
device_ids=[gpu])
###############################################################

# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
################################################################
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=args.world_size,
rank=rank
)
################################################################

train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
##############################
shuffle=False, #
##############################
num_workers=0,
pin_memory=True,
#############################
sampler=train_sampler) #
#############################
...

為了簡單起見,上面的代碼去掉了簡單循環並用 ... 代替,不過你可以在這里看到完整腳本 。

    • Line3:這里是該進程在所有進程中的全局rank(一個進程對應一個GPU)。這個rank在Line6會用到

    • Line4~6:初始化進程並加入其他進程。這就叫做“blocking”,也就是說只有當所有進程都加入了,單個進程才會運行。這里使用了 nccl 后端,因為Pytorch文檔說它是跑得最快的。init_method 讓進程組知道去哪里找到它需要的設置。在這里,它就在尋找名為 MASTER_ADDR 以及 MASTER_PORT 的環境變量,這些環境變量在 main 函數中設置過。當然,本來可以把world_size 設置成一個全局變量,不過本腳本選擇把它作為一個關鍵字參量(和當前進程的全局階序global rank一樣)

    • Line23:將模型封裝為一個 DistributedDataParallel 模型。這將把模型復制到GPU上進行處理。

    • Line35~39:nn.utils.data.DistributedSampler 確保每個進程拿到的都是不同的訓練數據切片。

    • Line46/Line51:因為用了 nn.utils.data.DistributedSampler 所以不能用正常的辦法做shuffle。

 

要在4個節點上運行它(每個節點上有8個gpu),我們需要4個終端(每個節點上有一個)。在節點0上(由 main 中的第13行設置):

python src/mnist-distributed.py -n 4 -g 8 -nr 0

而在其他的節點上:

python src/mnist-distributed.py -n 4 -g 8 -nr i

其中 i∈1,2,3. 換句話說,我們要把這個腳本在每個結點上運行腳本,讓腳本運行 args.gpus 個進程以在訓練開始之前同步每個進程。

注意,腳本中的batchsize設置的是每個GPU的batchsize,因此實際的batchsize要乘上總共的GPU數目(worldsize)。

8. 使用Apex進行混合混合精度訓練

混合精度訓練,即組合浮點數 (FP32)和半精度浮點數 (FP16)進行訓練,允許我們使用更大的batchsize,並利用NVIDIA張量核進行更快的計算。AWS p3實例使用了8塊帶張量核的NVIDIA Tesla V100 GPU。

我們只需要修改 train 函數即可,為了簡便表示,下面已經從示例中剔除了數據加載代碼和反向傳播之后的代碼,並將它們替換為 ... ,不過你可以在這看到完整腳本。

    rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank)

torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
##############################################################
model, optimizer = amp.initialize(model, optimizer,
opt_level='O2')
model = DDP(model)
##############################################################
# Data loading code
...
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)

# Backward and optimize
optimizer.zero_grad()
##############################################################
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
##############################################################
optimizer.step()
...
    • Line18:amp.initialize 將模型和優化器為了進行后續混合精度訓練而進行封裝。注意,在調用 amp.initialize 之前,模型模型必須已經部署在GPU上。opt_level 從 O0 (全部使用浮點數)一直到 O3 (全部使用半精度浮點數)。而 O1 和 O2 屬於不同的混合精度程度,具體可以參閱APEX的官方文檔。注意之前數字前面的是大寫字母O。

    • Line20:apex.parallel.DistributedDataParallel 是一個 nn.DistributedDataParallel 的替換版本。我們不需要指定GPU,因為Apex在一個進程中只允許用一個GPU。且它也假設程序在把模型搬到GPU之前已經調用了 torch.cuda.set_device(local_rank)(line 10) .

    • Line37-38:混合精度訓練需要縮放損失函數以阻止梯度出現下溢。不過Apex會自動進行這些工作。

這個腳本和之前的分布式訓練腳本的運行方式相同。

圖片

 

掃碼看好書,滿100減50超值優惠活動等你哦

圖片

✄------------------------------------------------

看到這里,說明你喜歡這篇文章,請點擊「在看」或順手「轉發」「點贊」。

歡迎微信搜索「panchuangxx」,添加小編磐小小仙微信,每日朋友圈更新一篇高質量推文(無廣告),為您提供更多精彩內容。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM