[源碼解析] PyTorch 流水線並行實現 (5)--計算依賴
0x00 摘要
前幾篇文章我們介紹了 PyTorch 流水線並行的基本知識,自動平衡機制和切分數據等,本文我們結合論文內容來看看如何實現流水線依賴,核心就是如何建立這些小批次之間的跨設備依賴關系。
流水線並行其他文章鏈接如下:
[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現
[源碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積
[源碼解析] 深度學習流水線並行 GPipe(3) ----重計算
[源碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段
[源碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分區
[源碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型
[源碼解析] 深度學習流水線並行 PipeDream(4)--- 運行時引擎
[源碼解析] 深度學習流水線並行 PipeDream(5)--- 通信模塊
[源碼解析] 深度學習流水線並行 PipeDream(6)--- 1F1B策略
[源碼解析] PyTorch 流水線並行實現 (1)--基礎知識
[源碼解析] PyTorch 流水線並行實現 (2)--如何划分模型
[源碼解析] PyTorch 流水線並行實現 (3)--切分數據和運行時系統
[源碼解析] PyTorch 流水線並行實現 (4)--前向計算
本文圖片來自論文和github源碼。
0x01 前文回顧
為了更好的理解本文,我們首先看看前文之中的關鍵部分。
- 原始流水線狀態如下:
- 管道並行的策略是根據分區索引 j 分配任務,以便第 j 個分區完全位於第 j 個設備中。
- 持有模型后期部分的設備必須等待,直到持有模型早期部分的設備計算結束。
- 目標流水線狀態如下:
-
目前問題:
- 如果分成若干個微批次,則需要強制要求 \(F_{i,j}\) 必須在 \(F_{i+1,j}\) 之前完成,以及 \(B{i,j}\) 必須在執行\(B{i-1,j}\) 之前完成。
- 后向傳播的計算圖是在前向傳播過程中動態構造的。PyTorch既不記錄正向計算圖,也不維護一個梯度磁帶(gradient tape),PyTorch的自動微分(autograd)引擎僅對計算圖進行反向傳播。這意味着自動加載引擎可能不會完全按照與正向過程相反的執行順序運行,除非由圖的結構強制執行。
-
目前難點:
- 如何在每個設備中以正確的順序發布那些綁定到設備的任務,以避免由於Python解釋器未能提前請求而延遲在設備上(與CPU異步)執行任務。[這個前文已經介紹]
- 如何建立這些小批次之間的跨設備依賴關系。
-
實現方案:
- 如何保證正確執行順序?torchgpipe引入了確定性時鍾周期(deterministic clock-cycle),它給出了任務的總體順序。[這個前文已經介紹]
- 如何保證計算圖中的動態顯式依賴關系?針對clock_cycles產生的每一個運行計划:
- 利用 fence 函數調用“fork”和“join”,以此在向后計算圖中動態創建顯式后向傳播依賴關系。
- 利用 compute(schedule, skip_trackers, in_queues, out_queues) 進行計算。
因為前文已經介紹了執行順序方案,所以本文介紹如何計算依賴。
0x02 計算依賴
+-----------------------------------------------------------------------------------------+
| |
| Layer 1 +---> Layer 2 +-----> Layer 3 +-----> Layer 4 +-----> Layer 5 +---> Layer 6 |
| |
+--------------------------+---------------------------+----------------------------------+
+
|
|
v
+------------------------------------------------------------------------------------+
| +--------------------+ +---------------------+ +--------------------+ |
| |Partition 1 | |Partition 2 | |Partition 3 | |
| | | | | | | |
| | Layer 1 | +---------> Layer 4 | | | |
| | + | | | + | +-------> Layer 6 | |
| | | | | | | | | | | |
| | v | | | | | | | | |
| | Layer 2 | | | | | | | | |
| | + | | | v | | | | |
| | | | | | Layer 5 +---------+ | | |
| | v | | | | | | |
| | Layer 3 +---------+ | | | | |
| | | | | | | |
| +---------+----------+ +---------+-----------+ +-----------+--------+ |
| |
+------------------------------------------------------------------------------------+
為什么需要計算依賴?
- 因為模型已經被分層,模型的不同部分拆開放到不同設備上,數據也分成微批次,所以本來模型內部是線性依賴關系,現在需要變成流水線依賴關系。因此原始計算圖不能滿足需求,因此需要有針對性的補充。就像上圖那樣,6個層被分成了三個partitions,這三個partitons 之間的依賴如何構建?
- 之前的線性依賴關系其實是在模型定義時候就基本確定了,現在則需要每次運行時候建立一個動態依賴關系。
所以針對流水線並行,torchgpipe需要自己補充一個本機跨設備偽分布式依賴關系。torchgpipe 通過在前向計算圖和后向計算圖做各種調整來達到目的。計算圖就意味着各種依賴邏輯,依賴邏輯的補足就是依靠本節介紹的 Fork 和 Join 兩個函數完成的。
這里最初有一個疑問,就是Torchgpipe怎么在不使用 PyTorch RPC 和 p2p的情況下,構建出來一個異地反向計算圖。后來發現,原來是我想多了,因為Torchgpipe沒有考慮到這種情況,它針對都是在同一個主機之上的GPU,不涉及異地多機器計算。
Torchgpipe 本質上還是一個進程內運行多個線程進行計算,是 DP 的替代。比如源碼中就有對比如下:
### ResNet-101 Accuracy Benchmark
Batch size | torchgpipe | nn.DataParallel | Goyal et al.
---------- | ---------: | --------------: | -----------:
256 | 21.99±0.13 | 22.02±0.11 | 22.08±0.06
1K | 22.24±0.19 | 22.04±0.24 | N/A
4K | 22.13±0.09 | N/A | N/A
再比如代碼中明確提到:
If you decide not to use checkpointing at all, :class:`nn.DataParallel
<torch.nn.DataParallel>` might be more efficient than GPipe.
0x03 反向傳播依賴
我們首先看看反向傳播依賴,這個是論文的重點。
2.1 解析
我們還是要回憶一下前面兩個圖例。
圖1
圖2
這里需要完成兩種依賴:
- 行間依賴,就是 batch 之間的依賴,就是設備內的依賴。從圖上看,就是藍色列內的 \(F_{1,1}\) 必須在 \(F_{2,1}\)之前完成,\(B_{2,1}\) 必須在\(B_{1,1}\) 之前完成。
- 列間依賴,就是 partitions(設備) 之間的依賴。從圖上看,就是藍色 \(F_{1,1}\) 必須在黃色 \(F_{1,2}\)之前完成,即第一個設備必須在第二個設備之前完成,而且第一個設備的輸出是第二個設備的輸入。
假定我們依據確定性時鍾周期(deterministic clock-cycle)算法來運行一個前向傳播。即使前向傳播是按照在第j個設備上應該執行的順序來執行任務 \(F_{1,j},...,F_{m,j}\) ,得到的后向傳播結果計算圖看起來也更像圖1而非圖2,
從圖1上看,PyTorch 的 autograd 引擎不知道 \(B_{i+1,j}\) 必須在 \(B_{i,j}\) 之前運行,因此會打亂后向傳播的時間流。因此,虛擬依賴(圖2的虛線箭頭)必須在前向傳播中被顯式繪制出來。
我們再仔細分析一下圖2。圖2之中,每一行都表示一個 micro-batch 在訓練中的運行流,這個流的前向是由clock算法確定的。后向關系是由前向傳播中自動確定完成的。
現在的問題是:一個 mini-batch 被分成了4個 micro-batch,分別在不同時鍾周期進入訓練。就是每一列。這一列由上到下的傳播也是由clock算法確定,但是反向傳播(由下自上)目前是不確定的。比如最后一列中,反向傳播的順序應是:\(B_{4,1},B_{3,1},B_{2,1},B_{1,1}\)。但是這個目前從前向傳播的結果來看,無法確定這個順序。
所以需要依靠本節介紹的 Fork 和 Join 兩個函數完成這個依賴關系。圖中斜線表示checkpoint之中需要先有一個重計算,然后才能由下往上走。
因此,torchpipe定義兩個基礎函數,Fork 和 Join 來表達這種依賴關系:
-
Fork 是 auto grad 函數,其把一個張量 x 映射到 pair(x, \(\phi\)),這里 \(\phi\) 是一個空張量。
-
Join 是 auto grad 函數,其把 pair(x, \(\phi\)) 映射到一個張量 x ,這里 \(\phi\) 是一個空張量。
現在,\(F_{i+1,j}\) 對於 \(F_{i,j}\) 的依賴(其在后向傳播計算圖中被轉換為 \(B_{i,j}\) 到 $B_{i+1,j} $ 的依賴關系)可以被如下表示:
所以,圖中這里實線都是前向傳播時候構建的,虛線是由 fork & join 構建的。
原則上,表示虛擬依賴關系的張量可以是任意的。然而,torchgpipe選擇使用空張量,以消除由張量引起的任何不必要的計算,例如PyTorch中的梯度累積。
具體如下圖。就是使用 Fork 和 Join 的后向計算圖。圖中,不同顏色對應不同的設備。箭頭依據后向傳播圖的方向來繪制,這些聯系是在前向傳播中被構建的。因此,\(F^{'}_{i,j}\) 對於 \(B_{i+1,j}\) 的虛擬依賴通過 Fork 和 Join 被構建出來,用虛線表示。
2.2 基礎功能
2.2.1 Function
首先,我們要看看 torch.autograd.Function 的作用。
torch.autograd.Function類實際上是一個操作函數的基礎父類,這樣的操作函數必須具備兩個基本的過程,即前向的運算過程和反向的求導過程,
如果某些操作無法通過 PyTorch 已有的層或者是已有的方法實現不了,就需要實現一個新的方法對 PyTorch 進行拓展。當不使用自動求導機制,需要自定義求導規則的時候,就應該拓展torch.autograd.Function類。 由於pytorch不再提供自動求導機制,就要用戶自己定義實現前向傳播和反向傳播的計算過程,這就是 "Extending torch.autograd"。
我們接下來介紹Backward Dependency 的關鍵算法:Fork and Join。
2.2.2 Fork
Fork 是auto grad 函數,其把一個張量 x 映射到 pair(x, \(\phi\)),這里 \(\phi\) 是一個空張量。Fork 方法就是拓展了torch.autograd.Function
。
def fork(input: Tensor) -> Tuple[Tensor, Tensor]:
"""Branches out from an autograd lane of the given tensor."""
if torch.is_grad_enabled() and input.requires_grad:
input, phony = Fork.apply(input)
else:
phony = get_phony(input.device, requires_grad=False)
return input, phony
class Fork(torch.autograd.Function):
@staticmethod
def forward(ctx: 'Fork', input: Tensor) -> Tuple[Tensor, Tensor]: # type: ignore
phony = get_phony(input.device, requires_grad=False)
return input.detach(), phony.detach()
@staticmethod
def backward(ctx: 'Fork', grad_input: Tensor, grad_grad: Tensor) -> Tensor: # type: ignore
return grad_input
2.2.3 Join
Join 是auto grad 函數,其把 pair(x, \(\phi\)) 映射到一個張量 x ,這里 \(\phi\) 是一個空張量。Join 方法也是拓展了torch.autograd.Function
。
def join(input: Tensor, phony: Tensor) -> Tensor:
"""Merges two autograd lanes."""
if torch.is_grad_enabled() and (input.requires_grad or phony.requires_grad):
input = Join.apply(input, phony)
return input
class Join(torch.autograd.Function):
@staticmethod
def forward(ctx: 'Join', input: Tensor, phony: Tensor) -> Tensor: # type: ignore
return input.detach()
@staticmethod
def backward(ctx: 'Join', grad_input: Tensor) -> Tuple[Tensor, None]: # type: ignore
return grad_input, None
2.2.4 Phony
Phony是沒有空間的張量,因為它不需要任何梯度累積,所以可以在 autograd 圖中構建任意的依賴。
def get_phony(device: torch.device, *, requires_grad: bool) -> Tensor:
"""Gets a phony. Phony is tensor without space. It is useful to make
arbitrary dependency in a autograd graph because it doesn't require any
gradient accumulation.
.. note::
Phonies for each device are cached. If an autograd function gets a phony
internally, the phony must be detached to be returned. Otherwise, the
autograd engine will mutate the cached phony in-place::
class Phonify(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
phony = get_phony(input.device, requires_grad=False)
return phony.detach() # detach() is necessary.
"""
key = (device, requires_grad)
try:
phony = _phonies[key]
except KeyError:
with use_stream(default_stream(device)):
phony = torch.empty(0, device=device, requires_grad=requires_grad)
_phonies[key] = phony
return phony
2.2.5 detach
在代碼中,經常可以見到 detach 的使用,這個從注釋可以看出來,是為了解決 PyTorch 的一個bug。
# A Python autograd function might fail with this error:
#
# RuntimeError: Returning Variables sharing storage with other Variables
# that require grad is not supported in Python functions. Please submit a
# feature request if you hit this error.
#
# It doesn't look like an essential restriction. But it happens on the
# current PyTorch version. To avoid it, we should detach the tensor before
# returning by identity autograd functions, such as Wait, Fork, and Join.
#
2.3 使用
在 Pipeline 之中我們可以看到具體的使用方法,fence 方法(省略部分代碼)利用 depend 來構建后向傳播的依賴關系,確保 batches[i-1] 在 batches[i] 之后完成。
def fence(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
) -> None:
"""Copies micro-batches after computation for the previous
micro-batches.
"""
batches = self.batches
copy_streams = self.copy_streams
skip_layout = self.skip_layout
for i, j in schedule:
# Ensure that batches[i-1] is executed after batches[i] in
# backpropagation by an explicit dependency.
if i != 0:
depend(batches[i-1], batches[i]) # 在這里建立了后向傳播依賴關系
next_stream = copy_streams[j][i]
for prev_j, ns, name in skip_layout.copy_policy(j):
prev_stream = copy_streams[prev_j][i]
skip_trackers[i].copy(batches[i], prev_stream, next_stream, ns, name)
if j != 0:
prev_stream = copy_streams[j-1][i]
copy(batches[i], prev_stream, next_stream)
具體 depend 代碼如下:
def depend(fork_from: Batch, join_to: Batch) -> None:
fork_from[0], phony = fork(fork_from[0])
join_to[0] = join(join_to[0], phony)
我們結合示例代碼把傳入的參數賦值一下,重新把方法解釋如下,這樣大家就可以更好的理解。
def depend(batches[i-1]: Batch, batches[i]: Batch) -> None:
batches[i-1][0], phony = fork(batches[i-1][0])
batches[i][0] = join(batches[i][0], phony)
具體邏輯如下,通過 phony 完成了一個橋接,即在正向傳播之中,batches[i] 依賴 batches[i-1] 的執行結果。
+----------------+ +--------------+
| | | |
| batches[i-1] | | batches[i] |
| | | |
+----------+-----+ +-----+--------+
| |
| |
| |
v v
+--------------------------------------------------------+
| depend | | |
| | | |
| | | |
| v | |
| +-----------------------+ | |
| | fork | | | |
| | | get_phony | | |
| | | + | | |
| | | | | | |
| | | | | | |
| +-----------------------+ | |
| | | | |
| | | | |
| | | | |
| v v | |
| +-----------+--+ +--+-----+ | |
| | | | | | |
| | batches[i-1] | | phony | | |
| | | | | | |
| +--------------+ +--+-----+ | |
| | | |
| | | |
| v v |
| +--+------------------+ |
| |Join | | |
| | | | |
| | | | |
| | v | |
| +---------------------+ |
| | |
| | |
| | |
| v |
| +-----+------+ |
| | | |
| | batches[i] | |
| | | |
| +------------+ |
| |
+--------------------------------------------------------+
我們把多個 batches 聯合起來看看,這樣就能看出來一個依賴鏈條。
+----------------------------------------------------------+
| depend |
| |
| +------------+ |
+------------- | |fork | +-----------+ |
| | | | | | | |
|batches[i] +----------------------> | batches[i]| |
| | | | | | | |
+------------- | | | +-----------+ |
| | | +-------+ |
| | +-----------> | Join | |
| | | | | |
| +------------+ | | |
+------------- | | | +--------------+ |
| | | | | | | |
|batches[i+1]+-------------------------------------------->+ batches[i+1] | |
| | | | | | | |
+---------+--- | | | +--------------+ |
| | +-------+ |
| | |
| +----------------------------------------------------------+
| +----------------------------------------------------------+
| | depend |
| | |
| | +-------------+ |
| | |fork | +------------+ |
| | | | | | |
+--------------------------> |batches[i+1]| |
| | | | | |
| | | +------------+ |
| | | +-------+ |
| | +---------> |Join | |
| +-------------+ | | |
+------------+ | | | +-------------+ |
| | | | | | | |
|batches[i+2]+--------------------------------------------> | batches[i+2]| |
| | | | | | | |
+----------+-+ | | | +-------------+ |
| | +-------+ |
| | |
| +----------------------------------------------------------+
|
| +-----------------------------------------------------------+
| | depend |
| | |
+-----------------------------> ...... |
| |
| |
+-----------------------------------------------------------+
這樣,上圖就是前向計算圖,於是在后向傳播之中,batches[i] 就 必須在 batches[i-1] 之前完成了。
我們再結合論文的圖來看看。
本來示例代碼中是:
depend(batches[i-1], batches[i])
為了和論文中的圖對應,我們修改為:
depend(batches[i], batches[i+1])
depend 代碼也變化為:
def depend(batches[i]: Batch, batches[i+1]: Batch) -> None:
batches[i][0], phony = fork(batches[i][0])
batches[i+1][0] = join(batches[i+1][0], phony)
對應下圖,就是在后向傳播計算圖之中 batches[i+1] 通過一個join, 一個fork,排在了 batches[i] 前面,就是下面大箭頭所示,具體細化一下:
-
從這個圖上,PyTorch 的 autograd 引擎不知道 \(B_{i+1,j}\) 必須在 \(B_{i,j}\) 之前運行,因此會打亂后向傳播的時間流。因此,虛擬依賴(前面圖的虛線箭頭)必須在前向傳播中被顯式繪制出來。
-
圖上的實線箭頭依據后向傳播圖的方向來繪制,這些聯系是在前向傳播中被構建的。就是說,對於 \({Batch}_i\) 來說,其反向傳播順序是固定的。就是上面一行內順序是固定的,下面一行內順序也是固定的。
-
但是,上下兩行之間的順序是不可知的,需要用虛線來保證,就是用 Join & Fork 來保證。
0x03 正向傳播依賴
我們回頭再來看正向依賴。因為正向傳播的部分目的就是完成反向傳播依賴,而目前反向傳播只完成了行之間的依賴,列之間的依賴沒有完成,我們現在補全。
列之間的依賴就是設備之間的依賴,即前一個設備的輸出是后一個設備的輸入。
3.1 分割模型
首先還是需要回顧下如何切分模型,從 split_module 可以看到,
GPipe 的 partitions 成員變量是 nn.ModuleList 類型。nn.ModuleList是一個容器,其儲存不同 module,並自動將每個 module 的 parameters 添加到網絡中。但是nn.ModuleList 並沒有定義一個網絡,而只是將不同的模塊儲存在一起,這些模塊之間並沒有什么先后順序,網絡的執行順序是根據 forward 函數來決定的。
def split_module(module: nn.Sequential,
balance: Iterable[int],
devices: List[torch.device],
) -> Tuple[List[nn.Sequential], List[int], List[torch.device]]:
balance = list(balance)
j = 0
partitions = []
layers: NamedModules = OrderedDict()
for name, layer in module.named_children(): # 遍歷模型包含的層
layers[name] = layer # 把新的層加入到數組中
if len(layers) == balance[j]: # 如果數組大小等於balance[j],就是達到了device j應該包含的層數
# Group buffered layers as a partition.
partition = nn.Sequential(layers) # 把層數組組合成一個sequential module
device = devices[j]
partition.to(device) # 把層放置到相關設備之上
partitions.append(partition) # 這個新module加入到分區數組中
# Prepare for the next partition.
layers.clear()
j += 1 # 去下一個device看看
partitions = cast(List[nn.Sequential], nn.ModuleList(partitions))
del devices[j:]
return partitions, balance, devices
隨之而來問題就是:partition內部可以用Sequential來進行一系列的前向操作,但是如何配置partitions 之間的執行順序?
+-----------------------------------------------------------------------------------------+
| |
| Layer 1 +---> Layer 2 +-----> Layer 3 +-----> Layer 4 +-----> Layer 5 +---> Layer 6 |
| |
+-----------------------------------------+-----------------------------------------------+
|
|
|
v
+-----------------------------------------------------------------------------------------+
| +--------------------+ +---------------------+ +--------------------+ |
| |Partition 1 | |Partition 2 | |Partition 3 | |
| | | ??? | | | | |
| | Layer 1 | +----------> Layer 4 | ??? | | |
| | + | | | + | +-------> Layer 6 | |
| | | | | | | | | | | |
| | v | | | | | | | | |
| | Layer 2 | | | | | | | | |
| | + | | | v | | | | |
| | | | | | Layer 5 +------------+ | | |
| | v | | | | | | |
| | Layer 3 +----------+ | | | | |
| | | | | | | |
| +--------------------+ +---------------------+ +--------------------+ |
| |
+-----------------------------------------------------------------------------------------+
3.2 建立依賴
我們還是從論文中入手。假定我們有一個神經網絡,其由一系列子網絡構成。我們假定這些子網絡是 \(f^1,...,f^n\),其參數分別是 \(\theta^1,...,\theta^n\),則整個網絡是:
參數是 \(\theta = (\theta^1,...,\theta^n)\),為了清楚起見,我們稱 \(f^j\) 表示 f 的第 j 個分區,並假設分區的參數是相互不相交的。
在訓練網絡時,基於梯度的方法(如隨機梯度下降法)需要在給定小批量訓練數據 x 和相應損失之后,計算網絡的輸出結果f(x)。以及損失相對於網絡參數 \(\theta\) 的梯度g。這兩個階段分別稱為向前傳播和向后傳播。
既然 f 由其 L 層 子模塊 (\(f^L, f^{L-1},...f^1\)) 順序組成,那么前向傳播\(f(x)\) 可以通過如下方式計算:讓 \(x^0=x\)(就是輸入x),然后順序應用每一個 partition,即 \(x^j = f^j (x^{j-1})\),這里 $ j = 1, ..., L$。就是 \(f(x)\) 可以表示為 :
於是我們知道了,前向傳播的順序是由 \(f(x) = f^L(f^{L-1}(f^{L-2}(... f^1(x))))\) 來確定的。
我們可以針對代碼,進一步解析,看看如何實施partitions之間的順序依賴。
def run(self) -> None:
"""Runs pipeline parallelism.
It modifies the given batches in place.
"""
batches = self.batches
partitions = self.partitions
devices = self.devices
skip_layout = self.skip_layout
m = len(batches)
n = len(partitions)
skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches]
with spawn_workers(devices) as (in_queues, out_queues):
for schedule in clock_cycles(m, n): # 這里使用,給出了執行序列計划,后續按照這個來執行
self.fence(schedule, skip_trackers)
self.compute(schedule, skip_trackers, in_queues, out_queues)
解析的目標是 for schedule in clock_cycles(m, n)
這個 for 循環,其:
- 針對clock_cycles產生的每一個運行計划:
- 利用 fence(schedule, skip_trackers) 構建后向傳播依賴關系。
- 利用 compute(schedule, skip_trackers, in_queues, out_queues) 進行計算。
現在我們完成了兩步:
- 確定性時鍾周期算法給定了前向傳播的執行順序,我們只要按照 clock_cycles 方法提供的計划一一運行即可。
- fence 方法通過調用 join 和 fork,我們做到了在后向傳播之中,batches[i] 就 必須在 batches[i-1] 之前完成了,即 \(B_{i+1,j}\) 必須在 \(B_{i,j}\) 之前運行。
對於我們的圖來說,第二步就是完成了下圖的列依賴。
我們的問題是:怎么通過這個 for 循環,做到 \(B_{i,{j+1}}\) 必須在 \(B_{i,j}\) 之前運行?,即怎么安排反向傳播逐次運行?就是怎么完成行內的依賴?
這就要通過 compute 的源碼進行分析。重點說明的是:
- batches[i] 這里是會變化的,比如 batches[0] 在經過 partitions[j] 的計算之后,會變成
batches[0][j]
。 - 對於 compute 方法,關鍵就是在最底部的代碼
batches[i] = batch
。就是把 第 j 個device 對 第 i 個 batch 的計算結果 賦值到 batches[i],賦值之后,batches[i]就是batches[i][j]
,這樣,在下次計算時候,構建的就是 F[i, j+1], 下一次 fence 之中的 depend 操作,就是針對batches[i, j+1]
。 - 因此,在前向計算圖上,通過這個賦值操作, batches[i, j+1] 就依賴 batches[i, j],所以反向計算時候,batches[i, j + 1] 就必須在 batches[i, j] 之前完成。
def compute(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
in_queues: List[InQueue],
out_queues: List[OutQueue],
) -> None:
"""Runs tasks with synchronization to copy streams."""
batches = self.batches
partitions = self.partitions
devices = self.devices
n = len(partitions)
streams = [current_stream(d) for d in devices]
for i, j in schedule: # 針對 schedule 之中的每一對 i,j
batch = batches[i]
partition = partitions[j]
# Synchronize with the copied input. ([1] in the diagram)
# Determine whether checkpointing or not.
if checkpoint:
# 忽略
else:
def compute(batch: Batch = batch,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> Batch:
with use_skip_tracker(skip_tracker):
return batch.call(partition) # 前向計算,計算以 partition為單位計算,partition內部的層是順序計算,由 Sequential保證。
task = Task(streams[j], compute=compute, finalize=None)
del compute
# Compute tasks in parallel. ([2] in the diagram)
in_queues[j].put(task) # 讓 worker計算
for i, j in schedule:
ok, payload = out_queues[j].get() # 獲取 worker 的前向計算結果,就是 第 j 個device 對 第 i 個 batch 的計算結果
task, batch = cast(Tuple[Task, Batch], payload)
# The copy stream synchronizes to copy the output. ([3] in the
# diagram)
# Finalize tasks. If checkpointing is enabled, here the
# recomputation is scheduled at backpropagation. ([4] in the
# diagram)
# 第 j 個device 對 第 i 個 batch 的計算 就是 F[i,j]
batches[i] = batch # 這里是關鍵,就是把 第 j 個device 對 第 i 個 batch 的計算結果 賦值到 batches[i],batches[i]就是 batches[i][j],在下次計算時候,構建的就是 F[i,j+1], 下一次 fence 之中的 depend 操作,就是針對 batches[i,j+1]
關於這個賦值操作,其對應的grad_fn 是 PermuteBackward,比如:
a = torch.tensor([2., 3.], requires_grad=True)
c = a
c.backward(gradient=external_grad)
print(c)
具體是:
c = {Tensor: 2} tensor([2., 3.], requires_grad=True)
T = {Tensor: 2} tensor([2., 3.], grad_fn=<PermuteBackward>)
現在,我們把下圖進行升級。
+-------------------------------------------------------------------+
| depend |
| |
| +---------------+ |
| |fork | |
+------------- | | | +-----------+ |
| | | | | | | |
|batches[i] +-------------------------> | batches[i]| |
| | | | | | | |
+------------- | | | +-----------+ |
| | | |
| | | |
| | | +--------+ +-------+ |
| | get_phony +------> | +--->+ Join | |
| | | | phony | | | |
| +---------------+ | | | | |
| +--------+ | | |
| | | |
+------------- | | | +--------------+ |
| | | | | | | |
|batches[i+1]+----------------------------------------------------->+ batches[i+1] | |
| | | | | | | |
+------------- | | | +--------------+ |
| +-------+ |
| |
+-------------------------------------------------------------------+
我們進行橫向拓展,得到如下,即一個batch 被分成兩個小批次: batches[i],batches[i+1] ,它們在兩個設備 partitions[j],partitions[j + 1] 之上流水線,這樣行和列都有反向傳播的依賴。
F[i,j] F[i,j+1]
+------------------------------------------------+ +-----------------------------------------------+
| partitions[j] | | partitions[j+1] |
| | | |
| +--------------------+ +------------------+ | | +-------------------+ +------------------+ |
| |fence | | compute | | | | fence | | compute | |
| | | | | | | | | | | |
+--------------+ | | +--------------+ | | +------------+ | | +-----------------+ | | +-------------+ | | +------------+ | | +-----------------+
| | | | | depend | | | |forward | | | | | | | | depend | | | |forward | | | | |
| batches[i] +---------------------------------------------------------> | batches[i][j] +----------------------------------------------------------> | batches[i][j+1] |
| | | | | | | | | | | | | | | | | | | | | | | | | |
+--------------+ | | | | | | | | | | +-----------------+ | | | | | | | | | | +-----------------+
| | | | | | +------------+ | | | | | | | | +------------+ | |
| | | | | | | | | | | | | | | |
+--------------+ | | | | | +------------------+ | +-----------------+ | | | | | +------------------+ | +-------------------+
| | | | | | | | | | | | | | | | | |
| batches[i+1]+---------------------------------------------------------> | batches[i+1][j] +----------------------------------------------------------> | batches[i+1][j+1] |
| | | | | | | | | | | | | | | | | |
+--------------+ | | +--------------+ | | +-----------------+ | | +-------------+ | | +-------------------+
| | | | | | | |
| +--------------------+ | | +-------------------+ |
+------------------------------------------------+ +-----------------------------------------------+
手機如下:
0x04 總結
下圖 $ m = 4, n = 3$。即,模型被分成3個子網絡,小批次被分割成 4個微批次。F 和 B 的下標是 (m, n)。
如上圖,這里需要完成兩種依賴:
- 行間依賴,就是 batch 之間的依賴,就是設備內的依賴。從圖上看是虛線,就是 \(F_{1,1}\) 必須在 \(F_{2,1}\)之前完成,\(B_{2,1}\) 必須在\(B_{1,1}\) 之前完成。
- 列間依賴,就是 partitions(設備) 之間的依賴。從圖上看是實線,就是 \(F_{1,1}\) 必須在 \(F_{1,2}\)之前完成,即第一個設備必須在第二個設備之前完成,而且第一個設備的輸出是第二個設備的輸入。
如上圖,我們需要完成行,列兩方面的依賴。
- 行間依賴是用 Join & Fork 來保證,利用空張量完成了依賴關系的設定,確保 batches[i-1] 在 batches[i] 之后完成。
- 列間依賴是通過
batches[i] = batch
完成,利用 PermuteBackward 來完成了設備之間的依賴。
至此,我們完成了執行順序和依賴關系的設定,下一篇我們介紹如何並行處理。
0x06 更正和補充
6.1 緣起
一位朋友 @劍柳吟風 對下面提出了疑問:
列間依賴是通過
batches[i] = batch
完成,利用 PermuteBackward 來完成了設備之間的依賴。
他認為:通過閱讀源碼,個人感覺應該是自定義的copy 和wait這兩個autograd.Function確定了設備之間的依賴。
我又仔細閱讀了論文和源碼,發現他是正確的,我之前的理解有誤,特此更正如下。
6.2 論文內容
我們首先看看論文內容。
6.2.1 設備級執行順序
論文內容如下:
2.2. Device-wise Execution Order
To summarize, in pipeline parallelism (with checkpointing) each device is assigned with a set of tasks with the prescribed order. Each device will execute the given tasks one-by-one as soon as cross-device dependencies are met. However, there is a missing component in this picture data tranfer between the devices. For illustration, the full execution order that device j must follow is shown in Figure 3. Here data transfer operations are explicitly denoted as ‘receive’ and ‘send’ for emphasis.
翻譯如下:
總之,在流水線並行性(帶有檢查點)中,每個設備都被分配了一組具有指定順序的任務。一旦滿足跨設備依賴關系,每個設備將逐個執行給定的任務。但是,在之前圖之中,設備之間的數據傳輸中缺少一個組件。為了便於說明,設備 j 必須遵循的完整執行順序如圖3 所示。為了更好的說明,在這里,數據傳輸操作被明確表示為“接收”和“發送”。
具體圖例如下:
6.2.2 並行計算與拷貝
論文之中另一部分則論述了Stream 的使用。
Concurrent Copy and Computation: Streams
PyTorch issues every device-bound kernels to the default stream, unless it is specified otherwise. Stream is a device- bound sequence of kernels that is executed in order. Kernels in the same stream are guaranteed to be executed in the pre- scribed order, but kernels in different streams can be inter- leaved, and even can overlap when possible. In particular, nearly all CUDA devices with compute capability 1.1 and higher support concurrent copy and execution: data transfer between devices can always overlap with kernel execution.
因為公式拷貝問題,將這段與原文下一段翻譯如下:
PyTorch將每個綁定到設備的核發布到默認流,除非另有規定。流是按順序執行這些綁定到設備的核序列。同一個流中的內核保證按預先指定的順序執行,但不同流中的核可以相互交錯,甚至可能重疊。特別是,幾乎所有具有計算能力1.1及更高版本的CUDA設備都支持並發復制和執行:設備之間的數據傳輸總是與內核執行重疊。
torchgpipe將每個拷貝核注冊到非默認流中,同時將計算內核保留在默認流中。這允許設備 j 可以並行處理,即 \(F_{i,j}\) 可以同 "發送到設備 \(j+1\) 的 \(x_{i-1}^j\)" 以及/或者 "從設備 \(j-1\) 接受 \(x_i^{j-1}\)" 這兩個操作並行。此外,每個設備對每個微批次使用不同的流。由於不同的微批次之間沒有真正的依賴關系,因此流的這種使用是安全的,這允許盡可能快地進行拷貝。
6.2.3 推論
可見,數據傳輸是通過 Stream 來完成,即構成了實際上的設備間依賴關系,又可以達到數據和拷貝並行的目的。
6.3 實現
我們接下來看看具體實現,依次驗證我們的推論。
6.3.1 _copy_streams
_copy_streams 定義如下:
self._copy_streams: List[List[AbstractStream]] = []
其初始化代碼如下,chunks 是micro-batches 的數目。_ensure_copy_streams 就是針對每一個設備的每一個macro-batch,都生成了一個專用流。
def _ensure_copy_streams(self) -> List[List[AbstractStream]]:
"""Ensures that :class:`GPipe` caches CUDA streams for copy.
It's worth to cache CUDA streams although PyTorch already manages a
pool of pre-allocated CUDA streams, because it may reduce GPU memory
fragementation when the number of micro-batches is small.
"""
if not self._copy_streams:
for device in self.devices:
self._copy_streams.append([new_stream(device) for _ in range(self.chunks)])
return self._copy_streams
假設有3個devices,模型被分成3個子網絡,小批次被分割成 4個微批次。則具體如下:就是說 _copy_streams[i][j]
之中,i 表示 device 的序列,j 表示 batch 序列。(后續的文章之中,有對如何使用的詳述)
+----------------------------------+
| _copy_streams |
| |
| +----------------------+ |
| | | |
| | [1,1] [1,2] [1,3]+--------------------------------+
| | | | |
| | [2,1] [2,2] [2,3]+------------------------------------------+
| | | | | |
+-------------------------+[3,1] [3,2] [3,3] | | | |
| | | | | | |
| | +----------------------+ | | |
| | | | |
| +----------------------------------+ | |
| | |
| v |
| +------------------------------------------------------------------------+------+ |
| | Stream of device 1, Stream of device 1, Stream of device 1, Stream of device 1| |
| +-------------------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------------------+ |
| | Stream of device 2, Stream of device 2, Stream of device 2, Stream of device 2+<-+
| +-------------------------------------------------------------------------------+
|
| +-------------------------------------------------------------------------------+
+-->+ Stream of device 3, Stream of device 3, Stream of device 3, Stream of device 3|
+-------------------------------------------------------------------------------+
6.3.2 Copy 算子
class Copy(torch.autograd.Function):
"""Copies tensors on specific streams."""
@staticmethod
def forward(ctx: Context, # type: ignore
prev_stream: AbstractStream,
next_stream: AbstractStream,
*input: Tensor,
) -> Tensors:
# 這里會把拷貝操作的source,dst 都保存在上下文之中,反向操作時候會取出來
ctx.prev_stream = prev_stream
ctx.next_stream = next_stream
output = []
output_stream = current_stream(get_device(next_stream))
with use_stream(prev_stream), use_stream(next_stream):
for x in input:
y = x.to(get_device(next_stream)) # 進行拷貝操作
output.append(y)
# 'prev_stream' is not where 'x' has been allocated.
record_stream(x, prev_stream)
# 'y' has been allocated on 'next_stream'.
# It might be used on the current stream captured as 'output_stream'.
record_stream(y, output_stream)
return tuple(output)
@staticmethod
def backward(ctx: Context,
*grad_output: Tensor,
) -> Tuple[Optional[Tensor], ...]:
# 取出來上下文保存的拷貝操作的src,dst。
prev_stream = ctx.prev_stream
next_stream = ctx.next_stream
grad_input: Deque[Tensor] = deque(maxlen=len(grad_output))
input_stream = current_stream(get_device(prev_stream))
with use_stream(prev_stream), use_stream(next_stream):
for x in reversed(grad_output):
y = x.to(get_device(prev_stream)) # 進行拷貝操作
grad_input.appendleft(y)
# 'next_stream' is not where 'x' has been allocated.
record_stream(x, next_stream)
# 'y' has been allocated on 'prev_stream'.
# It might be used on the current stream captured as 'input_stream'.
record_stream(y, input_stream)
grad_streams: Tuple[Optional[Tensor], ...] = (None, None)
return grad_streams + tuple(grad_input)
6.3.3 Wait 算子
Wait 算子代碼如下,主要就是起到同步作用,等待拷貝操作的完成。
class Wait(torch.autograd.Function):
"""Synchronizes a stream to another stream.
Place it just before you want to start an operation on the next stream,
provided that all operations on the previous stream are done.
"""
@staticmethod
def forward(ctx: Context, # type: ignore
prev_stream: AbstractStream,
next_stream: AbstractStream,
*input: Tensor,
) -> Tensors:
ctx.prev_stream = prev_stream
ctx.next_stream = next_stream
wait_stream(next_stream, prev_stream)
return tuple(x.detach() for x in input)
@staticmethod
def backward(ctx: Context,
*grad_input: Tensor,
) -> Tuple[Optional[Tensor], ...]:
prev_stream = ctx.prev_stream
next_stream = ctx.next_stream
wait_stream(prev_stream, next_stream)
grad_streams: Tuple[Optional[Tensor], ...] = (None, None)
return grad_streams + grad_input
6.3.4 封裝
以下函數對算子進行了封裝。
def copy(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
batch[:] = Copy.apply(prev_stream, next_stream, *batch)
def wait(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
batch[:] = Wait.apply(prev_stream, next_stream, *batch)
6.3.5 建立依賴關系
fence 簡化代碼如下,其建立了圖例之中的行,列 兩種依賴關系。
def fence(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
) -> None:
"""Copies micro-batches after computation for the previous
micro-batches.
"""
batches = self.batches
copy_streams = self.copy_streams
skip_layout = self.skip_layout
for i, j in schedule:
# Ensure that batches[i-1] is executed after batches[i] in
# backpropagation by an explicit dependency.
if i != 0:
depend(batches[i-1], batches[i]) # 在這里建立了后向傳播依賴關系
# 拿到dst設備的拷貝流
next_stream = copy_streams[j][i]
# 殘差連接相關設置
for prev_j, ns, name in skip_layout.copy_policy(j):
prev_stream = copy_streams[prev_j][i]
skip_trackers[i].copy(batches[i], prev_stream, next_stream, ns, name)
# 建立跨設備依賴關系,指定了 device[j-1] 的輸出是 device[i] 的輸入
if j != 0:
prev_stream = copy_streams[j-1][i] # 拿到src設備的拷貝流
copy(batches[i], prev_stream, next_stream) # 建立跨設備依賴關系
具體wait操作則是在 compute 之中調用,我們只給出了部分代碼。
def compute(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
in_queues: List[InQueue],
out_queues: List[OutQueue],
) -> None:
"""Runs tasks with synchronization to copy streams."""
batches = self.batches
partitions = self.partitions
devices = self.devices
copy_streams = self.copy_streams
# With checkpointing, the autograd graph looks like this diagram:
# ┌─────┸──────┐
# │ Copy │
# └─────┰──────┘ (fence)
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┃ (compute)
# ┌─────┸──────┐
# │ Wait │ [1] Synchronize the current stream with the copy stream.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Checkpoint │ [2] Compute a partition within checkpointing.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Wait │ [3] Synchronize the copy stream with the current stream.
# └─────┰──────┘
# ┠ ─ ─ ─ ┐
# ┃ ┌─────┴─────┐
# ┃ │ Recompute │ [4] Schedule the recomputation at backpropagation.
# ┃ └─────┬─────┘
# ┠ ─ ─ ─ ┘
# ┃
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┌─────┸──────┐ (fence)
# │ Copy │
# └─────┰──────┘
for i, j in schedule:
batch = batches[i]
partition = partitions[j]
# Synchronize with the copied input. ([1] in the diagram)
if j != 0:
wait(batch, copy_streams[j][i], streams[j]) # 這里保證了同步完成
6.4 對比圖
論文之中還有一組對比,特此翻譯摘錄:
我們還可視化了每個GPU的時間線,以幫助理解每個組件的角色,如圖所示。每幅圖片的規划概述如下。
- (a) 通過確定性時鍾周期,所有核在前向傳播期間以正確的順序發出。時間線的左側部分對其進行了說明。然而,因為沒有明確對計算圖中依賴關系的明確編碼,autograd引擎將以無法控制的順序處理微批次,因此時間線將混亂不堪。
- (b) 對於后向依賴,內核目前在向后傳播中以正確的、確定的順序發布。
- (c) 通過使用非默認拷貝流,拷貝和計算現在是並發的,如重疊的藍色和紅色條所示。
- (d) Portal移除了因將跳躍張量(skipping tensor)傳輸到其間的所有設備而導致的不必要副本。與(c) 相比,紅色條的長度減少。
6.5 總結
GPipe需要完成兩種依賴:
- 行間依賴,就是 batch 之間的依賴,就是設備內的依賴。從圖上看是虛線,就是 \(F_{1,1}\) 必須在 \(F_{2,1}\)之前完成,\(B_{2,1}\) 必須在\(B_{1,1}\) 之前完成。
- 列間依賴,就是 partitions(設備) 之間的依賴。從圖上看是實線,就是 \(F_{1,1}\) 必須在 \(F_{1,2}\)之前完成,即第一個設備必須在第二個設備之前完成,而且第一個設備的輸出是第二個設備的輸入。
行間依賴對應了論文中的:
Pipeline parallelism’s strategy is to assign tasks with re- spect to the partition index j so that jth partition entirely lies in the jth device. In addition to this, it is enforced that Fi,j must be completed before executing Fi+1,j and Bi,j must be completed before executing Bi−1,j .
如上圖,我們需要完成行,列兩方面的依賴。
- 行間依賴是用 Join & Fork 來保證,利用空張量完成了依賴關系的設定,確保 batches[i-1] 在 batches[i] 之后完成。PermuteBackward 協助完成了這個依賴操作。
- 列間依賴是通過 Copy & Wait 兩個派生的算子來完成了設備之間的依賴。
0xFF 參考
https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior
NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT
Accelerating Wide & Deep Recommender Inference on GPUs
HugeCTR: High-Performance Click-Through Rate Estimation Training
https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548
https://github.com/NVIDIA/apex/
https://github.com/justheuristic/prefetch_generator
https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html
https://pytorch.org/docs/stable/autograd.html
https://pytorch.org/docs/notes/cuda.html
https://zhuanlan.zhihu.com/p/61765561