pytorch中基於DistributedDataParallel實現多卡並行計算


torch.nn.parallel.DistributedDataParallel提供了更為高效的單機多卡和多機多卡訓練接口。

DistributedDataParallel並行庫接口參數參考:

https://www.cnblogs.com/jiangkejie/p/13256115.html

這里給出單機多卡的具體實現步驟

參考:

https://github.com/tczhangzhi/pytorch-distributed

https://blog.csdn.net/zwqjoy/article/details/89415933

https://www.cnblogs.com/yh-blog/p/12877922.html

 

 

 

幾個基本概念

  • group:

    即進程組。默認情況下,只有一個組,一個 job 即為一個組,也即一個 world。

    當需要進行更加精細的通信時,可以通過 new_group 接口,使用 word 的子集,創建新組,用於集體通信等。

  • world size :

    表示全局進程個數。

  • rank:

    表示進程序號,用於進程間通訊,表征進程優先級。rank = 0 的主機為 master 節點。

  • local_rank:

    節點內,GPU 編號,非顯式參數,由 torch.distributed.launch 內部指定。比方說, rank = 3,local_rank = 0 表示第 3 個節點內的第 1 塊 GPU。節點內每個進程會分配一個 local_rank 參數,表示當前進程在當前主機上的編號。例如:rank=2, local_rank=0 表示第 3 個節點上的第 1 個進程。

 

1:  torch.nn.parallel.DistributedDataParallel

這個從名字上就能看出來與DataParallel相類似,也是一個模型wrapper。這個包是實現多機多卡分布訓練最核心東西,它可以幫助我們在不同機器的多個模型拷貝之間平均梯度。

 

2: torch.utils.data.distributed.DistributedSampler

在多機多卡情況下分布式訓練數據的讀取也是一個問題,不同的卡讀取到的數據應該是不同的。dataparallel的做法是直接將batch切分到不同的卡,這種方法對於多機來說不可取,因為多機之間直接進行數據傳輸會嚴重影響效率。於是有了利用sampler確保dataloader只會load到整個數據集的一個特定子集的做法。DistributedSampler就是做這件事的。它為每一個子進程划分出一部分數據集,以避免不同進程之間數據重復

 

使用流程

Pytorch 中分布式的基本使用流程如下:

  1. 在使用 distributed 包的任何其他函數之前,需要使用 init_process_group 初始化進程組,同時初始化 distributed 包。

  2. 如果需要進行小組內集體通信,用 new_group 創建子分組

  3. 創建分布式並行(DistributedDataParallel)模型 DDP(model, device_ids=device_ids)

  4. 為數據集創建distributed Sampler
  5. 使用啟動工具 torch.distributed.launch 在每個主機上執行一次腳本,開始訓練

  6. 使用 destory_process_group() 銷毀進程組

 

單機多卡--DistributedDataParallel

 

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")
 
input_size = 5
output_size = 2
batch_size = 30
data_size = 90
 
# 2) 配置每個進程的gpu
local_rank = torch.distributed.get_rank()  # 也可以通過設置args.local_rank得到(見下文)
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
 
class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')
 
    def __getitem__(self, index):
        return self.data[index]
 
    def __len__(self):
        return self.len
 
dataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size,
                         sampler=DistributedSampler(dataset))
 
class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)
 
    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output
 
model = Model(input_size, output_size)
 
# 4) 封裝之前要把模型移到對應的gpu
model.to(device)
 
if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封裝
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)
 
for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data
 
    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

torch.distributed.launch 會給模型分配一個args.local_rank的參數,也可以通過torch.distributed.get_rank()獲取進程id。

# 這個參數是torch.distributed.launch傳遞過來的,我們設置位置參數來接受,local_rank代表當前程序進程使用的GPU標號
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int, help='node rank for distributed training')
args = parser.parse_args()
print(args.local_rank))

  

怎么將程序跑起來。這里也有兩種方法:

1. 用 torch.distributed.launch

CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py

參考:https://pytorch.org/docs/stable/distributed.html#distributed-launch

torch.distributed package在torch. distribued .launch中提供了一個啟動實用程序。

這個輔助工具可以用於為每個節點啟動多個進程,以進行分布式訓練。它是在每個訓練節點上生成多個分布式訓練進程的模塊。

 

一些Notes參見:https://pytorch.org/docs/stable/distributed.html#distributed-launch

 

2. 用 torch.multiprocessing:

import torch.multiprocessing as mp
 
def main(rank, your_custom_arg_1, your_custom_arg_2): # 這里第一個 rank 變量會被 mp.spawn 函數自動填充,可以充當 local_rank 來用(參見前面的代碼)
    pass # 將前面那一堆東西包裝成一個 main 函數
 
mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))

 

多機多卡訓練

參考:https://blog.csdn.net/zwqjoy/article/details/89415933

 

Save and Load Checkpoints

在訓練期間,通常使用torch.save和torch.load來設置檢查點模塊並從檢查點恢復。有關更多詳細信息,請參見保存和加載模型

使用DDP時,一種優化方法是僅在一個進程中保存模型,然后將其加載到所有進程中,從而減少寫入開銷。這是正確的,因為所有進程都從相同的參數開始,並且梯度在向后傳遞中同步,因此優化程序應將不同位置的模型參數設置為相同的值。

如果使用此優化,請確保在保存完成之前所有進程不要開始加載(使用dist.barrier()命令)。

此外,在加載模塊時,您需要提供適當的map_location參數,以防止進程進入其他的設備。

如果缺少map_location,torch.load將首先將模塊加載到CPU,然后將每個參數復制到保存該參數的位置,這將導致同一台計算機上的所有進程使用相同的設備集。

有關更高級的故障恢復和彈性支持,請參閱TorchElastic

示例:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()



class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)




def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM