[源碼解析] PyTorch 流水線並行實現 (1)--基礎知識


[源碼解析] PyTorch 流水線並行實現 (1)--基礎知識

0x00 摘要

本系列開始介紹PyTorch的流水線並行實現。實質上,PyTorch就是 GPipe 的PyTorch版本。這些開源軟件在互相借鑒思路,互相學習,從 PyTorch 的源碼注釋中,可以見到我們之前介紹的部分框架/庫的引用或者論文鏈接。

流水線並行其他文章鏈接如下:

[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

[源碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積

[源碼解析] 深度學習流水線並行 GPipe(3) ----重計算

[源碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[源碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分區

[源碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型

[源碼解析] 深度學習流水線並行 PipeDream(4)--- 運行時引擎

[源碼解析] 深度學習流水線並行 PipeDream(5)--- 通信模塊

[源碼解析] 深度學習流水線並行 PipeDream(6)--- 1F1B策略

本文圖來自論文和github源碼。

0x01 歷史

我們首先介紹一下來龍去脈。

1.1 GPipe

從前面系列文章我們知道,GPipe是Google Brain發布的可伸縮流水線並行庫,它允許高效地訓練大型的消耗內存的模型。其論文是:

`GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism
<https://arxiv.org/abs/1811.06965>`

GPipe實質是一個模型並行的庫,當模型的大小對於單個GPU來說太大時,訓練大型模型可能會導致內存不足。為了訓練如此大的模型,GPipe把一個多層網絡分割成若干個復合層,然后每個復合層被部署到GPU/TPU之上。但是這若干個復合層只能順序並行,這就嚴重影響了訓練速度。所以GPipe引入了流水線並行機制(pipeline parallelism),在不同的GPU設備之間對層進行流水線處理。另外,GPipe 也使用了重新計算這個技巧來降低內存,這樣可以訓練更大的模型。

Gpipe首先將模型分片到不同的設備上,其中每個設備承載模型的一個分片。碎片可以是單個層或一系列層。然后 Gpipe將一小批數據分割成微批次,並將微批次給承載第一個碎片的設備。每個設備上的層做如下操作:

  • 對接受到的微批次進行處理,並將輸出發送到后續設備。
  • 同時,它已准備好處理來自上一個設備的微批次。

通過以這種方式對輸入進行管道連接,Gpipe能夠減少設備的空閑時間。可以簡化大型模型的訓練。

1.2 torchgpipe

因為 GPipe 是基於 TensorFlow 的庫(這是Google的產品嘛),所以kakaobrain的一些工程師就用PyTorch 來實現了 GPipe,並且開源出來,這就是 torchgpipe,其地址為:https://github.com/kakaobrain/torchgpipe,用戶可以通過 pip install torchgpipe 進行安裝使用。

該作者團隊還發表了一篇論文,具體如下:https://arxiv.org/pdf/2004.09910.pdf

1.3 fairscale

FairScale是Facebook的一個PyTorch擴展庫,用於高性能和大規模培訓。該庫擴展了PyTorch的基本功能,同時添加了新的SOTA縮放技術。FairScale以可組合模塊和簡易API的形式來提供了最新的分布式訓練技術。這些API可以作為研究人員的基本工具,因為其可以使用有限的資源來訓練大模型。

其開源地址為:https://github.com/facebookresearch/fairscale.

從其內部源碼結構和文檔看,個人認為其是Facebook 內部對於市面上各種最新深度學習庫的一個收集/實驗/探索的試驗田。如果實驗成熟,Facebook 就把部分代碼合並到 PyTorch 之中

比如從下面可以看到,Facebook 正在/曾經試驗如下:

fairscale.nn.pipe is forked from torchgpipe, Copyright 2019, Kakao Brain, licensed under Apache License.

fairscale.nn.model_parallel is forked from Megatron-LM, Copyright 2020, NVIDIA CORPORATION, licensed under Apache License.

fairscale.optim.adascale is forked from AdaptDL, Copyright 2020, Petuum, Inc., licensed under Apache License.

fairscale.nn.misc.flatten_params_wrapper is forked from PyTorch-Reparam-Module, Copyright 2018, Tongzhou Wang, licensed under MIT License.

1.4 PyTorch

從 2021-03-18,PyTorch 1.8.0 Release Notes 之中可以看到。

  • Upstream fairscale.nn.Pipe into PyTorch as torch.distributed.pipeline (#44090)

這里正式引入了 pipeline。

https://github.com/pytorch/pytorch/pull/44090 的內容是:

Stack from ghstack:

  • #44090 Pull in fairscale.nn.Pipe into PyTorch.

This is an initial commit pulling in the torchgpipe fork at https://github.com/facebookresearch/fairscale.

The purpose of this commit is to just pull in the code and ensure all tests and builds work fine. We will slowly modify this to match our intended API mentioned in #44827. Follow up PRs would
address further changes needed on top of the initial commit..

We're pulling the code into the torch.distributed._pipeline.sync package. The package is private on purpose since there is a lot of work (ex: docs, API changes etc.) that needs to go in before we can actually officially support this.

需要注意一點是,torchgpipe 這部分代碼被合並到 torch/distributed/pipeline/sync 之下,這說明后續 PyTorch 也許會合並一個 async 實現,沒准就是 PipeDream

1.5 基礎版本

因為這部分源碼在 PyTorch 之中基本未做改變。所以,我們還是以 torchgpipe 原始代碼作為例子來進行說明。

本文只是選取重要功能進行講解,像 Deferred Batch NormalizationSkip Connections就不會分析,讀者如果有興趣,可以自行研究。

0x02 基礎知識

在 torchgpipe 和 fairscale 的源碼內部,都涉及和介紹了大量相關知識,了解這些知識有助於我們更好的學習源碼,也可以回頭對 GPipe 的分析做相應印證。

某些知識可能與之前文章有所重復,但是torchpipe和fairscale給出了自己的見解,我們可以再學習一下。

2.1 流水線並行

GPipe將一個模型拆分為多個分區,並將每個分區放置在不同的設備之上,這樣可以增加內容容量。例如,我們可以拆分一個占用40GB CUDA內存的模型分為4個分區,每個分區占用10GB。

這種方法稱為“模型並行"。然而,典型的深度學習模型由連續的層組成。換句話說,后面的層在前一層完成之前是不會工作的。如果一個模型是由完全連續的層構成,即使我們將模型擴展到兩個或多個層上,同一時間也只能使用一個設備。

GPipe將一個小批量(mini-batch)拆分為多個微批量(micro-batches),以使設備盡可能並行工作,這被稱為“流水線並行"。

基本上,流水線並行是一個小型數據並行的棧。當每個分區處理完一個微批次后,它可以將輸出拋到下一個分區並立即開始下一個微批次的工作,這樣分區就可以重疊。

因為每個分區都必須等待前一個分區輸入作為第一個微批次來處理,所以流水線之上仍然有空閑時間,我們稱之為 “bubble"。

通過選擇較小尺寸的微批次,可以減少“bubble"。但通常,較大的批量可以更有效地利用GPU。因此,如果選擇的微批量太小,GPU可能未得到充分利用。另外,更快的分區應該等待相鄰的較慢分區,分區之間的不平衡也可能導致GPU利用率不足。因此,總體性能由最慢的分區決定。

2.2 Checkpointing

2.2.1 基本概念

Checkpointing 是一種用於減少訓練期間GPU內存使用的技術。這是通過避免在向前傳遞期間存儲中間激活張量來實現的。具體而言,Checkpointing 在正向傳播過程中,只會記住分區邊界處的張量,所有其他中間張量都不會記住,而是在向后傳播過程中跟蹤原始輸入來重新計算向前傳播。因此,隱藏層消耗的內存僅為帶有檢查點的單個微批次所需要的數量。

Checkpointing 是性能和內存之間的折衷,因為如果完全重計算,則所花費的時間與正向傳播所花費的時間相同。但 Checkpointing 減少了存儲大型激活張量的需要,從而允許我們增加批量大小,增加模型的凈吞吐量。

2.2.2 使用

在 GPipe之中,Checkpointing 應用於每個分區,以最小化模型的總體內存消耗。

Checkpointing 會極大減少內存使用,但總體訓練速度會降低25%左右。您可以處理如何在模型上應用檢查點。Checkpointing 只有三種選擇,不能夠指定某些特定點:

  • "always" :在所有微批次上應用檢查點。

  • "except_last" : 在最后一個微批次之外應用檢查點。

  • "never" :從不應用檢查點。

@pytest.mark.parametrize('checkpoint', ['never', 'always', 'except_last'])

通常,在最后一個微批次上的檢查點可能沒有用處,因為保存的內存將立即重建。這就是為什么我們選擇"except_last"作為默認選項。如果您決定根本不使用檢查點,那么<torch.nn.DataParallel>可能比GPipe更有效。

2.2.3 實現概述

Checkpointing 已經作為“torch.utils.checkpoint.checkpoint_wrapper"API的一部分實現,通過該API可以包裝前向過程中的不同模塊。

Checkpointing 通過重寫“torch.autograd.Function"來實現。在處理模塊前向傳遞的“forward"函數中,如果使用“no_grad",我們可以在很長一段時間內(即直到反向傳播之前)防止正向圖的創建和中間激活張量的物化。相反,在后向傳播期間,會再次執行前向傳播,然后執行后向傳播。

前向傳播過程的輸入使用上下文對象保存,然后在后向傳播過程中訪問該上下文對象以檢索原始輸入。PyTorch還保存了RNG(Random Number Generator)的狀態,用於前向傳播和后向傳播,如 Dropout layers 所需。

以下是幾個注意點:

  1. 內存節省完全取決於檢查點所包裝的模型和分段。每個backprop由幾個迷你前向傳播(mini-forward)和backprop過程組成。收益完全取決於每層激活值的內存占用。

  2. 使用BatchNormalization時,您可能需要凍結統計數據的計算,因為我們運行了兩次正向傳遞。

  3. 確保輸入張量的'requires_grad'字段設置為True。為了觸發后向傳播功能,輸出需要設置此字段。通過在輸入張量設置這個字段,我們可以確保將其傳播到輸出,並觸發'backward'函數。

2.3 微批次的數目

微批量大小的選擇會影響GPU的利用率。較小的微批量可以減少等待先前微批次輸出的延遲,但較大的微批量可以更好地利用GPU。因此,關於微批次數量,存在了一個權衡,即每個微批次的GPU利用率和bubble總面積之間的權衡,用戶需要為模型找到最佳的微批次數量。

與大的微批次相比,在處理許多小的微批次時,GPU可能會減慢速度。如果每個CUDA內核太便宜而無法計算,那么GPU將無法得到充分利用,因此太小的微批次將導致利用率不足。另一方面,當每個微批次的尺寸減小時,氣泡的面積也相應減少。理想情況下,用戶應該選擇可以提高GPU利用率的最大數量的微批次。

作為補充說明,批次尺寸越小,性能越差。大量的微批次可能會對使用BatchNorm的模型的最終性能產生負面影響,就像 torch.nn.DataParallel 那樣。

2.4 檢查重計算

GPipe中的檢查點執行兩次前向傳播。第二個前向傳播稱為“重新計算"。

諸如<torch.nn.BatchNorm2d>之類的模塊在每次正向傳播時,如果更新其批處理統計信息,可能就會導致問題。因此,在重新計算期間,不應再次更新正在運行的估計值。為了避免再次更新運行估計,模塊的“forward"方法需要能夠檢測到這是重新計算。

~torchgpipe.is_recomputing方法可以檢測重新計算,在重運行期間,這個方法會返回True

   class Counter(nn.Module):
       def __init__(self):
           super().__init__()
           self.counter = 0

       def forward(self, input):
           if not is_recomputing():
               self.counter += 1
           return input

另外,如果把~torchgpipe.GPipe 的成員變量 deferred_batch_norm設置為 True,則可以阻止再次更新運行統計。

0x03 使用

3.1 示例

要使用GPipe訓練模塊,只需將其用 torchgpipe.GPipe 來包裝即可,但是用戶的模塊必須是<torch.nn.Sequential> 的實例。

GPipe 會將自動將模塊分割為多個分區,分區是在單個設備上一起運行的一組連續層,其中:

balance參數確定每個分區中的層數。

chunks參數指定微批處理的數量。

下面的示例代碼顯示了如何將具有四層的模塊拆分為兩個分區,每個分區有兩層。此代碼還將一個小批次 mini-batch 拆分為8個微批次(micro-batches)

   from torchgpipe import GPipe

   model = nn.Sequential(a, b, c, d)
   model = GPipe(model, balance=[2, 2], chunks=8)

   # 1st partition: nn.Sequential(a, b) on cuda:0
   # 2nd partition: nn.Sequential(c, d) on cuda:1

   for input in data_loader:
       output = model(input)

~torchgpipe.GPipe使用CUDA進行訓練。用戶不需要自己將模塊移動到GPU,因為~torchgpipe.GPipe自動把每個分區移動到不同的設備上。默認情況下,可用的GPU從cuda:0開始,並且按順序為每個分區選擇可用GPU。用戶也可以利用device 參數指定使用的GPU。

   model = GPipe(model,
                 balance=[2, 2],
                 devices=[4, 2],  # Specify GPUs.
                 chunks=8)

3.2 輸入輸出

與典型module不同,GPipe之中,輸入設備與輸出設備不同,除非只有一個分區。這是因為第一個分區和最后一個分區被放置在不同的設備上。因此,必須將輸入和目標移動到相應的設備。可以通過 torchgpipe.GPipe.devices 的屬性來完成,這個屬性保存了每個分區的設備列表.

   in_device = model.devices[0]
   out_device = model.devices[-1]

   for input, target in data_loader:
       # input on in_device
       input = input.to(in_device, non_blocking=True)

       # target on out_device
       target = target.to(out_device, non_blocking=True)

       # output on out_device
       output = model(input)
       loss = F.cross_entropy(output, target)
       loss.backward()
       ...

3.3 嵌套序列(Nested Sequentials)

~torchgpipe.GPipe拆分一個<torch.nn.Sequential>module時候,它將模塊的每個子模塊視為單一的、不可分割的層。然而,模型事實上並不一定這樣,有些子模塊可能是另一個順序模塊,可能需要進一步拆分它們。

GPipe 不會支持這些嵌套的 Sequentials module,所以用戶需要把module打平(flatten the module)。還好,這在PyTorch中並不難。以下代碼段顯示了嵌套順序模塊如何展平:

   _3_layers = nn.Sequential(...)  # len(_3_layers) == 3
   _4_layers = nn.Sequential(...)  # len(_4_layers) == 4
   model = nn.Sequential(_3_layers, _4_layers)  # len(model) == 2

   def flatten_sequential(module):
       def _flatten(module):
           for name, child in module.named_children():
               if isinstance(child, nn.Sequential):
                   for sub_name, sub_child in _flatten(child):
                       yield (f'{name}_{sub_name}', sub_child)
               else:
                   yield (name, child)
       return nn.Sequential(OrderedDict(_flatten(module)))

   model = flatten_sequential(model)  # len(model) == 7
   model = GPipe(model, balance=[2, 3, 2], chunks=4)

3.4 典型模型並行

典型的模型並行(Typical Model Parallelism)是GPipe的一個特例。模型並行性是相當於禁用了微批處理和檢查點的GPipe,可以通過chunks=1checkpoint='never' 來做到。

model = GPipe(model, balance=[2, 2], chunks=1, checkpoint='never')

至此,庫的歷史和基本知識已經介紹完畢,下一篇我們介紹 Auto balance。

0xFF 參考

Markdown公式用法大全

markdown中公式編輯教程

https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

CUDA學習:基礎知識小結

CUDA隨筆之Stream的使用

NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

https://github.com/NVIDIA/apex/

https://github.com/justheuristic/prefetch_generator

https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

https://pytorch.org/docs/stable/autograd.html

https://pytorch.org/docs/notes/cuda.html

https://zhuanlan.zhihu.com/p/61765561

https://pytorch.apachen.org/docs/1.7/64.html

https://zhidx.com/p/217999.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM