PyTorch分布式訓練詳解教程 scatter, gather & isend, irecv & all_reduce & DDP


PyTorch分布式訓練詳解教程 scatter, gather & isend, irecv & all_reduce & DDP

本文將從零入手,簡單介紹如何使用PyTorch中的多種方法進行分布式訓練。

具體而言,我們將使用四種方法,分別是: (1)scatter, gatter; (2)isend, irecv; (3)all_reduce; (4)DataDistributedParallel (DDP).

其簡單原理是將數據集分區(partition data),之后分別發送到不同的節點進行訓練,再將所獲得的數據,例如梯度,發送到同一個節點進行運算如相加求和,再重新將參數分發到不同的結點。

本文將以VGG11模型和Cifar10數據集為例,具體介紹如何使用這四種方法進行分布式訓練。

本文的實驗環境為4節點Ubuntu18環境,分別為node0, node1, node2, node3,其中node0作為master節點,負責發送數據到其他節點,收集其他節點的數據,以及計算。請注意,本文中master節點特指node0,slave節點指node1, node2, node3(即便它們之間並非master和slave的關系)。Python環境為Anaconda下3.8版本,Pytorch 1.4版本。

本文不使用GPU,而是直接用CPU進行訓練。但本文的代碼只需要修改device參數即可移植到GPU上。master ip為10.10.1.1, port為29501。不同節點之間通過該端口進行通信。

請注意,為了使得每次跑得結果類似,本文中設置了PyTorch和Numpy的隨機數,從而使得每次的結果相同,方便比較。

單機訓練VGG11模型

首先,我們先來在單一節點上訓練VGG模型以作為對比,數據集為Cifar10。代碼在此:單機訓練VGG模型

接下來,我們主要關注train_model函數的寫法,這個函數用於訓練模型。通過更改這個函數的內容,我們可以實現分布式模型訓練。

訓練包括三步,根據輸入數據得出輸出數據,將輸出與真值對比以計算loss,根據loss更新權值。請注意,在單機上batch_size=256。

    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # 獲取數據
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        # 計算loss
        train_loss = criterion(output, target)
        # 更新權值
        train_loss.backward()
        optimizer.step()

多節點訓練VGG11模型:總述

接下來,將在多節點進行訓練。PyTorch多節點訓練可參考PyTorch分布式官方文檔寫PyTorch分布式程序

第一步,首先運行 torch.distributed.is_available() 以確保安裝了相對應的package。

接下來, 對於多節點訓練,首先需要初始化多節點進程init_process_group.

這需要3個參數, backend是不同的通訊方式,在本文中,我們將使用gloo進行后端通訊。rank, world_size代表了本機的級別和節點數,因為我們是四個節點的cluster,所以rank分別為0,1,2,3,其中master設置為0, world_size設置為4.

代碼如下:

def init_process(master_ip, rank, size, vgg_model, backend='gloo'):
    """ 初始化環境 """
    os.environ['MASTER_ADDR'] = master_ip
    os.environ['MASTER_PORT'] = '29501'
    dist.init_process_group(backend, rank=rank, world_size=size)
    # 跑模型
    vgg_model(rank, size)

可以看到,首先設置了master節點的ip和port,之后初始化了process group。

接下來,我們需要將數據集分成四份(data partition),並分別發送到四台機器上。

具體的方法是使用DistributedSampler, 並將data_loader中的sampler改成所對應的DistributedSampler:

from torch.utils.data.distributed import DistributedSampler

sampler_d = DistributedSampler(training_set) if torch.distributed.is_available() else None
train_loader = torch.utils.data.DataLoader(training_set, num_workers=2, batch_size=batch_size, sampler=sampler_d, pin_memory=True)

之后就可以跑模型了。

請注意,要想在各個節點都跑起來模型,需要將代碼、數據等在每一個機子上都有一份,其內容可以有略微不同,例如,在node0的代碼需要設置為rank=0,node1的代碼需要設置為rank=1, 以此類推。接下來會對此進行詳細解說。

此外,在運行時,需要將所有的代碼在所有的節點運行之后,整個訓練才會開始。例如,如果world_size=4,也就是四個節點,那么需要分別在四個機子上執行代碼,當第四個機子的代碼執行后,全部訓練才會開始。

多節點訓練VGG11模型:使用scatter和gather

接下來就進入正題,如何用不同的方法多節點訓練模型。具體而言,我們將分別用多種方法,分布式計算和更新梯度。

首先,因為單機上batch_size=256,而我們現在有4個節點,也就是四台機器,所以我們設置每台機子上的batch_size=256/4=64,這樣每一個epoch仍然等價於batch_size=256

接下來,應該新建一個組,以確保通信發生在組內:

group = dist.new_group([0, 1, 2, 3])

分布式實現訓練的思路如下,我們將數據分成四份並分別發送到不同的機子上,接下來,在每一台機子上,根據輸入數據得出輸出數據,將輸出與真值對比以計算loss,以上兩步與單機版的思路基本相同。接下來,需要將四台機子的parameters梯度發送到master節點,並計算平均值,以獲得統一的權值,再發送到各個節點進行更新,這樣就能確保所訓練的模型在各個機子上相同。

在這里,我們使用scatter和gather來發送和收集信息。其中,scatter可以將信息從master節點傳到所有的其他節點,gather可以將信息從別的節點獲取到master節點。

用法如下:

# master node
var_list = [torch.zeros_like(var) for _ in range(4)]
dist.gather(var, var_list, group=group, async_op=False)
# slave node
dist.gather(var, group=group, async_op=False)

對於gather, 首先需要在master node新建一個空的list來存儲tensor,如果有4個節點則list長度為4,分別存儲rank 0, 1, 2, 3節點的這個變量的值。

接下來,dist.gather()第一個參數指明了需要獲取的每個節點的具體變量名。

而slave node只需要將tensor傳出即可,不需要新建list存儲tensor。

以上這個例子就是在master node用var_list這個list分別收集了node 0~3每個節點var的值。

# master node
var_list = [var for _ in range(4)]
dist.scatter(var, var_list, group=group, src=0, async_op=False)
# slave node
dist.scatter(var, group=group, src=0, async_op=False)

對於scatter, 首先需要在master node新建一個list來存儲tensor,如果有4個節點則list長度為4,分別存儲需要發送到rank 0, 1, 2, 3節點的變量。

接下來,dist.scatter()第一個參數指明了每個節點接收數據的具體變量名。

而slave node只需要將tensor接收即可,接收的變量為var。

以上這個例子就是在master node用var_list這個list分別復制了四個var,再分別發送到了所有node 0~3.

接下來我們看具體的代碼。對於vgg11模型訓練而言,我們首先將四個節點的梯度發送到master節點,求平均值后再分別發送到四個節點。

故master代碼和其他node的代碼不同。代碼在此:scatter, gather on master node, scatter, gather on other nodes

具體代碼master(node0)如下:

    # 新建組
    group = dist.new_group([0, 1, 2, 3])
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        train_loss = criterion(output, target)
        train_loss.backward()
        # 以上均和單機版代碼相同。接下來遍歷本機模型的parameters,並采集其他節點的grad梯度,計算平均值並發送到其他節點
        for p in model.parameters():
            # 新建一個list存儲各個節點的梯度
            grad_list = [torch.zeros_like(p.grad) for _ in range(4)]
            # 獲取所有節點的梯度
            dist.gather(p.grad, grad_list, group=group, async_op=False)

            # 計算所有節點的平均梯度
            grad_sum = torch.zeros_like(p.grad)
            for i in range(4):
                grad_sum += grad_list[i]
            grad_mean = grad_sum / 4

            # 新建一個list存儲將要發到各個節點的平均梯度
            scatter_list = [grad_mean for _ in range(4)]
            # 將所有的值發送到各個節點
            dist.scatter(p.grad, scatter_list, group=group, src=0, async_op=False)
        optimizer.step()

簡單而言,master node首先獲取了所有節點的梯度並計算了平均值,接下來將該平均值分發到了各個slave nodes。

具體代碼slave(node1~3)如下:

    # 新建組
    group = dist.new_group([0, 1, 2, 3])
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        train_loss = criterion(output, target)
        train_loss.backward()
        # 以上均和單機版代碼相同。接下來遍歷本機模型的parameters,並獲取grad梯度,發送到master node,並從master node獲取平均值后更新grad
        for p in model.parameters():
            # 將grad值發送到master node
            dist.gather(p.grad, group=group, async_op=False)
            # 接收master node發送過來的grad值
            dist.scatter(p.grad, group=group, src=0, async_op=False)
        optimizer.step()

slave node首先將該節點的梯度發送到了master node, 之后接收了master node計算的梯度平均值.

多節點訓練VGG11模型:使用isend和irecv

除了scatter和gather, 另一種方法是使用isend和irecv。

isend和irecv屬於點對點通訊,可以指定發送和接收的目標。因此,不需要新建組。

具體思路仍然是將四台機子的parameters梯度發送到master節點,並計算平均值,獲得統一的梯度,再從master node發送到各個節點進行更新.

isend可以將信息從一個節點傳到另一個節點,同時對應的另一個節點需要使用irecv進行接收。

用法如下:

    req = dist.isend(tensor=var, dst=rank)
    req.wait()
    req = dist.irecv(tensor=var, src=rank)
    req.wait()

isend和irecv的第一個參數即為需要傳輸和需要接收的變量名var。對於isend,dst指定了發送時的目標(rank),而對於irecv,src指定了接受時數據的來源(rank)。

在發送和接收后,需要req.wait()以確保目標或者來源機器已經收到或者已經發送了數據,以確保各個機器同步。

接下來我們看具體的代碼。對於vgg11模型訓練而言,我們首先將四個節點的梯度發送到master節點,求平均值后再分別發送到四個節點。

故master代碼和其他node的代碼不同。代碼在此:isend, irecv on master node, isend, irecv on other nodes

具體代碼master(node0)如下:

    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        train_loss = criterion(output, target)
        train_loss.backward()
        # 以上均和單機版代碼相同。接下來遍歷本機模型的parameters,並采集其他節點的grad梯度,計算平均值並發送到其他節點
        for p in model.parameters():
            # 采集其他節點的grad梯度
            grad_1 = torch.zeros_like(p.grad)
            grad_2 = torch.zeros_like(p.grad)
            grad_3 = torch.zeros_like(p.grad)
            req = dist.irecv(tensor=grad_1, src=1)
            req.wait()
            req = dist.irecv(tensor=grad_2, src=2)
            req.wait()
            req = dist.irecv(tensor=grad_3, src=3)
            req.wait()
            # 計算所有節點的梯度平均值
            grad_mean = (p.grad + grad_1 + grad_2 + grad_3)/4
            p.grad = grad_mean
            # 將梯度平均值發送到其他節點
            req = dist.isend(tensor=grad_mean, dst=1)
            req.wait()
            req = dist.isend(tensor=grad_mean, dst=2)
            req.wait()
            req = dist.isend(tensor=grad_mean, dst=3)
            req.wait()
        optimizer.step()

具體代碼slave(node1~3)如下:

    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        train_loss = criterion(output, target)
        train_loss.backward()
        # 以上均和單機版代碼相同。接下來遍歷本機模型的parameters,並獲取grad梯度,發送到master node,並從master node獲取平均值后更新grad
        for p in model.parameters():
            # 將本機的梯度發送到master node
            req = dist.isend(tensor=p.grad, dst=0)
            req.wait()
            # 從master node接收平均梯度
            req = dist.irecv(tensor=p.grad, src=0)
            req.wait()
        optimizer.step()

多節點訓練VGG11模型:使用all_reduce

以上方法雖然完成了不同節點之間的通訊,但由於master node和其他node之間代碼不同,所以仍然比較麻煩。PyTorch提供了一種簡便且高效的方法。具體而言就是all_reduce函數,該函數使用了ring通訊方法,使得通訊效率得到了提升。同時,也完成了組內變量的共享和計算。

用法如下:

dist.all_reduce(var, op=dist.reduce_op.SUM, group=group, async_op=False)

與gather(), scatter()相似,首先需要建立一個組。all_reduce()第一個參數為需要進行運算的變量,第二個參數op則包含了一些方法,例如求和SUM,此外還有MIN, MAX等,可參見這里.

所以以上代碼的意思是計算組內所有節點var變量的總和,且返回該var.

具體思路仍然是將四台機子的parameters梯度發送到master節點,並計算平均值,獲得統一的梯度,再從master node發送到各個節點進行更新.

isend可以將信息從一個節點傳到另一個節點,同時對應的另一個節點需要使用irecv進行接收。

    req = dist.isend(tensor=var, dst=rank)
    req.wait()
    req = dist.irecv(tensor=var, src=rank)
    req.wait()

接下來我們看具體的代碼。對於vgg11模型訓練而言,我們首先將四個節點的梯度分布除以4,再全部相加,即可獲得平均梯度值。在這里,所有機器的代碼均相同。代碼在此:all_reduce.

具體代碼如下:

    # 新建組
    group = dist.new_group([0, 1, 2, 3])
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        train_loss = criterion(output, target)
        train_loss.backward()
        # 以上均和單機版代碼相同。接下來遍歷本機模型的parameters,並計算組內所有機器的梯度平均值
        for p in model.parameters():
            p.grad = p.grad / 4
            dist.all_reduce(p.grad, op=dist.reduce_op.SUM, group=group, async_op=False)
        optimizer.step()

多節點訓練VGG11模型:使用Distributed Data Parallel

PyTorch還提供了最新的Distributed Data Parallel (DDP) API,通過Gradient Bucketing更高效和方便地實現了以上方法。

在這里,所有機器的代碼均相同。代碼在此:DDP.

具體用法如下:

from torch.nn.parallel import DistributedDataParallel as DDP

ddp_model = DDP(model)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.1,
                        momentum=0.9, weight_decay=0.0001)

也就是將model轉換為ddp_model,之后即可和單機模型類似進行訓練。

以上就是使用多種方法進行PyTorch分布式訓練的內容了。所有代碼可以參考:GitHub.
同時,本文基於威斯康星大學麥迪遜分校CS744課程作業改編而成,在此感謝Prof.Shivaram Venkataraman. 同時,本項目由饒錦蒙和我共同完成。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM