[源碼解析] PyTorch 流水線並行實現 (3)--切分數據和運行時系統


[源碼解析] PyTorch 流水線並行實現 (3)--切分數據和運行時系統

0x00 摘要

前幾篇文章我們介紹了 PyTorch 流水線並行的基本知識和自動平衡機制,本文我們介紹如何切分數據和運行時系統。

流水線並行其他文章鏈接如下:

[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

[源碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積

[源碼解析] 深度學習流水線並行 GPipe(3) ----重計算

[源碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[源碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分區

[源碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型

[源碼解析] 深度學習流水線並行 PipeDream(4)--- 運行時引擎

[源碼解析] 深度學習流水線並行 PipeDream(5)--- 通信模塊

[源碼解析] 深度學習流水線並行 PipeDream(6)--- 1F1B策略

[源碼解析] PyTorch 流水線並行實現 (1)--基礎知識

[源碼解析] PyTorch 流水線並行實現 (2)--如何划分模型

最后得出運行時系統如下:

0x01 分割小批次

我們首先看看如何把一個 mini-batch 分割為多個 micro-batches。

1.1 使用

從下面示例代碼可以看出來,具體使用scatter方法進行了分割。

# Divide a mini-batch into micro-batches.
batches = microbatch.scatter(input, self.chunks)

# Run pipeline parallelism.
pipeline = Pipeline(batches,
                            self.partitions,
                            self.devices,
                            copy_streams,
                            self._skip_layout,
                            checkpoint_stop)

pipeline.run()

# Merge the micro-batches into one mini-batch.
output = microbatch.gather(batches)
return output

1.2 PyTorch 基礎

我們先看看 PyTorch 的一些基礎代碼。

1.2.1 chunk

chunk方法可以對張量分塊,返回一個張量列表,其參數是:

  • ensor :要分割的張量。
  • chunks : 分割的塊數
  • dim :沿着哪個軸分塊

具體舉例如下:

import numpy as np
import torch

data = torch.from_numpy(np.random.rand(3, 5))
print(str(data))

for i, data_i in enumerate(data.chunk(3, 0)): # 沿0軸分為3塊
    print(str(data_i))
    
輸出
tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410],
        [0.7889, 0.4480, 0.7607, 0.7903, 0.4118],
        [0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64)

tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410]], dtype=torch.float64)
tensor([[0.7889, 0.4480, 0.7607, 0.7903, 0.4118]], dtype=torch.float64)
tensor([[0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64)

1.2.2 cat

cat 的用法則是把張量拼接在一起,或者把一個張量列表拼接起來。

Z = torch.cat( (X,Y),0 )  # 按維數0拼接,就是豎着拼
Z = torch.cat( (X,Y),1 )  # 按維數1拼接,就是橫着拼

我們用示例看看:

X = torch.ones(2, 5)
Y = torch.ones(4, 5)
Z = torch.cat((X, Y), 0)
print(Z)

結果是:

tensor([[1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.]])

1.3 分割 & 聚合

具體回到分割批次,我們來看看Scatter 代碼。

def scatter(input: TensorOrTensors, chunks: int) -> List[Batch]:
    """Splits an input mini-batch into multiple micro-batches."""
    inputs: Iterable[TensorOrTensors]

    if isinstance(input, Tensor):
        inputs = input.chunk(chunks) # 如果是張量,則直接分割
    else:
        rotated: List[Tensors] = []

        for tensor in input: # 如果是張量數組,則遍歷
            tensors = tensor.chunk(chunks) # 對於每一個張量進行分割
            rotated.append(cast(Tensors, tensors)) # 分割結果映射為 Tuple list

        inputs = zip(*rotated) # 把 list 之中的Tuple 分別聚合

    return [Batch(x) for x in inputs] # 映射成 Batch 列表返回

gather 方法則是把scatter的結果重新聚集起來,就是一個逆向操作。

def gather(outputs: List[Batch]) -> TensorOrTensors:
    """Concatenates output micro-batches into a mini-batch."""
    output: TensorOrTensors

    if outputs[0].atomic:
        tensors = tuple(b.tensor for b in outputs)
        output = torch.cat(tensors)
    else:
        rotated = [b.tensors for b in outputs]
        output_buf = []

        for tensors in zip(*rotated):
            output_buf.append(torch.cat(tensors))

        output = tuple(output_buf)

    return output

1.4 剖析

我們看看如何使用,下面代碼是把ab這個張量列表打散,分割成兩個塊。

def test_scatter_tuple():
    ab = (torch.ones(2, 1), torch.zeros(4, 2), torch.zeros(6, 3))

    a, b = scatter(ab, chunks=2)

    assert a.tensors[0].size() == (1, 1)
    assert b.tensors[0].size() == (1, 1)
    assert a.tensors[1].size() == (2, 2)
    assert b.tensors[1].size() == (2, 2)
    assert a.tensors[2].size() == (3, 3)
    assert b.tensors[2].size() == (3, 3)

我們畫個圖來看看。

    +-------------------------------------------------------------+
    | ab                                                          |
    |                                                             |
    |    +-----------+         +---------+        +----------+    |
    |    |           |         |         |        |  0 0 0   |    |
    |    |           |         |   0 0   |        |  0 0 0   |    |
    |    |     1     |         |   0 0   |        |  0 0 0   |    |
    |    |     1     |         |   0 0   |        |  0 0 0   |    |
    |    |           |         |   0 0   |        |  0 0 0   |    |
    |    |           |         |         |        |  0 0 0   |    |
    |    +-----------+         +---------+        +----------+    |
    |                                                             |
    +-------------------------------+-----------------------------+
                                    |
                                    |
                                    |
                         a, b = scatter(ab, chunks=2)
                                    |
                                    |
                                    |
                                    |
                                    |
                                    v


+------------------------------+         +-----------------------------+
| a                            |         |b                            |
|  +---+  +-----+  +--------+  |         |  +---+  +-----+ +--------+  |
|  | 1 |  | 0 0 |  | 0 0 0  |  |         |  | 1 |  | 0 0 | | 0 0 0  |  |
|  +---+  | 0 0 |  | 0 0 0  |  |         |  +---+  | 0 0 | | 0 0 0  |  |
|         +-----+  | 0 0 0  |  |         |         +-----+ | 0 0 0  |  |
|                  +--------+  |         |                 +--------+  |
+------------------------------+         +-----------------------------+

使用下面的示例代碼也可以看到如何聚合。

def test_gather_tensors():
    a = torch.zeros(1, 1)
    b = torch.zeros(1, 1)
    ab = gather([Batch(a), Batch(b)])

    assert ab.size() == (2, 1)


def test_gather_tuples():
    a = (torch.zeros(1, 1), torch.zeros(2, 2))
    b = (torch.zeros(1, 1), torch.zeros(2, 2))
    ab = gather([Batch(a), Batch(b)])

    assert isinstance(ab, tuple)
    assert ab[0].size() == (2, 1)
    assert ab[1].size() == (4, 2)

0x02 運行

我們接下來看看運行時的一些基礎設施,具體包括 Stream,Task,Worker。

2.1 Stream

Stream 類是用來封裝 CUDA stream 和 CPU stream。代碼位於:torchgpipe/stream.py。

CUDA流表示一個GPU操作隊列,即某個設備綁定的,按照順序執的核(kernel)序列。我們可以把一個流看作是GPU之上的一個任務。用戶向流的隊列上添加一系列操作,GPU會按照添加到流中的先后順序而依次執行這一系列操作。在同一個流之中,所有操作是串行序列化,因此這些操作永遠不會並行。因此,要想並行,兩個操作必須位於不同的 stream 中。不同流中的核函數可以交錯,甚至可能重疊。

class CPUStreamType:
    pass

# The placeholder on place of streams for the CPU device instead of CUDA.
CPUStream = CPUStreamType()

# It represents both CUDA streams and the CPU stream.
AbstractStream = Union[torch.cuda.Stream, CPUStreamType]

本文用到的相關操作為 use_stream。

torch.cuda.stream(stream) 的作用是選擇給定流的上下文管理器。

@contextmanager
def use_stream(stream: AbstractStream) -> Generator[None, None, None]:
    """:func:`torch.cuda.stream` for either CPU or CUDA stream."""
    if not is_cuda(stream):
        yield
        return

    with torch.cuda.stream(as_cuda(stream)):
        yield
        
def is_cuda(stream: AbstractStream) -> bool:
    """Returns ``True`` if the given stream is a valid CUDA stream."""
    return stream is not CPUStream


def as_cuda(stream: AbstractStream) -> torch.cuda.Stream:
    """Casts the given stream as :class:`torch.cuda.Stream`."""
    return cast(torch.cuda.Stream, stream)        

2.2 Task

Task 表示如何在一個分區上計算微批次數據(micro-batch)。它由兩部分組成:

  • compute應在工作線程中並發執行。
  • finalize應在工作線程完成后執行。

可以理解為一個業務處理邏輯。如果有安卓經驗的同學,可以理解為類似於 業務Message。其實 Android message也叫task,其封裝了本任務攜帶的信息和處理該任務的handler。

這里的 Task 也是類似的,在構建Task 時候,就傳入了 compute 方法和finalize方法,舉例如下:

task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)

或者如下:

def compute(batch: Batch = batch,
            partition: nn.Sequential = partition,
            skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
            ) -> Batch:
    with use_skip_tracker(skip_tracker):
        return batch.call(partition)

task = Task(streams[j], compute=compute, finalize=None)

具體Task定義如下,Task是綁定在 Stream 之上,即可以運行在任何device之上,這就用到了上一節的內容。

class Task:
    """A task represents how to compute a micro-batch on a partition.

    It consists of two parts: :meth:`compute` and :meth:`finalize`.
    :meth:`compute` should be executed in worker threads concurrently.
    :meth:`finalize` should be executed after when worker threads complete to
    execute :meth:`compute`.

    :meth:`compute` might be boosted by worker threads. Because it produces
    several CUDA API calls by user code. In PyTorch, parallel CUDA API calls
    are not serialized through GIL. So more than one CUDA API call can be
    produced at the same time.
    """

    def __init__(self,
                 stream: AbstractStream,
                 *,
                 compute: Callable[[], Batch],
                 finalize: Optional[Callable[[Batch], None]],
                 ) -> None:
        self.stream = stream
        self._compute = compute
        self._finalize = finalize

    def compute(self) -> Batch:
        with use_stream(self.stream): # 綁定在stream之上
            return self._compute() # 調用傳入的業務代碼

    def finalize(self, batch: Batch) -> None:
        if self._finalize is None:
            return
        with use_stream(self.stream): # 綁定在stream之上
            self._finalize(batch)  # 調用傳入的業務代碼

2.3 Worker

worker是用來運行task的,每個 device 有一個 worker 來負責執行這個 device 上的 task。如果有安卓經驗的同學,可以理解為是 Looper。

需要注意,worker只是一個函數,如果運行,還需要一個線程作為寄托。這就是后續 spawn_workers 的工作。

def worker(in_queue: InQueue,
           out_queue: OutQueue,
           device: torch.device,
           grad_mode: bool,
           ) -> None:
    """The main loop of a worker thread."""
    torch.set_grad_enabled(grad_mode)

    with use_device(device):
        while True:
            task = in_queue.get() # 從輸入隊列中獲取task

            if task is None:
                break

            try:
                batch = task.compute() # 計算task
            except Exception:
                exc_info = cast(ExcInfo, sys.exc_info())
                out_queue.put((False, exc_info))
                continue

            out_queue.put((True, (task, batch))) # 把task和計算結果放到輸出隊列

    done = (False, None)
    out_queue.put(done)

2.4 生成 worker

這里使用了 @contextmanager 注解,這是實現了上下文管理協議的對象,主要用於保存和恢復各種全局狀態,關閉文件等,並為try...except...finally提供了一個方便使用的封裝。

spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函數是 worker

spawn_workers 不止生成了若干 workers,也生成了一對消息隊列 (in_queues, out_queues) ,這個 (in_queues, out_queues) 在Pipeline 生命周期之內全程都存在,具體來說是:

  • spawn_workers 內部會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是串行來執行業務操作。
in_queue, out_queue = workers[device]
  • 這些 queues 被添加到 (in_queues, out_queues) 之中。
in_queues.append(in_queue)
out_queues.append(out_queue)
  • 之后就是使用 (in_queues, out_queues) 作為各個task 之間傳遞信息的上下文。

  • in_queues 里面的順序就是 device 的順序,也就是partition的順序。out_queues 亦然。

具體代碼如下:

@contextmanager
def spawn_workers(devices: List[torch.device],
                  ) -> Generator[Tuple[List[InQueue], List[OutQueue]], None, None]:
    """Spawns worker threads. A worker thread is bound to a device."""
    in_queues: List[InQueue] = []
    out_queues: List[OutQueue] = []

    # Spawn workers.
    workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {}

    def normalize_device(device: torch.device) -> torch.device:
        if device.type == 'cuda' and device.index is None:
            return torch.device('cuda', index=torch.cuda.current_device())

        if device.type == 'cpu' and device.index is not None:
            return torch.device('cpu')

        return device

    for device in devices:
        device = normalize_device(device) # 得到使用的設備

        try:
            in_queue, out_queue = workers[device] # 臨時放置queue
        except KeyError: # 如果 device 還沒有生成對應的queues,則生成
            in_queue = Queue() # 生成新的queue
            out_queue = Queue()
            
            # 取出queue
            workers[device] = (in_queue, out_queue) # 賦值給workers

            t = Thread(
                target=worker, # Thread的執行程序是 worker 函數
                args=(in_queue, out_queue, device, torch.is_grad_enabled()),
                daemon=True,
            )
            t.start() # 啟動工作線程

        in_queues.append(in_queue) # 插入queue
        out_queues.append(out_queue) # 插入queue

    try:
        yield (in_queues, out_queues) # 返回給調用者
    finally:
        # Close workers.
        for in_queue in set(in_queues):
            in_queue.put(None)

        # Join running workers.
        running = set(out_queues)
        while running:
            out_queue = running.pop()
            ok, payload = out_queue.get()

            done = (False, None)
            if (ok, payload) == done:
                continue

            running.add(out_queue)

2.5 使用

2.5.1 何時生成worker

使用例子位於 torchgpipe/pipeline.py,在 Pipeline 類之中的 run 函數中會生成workers。我們可以看到,對於 Pipeline 來說,有意義的就是 (in_queues, out_queues)。

    def run(self) -> None:
        """Runs pipeline parallelism.

        It modifies the given batches in place.

        """
        batches = self.batches
        partitions = self.partitions
        devices = self.devices
        skip_layout = self.skip_layout

        m = len(batches)
        n = len(partitions)

        skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches]

        with spawn_workers(devices) as (in_queues, out_queues): # 生成 workers,並且得到隊列
            for schedule in clock_cycles(m, n): # 這里是按照算法有次序的運行多個fence, compute
                self.fence(schedule, skip_trackers)
                # 把隊列傳遞進去
                self.compute(schedule, skip_trackers, in_queues, out_queues)

2.5.2 剖析

Torchgpipe 使用了 Python 的 Queue 數據結構。

Queue 類實現了一個基本的先進先出(FIFO)容器。

A multi-producer, multi-consumer queue.

其主要方法是:

  • Queue.get([block, [timeout]]) 讀隊列,從隊列尾部移除元素,timeout為等待時間,如果隊列滿,則阻塞。
  • Queue.put(item, [block, [timeout]]) 寫隊列,將元素添加到序列尾端,timeout為等待時間,如果隊列空,則阻塞。

我個人更習慣於把 (in_queues, out_queues) 理解為類似 Linux 的 管道(Pipe)。

Linux 管道是一種最基本的IPC機制,作用於有血緣關系的進程之間,完成數據傳遞,具體特性如下:

  • 管道是由核函數管理的一個FIFO文件,其實是一個緩沖區,相當於我們放入內存中的一個管道,兩個進程分別處於管道兩端,通過這個管道來傳遞信息。
  • 管道的一端連接一個進程的輸出。這個進程會向管道中放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另一端的進程取出信息。
  • 管道的另一端連接另一個進程的輸入,這個進程取出被放入管道的信息。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另一端的進程放入信息。

具體回到 TorchPipe,我們提前看看論文的內容:

對於這種細粒度的順序控制,torchgpipe把checkpointing 使用兩個單獨的autograd函數Checkpoint和Recompute來實現。在任務 \(F^{'}_{i,j}\) 的執行時間之內,生成一對具有共享內存的Checkpoint和Recompute。該共享內存在向后傳播中被使用,用於將通過執行Recompute生成的本地計算圖傳輸到Checkpoint來進行反向傳播。

於是,這里就有很多並行處理的需求,於是我們可以看到 Pipeline 類的 compute 方法(省略部分代碼)中有向 in_queues 之中放入 Task,從 out_queues 之中去除 Task 的執行結果

    def compute(self,
                schedule: List[Tuple[int, int]],
                skip_trackers: List[SkipTrackerThroughPotals],
                in_queues: List[InQueue],
                out_queues: List[OutQueue],
                ) -> None:


        # With checkpointing, the autograd graph looks like this diagram:
        # ┌─────┸──────┐
        # │    Copy    │
        # └─────┰──────┘   (fence)
        # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
        #       ┃          (compute)
        # ┌─────┸──────┐
        # │    Wait    │ [1] Synchronize the current stream with the copy stream.
        # └─────┰──────┘
        # ┌─────┸──────┐
        # │ Checkpoint │ [2] Compute a partition within checkpointing.
        # └─────┰──────┘
        # ┌─────┸──────┐
        # │    Wait    │ [3] Synchronize the copy stream with the current stream.
        # └─────┰──────┘
        #       ┠ ─ ─ ─ ┐
        #       ┃ ┌─────┴─────┐
        #       ┃ │ Recompute │ [4] Schedule the recomputation at backpropagation.
        #       ┃ └─────┬─────┘
        #       ┠ ─ ─ ─ ┘
        #       ┃
        # ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
        # ┌─────┸──────┐   (fence)
        # │    Copy    │
        # └─────┰──────┘
        for i, j in schedule: # 並行執行
            batch = batches[i]
            partition = partitions[j]

            # Synchronize with the copied input. ([1] in the diagram)

            # Determine whether checkpointing or not.
            if checkpoint:
                def function(input: TensorOrTensors,
                             partition: nn.Sequential = partition,
                             skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                             ) -> TensorOrTensors:
                    with use_skip_tracker(skip_tracker):
                        return partition(input)

                chk = Checkpointing(function, batch)
                # 生成一個Task
                task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
                del function, chk

            else:
                def compute(batch: Batch = batch,
                            partition: nn.Sequential = partition,
                            skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                            ) -> Batch:
                    with use_skip_tracker(skip_tracker):
                        return batch.call(partition)
								# 生成一個Task
                task = Task(streams[j], compute=compute, finalize=None)
                del compute   
                
            # Compute tasks in parallel. ([2] in the diagram)
            in_queues[j].put(task) # 給第j個partition放入一個新的task。因為 i, j 已經在clock算法中設定了,所以前向傳播就是按照這個來走的。

        for i, j in schedule:
            ok, payload = out_queues[j].get() # 取出第j個partition的運行結果
            # .......
            
        # 省略后續代碼

2.6 總結

我們總結梳理一下大致業務邏輯(后文還會細化):

  1. 系統調用 spawn_workers 來生成若干 workers。
  2. spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函數是 worker。spawn_workers 內部也會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是串行來執行業務操作。
  3. 這些 queues 被添加到 (in_queues, out_queues) 之中。然后把 (in_queues, out_queues) 返回給 Pipeline 主線程。之后就是使用 (in_queues, out_queues) 作為各個task 之間傳遞信息的上下文。
  4. Pipeline 主線程得到 (in_queues, out_queues) 之后,如果要通過 compute 方法運行一個Task,就找到其device對應的in_queue,把Task插進去。
  5. Worker Thread 阻塞在 in_queue 之上,如果發現有內容,就讀取 Task,運行Task。
  6. Worker Thread 把運行結果插入到 out_queue之中。
  7. Pipeline 的 compute 方法會取出 out_queue 之中的運行結果,進行后續處理。

如下圖所示:

                           +-------------------------------------------------------------------------+
                           |                                1                                        |
                           |     +--------------------------------------------------------------+    |
                           |     |               3   (in_queues, out_queues)                    |    |
                           |     v                                                              |    v
+--------------------------------+---------+                                             +------+----+-----------------------------------------------------------------------+
| Pipeline                 |               |                                             | spawn_workers                                                                     |
|                          |               |                                             |                                                                                   |
|                          |               |                                             | +-------------------------------------+                                           |
|                          |               |                                             | | workers                             |                                           |
|                          |               |                                             | |                                     |     t = Thread(                           |
|                          +               |                                             | |                                     |       target=worker,                      |
|                 spawn_workers(devices)   |                                             | |  device 1 : in_queue 1, out_queue 1 |       args=(in_queue, out_queue, device), |
|                                          |                                             | |                                     |       daemon=True,                        |
|                                          |                                             | |  device 2 : in_queue 2, out_queue 2 |     )                                     |
| +--------------------------------------+ |                                             | |                                     |     t.start()                             |
| | compute                              | |                                             | |  device 3 : in_queue 3, out_queue 3 |          +                                |
| |                                      | |                                             | |                                     |          |                                |
| |                                      | |    4                                        | |                                     |          |                                |
| |  in_queues[j].put(task)  +-----------------------+                                   | +-------------------------------------+          |                                |
| |                                      | |         |                                   +-----------------------------------------------------------------------------------+
| |                                      | |         |                                                                                      | 2
| |  ok, payload = out_queues[j].get()<--------+     |         +---------------------+                                                      |
| |                                      | |   |     |         | in_queues           |                                                      v
| +--------------------------------------+ |   |     |         |                     |
|                                          |   |     +------------> in_queue 1 +--------+          +---------------------------------------------------------------------+
+------------------------------------------+   |               |    in_queue 2       |  |          | Thread                                                              |
                                               |               |    in_queue 3       |  |          |                                                                     |
                                               |               |                     |  | 5        |    +------------------------------------------------------------+   |
                                               | 7             +---------------------+  |          |    | Worker                                                     |   |
                                               |               +---------------------+  |          |    |                                                            |   |
                                               |               | out_queues          |  |          |    |        device 1      task = in_queue.get()                 |   |
                                               |               |                     |  |  task    |    |                                                            |   |
                                               +------------------+ out_queue 1 <--+ |  +----------------------> in_queue 1    batch = task.compute()                |   |
                                      (True, (task,,batch))    |    out_queue 2    | |             |    |                                                            |   |
                                                               |    out_queue 3    +---------------------------+ out_queue 1   out_queue.put((True, (task, batch)))  |   |
                                                               |                     |      6      |    |                                                            |   |
                                                               +---------------------+             |    +------------------------------------------------------------+   |
                                                                                                   +---------------------------------------------------------------------+

手機如下:

至此,我們分析了如何切分數據和一些運行時機制,下一篇我們結合論文看看具體實現。

0xFF 參考

Markdown公式用法大全

markdown中公式編輯教程

https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

CUDA學習:基礎知識小結

CUDA隨筆之Stream的使用

NVIDIA解決方案架構師深度解析大規模參數語言模型Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

https://github.com/NVIDIA/apex/

https://github.com/justheuristic/prefetch_generator

https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

https://pytorch.org/docs/stable/autograd.html

https://pytorch.org/docs/notes/cuda.html

https://zhuanlan.zhihu.com/p/61765561

https://pytorch.apachen.org/docs/1.7/64.html

https://zhidx.com/p/217999.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM