[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現


[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

0x00 摘要

GPipe是一個基於 Lingvo (Lingvo 是 Google 基於 TensorFlow 二次開發的重點針對序列模型的框架)開發的,支持超大規模模型的神經網絡訓練並行庫,本文介紹其基本功能和流水線機制。

0x01 概述

1.1 什么是GPipe

GPipe是一個基於 Lingvo (Lingvo 是 Google 基於 TensorFlow 二次開發的重點針對序列模型的框架 https://github.com/tensorflow/lingvo)開發的,支持超大規模模型的神經網絡訓練並行庫,其特點如下:

  • GPipe 把一個L層的網絡,切分成 K個 composite layers。每個composite layer 運行在單獨的TPU core上。
  • 這 K個 core composite layers只能順序執行,但是GPipe 引入了流水並行策略來緩解這個順序執行的性能問題,把 mini-batch細分為多個更小的macro-batch,提高並行程度。
  • GPipe 還用recomputation這個簡單有效的技巧來降低內存,進一步允許訓練更大的模型。

1.2 挑戰

深度學習框架本質上是一個基於張量(Tensor)之間的計算(Operator)表達式所組成的計算圖(Graph)編譯執行引擎,提供了一系列張量的定義、一元操作、二元操作等數學原語,並根據反向傳播算法(Back Propagation)進行梯度自動求導以及模型更新。在大量數據分批次流入計算圖進行模型訓練之后,使得模型學習到數據中的內在關聯關系,從而獲得對應場景中的“智能”感知與判斷能力。

DNN訓練的目標是在盡可能短的時間內獲得一個高精度的模型。這一目標可以通過兩個指標來實現:

  • 統計效率,達到預期准確度所需的歷元數;
  • 硬件效率,完成單個歷元所需的時間。達到期望精度水平的總訓練時間只是這兩個指標的乘積;

GPU最主要提供的是兩種資源:計算資源顯存帶寬資源。所以訓練大型模型有兩個基本挑戰:顯存效率計算效率

深度學習框架性能優化的最終目標是深度學習模型訓練最快,從而使得完成訓練的時間最短,節省模型訓練開發周期和用戶的時間成本。

0x02 並行機制

業界采用一些並行機制達到優化的目的。

2.1 機制分類與權衡

本節以下主要參考如下文章:

Efficient Large-Scale Language Model Training on GPU Clusters

DeepSpeed: Extreme-scale model training for everyone

[譯] DeepSpeed:所有人都能用的超大規模模型訓練工具

PipeDream: Fast and Efficient Pipeline Parallel DNN Training

在 "Efficient Large-Scale Language Model Training on GPU Clusters" 論文中, NVIDIA 介紹了分布式訓練超大規模模型的三種必須的並行技術:

  • 數據並行(Data Parallelism)
  • 模型並行(Tensor Model Parallelism)
  • 流水並行(Pipeline Model Parallelism)

2.1.1 數據並行

數據並行(Data Parallelism)是最常見的方法。其特點如下:

  • 模型在多個worker機器上復制,每個GPU都維護模型的完整副本。
  • 輸入數據集可以跨多個gpu進行分區。每批輸入的訓練數據都在數據並行的 worker 之間划分。每個worker處理訓練數據的一個子集。
  • 使用集合通信原語或參數服務器定期與其他GPU同步權重。
  • 反向傳播后需要通信並規約梯度,以保證優化器在各個 worker 上進行相同的更新。即,對單個worker計算的權重更新進行聚合,以獲得反映所有輸入更新的最終權重更新。
  • 每個聚合傳遞的數據量與模型的大小成比例。

數據並行性具有幾個明顯的優勢,包括計算效率高和實現起來工作量小,這使得數據並行訓練在一些流行的具有高計算通信比的模型上運行良好,但有幾個重要的趨勢威脅着它的有效性:

  • 顯存效率:數據並行會在所有 worker 之間進行模型和優化器的復制,因此顯存效率不高。
  • 計算效率:隨着我們提高並行度,每個 worker 執行的計算量是恆定的。數據並行可以在小規模上實現近乎線性擴展。但是,在 worker 之間規約梯度的通信開銷跟模型大小成正相關,所以當模型很大或通信帶寬很低時,計算效率會受限。而GPU計算能力的快速增長進一步將訓練的瓶頸轉移到跨模型的通信上。另外,參數同步的頻率影響統計和硬件效率。
  • 伸縮性:不斷增長的模型大小增加了每次聚合的通信量。事實上,一些廣泛使用的模型足夠大,使得通信超過了計算時間,限制了伸縮性並支配了總的訓練時間。而且,數據並行的 batch 大小隨 worker 數量提高,而我們往往無法在不影響收斂性的情況下一直增加 batch 大小。

2.1.2 模型並行

模型並行在傳統上用於訓練過程中太大而無法保存在工作者內存或緩存中的模型。其特點如下:

  • 模型並行涉及到將模型在worker之間進行划分,以便每個worker僅對模型參數的一個子集進行評估和更新。這樣就可以分為層間並行和層內模型並行。
  • 層間模型並行會在多個 worker 之間划分模型的各個層。
  • 層內模型並行把每層的模型參數切分到多個設備。層內模型並行在有的論文里被叫做 "Tensor 級別的模型並行" ,是對某一層(如 Linear/Dense Layer 里的 Variable )的模型 Tensor 切分,從而將大的模型 Tensor 分成多個相對較小的 Tensor 進行並行計算;
  • 層間值(激活和梯度)往往是需要跨機器通信的唯一參數。

就其本質而言,模型並行性的計算和通信因模型結構而異,因此在實現上有很大的工作量。

然而,即使模型並行能夠訓練非常大的模型,傳統的模型並行也會導致計算資源的嚴重利用率不足,因為它一次只主動使用一個worker(如果每個層被分配給一個worker),或者不能重疊計算和通信(如果每個層被分區)。

  • 顯存效率:模型並行DNN訓練導致GPU資源的嚴重利用不足。模型並行通過在模型並行 worker 之間划分激活顯存,會根據 worker 數量成比例地減少顯存使用量。至關重要的是,這是減少單個網絡層的激活顯存的唯一方法。
  • 計算效率:由於每次前向和反向傳播中都需要額外通信激活值,模型並行的計算效率很低。模型並行需要高通信帶寬,並且不能很好地擴展到通信帶寬受限的節點。此外,每個模型並行worker 都會減少每個通信階段之間執行的計算量,從而影響計算效率。模型並行性通常與數據並行性結合使用,以在內存和計算效率之間進行權衡。
  • 開發效率:跨多個GPU划分模型的負擔留給了程序員,即使對於最有經驗的機器學習實踐者來說,確定如何在工作者中最好地划分DNN模型也是一項具有挑戰性的任務,這往往會導致額外的效率低下。最近有些工作探索了如何使用增強學習來自動確定模型並行性的設備位置。不幸的是,這樣的在線決策技術是時間和資源密集型的;它們也不能無縫地結合流水線、數據並行和模型並行。

2.1.3 流水線並行

流水並行(Pipeline Model Parallelism)在有的論文里叫做流水線級別的模型並行,其特點是:

  • 將整個網絡分段(stage),不同段在不同的設備上,前后階段流水分批工作,通過一種“接力”的方式並行。
  • 流水線並行將模型的各層划分為可以並行處理的階段。當一個階段完成一個 micro-batch 的正向傳遞時,激活內存將被通信至流水線的下一個階段。類似地,當下一階段完成反向傳播時,將通過管道反向通信梯度。必須同時計算多個 micro-batch 以確保流水線的各個階段能並行計算。
  • 流水線並行訓練有可能在數據並行性困難時提供較高的DNN訓練性能。特別是,工作人員之間的通信可以限制在分配給不同工作人員的相鄰層之間的激活(在前向通道上)和梯度(后向)上。

但是流水線並行依然有一些問題:

  • 顯存效率:流水線並行減少的顯存與流水線的階段數成正比,使模型的大小可以隨 worker 的數量線性擴展。但是,流水線並行不會減少每一層的激活函數的顯存占用量。此外,每個 worker 必須存儲同時運行的各個 micro-batch 的激活值。這導致流水線第一階段的激活內存與單個 mirco batch 的總激活內存大致相同。
  • 計算效率:流水線並行具有最低的通信量,因為它的通信量只和在各階段邊界的各層的激活值大小成正比。但是,它不能無限擴展。像模型並行一樣,增加流水線大小會減少每個流水線階段的計算量,這會降低計算與通信的比率。如果要實現好的計算效率,流水線並行還要求其每個階段的計算負載完美的均衡。此外,流水線並行性會在每個 batch 的開始和結束時因為需要重新填充或排空流水線而產生 bubble overhead。
  • 開發效率:DNN的雙向性(正向傳遞后反向傳遞相同的層)使得流水線具有挑戰性,更重要的是,一個簡單的流水線機制引入了過時權重的最新計算,導致最終模型的精度低於數據並行訓練。

2.2 如何使用

給定一個特定的神經網絡模型和一批計算資源,從任務到設備之間的映射有多種方式,但不同的映射方案運行效率不同。哪種方案最優既取決於作業本身的特性,也取決於底層硬件的拓撲。

神經網絡由很多局部計算搭建組成,一般來說,同一個神經網絡的不同算子可能適合不同的並行模式。一個局部計算是采用數據並行,還是模型並行取決於這個局部任務的計算傳輸比。比如:

  • 某些算子(比如卷積) 運算參數量很小,但中間結果量大,為了較少傳輸量,所以最划算的方法是數據並行。即對數據進行切分,不同的設備處理不同的數據,在設備之間偶爾進行參數同步;
  • 某些算子,中間計算結果相對於參數量更少,就適合模型並行。
  • 還有一些算子,網絡參數量/中間計算結果都很大,可能采用流水並行(也就是接力的形式)是最優的。
  • 相比於一個算子只使用單一的並行模式,一個算子也可以同時使用多樣的並行模式可能進一步地減少傳輸量,譬如在隱藏層比較大的地方,就可能同時對數據矩陣切割以及對模型矩陣切割。

因此,對於每一個任務選擇最優的並行模式是一個非常復雜的問題,需要具體情況具體分析。

0x03 Pytorch 手動指定並行方式

目前已有的深度學習框架,大多數提供了對數據並行的原生支持,但是對模型並行支持的還不完善。如果用戶想要將模型參數分配到不同設備上,往往會遇到需要人工指定模型切分方式、手工編寫數據通信邏輯代碼等問題。

我們就看看 Pytorch 如何手動指定,主要摘錄(翻譯):

https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

3.1 基礎知識

PyTorch以Tensor為基本單元,更符合算法工程師寫Python腳本的直覺,以面向對象的方式進行模型搭建和訓練。對Tensor進行賦值、切片,就像numpy一樣易用。

PyTorch 是單卡視角,一個設備上的 Tensor、模型腳本跟另一個設備上的 Tensor、模型腳本並無直接關系,對於每個設備上的模型腳本都完全對稱的(Mirror)最簡單的數據並行來說,PyTorch 這樣的設計沒有什么明顯的缺陷。每個設備上的腳本運行到相同 batch 的模型更新部分(Optimizer),統一做一次模型同步(AllReduce 操作)就完成了數據並行,這就是 PyTorch 的 DDP(DistributedDataParallel)模塊。

但在分布式情況下想把一個Tensor切分到不同機器上,需要手動構建傳輸過程,相當於直接對物理編程,所以對分布式使用的門檻更高。

3.2 特點

PyTorch 模型並行將單個模型拆分到不同的GPU上,而不是在每個GPU上復制整個模型(具體來說,假設模型 m包含10層。如果使用 DataParallel,則每個GPU都具有這10層中每個層的副本,而如果在兩個GPU上使用模型並行時,每個GPU可以托管5層)。

模型並行的高級思想是將模型的不同子網絡放置在不同的設備上,並相應地實現該forward方法以跨設備移動中間輸出。由於模型的一部分只在任何單個設備上運行,因此一組設備可以共同服務於一個更大的模型。

3.3 基本用法

讓我們從包含兩個線性層的玩具模型(toy model)開始。要在兩個GPU上運行此模型,只需將每個線性層放在不同的GPU上,然后移動輸入(input)中間輸出(intermediate outputs)以匹配層設備(layer devices)

import torch
import torch.nn as nn
import torch.optim as optim

class ToyModel(nn.Module):
  def __init__(self):
    super(ToyModel, self).__init__()
    self.net1 = torch.nn.Linear(10, 10).to('cuda:0')  # 將net1放置在第1個GPU上
    self.relu = torch.nn.ReLU()
    self.net2 = torch.nn.Linear(10, 5).to('cuda:1')   # 將net2放置在第2個GPU上

  def forward(self, x):
    x = self.relu(self.net1(x.to('cuda:0')))
    return self.net2(x.to('cuda:1'))

請注意對於 ToyModel ,除了五個用於將線性層(linear layers)和張量(tensors)放置在適當的設備上的to(device)調用之外,以上內容與在單個GPU上實現該功能非常相似。這是模型中唯一需要更改地方(即to(device) )。 backward()torch.optim 會自動關注梯度(gradients),就好像模型是一個GPU一樣。調用損失函數時,只需確保標簽(label)與輸出(output)在同一設備(on the same device)上。

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.paraeters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1') # ToyMode 的 output 是在 'cuda:1' 上,此處的 label 也應該置於 'cuda:1' 上
loss_fn(outputs,labels).backward()
optimizer.step()

3.4 將模型並行化應用於現有模塊

只需更改幾行,就可以在多個GPU上運行現有的單GPU模塊。以下代碼顯示了如何分解 torchvision.models.reset50() 為兩個GPU。思想是從現有 ResNet模塊繼承,並在構建過程中將層拆分為兩個GPU。然后,覆蓋 forward方法來縫合兩個子網,通過相應地移動中間輸出。

from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000

class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__(
            Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to('cuda:0')  # 放置在第1個GPU上

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')  # 放置在第2個GPU上

        self.fc.to('cuda:1')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))

對於模型太大而無法放入單個GPU的情況,上述實現解決了該問題。但是,你可能已經注意到,如果模型合適,它(model parallel)將比在單個GPU上運行要慢。這是因為在任何時間點,兩個GPU中只有一個在工作,而另一個在那兒什么也沒做。在 layer2layer3之間,中間輸出需要從 cuda:0 復制到 cuda:1,這使得性能進一步惡化。

在整個執行過程中,兩個GPU中的一個會處於空閑狀態。為了解決這個問題,有一種選擇是將每個批次進一步划分為拆分流水線,以便當一個拆分到達第二子網時,可以將下一個拆分饋入第一子網。這樣,兩個連續的拆分可以在兩個GPU上同時運行。

3.5 通過流水線輸入(Pipelining Inputs)加速

在以下實驗中,我們將每批次 120-image 進一步划分為 20-image 。當PyTorch異步啟動CUDA操作時,該實現無需生成多個線程即可實現並發。

class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
     'mp_vs_rn_vs_pp.png')

請注意,設備到設備的張量復制操作在源設備和目標設備上的當前流(current streams)上同步。如果創建多個流,則必須確保復制操作正確同步。在完成復制操作之前寫入源張量或讀取/寫入目標張量可能導致不確定的行為。上面的實現僅在源設備和目標設備上都使用默認流,因此沒有必要強制執行其他同步。

0x04 關鍵技術

因為每個模型的並行策略候選集合是指數級的,純手工從中挑出一種合適的並行策略,需要耗費算法工程師大量的時間以及計算資源,而且算法工程師需要考慮的相關事宜太多,比如:如何分配內存,層之間如何交互,如何減少通信代價,分割的張量不能破壞原有數學模型,如何確定確定張量shape,如何確定輸入輸出等等。

所以自動並行技術(如何從框架層次自動解決並行策略選擇問題)成為一個研究熱點

自動並行通過建立代價模型來預測並挑選一個較優的並行策略(暫時無法保證是最優的策略,因為挑出最優的策略是個NP-Hard的問題),有希望將算法工程師從並行策略的選擇和配置中解放出來。

因此,目前分布式模型訓練有幾個必要並行技術:

  • 流水並行,尤其是如何自動設定流水;
  • 梯度累加;
  • 后向重計算;
  • 1F1B 策略(我們將采用PipeDream分析);

下面我們結合 Gpipe代碼看看這些技術如何使用。

0x05 基礎知識 & 支撐系統

5.1 Lingvo框架

5.1.1 核心組件

Lingvo 的核心組件如下:

  • Models : 一個model是一個抽象collection,包含一個或者多個tasks。Model相當於對於Task的一層wrapper。對於multi-tasks模型,Model將控制哪些variable將在Task之間共享,以及訓練時每個task如何采樣。
  • Tasks :一個task就是關於一個完整優化問題的描述,比如圖片分類或者語音識別。包含input generator。
  • Layers :一個Layer代表一個包含有可訓練參數的隨機函數。一個Layer可以包含其他的Layer作為孩子。Softmax, LSTM, Attension甚至一個task都是Layer的例子。
  • Params :此對象包含了模型的超參數。Layers,Tasks以及models都是通過Params中的specifications來構建的。 Params是層級的,一個對象的params中可以包含其child對象的params。
  • NestedMap :一個dictionary結構,用於傳遞數據。代碼中大部分代碼中的Python對象要么是Tensor的實例,要么就是BaseLayer或者NestedMap的一個子類。

5.1.2 模型定義

在Lingvo中,網絡是一個層的嵌套結構。Lingvo中的大多數類都是[Lingvo/core/base_layer.py] BaseLayer 的子類。

  • Params :用來配置類,定義了配置所需要的keys,這些keys在對象創建時都應該被定義。Params對象還可以包含用於配置子層的Params對象。每個layer類都會有一個params的classmethod,這個方法將會創建一個新的params對象,並且通過定義的keys來配置這個layer,同時為這些keys給出一些合理的默認值。

    Params對象中屬性包括:

    • cls: tParams對象關聯的python類。這可以用來構造類的實例;
    • name: 該層的名稱;
    • dtype: 創建變量時使用的默認數據類型。
  • __init__ constructor :所有子層和變量都應該在這里創建。

  • CreateVariable :創建變量的方法。每個Layer負責創建和管理它自己的variable。

  • CreateChild :創建子層的方法。

  • FProp : 所有的layers都有一個FProp() 函數, 實現該層的前向傳播,在計算的前向step時將會被調用。 因為可以在分布式訓練時在不同的設備上執行,出於性能的考慮,Lingvo通過theta參數訪問variable,而不是通過self.vars或者self.theta。

  • FPropMeta : 返回該層關於FProp計算的元數據。其中 meta.flops在得到一個輸入張量時給出估計的floating point operations數目。

對於模型算法的落地,有兩個指標特別重要:

  • 前向傳播時所需的計算力,它反應了對硬件如GPU性能要求的高低。
  • 參數個數,它反應所占內存大小。

接下來,我們需要看看如何計算模型訓練的內存大小,以及如何計算算力(后續流水線並行需要)。

5.2 計算內存

5.2.1 總體分析

我們主要參考了 ZeRO: Memory Optimization Towards Training A Trillion Parameter Models 這篇論文的思路。

在模型訓練期間,大部分內存被以下三種情況之一消耗:

  • i)激活。
  • ii)OGP狀態,即由優化器狀態、參數梯度和參數本身組成的張量。
  • iii)臨時緩沖區。

輸入數據所占用的顯存其實並不大,這是因為我們往往采用迭代器的方式讀取數據,這意味着我們其實並不是一次性的將所有數據讀入顯存,而這保證每次輸入所占用的顯存與整個網絡參數來比是微不足道的。

我們逐一分析。

5.2.2 激活函數

對於激活函數,有如下特點:

  • 激活函數額外消耗的顯存隨 batch 大小而增加,batch 設置為1的情況下,訓練萬億參數模型就會產生超過 1 TB 的激活函數用的顯存。
  • 業界已有方案如論文 Training deep nets with sublinear memory cost ,可以以33%的重新計算開銷為代價,幾乎可以消除激活所需的所有內存。這個技巧叫做 gradient checkpointing, 有時還叫做 rematerialization, re-forward。

5.2.3 OGP狀態

5.2.3.1 模型自身參數

模型自身的參數指的就是各個網絡層的 Weight 和Bias,這部分顯存在模型加載完成之后就會被占用。另外需要注意到的是,有些層是有參數的,如CNN, RNN; 而有些層是無參數的, 如激活層, 池化層等。

5.2.3.2 優化器參數

優化器參數指的是模型在優化過程即反向傳播中所產生的參數, 這部分參數主要指的就是 dw, 即梯度,在SGD中, 其大小與參數一樣, 因此在優化期間, 模型參數所占用的顯存會翻倍。

值得注意的是,不同的優化器其所需保存的優化參數不同, 對於 Adam, 由於其還需要保存其余參數, 模型的參數量會在優化區間翻 4 倍。

5.2.3.3 樣例

對於OGP狀態,讓我們以ADAM為例。使用ADAM對具有Ψ個參數的模型進行混合精度訓練。

  • 模型參數:參數的fp16副本,內存要求2Ψ字節。
  • 梯度 :梯度的fp16副本,內存要求2Ψ字節。
  • 優化器狀態:參數、Momentum和Variance的fp32副本,內存需求分別為4Ψ、4Ψ和4Ψ字節。讓我們用K來表示優化器狀態的內存乘數,也就是說,存儲它們所需的額外內存是KΨ字節。

總的來說,OGP狀態有2Ψ+2Ψ+KΨ=16Ψ字節(混合精度ADAM的K=12)的內存需求。

具體如下:

藍色是參數,橙色是梯度,綠色是優化器狀態。

在內存消耗公式中,Ψ表示模型大小(參數個數),K表示優化器狀態的內存乘數,Nd表示數據並行度。在這個例子中,我們假設模型大小為Ψ=75億,基於Adam優化器的混合精度訓練,數據並行度為Nd=64(即64個GPU),K=12。

對於一個擁有15億個參數的GPT-2這樣的模型,這導致了至少24gb的內存需求,遠遠高於單獨保存fp16參數所需的3gb內存。

5.2.4 臨時緩沖區

臨時緩沖區是用於存儲臨時結果的緩沖區,例如,對於參數為15億的GPT-2模型, fp32緩沖區將需要6GB的內存。

5.3 計算算力

5.3.1 背景知識

  • FLOPS:注意全大寫,是floating point operations per second的縮寫,意指每秒浮點運算次數,理解為計算速度。是一個衡量硬件性能的指標。
  • FLOPs:注意s小寫,是floating point operations的縮寫(s表復數),意指浮點運算數,理解為計算量。可以用來衡量算法/模型的復雜度。

前向傳播時所需的計算力就是由FLOPs體現,那么FLOPs該怎么計算呢?

我們知道,在一個模型進行前向傳播的時候,會進行卷積、池化、BatchNorm、Relu、Upsample等操作。這些操作的進行都會有其對應的計算力消耗產生,其中,卷積所對應的計算力消耗是所占比重最高的。所以,我們以卷積操作為例,看看卷積所對應的計算力。

推導過程:卷積層 wx + b 需要計算兩部分,首先考慮前半部分 wx 的計算量:

令 :

  • k 表示卷積核大小;
  • c 表示輸入 feature map 的數量;

則對於輸出 feature map 上的單個 Unit 有:

k * k * c 次乘法,以及 k * k * c - 1 次加法

如果輸出 feature map 的分辨率是 H * W ,且輸出 o 個 feature map,則輸出 feature map 包含 Unit的總數就是 H * W * o。

因此,該卷積層在計算 wx 時有:

k * k * c * H * W * o 次乘法          --(1)
(k * k * c - 1) * H * W * o 次加法    --(2)

再考慮偏置項 b 包含的計算量:

由於 b 只存在加法運算,輸出 feature map 上的每個 Unit 做一次偏置項加法。因此,該卷積層在計算偏置項時總共包含:

H * W * o 次加法      --(3)

將該卷積層的 wx 和 b 兩部分的計算次數累計起來就有:

式(1) 次乘法:

k * k * c * H * W * o 次乘法

式(2) + 式(3) 次加法:

(k * k * c - 1) * H * W * o  + H * W * o  = k * k * c * H * W * o

可見,式(2) + 式(3) = 式 (1)

因此,對於帶偏置項的卷積層,該層的計算力消耗 為:

k * k * c * H * W * o

5.3.2 lingvo中實現

在 Lingvo 之中,具體計算算力,就是通過每個類的 FPropMeta完成,這些方法都是每個類根據自己的特點實現。我們具體找幾個例子看看如何計算FLOPS。

Conv2DLayerNoPadding如下計算:

  @classmethod
  def FPropMeta(cls, p, inputs):
    py_utils.CheckShapes((inputs,))
    b, h, w, c = inputs
    fh, fw, ic, oc = p.filter_shape
    assert ic == c
    sh, sw = p.filter_stride
    if p.padding == 'SAME':
      oh = sympy.ceiling(h / sh)
      ow = sympy.ceiling(w / sw)
    else:
      oh = sympy.ceiling((h - fh + 1) / sh)
      ow = sympy.ceiling((w - fw + 1) / sw)
    flops = b * oh * ow * fh * fw * ic * oc * 2  # mul/add counts as 2 flop.
    outputs = tshape.Shape([b, oh, ow, oc])
    return py_utils.NestedMap(flops=flops, out_shapes=(outputs,))

DropoutLayer 如下計算:

  @classmethod
  def FPropMeta(cls, p, inputs, *args):
    py_utils.CheckShapes((inputs,))
    flops_per_element = 10  # Approximately 10 flops per element.
    return py_utils.NestedMap(
        flops=inputs.num_elements() * flops_per_element, out_shapes=(inputs,))

BatchNormLayer 的 FLOPS 如下計算。

  @classmethod
  def FPropMeta(cls, p, inputs, padding=None):
    py_utils.CheckShapes((inputs,))
    return py_utils.NestedMap(
        flops=inputs.num_elements() * _BN_FLOPS_PER_ELEMENT,
        out_shapes=(inputs,))

ActivationLayer 如下計算:

  @classmethod
  def FPropMeta(cls, p, inputs):
    py_utils.CheckShapes((inputs,))
    return py_utils.NestedMap(
        flops=inputs.num_elements() * GetFlops(p.activation),
        out_shapes=(inputs,))

0x06 流水線

6.1 背景知識

6.1.1 問題點

通信問題

無論是數據並行還是模型並行,都會在相應的機器之間進行全連接的通信,當機器數量增大時,通信開銷和時延會大到難以忍受。

比如參數服務器使用中,三段式流程如下:Pull weight ---> Compute new weight ---> Push new weight

如果三段式流程串行的進行通信和計算,無論這個通信是快是慢,這個時間開銷都會導致在分布式環境下每個iteration的時間比單機版要長(Ethernet的帶寬高低或者latency大小只會影響這個時間的長短,但並不能把這個時間降到零)。所以,把通信和計算重疊(overlap)起來以便 “掩蓋” 通信時間幾乎是一個必須的步驟。如何重疊計算和傳輸從而提高設備利用率就非常有挑戰。

無法充分利用

原則上我們可以通過並行計算在GPU或者TPU上訓練更大的DNN模型。但是由於DNN的順序性,這種方法可能導致在計算期間只有一個加速器處於活動狀態,不能充分利用設備的計算能力

6.1.2 如何設計系統

回到神經網絡的訓練過程上,怎么設計系統來重疊計算和通信?

在后向傳播之中有兩個特點可以利用:

  • 首先,神經網絡的計算是一層接着一層完成的,不管是前向還是后向傳播,算完本層才能算下一層;
  • 另一方面,在后向傳播的過程中,一旦后一層拿到前一層的輸入,這一層的計算就不再依賴於前一層了。

因此,根據這個特點,人們引入了流水線並行。

數據並行與模型並行都是讓設備執行同一個層次的計算,流水並行則是把任務划分為幾個有明確先后順序的階段,把不同的階段分給不同的計算設備,使得單設備只負責網絡中部分層的計算。模型網絡分布在各個設備上是非對稱的,各個設備“接力”執行網絡的一部分。每個階段(stage) 和下一個階段之間僅有相鄰的某一個 Tensor 數據需要傳輸,每台機器的數據傳輸量跟總的網絡大小、機器總數、並行規模無關。

在這種多設備接力完成一個網絡計算的模式下,可以支持更大的模型或者支持更大的 Batch Size。如果通信和計算重疊(overlap)好,又可以解決了機器之間的通信開銷的問題,

總的來說,流水並行在通常大模型訓練情況下具有優勢。流水並行的數據傳輸量少,僅為階段之間需要傳輸的數據量之和,不像數據並行與模型並行那樣大,傳輸量與整個計算圖都有關,因此對於帶寬較小的機器,會趨於使用流水並行。但某些情況下,流水並行與模型並行的結合則會優於單一的模型並行與流水並行。同時,在數據並行與模型並行中也存在計算時間掩蓋傳輸時間的優化。

6.2 Gpipe流水線綜述

6.2.1 關鍵要點

從概念上講,GPipe是一個分布式機器學習庫,它使用同步隨機梯度下降和流水線並行的方式進行訓練,適用於任何由多個有序的層組成的深度神經網絡(Deep Neural Networks, DNN)。 Gpipe通過跨不同的加速器來分割模型,並自動將一小批訓練示例划分成更小的批量。 該模型允許GPipe的加速器進行並行操作,最大限度地提高了訓練過程中的可擴展性。

GPipe 有幾個關鍵要點:

  • Network partition(網絡分片):將一個N層的網絡划分成K個partition, 每個partition在單獨的TPU上執行,partition之間需要插入一些網絡通信操作。

  • Pipeline parallelism(流水線並行):把CPU里的流水線並發技術用在了深度學習上,主要是把計算和網絡通信兩種操作,更好地重排列。即自動將mini-batch的訓練樣本分成更小的micro-batch,並在pipeline中運行,使TPU核心能夠並行操作。

  • Gradient Accumulation(梯度累積) :梯度一直在micro-batch中累積,因此分區數量不會影響模型質量。

  • Re-Materialization(重計算) :Re-Materialization具體是指在前向計算過程中,GPipe只記錄stage划分處的輸出,在計算梯度時,GPipe會重新執行前向計算邏輯,從而得到各個算子的前向結果,然后再計算梯度結果。跟OpenAI開源的gradient-checkpointing一樣,只不過GPipe是在TPU上實現的,OpenAI的只能運行在GPU上。

6.2.2 圖示

  • 下圖左端是原始模型。
  • 右端說明了具有多個有序層的神經網絡的GPipe模型被划分到了四個加速器上。 Fk是kth分區的復合正向計算函數。 Bk是其相對應的反向傳播函數。 Bk依賴於來自上層的Bk+1和Fk的中間激活函數。
    • 上面的模型中,我們可以看到網絡的順序性是如何導致資源利用不足的。
    • 下面的模型則顯示了GPipe方法,在該方法中,輸入的小批量示例被划分為更小的微批量,這些微批量可以由加速器同時處理。

6.2.3 問題

我們根據論文思路,提出了幾個問題,爭取以后按圖索驥。

  • 如何划分 stage?
    • 將模型划分為連續的幾個stage,每個stage各自對應一個設備。這樣就使得模型的大小可以突破單個設備內存的大小,因為一台設備只需要能夠容納部分模型的參數和計算;
    • 因為划分了stage,所以整個系統中,處理最慢的stage會成為瓶頸。所以應該平均分配算力
  • 依據什么分割做流水?
    • 如何將mini-batch進一步划分成更小的micro-batch,同時利用pipipline方案,每次處理一個micro-batch的數據,得到結果后,將該micro-batch的結果發送給下游設備,同時開始處理后一個 micro-batch的數據,通過這套方案減小設備中的Bubble(設備空閑的時間稱為 Bubble)。
  • 如何做Re-Materialization?

6.3 依據算力分割Stage

神經網絡有一個特點:對不同的輸入,其運行時間相差不大,因此可以預估其算力,時間,參數大小等等。Gpipe就是依據算力對圖進行了分割,從而把不同層分配到不同的設備上。

6.3.1 PartitionSequentialLayers

PartitionSequentialLayers 把一個包括sequential layers的層分解,目的是讓每一個partition都大致擁有同樣的 flops。最終目的是讓每個 GPU 都擁有盡量同樣的算力

  • 輸入是:一個layer param 或者 一個 layer param 列表;
  • 輸出是:一個 FeatureExtractionLayer params 列表;

邏輯是:

  • 如果params只是一個layer,那么就把這個layer構建成一個包含sub-layers的列表 subs;
  • 利用 FPropMeta 計算出來這個 subs 列表的shapes和總flops,賦值給了 histo;
  • 利用 histo 計算出來一個層代價(layer's cost)的歸一化累積直方圖
  • 構建一個parts變量;
    • 該變量是一個num_partitions大小的數組;數組每個item也是一個數組;
    • 依據直方圖把subs分到parts中的每個item之中,這樣每個parts[i]都擁有部分layers,一些算力小的算子被合並到一個 part 之中,目的是讓最終 parts 每個item的算力盡量相同
  • 把parts轉換成一個 FeatureExtractionLayer param 列表;
def PartitionSequentialLayers(params, num_partitions, *shapes):
  r"""Partition a layer composed of sequential layers.

  This routine strives to partition layers so that each partition costs roughly
  the same flops given the input shapes.

  Args:
    params: A layer param or a list of layer param.
    num_partitions: The desired number of partitions.
    *shapes: A tuple of tshape.Shape representing input tensors to the first
      layer.

  Returns:
    A list of FeatureExtractionLayer params.
  """

  # Recursively concatenate SequentialLayer into a list.
  # SequentialLayer 是一個層,其作用是把若干層按順序連接起來
  def FlattenSeq(p):
    if isinstance(p, list): # 已經是列表則返回
      return p
    if p.cls not in [builder_layers.SequentialLayer, FeatureExtractionLayer]:
      return [p.Copy()]
    subs = []
    for _ in range(p.repeat): # 把p包含的所有層都組裝成一個層列表
      for s in p.sub:
        subs += FlattenSeq(s)
    return subs

  # 如果params是一個layer,那么就依據這個layer,構建一個包含sub-layers的新列表subs,如果是列表則直接返回
  subs = FlattenSeq(params)

  assert len(shapes) == 1
  tf.logging.info('num_partitions: {} input_shape: {}'.format(
      num_partitions, shapes[0]))

  # 利用 FPropMeta 計算出來這個 subs 列表的shapes和總flops,賦值給了 histo
  # Computes the estimate cost for each sub layer.
  # 假設有7個sub-layers,其flops分別是 10,40,30,10,20,50,10
  total, histo, output_shapes = 0, [], []
  for i, s in enumerate(subs):
    s.name = 'cell_%03d' % i
    meta = s.cls.FPropMeta(s, *shapes) # 
    total += meta.flops
    histo.append(total)
    output_shapes.append(meta.out_shapes)
    shapes = meta.out_shapes
  tf.logging.vlog(1, 'len %d histogram = %s', len(subs), histo)
  # 則對應的histo 為:[10,50,80,90,110,160, 170],total為170

  # 利用 histo 計算出來一個層代價(layer's cost)的歸一化累積直方圖
  # Computes the normalized cumulative histogram of the layer's cost.
  histo_pct = [float(x / total) for x in histo]
  tf.logging.vlog(1, 'cost pct = %s', histo_pct)
  # histo_pct 為 [1/17,5/17,8/17,9/17,11/17,16/17, 1], 
  # 假設 num_partitions = 3

  # 構建一個parts變量,該變量是一個num_partitions大小的數組,數組每個item也是一個數組
  # 依據直方圖把subs分到parts中的每個item之中,這樣每個parts[i]都擁有部分layers,目的是讓最終 parts 每個item的算力盡量相同
  # i-th sub layer is put into partition j, where j is roughly i-th cumulative
  # histogram times num_partitions.

  parts = [[] for _ in range(num_partitions)]
  parts_cost = [0] * num_partitions
  pre_hist_cost = 0
  for i, s in enumerate(subs):
    # 從histogram數組中找出s對應cost的index,j也就是s對應的partition
    # 對於i,s,則 histo_pct[i] * num_partitions 分別為: [3/17, 15/17, 24/17, 27/17, 33/17, 48/17,3],j分別為[0,0,1,1,1,2,2]
    j = min(int(histo_pct[i] * num_partitions), num_partitions - 1)
    # The boundary at parts[j] where j > 0
    if j > 0 and not parts[j]:
      parts_cost[j - 1] = histo_pct[i - 1] - pre_hist_cost
      pre_hist_cost = histo_pct[i - 1]
    parts[j].append(s) # 把s加入到對應的partition
    # 三個桶內容分別為:[1,2],[3,4,5],[6,7]
    # 對應每個桶的flops為: [60,280,330]
    
  # 把parts轉換成一個 FeatureExtractionLayer 列表
  parts_cost[num_partitions - 1] = 1.0 - pre_hist_cost
  seqs = []
  for i, pa in enumerate(parts):
    tf.logging.info('Partition %d #subs %d #cost %.3f', i, len(pa),
                         parts_cost[i])
    seqs.append(FeatureExtractionLayer.Params().Set(name='d%d' % i, sub=pa))
  return seqs

6.3.2 FeatureExtractionLayer

上面代碼中使用了FeatureExtractionLayer,其功能就是返回一個層的序列。

FeatureExtractionLayer 從一個層序列中提取特征,具體特點是:

  • 把一些層連接成一個序列;
  • 可以 得到&傳遞 激活點;
class FeatureExtractionLayer(base_layer.BaseLayer):
  """A layer that extrac features from a sequence of layers.

  FeatureExtractionLayer is a layer which connects a few layers in a sequence.
  It is also capable of fetching and forwarding activation endpoints.
  # TODO(huangyp): Make it a sublayer of builder_layers.SequentialLayer
  """

  @classmethod
  def Params(cls):
    p = super().Params()
    p.Define('variable_name_prefix', '',
             'Prefix for variable names in sub layers')
    p.Define('sub', [], 'A list of layers\' params.')
    p.Define('num_act_inputs', 0, 'Number of activation inputs.')
    p.Define('num_act_outputs', 0, 'Number of activation outputs.')
    p.Define('act_fetch_layers', [],
             'Names of fetch layers that cached extra activations')
    return p

  def __init__(self, params):
    super().__init__(params)
    p = self.params
    assert p.num_act_inputs >= 0
    assert p.num_act_outputs >= 0
    p.act_fetch_layers = p.act_fetch_layers or []
    assert p.num_act_outputs == p.num_act_inputs + len(p.act_fetch_layers)
    self._seq = []
    for sub in p.sub:
      assert sub.name
      sub.name = p.variable_name_prefix + sub.name
      self.CreateChild(sub.name, sub)
      self._seq.append((sub.name, self.children[sub.name])) # 把一些層連接成一個序列

  def FProp(self, theta, *args): # 實現該層的前向傳播,在計算的前向step時將會被調用
    p = self.params
    assert len(args) > p.num_act_inputs
    out_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
    extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
    for (name, ch) in self._seq:
      th = theta[name]
      out_args = _ToTuple(out_args)
      out_args = ch.FProp(th, *out_args)
    # Append fetched activations to fprop outputs.
    for fetch_layer in p.act_fetch_layers:
      assert fetch_layer in self.children
      activation = self.children[fetch_layer].activation # 子層激活點
      if isinstance(activation, (tuple, list)):
        activation = activation[0] # 如果是list,得到相應激活點
      extra_args += (activation,) # 把激活點添加進來
    if extra_args:
      out_args = _ToTuple(out_args) + extra_args # 最終返回所有激活點
    return out_args

  @classmethod
  def FPropMeta(cls, p, *args): # 返回該層關於`FProp`計算的元數據
    assert len(args) > p.num_act_inputs
    seq_args = args[:-p.num_act_inputs] if p.num_act_inputs > 0 else args
    extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else ()
    total = 0
    act_fetch_metas = {}
    for sub in p.sub:
      meta = sub.cls.FPropMeta(sub, *seq_args)
      if sub.name in p.act_fetch_layers:
        act_fetch_metas[sub.name] = meta.out_shapes[0]
      total += meta.flops
      seq_args = meta.out_shapes
    for fetch_layer in p.act_fetch_layers:
      extra_args += (act_fetch_metas[fetch_layer],)
    return py_utils.NestedMap(flops=total, out_shapes=seq_args + extra_args)

計算過程如下圖,里面具體數值請參見上面幾段代碼之中的舉例:

  +--------------+   +--------------+   +---------------+
  |              |   |              |   |               |
  |  sub-layer 1 |   |  sub-layer 2 |   |  sub-layer n  |
  |              |   |              |   |               |
  +-------+------+   +-------+------+   +--------+------+
          |                  |                   |
          |FPropMeta         |FPropMeta          |FPropMeta
          |                  |                   |
          v                  v                   v
       flops 1            flops 2             flops n
          +                  +                   +
          |                  |                   |
          |                  |                   |
          +--------------------------------------+
                             |
                             |
                             v
                  for i, s in enumerate(subs):
                     total += meta.flops
                     histo.append(total)
                  histo=[10,50,80,90,110,160,170]
                             +
                             |
                             |
                             v
Computes the normalized cumulative histogram of the layer's cost
        histo_pct = [float(x / total) for x in histo]
       histo_pct=[1/17,5/17,8/17,9/17,11/17,16/17,1]
                             +
                             |
                             |
                             +
           Assign layers to partition based on histogram
                   [1,2],[3,4,5],[6,7]
                             +
                             |
                             |
                             v
      +----------------------+----------------------------+
      | parts                                             |
      |                                                   |
      | +--------------+  +------------+  +-------------+ |
      | | sub-layer 1  |  |sub-layer 3 |  | sub-layer 6 | |
      | |              |  |            |  |             | |
      | | sub-layer 2  |  |sub-layer 4 |  | sub-layer 7 | |
      | |              |  |            |  |             | |
      | |              |  |sub-layer 5 |  |             | |
      | +--------------+  +------------+  +-------------+ |
      +---------------------------------------------------+

6.4 流水線分配

6.4.1 基礎類 SeqLayer

該層的目的是:用 Round-robin 策略把 cell_tpl之中的每個children cell 部署到 工作設備之上

Params 包括:

  • before_tpl :配置在流水線之前運行的CNN層;
  • cell_tpl :FeatureExtractionLayer 列表;
6.4.1.1 初始化

初始化函數的邏輯是:

  • 遍歷before_tpl,對於每個item調用CreateChild構建其子層,把item添加到 _before_layers 之中;
  • 遍歷cell_tpl,對於每個item調用CreateChild構建其子層,把item添加到 _cells 之中;
  def __init__(self, params):
    super().__init__(params)
    p = self.params
    self._before_layers = []
    self._cells = []
    # 遍歷before_tpl,對於每個item調用CreateChild構建其子層,把item添加到 _before_layers 之中
    for l in p.before_tpl:
      self.CreateChild(l.name, l)
      self._before_layers.append((l.name, self.children[l.name]))
    # 遍歷cell_tpl,對於每個item調用CreateChild構建其子層,把item添加到 _cells 之中  
    for l in p.cell_tpl:
      self.CreateChild(l.name, l)
      self._cells.append((l.name, self.children[l.name]))
6.4.1.2 _CreateChildrenVariables

構建變量。邏輯如下:

  • 如果使用 tpu,則
    • 利用 cluster.WorkerDeviceInModelSplit(0) 來構建 before_tpl_device,即用集群的第一個設備作為 before_tpl_device;
    • 遍歷集群的其他設備,分配給cell_devices;
  • 遍歷 _before_layers,把其中每個變量部署在 before_tpl_device;
  • 遍歷 _cells,把其中每個變量部署在 cell_devices;
  def _CreateChildrenVariables(self):
    p = self.params

    num_cells = len(p.cell_tpl)
    before_tpl_device = ''
    cell_devices = [''] * num_cells
    if py_utils.use_tpu(): # 如果使用 tpu
      # 利用 `cluster.WorkerDeviceInModelSplit(0)` 來構建 before_tpl_device,即用集群的第一個設備作為 before_tpl_device
      cluster = self.cluster
      before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
      # 遍歷集群的其他設備,分配給cell_devices
      cell_devices = [
          cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
      ]

    # 遍歷 _before_layers,把其中每個變量部署在 before_tpl_device
    for unused_name, l in self._before_layers:
      with tf.device(before_tpl_device):
        l.InstantiateVariables()

    # 遍歷 _cells,把其中每個變量部署在 cell_devices
    for i, (unused_name, l) in enumerate(self._cells):
      with tf.device(cell_devices[i]):
        l.InstantiateVariables()

    super()._CreateChildrenVariables()
6.4.1.3 FProp

前向傳播代碼,具體邏輯如下:

  • 遍歷 _before_layers,對於其中每層調用其FProp;
  • 遍歷 _cells,對於其中每層,在cluster.WorkerDeviceInModelSplit(i)之上調用其FProp;
  def FProp(self, theta, *args):
    """Round-robin every children cells in cell_tpl among worker devices.

    Args:
      theta: A NestedMap object containing weights' values of this layer and its
        children layers.
      *args: Input args

    Returns:
      A list contains one tensor of [batch_size, feature_height, feature_width,
        channel].
    """
    num_layers = len(self.params.cell_tpl)
    cluster = self.cluster

    # 遍歷 _before_layers,對於其中每層調用其FProp	
    for (name, l) in self._before_layers:
      l_theta = theta[name]
      args = _ToTuple(args)
      args = l.FProp(l_theta, *args)
    # 遍歷 _cells,對於其中每層,在`cluster.WorkerDeviceInModelSplit(i)`之上調用其FProp  
    for i in range(num_layers):
      with tf.device(cluster.WorkerDeviceInModelSplit(i)):
        cell_name, cell = self._cells[i]
        args = _ToTuple(args)
        args = cell.FProp(theta[cell_name], *args)

    return args
6.4.1.4 具體實現

SeqLayer 全部代碼如下:

class SeqLayer(base_layer.BaseLayer):
  """Round-robin every children cells in cell_tpl among worker devices."""

  @classmethod
  def Params(cls):
    p = super().Params()
    p.Define('before_tpl', [],
             'Config for the CNN layers that runs before pipelining.')
    p.Define('cell_tpl', [], 'A list of FeatureExtractionLayer layers.')
    return p

  def __init__(self, params):
    super().__init__(params)
    p = self.params
    self._before_layers = []
    self._cells = []
    for l in p.before_tpl:
      self.CreateChild(l.name, l)
      self._before_layers.append((l.name, self.children[l.name]))
    for l in p.cell_tpl:
      self.CreateChild(l.name, l)
      self._cells.append((l.name, self.children[l.name]))

  def _CreateChildrenVariables(self):
    p = self.params

    num_cells = len(p.cell_tpl)
    before_tpl_device = ''
    cell_devices = [''] * num_cells
    if py_utils.use_tpu():
      cluster = self.cluster
      before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
      cell_devices = [
          cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
      ]

    for unused_name, l in self._before_layers:
      with tf.device(before_tpl_device):
        l.InstantiateVariables()

    for i, (unused_name, l) in enumerate(self._cells):
      with tf.device(cell_devices[i]):
        l.InstantiateVariables()

    super()._CreateChildrenVariables()

  def FProp(self, theta, *args):
    """Round-robin every children cells in cell_tpl among worker devices.

    Args:
      theta: A NestedMap object containing weights' values of this layer and its
        children layers.
      *args: Input args

    Returns:
      A list contains one tensor of [batch_size, feature_height, feature_width,
        channel].
    """
    num_layers = len(self.params.cell_tpl)
    cluster = self.cluster

    for (name, l) in self._before_layers:
      l_theta = theta[name]
      args = _ToTuple(args)
      args = l.FProp(l_theta, *args)
    for i in range(num_layers):
      with tf.device(cluster.WorkerDeviceInModelSplit(i)):
        cell_name, cell = self._cells[i]
        args = _ToTuple(args)
        args = cell.FProp(theta[cell_name], *args)

    return args

6.4.2 具體分配 PipeliningLayer

PipeliningLayer 是 SeqLayer 的派生類

  • 在流水線最前面是device[0],負責處理前置條件。
  • 流水線中間是一系列 device,負責處理具體的 micro batches。
  • 流水線最后是 device[-1],負責整理形狀,最后輸出一個最終張量。
6.4.2.1 得到中間層輸出形狀

_CalculateOutputShapes 計算出中間層的output shape。具體邏輯如下:

  • 遍歷 _before_layers,對其中每層調用其FPropMeta,得到 output shapes,插入 state_shapes 數組之中;
  • 遍歷 _cells,對其中每層調用其FPropMeta,得到 output shapes,插入 state_shapes 數組之中;
  def _CalculateOutputShapes(self, input_shapes):
    """Calcuate the output shape of intermediate layers.

    Given the FPropMeta function in each FeatureExtractionLayer, calcuates
    the shapes of outputs of that layer. This is used to recover the shape
    information in StackedRecurrent.

    Args:
      input_shapes: NestedMap or tuple of input TensorShapes.

    Returns:
      Return a list of K + 1 NestedMaps or lists of tShape where K is
      the number of partitions.
    """
    p = self.params
    shapes = []

    # Converts TensorShape to tshape.Shape.
    def _ToTShape(x):
      if x is None:
        return None
      return tshape.Shape(x.as_list())

    shapes = py_utils.Transform(_ToTShape, input_shapes)
    shapes = _ToTuple(shapes)

    state_shapes = []
    # 遍歷_before_layers,對其中每層調用其FPropMeta,得到 output shapes,插入 state_shapes 數組之中
    for (_, cell) in self._before_layers:
      shapes = cell.FPropMeta(cell.params, *shapes).out_shapes

    state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)

    # 遍歷 _cells,對其中每層調用其FPropMeta,得到 output shapes,插入 state_shapes 數組之中
    for (_, cell) in self._cells:
      shapes = cell.FPropMeta(cell.params, *shapes).out_shapes
      state_shapes.append(shapes[0] if p.nested_map_fprop else shapes)

    return state_shapes
6.4.2.2 得到數據類型

_get_state_dtype 的作用是得到數據類型。

  def _get_state_dtype(self, *args):
    if self.params.state_dtype:
      return self.params.state_dtype
    if self.params.nested_map_fprop:
      inputs = args[0].Filter(lambda x: x is not None)
      return py_utils.Flatten(inputs)[0].dtype
    return args[0].dtype
6.4.2.3 得到輸入形狀

Gpipe 會首先將一個小批量的訓練樣本(mini-batch)分割成更小的小批量(micro-batches),然后將每組小批量的執行通過管道傳送到單元上。

_get_input_shapes作用是得到輸入的shapes,具體邏輯如下:

  • 從 args 得到輸入 input_tensors;
  • 遍歷 input_tensors,找出第一個不為空的張量,獲取這個張量的 batch size,賦給 mini_batch_size;
  • 從參數中得到 micro_batch_size,設置到 micro_batch_size;
  • 如果 micro_batch_size 沒有意義,則:
    • 如果 p.num_micro_batches 大於 mini_batch_size,則 p.num_micro_batches 為 mini_batch_size;
    • 把 micro_batch_size 設置為 mini_batch_size // p.num_micro_batches;
  • 建立一個 input_shapes 集合,遍歷 input_tensors,對於每個張量,得到其shapes列表 input_shape,並且設置 input_shape 的 batch_dim 為 micro_batch_size;
  • 如果設置了 p.nested_map_fprop,則把 input_shapes 構建成一個遞歸嵌套的結構;
  • 返回 input_shapes;
  def _get_input_shapes(self, *args):
    p = self.params
    if p.nested_map_fprop:
      assert len(args) == 1
      assert isinstance(args[0], py_utils.NestedMap)
      input_tensors = py_utils.Flatten(args[0])
    else:
      input_tensors = _ToTuple(args)
    
    # 遍歷 input_tensors,找出第一個不為空的張量,獲取這個張量的 batch size,賦給 mini_batch_size
    # Get batch size from the first tensor which is not None.
    mini_batch_size = None
    for input_tensor in input_tensors:
      if input_tensor is not None:
        mini_batch_size = input_tensor.get_shape().as_list()[p.batch_dim]
    assert mini_batch_size is not None
    micro_batch_size = p.micro_batch_size
    
    if not micro_batch_size: # 如果 micro_batch_size 沒有意義
      # 如果 p.num_micro_batches 大於 mini_batch_size,則 p.num_micro_batches 為 mini_batch_size
      if p.num_micro_batches > mini_batch_size:
        p.num_micro_batches = mini_batch_size
      # 把 micro_batch_size 設置為 mini_batch_size // p.num_micro_batches  
      micro_batch_size = mini_batch_size // p.num_micro_batches
    if mini_batch_size is not None:
      if micro_batch_size * p.num_micro_batches != mini_batch_size:
        raise ValueError('micro_batch_size * num_micro_batches != batch_size.')

    # 遍歷 input_tensors,對於每個張量,得到其shapes列表 input_shape,並且設置 input_shape 的 batch_dim 為 micro_batch_size
    input_shapes = ()
    for input_tensor in input_tensors:
      if input_tensor is not None:
        input_shape = input_tensor.get_shape().as_list()
        input_shape[p.batch_dim] = micro_batch_size
        input_shapes += (tf.TensorShape(input_shape),)
      else:
        input_shapes += (None,)

    # 如果設置了 p.nested_map_fprop,則把 input_shapes 構建成一個遞歸嵌套的結構    
    if p.nested_map_fprop:
      input_shapes = py_utils.Pack(args[0], input_shapes)
    return input_shapes
6.4.2.4 FProp

前向傳播函數,用流水線方式在多個設備上運行多個 children cells。具體邏輯如下:

  • 做一些准備工作,比如:
    • 得到 children cell個數;
    • 得到集群;
    • 得到 輸入shapes,dtypes;
    • 利用 輸入shapes 計算出 輸出shapes;
  • 遍歷處理中間層:
    • 對於具體cell,把cell加入到累積層中,對於每個cell,得到對應的function;
    • 為后續的 StackedRecurrent 運行設置其初始狀態;
    • 把cell_idx對應的設備加入到devices列表;
  • 為流水線中間(去除頭尾)的各個設備設定一些變量;
  • 在第一個設備上執行如下操作:
    • 遍歷_before_layers,運行每層的FProp,最終得到 previous;
    • 對於 previous 繼續操作,構建出 inputs,即利用_StackAndSplit分割張量;
    • 為流水線后續設備設置其輸入;
  • 在中間設備上執行recurrent.StackedRecurrent操作 ;
  • 在最后一個設備上把micro_batches的形狀聚合,最終得到輸出張量:
    • 如果嵌套,則返回最后一個形狀;
    • 否則遍歷輸出,聚合各個輸出的形狀;
  def FProp(self, theta, *args):
    """Run multiple cells in different devices in a pipelining manner.

    Args:
      theta: A NestedMap object containing weights' values of this layer and its
        children layers.
      *args: Non-keyworded variable length argument list of input tensors.

    Returns:
      A list of output tensors
    """
    # TODO(huangyp): handle optional None inputs.
    p = self.params
    if self.do_eval and self.cluster.num_devices_per_split == 1: # 如果設置了 do_eval 並且集群的 num_devices_per_split 為 1
      outputs = copy.copy(args)
      # 就直接串行執行
      for (name, l) in self._before_layers + self._cells:
        outputs = _ToTuple(outputs)
        outputs = l.FProp(theta[name], *outputs)
      return outputs

    num_cells = len(p.cell_tpl) # 得到 children cell個數
    cluster = self.cluster # 得到集群

    # Compute shapes of input and output tensors.
    # 得到 輸入shapes,dtypes
    input_shapes = self._get_input_shapes(*args)
    state_dtype = self._get_state_dtype(*args)
    # 利用 輸入shapes 計算出 輸出shapes
    state_shapes = self._CalculateOutputShapes(input_shapes)
    tf.logging.info('state_shapes={}'.format(state_shapes))

    def GetCellFn(i): # 對於第 i 個層,返回一個對應的函數,這個函數將在 StackedRecurrent 內部執行
      """Get the ith feature extraction layer."""

      def CellFn(theta, state0, inputs):
        """A cell fn is exectued inside of StackedRecurrent."""
        # 沒有深入研究StackedRecurrent,只從此函數看,作用是利用cell.FProp計算輸出,並且得到一個state,其中包括輸出和micro batch tensor
        del state0

        def _FPropInputSetShape(name, t_shape): # 給輸入設置shape
          if t_shape is None:
            return None
          inputs[name].set_shape(t_shape.ToTensorShape().as_list())
          return inputs[name]

        if p.nested_map_fprop:
          # pylint: disable=protected-access
          fprop_inputs = state_shapes[i]._RecursiveMap(_FPropInputSetShape)
          # pylint: enable=protected-access
        else:
          fprop_inputs = []
          for input_idx, input_shape in enumerate(state_shapes[i]):
            name = 's{}'.format(input_idx)
            fprop_inputs.append(_FPropInputSetShape(name, input_shape))

        with py_utils.RemoveAssertContext(remove=True):
          with CellFnFPropOpReplacementWrapper():
            tf.logging.info('cell {} input {}'.format(i, fprop_inputs))
            mb_tensor = inputs[_MICRO_BATCH_STATE_NAME] # 得到輸入的 micro batch tensor
            SetOverWriteGlobalStep(mb_tensor)
            _, cell = self._cells[i]
            fprop_inputs = _ToTuple(fprop_inputs)
            outputs = cell.FProp(theta, *fprop_inputs) # 計算輸出

        if p.nested_map_fprop:
          assert py_utils.IsCompatible(outputs, state_shapes[i + 1])
          state1 = outputs.Filter(lambda x: x is not None)
        else:
          state1 = py_utils.NestedMap()
          outputs = _ToTuple(outputs)
          assert len(outputs) == len(state_shapes[i + 1])
          for output_idx in range(len(outputs)):
            if outputs[output_idx] is not None:
              name = 's{}'.format(output_idx)
              state1[name] = outputs[output_idx]
        state1[_MICRO_BATCH_STATE_NAME] = mb_tensor
        return state1, py_utils.NestedMap()

      return CellFn

    cell_fns = []
    accumulator_layers = [] # 為了梯度累積
    thetas = []
    init_states = []
    devices = []
    # 遍歷,把cell_idx對應的設備加入到devices列表
    for cell_idx in range(num_cells): # 遍歷 children cell
      cell_name, cell = self._cells[cell_idx] # 得到具體一個 cell
      accumulator_layers.append(cell) # 把cell加入到累積層中
      cell_fns.append(GetCellFn(cell_idx)) # 對於每個cell,得到對應的function
      thetas.append(theta[cell_name]) # 添加 theta

      # 返回一個帶有形狀t_shape的,類型為state_dtype的張量,並且所有元素都設為零.
      def _TfZeros(t_shape):
        if t_shape is None:
          return None
        return tf.zeros(t_shape.ToTensorShape().as_list(), dtype=state_dtype)

      # 為后續的 StackedRecurrent 運行設置其初始狀態
      if p.nested_map_fprop:
        init_state = py_utils.Transform(_TfZeros, state_shapes[cell_idx + 1])
        init_state = init_state.Filter(lambda x: x is not None)
      else:
        init_state = py_utils.NestedMap()
        for output_idx, state in enumerate(state_shapes[cell_idx + 1]):
          state = _TfZeros(state)
          if state is not None:
            name = 's{}'.format(output_idx)
            init_state[name] = state
      init_state[_MICRO_BATCH_STATE_NAME] = tf.cast(0, dtype=state_dtype)
      init_states.append(init_state)

      # 把cell_idx對應的設備加入到devices列表
      devices.append(cluster.WorkerDeviceInModelSplit(cell_idx))

    # 為流水線中間(去除頭尾)的各個設備設定一些變量
    cell_grads = [None] * num_cells
    cell_outs = [lambda x: x] * num_cells
    cell_out_grads = [lambda x: x] * num_cells

    # 在第一個設備上執行如下操作
    with tf.device(devices[0]): 
      previous = _ToTuple(args)
      for (name, l) in self._before_layers: # 遍歷_before_layers,運行每層的FProp,最終得到 previous
        previous = l.FProp(theta[name], *previous)
        previous = _ToTuple(previous)

      def _StackAndSplit(x): # 把張量分割成
        # Split tensors into microbatches.
        if x is None:
          return None
        # tf.split按照行或者列分割一個矩陣
        return tf.stack(tf.split(x, p.num_micro_batches, axis=p.batch_dim))

      # 對於 previous 繼續操作,構建出 inputs,即利用_StackAndSplit分割張量
      if p.nested_map_fprop: # 嵌套情況,只選取previous[0]做處理
        inputs = py_utils.Transform(_StackAndSplit, previous[0]) #利用_StackAndSplit分割張量
        inputs = inputs.Filter(lambda x: x is not None)
      else: # 非嵌套
        inputs = py_utils.NestedMap()
        for output_idx, output_tensor in enumerate(previous): # 遍歷第一層的輸出
          output_tensor = _StackAndSplit(output_tensor) # 利用_StackAndSplit分割張量
          if output_tensor is not None:
            name = 's{}'.format(output_idx)
            inputs[name] = output_tensor
      gs_tensor = py_utils.GetGlobalStep()
      # 為流水線后續設備設置其輸入
      inputs[_MICRO_BATCH_STATE_NAME] = tf.stack([
          tf.cast(gs_tensor * p.num_micro_batches + t, dtype=state_dtype)
          for t in range(p.num_micro_batches)
      ])
      
    # 在中間設備上執行操作    
    tf.logging.info('pipeline input = {}'.format(inputs))
    output_state, _ = recurrent.StackedRecurrent( 
        devices=devices,
        cell_fns=cell_fns,
        cell_grads=cell_grads,
        cell_outs=cell_outs,
        cell_out_grads=cell_out_grads,
        thetas=thetas,
        init_states=init_states,
        inputs=inputs,
        accumulator_layers=accumulator_layers,
        unused_acc_state=True)

    # 在最后一個設備上執行如下操作,最終得到輸出張量
    with tf.device(devices[-1]):
      def _ReshapeRetVal(name, t_shape): # 把micro_batches的形狀聚合,得到最終輸出
        """Restore shape for tensors in microbatches."""
        if t_shape is None:
          return None
        output_tensor = output_state[name]
        if p.batch_dim != 0:
          perm = list(range(1, p.batch_dim + 1)) + [0]
          perm += list(range(p.batch_dim + 1, t_shape.rank + 1))
          output_tensor = tf.transpose(output_tensor, perm=perm)
        output_shape = t_shape.ToTensorShape().as_list()
        output_shape[p.batch_dim] *= p.num_micro_batches
        output_tensor = tf.reshape(output_tensor, output_shape)
        return output_tensor

      # Construct the final return values from output_state.
      if p.nested_map_fprop: # 如果嵌套,則返回最后一個形狀
        # pylint: disable=protected-access
        output_tensors = state_shapes[-1]._RecursiveMap(_ReshapeRetVal) # 聚合形狀
        # pylint: enable=protected-access
      else:
        output_tensors = []
        # 遍歷輸出,聚合各個輸出的形狀
        for output_idx, state_shape in enumerate(state_shapes[-1]): 
          output_name = 's{}'.format(output_idx)
          output_tensor = _ReshapeRetVal(output_name, state_shape) # 聚合形狀
          output_tensors.append(output_tensor)
        if len(output_tensors) == 1:
          output_tensors = output_tensors[0]
        else:
          output_tensors = tuple(output_tensors)
        
      tf.logging.info('pipeline output = {}'.format(output_tensors))
      return output_tensors
