概述
Pytorch多GPU訓練本質上是數據並行,每個GPU上擁有整個模型的參數,將一個batch的數據均分成N份,每個GPU處理一份數據,然后將每個GPU上的梯度進行整合得到整個batch的梯度,用整合后的梯度更新所有GPU上的參數,完成一次迭代。
其中多gpu訓練的方案有兩種,一種是利用nn.DataParallel
實現,這種方法是最早引入pytorch的,使用簡單方便,不涉及多進程。另一種是用torch.nn.parallel.DistributedDataParallel
和
torch.utils.data.distributed.DistributedSampler
結合多進程實現,第二種方式效率更高,參考,但是實現起來稍難, 第二種方式同時支持多節點分布式實現。方案二的效率要比方案一高,即使是在單運算節點上,參考pytorch doc:
In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data parallelism, including torch.nn.DataParallel():
本篇文章將詳細介紹這兩種方式的實現,只限於單機上實現,分布式較為復雜,下一篇文章再介紹。
參考:
- https://github.com/dnddnjs/pytorch-multigpu
- https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html
- https://tankzhou.cn/2019/07/07/Pytorch-分布式訓練
方案一
步驟
- 將model用
nn.DataParallel
wrap.
model = nn.DataParallel(model)
- 用
os.environ["CUDA_VISIBLE_DEVICES"]="0"
指定當前程序可以使用GPU設備號,如果不指定將會使用設備上所有的GPU設備。
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2" #使用3個GPU
- model.cuda()或者model.to("cuda")和data.cuda()或者data.to("cuda")將模型和數據放入GPU上。
訓練過程與使用單GPU一致,使用這種方法,pytorch會自動的將batch數據拆分為N份(N是用os.environ
指定的GPU數量),分別forward,backward,然后自動整合每個GPU上的梯度,在一塊GPU上update參數,最后將參數廣播給其他GPU,完成一次迭代。
測試
代碼:
展開
```python import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import osdataset
class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return self.len
model define
class Model(nn.Module):
# Our model
def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
def forward(self, input):
output = self.fc(input)
print("\tIn Model: input size", input.size(),
"output size", output.size())
return output
if name=="main":
# Parameters
input_size = 5
output_size = 2
batch_size = 30
data_size = 100
dataset = RandomDataset(input_size, data_size)
# dataloader define
rand_loader = DataLoader(dataset=dataset,
batch_size=batch_size, shuffle=True)
# model init
model = Model(input_size, output_size)
# cuda devices
os.environ["CUDA_VISIBLE_DEVICES"]="0,1"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
model = nn.DataParallel(model)
model.to(device)
for data in rand_loader:
input = data.to(device)
output = model(input)
# loss
# backward
#update
time.sleep(1)#模擬一個比較長的batch時間
print("Outside: input size", input.size(),
"output_size", output.size())
torch.save(model.module.state_dict(), "model.pth")
</details>
- 如果使用一塊GPU,則測試結果為如下,可以看出模型內部與外部輸入輸出是一致的。
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
- 如果使用兩塊GPU,則測試結果如下,可以看出自動進行batch的拆分。
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])
In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])
Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
### 注意
- 多個GPU有主次之分,主GPU負責梯度的整合和參數的更新,將更新后的參數傳遞給其他GPU,完成一次迭代,所以這個過程中GPU之間既有參數傳遞,又有參數的梯度傳遞。
- 關於Batch Norm的問題,[參考](https://discuss.pytorch.org/t/how-does-dataparallel-handels-batch-norm/14040)。因為一個大的batch被拆分成了多個minibatch,所以normalization只會在minibatch上做,最后測試時用的Noramlization layer的參數是主GPU上的。如果想實現多個GPU上同步Normalization需要用[sync norm](https://github.com/vacancy/Synchronized-BatchNorm-PyTorch)實現。這個normalization的問題在分布式訓練中依然存在。
## 方案二
方案二是用多進程來實現的,其實分布式就是多進程的意思,分布在多個機器上的進程,利用網絡通信協調彼此。關於分布式的處理下一篇文章再詳細介紹。這里主要介紹單機上方案二與方案一的不同。首先每個進程都有獨立的訓練過程,一次迭代后share梯度,整合梯度,獨立更新參數。迭代過程中不會進行參數的傳遞(初始化時會同步所有進程上的參數)。其次進程之間的通信采用了NCCL,當然NCCL已經是pytorch內部支持了,所以一般情況下不用理這個。分布式的細節參考下一篇文章,這里只給出最簡單的實現。
### 步驟
- 需要先初始化進程組,這里采用默認的方式初始化,對於單節點來說這也是最方便的一種初始化方式,初始化的目的是讓所有的進程彼此建立聯系,即知道彼此的位置,狀態等信息。
- dataset prepare,增加`torch.utils.data.distributed.DistributedSampler`. 具體使用參見測試部分的代碼。
- model prepare, 增加`torch.nn.parallel.DistributedDataParallel`. 具體使用參見測試部分的代碼。
- 訓練過程與方案一一致,想象同時有多個進程在同時運行training的代碼即可。
### 測試
代碼與方案一類似,需要初始化進程組,表示本程序是分布式訓練的。多進程的創建通過指定`python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1`來實現的,nnodes為1,因為這里我們是一個計算節點,`nproc_per_node=2`表示需要創建兩個進程來訓練,然后每個進程都獲得分配給它rank號,rank唯一標識一個進程,rank 0為master,其他是slave。當然一般是需要兩個GPU的,測試程序中是根據rank來指定進程使用GPU,即rank 0使用GPU0,rank 1進程使用GPU1。需要根據數據集創建一個分布式的sampler,初始化dataloader的時候要指定這個sampler,模型分布式封裝詳見代碼。
代碼:
<details>
<summary>展開</summary>
```python
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
import torch.distributed as dist
import torch.utils.data.distributed
import sys
import time
# dataset
class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return self.len
# model define
class Model(nn.Module):
# Our model
def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
def forward(self, input):
output = self.fc(input)
# print("\tIn Model: input size", input.size(),
# "output size", output.size())
return output
if __name__=="__main__":
# Parameters
input_size = 5
output_size = 2
batch_size = 30
data_size = 100
# check the nccl backend
if not dist.is_nccl_available():
print("Error: nccl backend not available.")
sys.exit(1)
# init group
dist.init_process_group(backend="nccl", init_method="env://")
# get the process rank and the world size
rank = dist.get_rank()
world_size = dist.get_world_size()
# prepare the dataset
dataset = RandomDataset(input_size, data_size)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
rand_loader = DataLoader(dataset, batch_size=batch_size//world_size,
shuffle=(train_sampler is None),
sampler=train_sampler)
# dataloader define
# rand_loader = DataLoader(dataset=dataset,
# batch_size=batch_size, shuffle=True)
# model init
model = Model(input_size, output_size)
# cuda devices
# os.environ["CUDA_VISIBLE_DEVICES"]="0"
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# if torch.cuda.device_count() > 1:
# print("Let's use", torch.cuda.device_count(), "GPUs!")
# # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
# model = nn.DataParallel(model)
# model.to(device)
# distribute model define
device = torch.device('cuda', rank)
model = model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank)
print("From rank %d: start training, time:%s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
for data in rand_loader:
input = data.to(device)
output = model(input)
# loss
# backward
#update
time.sleep(1)#模擬一個比較長的batch時間
print("From rank %d: Outside: input size %s, output size %s"%(rank, str(input.size()), str(output.size())),flush=True)
torch.save(model.module.state_dict(), "model_%d.pth"%rank)
print("From rank %d: end training, time: %s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
From rank 0: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2])
From rank 0: end training, time: 2019-09-26 13:20:17
From rank 1: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2])
From rank 1: end training, time: 2019-09-26 13:20:17
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
我直接將測試果貼上來,可以看出有點亂,是由於多進程並行導致的問題,仔細看可以看出有兩個進程並行訓練,每個進程處理半個batch數據。最后的OMP_NUM_THREADS 信息是pytorch lanch的時候打印的,翻譯過來就是我沒有指定OMP多線程的數目,它為了防止系統過負荷,所以貼心的幫我設置為了1,原碼[參考](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py#L214).
## One more Thing
模型的保存與加載,與單GPU的方式有所不同。這里通通將參數以cpu的方式save進存儲, 因為如果是保存的GPU上參數,pth文件中會記錄參數屬於的GPU號,則加載時會加載到相應的GPU上,這樣就會導致如果你GPU數目不夠時會在加載模型時報錯,像下面這樣:
``` shell
RuntimeError: Attempting to deserialize object on CUDA device 1 but torch.cuda.device_count() is 1. Please use torch.load with map_location to map your storages to an existing device.
模型保存都是一致的,不過時刻記住方案二中你有多個進程在同時跑,所以會保存多個模型到存儲上,如果使用共享存儲就要注意文件名的問題,當然一般只在rank0進程上保存參數即可,因為所有進程的模型參數是同步的。
torch.save(model.module.cpu().state_dict(), "model.pth")
模型的加載:
param=torch.load("model.pth")
好了今天就寫到這兒,好久沒有這么認真的寫篇博客了。當然還是有一些地方不夠完善,比如關於模型參數同步的檢驗。如果你有什么問題,或者覺得哪里有不對的地方請在評論區給出,蟹蟹 ^=^。