6.4.2.5 類定義

具體代碼如下:

class PipeliningLayer(SeqLayer):
  """Pipelining a sequence of layers on multiple devices."""

  @classmethod
  def Params(cls):
    p = super().Params()
    p.Define('num_micro_batches', 1, 'Number of micro batches.')
    p.Define('micro_batch_size', None, 'Size of a micro batch.')
    p.Define('batch_dim', 0, 'The batch dimension.')
    p.Define('state_dtype', None, 'Externally specify dtype for states.')
    p.Define(
        'nested_map_fprop', False, 'Whether arguments and returns of '
        'cell fprop functions are nested maps')
    return p

具體FProp函數邏輯如下圖:

+--------------------------------------------------------------+
| FProp             _CalculateOutputShapes                     |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|                        state_shapes                          |
|                             +                                |
|                             |                                |
|                             |                                |
|                             |                                |
|                             v                                |
|                for cell_idx in range(num_cells):             |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|       devices.append(WorkerDeviceInModelSplit(cell_idx))     |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|                  with tf.device(devices[0])                  |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|             recurrent.StackedRecurrent(cell_outs)            |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|                 with tf.device(devices[-1])                  |
|                             +                                |
|                             |                                |
|                             |                                |
|                             v                                |
|                       output_tensors                         |
|                                                              |
+--------------------------------------------------------------+

Device流水線邏輯如下:

                   devices[0]
                       +
                       |
                       |
                       |
                       v
+----------------------+-------------------------+
|Pipeline                                        |
|                         devices[1]             |
|                             +                  |
|                             |                  |
|                             |                  |
|                             v                  |
|  cell_grads[1~n]        devices[2]             |
|                             +                  |
|  cell_outs[1~n]             |                  |
|                             |                  |
|  cell_out_grads[1~n]        v                  |
|                         devices[3]             |
|                             +                  |
|                             |                  |
|                             |                  |
|                             v                  |
|                         devices[4]             |
|                                                |
+----------------------+-------------------------+
                       |
                       |
                       |
                       v
                   devices[-1]
6.4.2.6 使用

源碼中給出的例子是 GPipeBatchMajorTransformerStack,目前看來,繼承PipeliningLayer即可。

class GPipeBatchMajorTransformerStack(PipeliningLayer):
  """Stacked self- multi-head attention and fully connected layers.

  With optional layer normalization applied to the final output.

  See 'Attention Is All You Need' https://arxiv.org/abs/1706.03762
  for details. 

  Implements a gipe stack for the batch major transformer variant.
  """

GPipeBatchMajorTransformerStack 的 FProp 返回一個輸出張量的列表,其中下面代碼調用了PipeliningLayer的功能。

logits = super().FProp(theta, source_input, source_paddings, target_input,
                       target_paddings, encoder_self_atten_segment_mask,
                       decoder_self_atten_segment_mask,
                       decoder_cross_atten_segment_mask, source_segment_pos,
                       target_segment_pos)

具體代碼如下:

  def FProp(self,
            theta,
            source_input,
            source_paddings,
            target_input=None,
            target_paddings=None,
            source_segment_id=None,
            target_segment_id=None,
            labels=None,
            label_weights=None,
            source_segment_pos=None,
            target_segment_pos=None):

    p = self.params
    if p.num_decoder_layers > 0:
      assert target_input is not None
      assert target_paddings is not None
      target_time = tf.shape(target_input)[1]
      batch = tf.shape(target_input)[0]
    encoder_self_atten_segment_mask = None
    decoder_self_atten_segment_mask = None
    decoder_cross_atten_segment_mask = None

    # Prepare segment masks from segment ids.
    if p.packed_input:
      dtype = py_utils.FPropDtype(p)
      assert source_segment_id is not None, (
          'Need to specify src_segment_id if packed input is supported.')
      assert source_segment_pos is not None, (
          'Need to specify src_segment_pos for packed input and embeddings.')
      encoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
          source_segment_id, source_segment_id, dtype, False)
      if target_segment_id is not None:
        decoder_self_atten_segment_mask = batch_major_attention.SegmentMask(
            target_segment_id, target_segment_id, dtype, False)
        causal_padding = tf.expand_dims(
            tf.tile(
                tf.expand_dims(
                    batch_major_attention.CausalPadding(
                        target_time, dtype=dtype), 0), [batch, 1, 1]), 1)
        decoder_self_atten_segment_mask = tf.math.maximum(
            causal_padding, decoder_self_atten_segment_mask)
        decoder_cross_atten_segment_mask = batch_major_attention.SegmentMask(
            target_segment_id, source_segment_id, dtype, False)

    # FProp through the gpipe pipeline.
    # 這里調用了基類的PipeliningLayer,完成流水線操作。
    logits = super().FProp(theta, source_input, source_paddings, target_input,
                           target_paddings, encoder_self_atten_segment_mask,
                           decoder_self_atten_segment_mask,
                           decoder_cross_atten_segment_mask, source_segment_pos,
                           target_segment_pos)
            
    label_weights = tf.reshape(label_weights, [-1])
    target_probs = None
    if p.label_smoothing:
      target_probs = self.smoother.FProp(
          theta.smoother, target_paddings, labels, target_ids=None)
      target_probs = tf.reshape(target_probs, [-1, p.softmax_tpl.num_classes])
    reshaped_logits = tf.reshape(logits, [-1, p.softmax_tpl.num_classes])
    tgt_labels = tf.reshape(labels, [-1])
    num_splits = len(p.splits)
    softmax = self.children['cell_{}'.format(num_splits - 1)].softmax
    softmax_theta = theta['cell_{}'.format(num_splits - 1)].softmax
    per_example_xent, _ = softmax.XentLossFromLogits(
        softmax_theta,
        reshaped_logits,
        class_weights=tf.reshape(label_weights, [-1]),
        class_ids=tgt_labels,
        class_probabilities=target_probs)
    xent_shape = tf.shape(logits)[:2]
    per_example_xent = tf.reshape(per_example_xent, xent_shape)
    return per_example_xent, logits

0xFF 參考

僅此一文讓你掌握OneFlow框架的系統設計(上篇)

DeepSpeed: Extreme-scale model training for everyone

[譯] DeepSpeed:所有人都能用的超大規模模型訓練工具

訓練GPT-3,為什么原有的深度學習框架吃不消?

GPT-3模型為何難以復現?這也許是分布式AI框架的最優設計

FLOPs與模型推理速度

深度學習中parameters個數和FLOPS計算(以CNN中經典的AlexNet網絡結構為例)

CNN 模型所需的計算力flops是什么?怎么計算?

CNN中計算量FLOPs的計算

有關FLOPS的定義與計算

論文解讀系列第十三篇:ZeRO——面向萬億級參數的模型訓練方法

模型並行最佳實踐(PyTorch)

Tensorflow: Model parallelism 模型並行計算

https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

pytorch 模型並行 model parallel

https://arxiv.org/pdf/1802.09941.pdf

深度學習的模型並行是什么原理?

https://www.microsoft.com/en-us/research/uploads/prod/2019/08/fiddle_pipedream_sosp19.pdf

論文解讀系列第五篇:微軟斯坦福等PipeDream快速訓練大規模神經網絡

微軟提出 DNN 並行訓練新方法 PipeDream,比傳統方法快四倍

如何評價Google 開源的並行加速庫 GPipe?

論文解讀系列第四篇:谷歌GPipe訓練超大規模神經網絡

如何減少神經網絡的內存?

訓練時顯存優化技術——OP合並與gradient checkpoint

顯存不夠時,如何利用GPU訓練數據

GPU 顯存不足怎么辦?

模型訓練太慢?顯存不夠?這個方法讓你的GPU聯手CPU

TF-Replicator, GPipe, Mesh-Tensorflow 三個庫對比

深度神經網絡訓練中的數據並行(Data Parallelism)總結

【新】Facebook的深度學習推薦系統

https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

https://github.com/pytorch/tutorials/blob/master/intermediate_source/model_parallel_tutorial.py

分布式TensorFlow入門教程

PyTorch中在反向傳播前為什么要手動將梯度清零?

Model.zero_grad() or optimizer.zero_grad()?

A trick to use bigger batches for training: gradient accumulation

Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups

lingvo框架走讀筆記

分布式訓練從入門到放棄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM