[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
0x00 摘要
上文已經分析了如何啟動/接受反向傳播,如何進入分布式autograd 引擎,本文和下文就看看如何分布式引擎如何運作。通過本文的學習,讀者可以對 dist.autograd 引擎基本靜態架構和總體執行邏輯有所了解。
PyTorch分布式其他文章如下:
[源碼解析]PyTorch如何實現前向傳播(1) --- 基礎類(上)
[源碼解析]PyTorch如何實現前向傳播(2) --- 基礎類(下)
[源碼解析] PyTorch如何實現前向傳播(3) --- 具體實現
[源碼解析] Pytorch 如何實現后向傳播 (1)---- 調用引擎
[源碼解析] Pytorch 如何實現后向傳播 (2)---- 引擎靜態結構
[源碼解析] Pytorch 如何實現后向傳播 (3)---- 引擎動態邏輯
[源碼解析] PyTorch 如何實現后向傳播 (4)---- 具體算法
[源碼解析] PyTorch 分布式(1)------歷史和概述
[源碼解析] PyTorch 分布式(2) ----- DataParallel(上)
[源碼解析] PyTorch 分布式(3) ----- DataParallel(下)
[源碼解析] PyTorch 分布式(4)------分布式應用基礎概念
[源碼解析] PyTorch分布式(5) ------ DistributedDataParallel 總述&如何使用
[源碼解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源碼解析] PyTorch 分布式(7) ----- DistributedDataParallel 之進程組
[源碼解析] PyTorch 分布式(8) -------- DistributedDataParallel之論文篇
[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化
[源碼解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer靜態架構
[源碼解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 構建Reducer和Join操作
[源碼解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向傳播
[源碼解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向傳播
[源碼解析] PyTorch 分布式 Autograd (1) ---- 設計
[源碼解析] PyTorch 分布式 Autograd (2) ---- RPC基礎
[源碼解析] PyTorch 分布式 Autograd (3) ---- 上下文相關
[源碼解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎
為了更好的說明,本文代碼會依據具體情況來進行相應精簡。
0x01 支撐系統
我們首先看看一些引擎內部支撐系統。
1.1 引擎入口
引擎入口在 backward 函數中有調用,從 DistEngine::getInstance().execute 進入到引擎,由前文可知,這里是主動調用引擎。
void backward(
int64_t context_id,
const variable_list& roots,
bool retain_graph) {
RECORD_FUNCTION(
kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
try {
DistEngine::getInstance().execute(context_id, roots, retain_graph);
} catch (std::exception& e) {
throw std::runtime_error(e.what());
}
}
1.2 SendRpcBackward
被動調用引擎是從 SendRpcBackward 開始的。SendRpcBackward 是前向傳播之中發送行為對應的反向傳播算子。DistAutogradContext 存儲在一個worker之上的每一個分布式autograd的相關信息,其在分布式 autograd 之中封裝前向和后向傳播,累積梯度,這避免了多個worker在彼此的梯度上互相影響。在上下文 DistAutogradContext 之中有個成員變量,記錄了本 worker 所有發送行為對應的反向傳播算子。
std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>> sendAutogradFunctions_;
sendAutogradFunctions_ 中的內容都是SendRpcBackward。
1.2.1 剖析
SendRpcBackward 作為分布式autograd實現的一部分,每當我們將RPC從一個節點發送到另一個節點時,我們都會向autograd圖添加一個"SendRpcBackward"autograd function。這是一個占位符函數,用於在向后傳播時啟動當前worker的autograd引擎。此autograd function的邊是RPC方法的輸入。
在向后傳播過程中,此函數將在autograd引擎中排隊等待執行,該引擎最終將運行autograd圖的其余部分。
SendRpcBackward實際上是本地節點上autograd圖的根。我們給出之前的示意圖如下:
- SendRpcBackward不會接收任何 "輸入",而是RPC框架將梯度傳遞給該函數以啟動局部autograd計算。
- SendRpcBackward的input邊是RPC方法的輸入,就是梯度。
1.2.2 定義
SendRpcBackward 是 Node 的派生類,因為是 Node,所以有 next_edges,可以看到其新增成員變量是 grads_。
// As part of our distributed autograd implementation, whenever we send an RPC
// from one node to another, we add a 'SendRpcBackward' autograd function to the
// autograd graph. This is more or less a placeholder function that is used to
// kickoff the autograd engine on the current worker on the backward pass. The
// edges for this autograd function are the inputs to the RPC method.
//
// During the backward pass, this function is queued for execution in the
// autograd engine which eventually runs the rest of the autograd graph.
struct TORCH_API SendRpcBackward : public torch::autograd::Node {
public:
torch::autograd::variable_list apply(
torch::autograd::variable_list&& inputs) override;
// SendRpcBackward is actually the root of an autograd graph on the local
// node. As a result, it doesn't receive any 'inputs', but rather the RPC
// framework passes gradients over to this function to kickoff local autograd
// computation.
void setGrads(const torch::autograd::variable_list& grads);
// Retrieve the grads for the function.
const torch::autograd::variable_list& getGrads() const;
private:
torch::autograd::variable_list grads_;
};
1.2.3 構建
在前向傳播過程之中,addSendRpcBackward 會構建一個SendRpcBackward,會把其前向傳播輸入邊作為反向傳播的輸出邊設置在 SendRpcBackward 之中。
void addSendRpcBackward(
const ContextPtr& autogradContext,
const AutogradMetadata& autogradMetadata,
std::vector<torch::Tensor>& tensors) {
// Attach autograd information only for tensors requiring grad.
std::vector<torch::Tensor> tensors_with_grad;
std::copy_if(
tensors.begin(),
tensors.end(),
std::back_inserter(tensors_with_grad),
[](const torch::Tensor& t) { return t.requires_grad(); });
// Attach the appropriate autograd edges.
auto grad_fn = std::make_shared<SendRpcBackward>(); // 構建了 SendRpcBackward
grad_fn->set_next_edges(
torch::autograd::collect_next_edges(tensors_with_grad));
// Add the appropriate input metadata for the grad_fn.
for (const auto& tensor : tensors_with_grad) {
grad_fn->add_input_metadata(tensor); // 添加邊 SendRpcBackward
}
// Record the send autograd function in our current context.
// 插入到上下文
autogradContext->addSendFunction(grad_fn, autogradMetadata.autogradMessageId);
}
1.2.4 grads_
之前看到,SendRpcBackward新增成員變量是 grads_
,我們看看 grads_
如何設置和使用?
SendRpcBackward 提供了 set, get 操作。
void SendRpcBackward::setGrads(const torch::autograd::variable_list& grads) {
grads_ = grads;
}
const torch::autograd::variable_list& SendRpcBackward::getGrads() const {
return grads_;
}
何時會使用?在 torch/csrc/distributed/rpc/request_callback_no_python.cpp 之中有 processBackwardAutogradReq。processBackwardAutogradReq 會:
- 使用 sendFunction->setGrads(gradientsCall.getGrads()) 來設置遠端傳遞來的梯度。
- 調用 DistEngine::getInstance().executeSendFunctionAsync 來執行引擎開始本地后向計算。
對應了設計中如下文字,也就是被動進入引擎的起點:
SendRpcBackward實際上是本地節點上autograd圖的根。因此,它不會接收任何"輸入",而是RPC框架將梯度傳遞給該函數以啟動局部autograd計算。
具體代碼如下:
void RequestCallbackNoPython::processBackwardAutogradReq(
RpcCommandBase& rpc,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture) const {
auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
const auto& autogradMetadata = gradientsCall.getAutogradMetadata();
// Retrieve the appropriate autograd context.
auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
autogradMetadata.autogradContextId);
// Lookup the appropriate 'send' function to enqueue.
std::shared_ptr<SendRpcBackward> sendFunction =
autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);
// Attach the gradients to the send function.
sendFunction->setGrads(gradientsCall.getGrads()); // 這里設置,就是把RPC傳來的梯度賦值
// Now execute the autograd graph using the "distributed engine."
auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 這里使用了 grads_
autogradContext, sendFunction, gradientsCall.retainGraph());
// Our response is satisfied when the rpcs come back.
execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
if (!execFuture.hasError()) {
Message m = std::move(PropagateGradientsResp()).toMessage();
m.setId(messageId);
responseFuture->markCompleted(
IValue(c10::make_intrusive<Message>(std::move(m))));
} else {
responseFuture->setError(execFuture.exception_ptr());
}
});
}
executeSendFunctionAsync 就會用 sendFunction->getGrads() 提取梯度,進行操作。
c10::intrusive_ptr<c10::ivalue::Future> DistEngine::executeSendFunctionAsync(
const ContextPtr& autogradContext,
const std::shared_ptr<SendRpcBackward>& sendFunction,
bool retainGraph) {
// Typically the local autograd engine ensures stream synchronizations between
// nodes in the graph. However, for distributed autograd the sendFunction
// inputs might have been retrieved over the wire on a separate stream and the
// sendFunction itself runs on a different stream. As a result, we need to
// manually synchronize those two streams here.
const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA);
if (send_backward_stream) {
for (const auto& grad : sendFunction->getGrads()) { // 這里有獲取
const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
const auto default_stream = guard.getStream(grad.device());
if (send_backward_stream != default_stream) {
auto event = c10::Event{c10::DeviceType::CUDA};
event.record(default_stream);
send_backward_stream->wait(event);
}
}
}
// 省略后續代碼
具體如下圖:
0x02 定義
2.1 定義
DistEngine 的定義如下,為了更好講解,下面刪除了部分代碼:
class TORCH_API DistEngine {
public:
// Retrieve the singleton instance.
static DistEngine& getInstance();
// Given a list of root variables, start the distributed backwards pass from
// these variables and accumulate all the gradients in the current autograd
// context on each node. This method is used to kickoff distributed autograd
// on a single node.
void execute(
int64_t context_id,
const torch::autograd::variable_list& roots,
bool retainGraph);
// Given a send function to execute in the autograd engine, ensures we compute
// dependencies once for this node and enqueues the send function for execute
// in the engine.
// This method is used to kick off the autograd computation on a node when it
// receives gradients from the corresponding 'recv' method on another node.
// The gradients are accumulated in the provided autograd context.
c10::intrusive_ptr<c10::ivalue::Future> executeSendFunctionAsync(
const ContextPtr& autogradContext,
const std::shared_ptr<SendRpcBackward>& sendFunction,
bool retainGraph);
// Number of backward passes currently running for the Distributed Engine.
size_t numBackwardPasses() const;
// Returns key-value pairs consisting of useful debugging information related
// to distributed autograd.
std::unordered_map<std::string, int> getDebugInfo() const;
// Validates the input roots for the backward computations and retrieves the
// appropriate root edges and corresponding gradients. Populates root_edges
// with the appropriate gradient edges and grads with the gradients for each
// edge.
void validateRootsAndRetrieveEdges(
const torch::autograd::variable_list& roots,
torch::autograd::edge_list& rootEdges,
torch::autograd::variable_list& grads);
// Given the autograd context, root edges and grads, we compute dependencies
// for the local node and fill out the provided GraphTask and GraphRoot with
// appropriate information for the local autograd engine.
// We also determine all leaf nodes(functions) in the graph and accumulate
// them in outputEdges.
void computeDependencies(
const ContextPtr& context,
const torch::autograd::edge_list& rootEdges,
const torch::autograd::variable_list& grads,
const std::shared_ptr<torch::autograd::Node>& graphRoot,
torch::autograd::edge_list& outputEdges,
bool retainGraph);
// Given a pre-populated GraphTask and a root node, compute the backward pass
// for the autograd graph until the graph task ready queue is empty.
//
// This method assumes that the appropriate GraphTask has already been
// initialized appropriately. It will construct a local ready queue to
// traverse the GraphTask instead of using the GraphTask embedded
// cpu_ready_queue, this is because dist engine might run the same GraphTask
// from different SendFunctions concurrently in different threads. The method
// will only mark the GraphTask as completed when it needes to, which means it
// might not mark as completed for every call as dist engine would like to
// keep the GraphTask alive when it not receives all gradients.
//
// When `incrementOutstandingTasks=false`, the function does not increment
// 'outstanding_tasks_' in the appropriate GraphTask. It is assumed we've
// already done this before hand for this task (to ensure we don't pre-mark
// this graph_task as completed). This is useful in the distributed autograd
// case where we need to increment 'outstanding_tasks_' first to indicate the
// local autograd engine the graph task is not completed until it receives the
// signals from other workers over the network.
//
// XXX: calling this function assumes that we will have NO GPU nodetasks be
// executed for the graph_task, the caller of this function need to ensure
// this otherwise there will be undefined behaviors. A correct way to fix this
// is to re-design the autograd engine so that GPU worker thread to behave the
// same as CPU caller thread, record the operation/thread for the device, and
// reuse it in backward.
// TODO: 1. Add assert in the dist engine to ensure no GPU NodeTasks during
// backward
// 2. properly setup the thread local ready queue to enable reentrant
// backwards
void execute_graph_task_until_ready_queue_empty(
torch::autograd::NodeTask&& node_task,
bool incrementOutstandingTasks = true);
// Run the local autograd engine using the provided graphTask and graphRoot
// and accumulate the gradients part 'outputEdges' in the provided autograd
// context.
c10::intrusive_ptr<c10::ivalue::Future> runEngineAndAccumulateGradients(
const ContextPtr& autogradContext,
const std::shared_ptr<torch::autograd::Node>& graphRoot,
const torch::autograd::edge_list& outputEdges,
bool incrementOutStandingTasks = true);
// Run after the backward pass is done to appropriately cleanup structures.
void cleanupBackwardPass(const ContextPtr& autogradContext);
// Global thread to execute CPU continuations.
void globalCpuThread(
const std::shared_ptr<torch::autograd::ReadyQueue>& ready_queue);
// Set of autograd context_ids, which we have already initialized for
// distributed autograd on this node (e.g.: already computed dependencies)
std::unordered_set<int64_t> initializedContextIds_;
mutable std::mutex initializedContextIdsLock_;
// Reference to local autograd engine.
torch::autograd::Engine& engine_;
// Ready queue used by the CPU thread in distributed engine.
// See Note [GPU to CPU continuations]
// 每個 GraphTask都把 global_cpu_ready_queue_ 設置為自己的 cpu_ready_queue_
std::shared_ptr<torch::autograd::ReadyQueue> global_cpu_ready_queue_;
// See Note [GPU to CPU continuations]
std::thread global_cpu_thread_;
friend class BackwardPassCleanupGuard;
};
2.2 單例
引擎使用了單例模式,這樣每個 worker 之中就只有一個單例在運行。
DistEngine& DistEngine::getInstance() {
// Leaky singleton to avoid module destructor race.
static DistEngine* engine = new DistEngine();
return *engine;
}
2.3 重要注釋
PyTorch 源碼之中有大量詳盡的注釋,我們挑選一些來看看。
2.3.1 成員變量
代碼中定義了兩個 CPU 全局相關成員變量,具體如下,均注明需要看 [GPU to CPU continuations] 這個注釋。
// Ready queue used by the CPU thread in distributed engine.
// See Note [GPU to CPU continuations]
std::shared_ptr<torch::autograd::ReadyQueue> global_cpu_ready_queue_;
// See Note [GPU to CPU continuations]
std::thread global_cpu_thread_;
2.3.2 構建
這兩個成員變量具體初始化位置是在構建函數之中。
DistEngine::DistEngine()
: initializedContextIds_(),
engine_(Engine::get_default_engine()),
global_cpu_ready_queue_(std::make_shared<ReadyQueue>()), // 這里構建了
global_cpu_thread_( // 這里構建了
&DistEngine::globalCpuThread,
this,
global_cpu_ready_queue_) {
// Note [GPU to CPU continuations]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// Initialize a single CPU thread to execute continuations from GPU
// tasks. The multithreaded structure for the distributed engine works
// well only for CPU tasks. If we have an order of tasks like
// CPU->GPU->CPU, distributed autograd has no thread to execute the last
// CPU task on. To fix this, we introduce a global CPU thread to handle
// such situations and it will be responsible for executing these CPU
// tasks. The CPU thread has its own ready_queue which is used as the
// cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU
// to CPU continuations are enqueued on this thread. The global CPU thread
// simply dequeues tasks from the global queue and calls
// "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the
// appropriate task.
global_cpu_thread_.detach(); // detach之后就獨立運行了
}
2.3.3 GPU to CPU continuations
以下是 GPU to CPU continuations 的翻譯和理解。
Continuations 最初應該是在schema語言里面接觸過的,后來也看過不少語言用到,這個概念沒有找到一個很好的延續概念,暫時使用"延續"這個翻譯。
為了執行GPU任務的延續(continuations),所以需要初始化一個單獨的CPU線程來處理。分布式引擎的多線程結構僅適用於CPU任務。如果我們有CPU->GPU->CPU這樣的任務順序,分布式 autograd 就沒有線程來執行最后一個CPU任務。為了解決這個問題,我們引入了一個全局CPU線程來處理這種情況,它將負責執行這些CPU任務。
CPU線程有自己的就緒隊列(ready_queue),它用作DistEngine的所有GraphTask的CPU就緒隊列(cpu_ready_queue)。這確保所有GPU到CPU的延續(continuations)都在此線程上排隊。全局CPU線程只需將任務從全局隊列中取出,並在JIT線程上調用"execute_graph_task_until_ready_queue_empty",以執行相應的任務。
If we have an order of tasks like CPU->GPU->CPU, distributed autograd has no thread to execute the last CPU task on. To fix this, we introduce a global CPU thread to handle such situations and it will be responsible for executing these CPU tasks. The CPU thread has its own ready_queue which is used as the cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU to CPU continuations are enqueued on this thread. The global CPU thread simply dequeues tasks from the global queue and calls "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the appropriate task.
2.3.4 析構
析構函數之中有如下,就是為了引擎結束而做對這兩個成員變量做了相關操作。
DistEngine::~DistEngine() {
// Ensure we shutdown the CPU thread.
TORCH_ASSERT_NO_GIL_WITHOUT_PYTHON_DEP();
global_cpu_ready_queue_->pushShutdownTask();
global_cpu_thread_.join();
}
2.3.5 插入隊列
在哪里往 global_cpu_ready_queue_ 插入?在 DistEngine::computeDependencies 里面會有插入。首先,每個 GraphTask 都把 global_cpu_ready_queue_ 設置為 cpu_ready_queue。GraphTask構造函數這里參數在調用時候傳入的是 global_cpu_ready_queue_。
void DistEngine::computeDependencies(
const ContextPtr& autogradContext,
const edge_list& rootEdges,
const variable_list& grads,
const std::shared_ptr<Node>& graphRoot,
edge_list& outputEdges,
bool retainGraph) {
// Build the graph task and graph root.
// NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask
// as we use execute_graph_task_until_ready_queue_empty, which will build
// a separate ReadyQueue for each call.
auto graphTask = std::make_shared<GraphTask>(
/* keep_graph */ retainGraph,
/* create_graph */ false,
/* depth */ 0,
/* cpu_ready_queue */ global_cpu_ready_queue_,
/* exit_on_error */ true);
// 省略其他 graphTask 初始化
// Let autograd context take ownership of the GraphTask.
// 上下文里面設置了 GraphTask
autogradContext->setGraphTask(std::move(graphTask));
}
所以,如果 GraphTask 最后返回需要 CPU 運行時候,就統一用這個。
2.3.6 工作線程
globalCpuThread 是工作線程,其就是從 ready queue 里面彈出 NodeTask,然后執行。
void DistEngine::globalCpuThread(
const std::shared_ptr<ReadyQueue>& ready_queue) {
while (true) {
NodeTask task = ready_queue->pop();
if (task.isShutdownTask_) {
// Need to shutdown this thread.
C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown");
break;
}
auto graphTask = task.base_.lock();
if (graphTask == nullptr) {
// GraphTask has expired, ignore and continue processing.
continue;
}
// Launch the execution on a JIT thread.
at::launch([this,
graphTask,
graphRoot = task.fn_,
variables =
InputBuffer::variables(std::move(task.inputs_))]() mutable {
InputBuffer inputs(variables.size());
for (size_t i = 0; i < variables.size(); i++) {
inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt);
}
execute_graph_task_until_ready_queue_empty(
/*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)),
/*incrementOutstandingTasks*/ false);
});
}
}
0x03 總體執行
總體執行是在 DistEngine::execute 之中完成,具體分為如下步驟:
- 使用 contextId 得到前向的上下文。
- 使用 validateRootsAndRetrieveEdges 進行驗證。
- 構造一個GraphRoot,用它來驅動后向傳播,可以認為是一個虛擬根。
- 使用 computeDependencies 計算依賴。
- 使用 runEngineAndAccumulateGradients 進行反向傳播計算。
- 使用 clearAndWaitForOutstandingRpcsAsync 等待 RPC 完成。
可以看到,與普通引擎相比較,分布式多了一個計算root邊和生成邊上梯度信息的過程。因為在普通前向傳播過程之中,這些是已經配置好的,但是在分布式計算之中,前向傳播是沒有計算這些,所以需要在反向傳播之前計算出來。
void DistEngine::execute(
int64_t contextId,
const variable_list& roots,
bool retainGraph) {
// Retrieve the context for the given context_id. This will throw if the
// context_id is invalid.
auto autogradContext =
DistAutogradContainer::getInstance().retrieveContext(contextId);
// Perform initial pre-processing.
edge_list rootEdges;
variable_list grads;
validateRootsAndRetrieveEdges(roots, rootEdges, grads);
// 構造一個GraphRoot,用它來驅動后向傳播,可以認為是一個虛擬根
std::shared_ptr<Node> graphRoot =
std::make_shared<GraphRoot>(rootEdges, grads);
edge_list outputEdges;
// Compute dependencies locally, starting from all roots and all 'send'
// functions.
{
std::lock_guard<std::mutex> guard(initializedContextIdsLock_);
// Context should not have been initialized already.
TORCH_INTERNAL_ASSERT(
initializedContextIds_.find(autogradContext->contextId()) ==
initializedContextIds_.end());
// 計算依賴
computeDependencies(
autogradContext, rootEdges, grads, graphRoot, outputEdges, retainGraph);
// Mark the autograd context id as initialized.
initializedContextIds_.insert(autogradContext->contextId());
}
BackwardPassCleanupGuard guard(autogradContext);
// This needs to be blocking and as a result we wait for the future to
// complete.
runEngineAndAccumulateGradients(autogradContext, graphRoot, outputEdges)
->waitAndThrow(); // 反向傳播計算
// Wait for all of the outstanding rpcs to complete.
autogradContext->clearAndWaitForOutstandingRpcsAsync()->waitAndThrow();
}
0x04 驗證節點和邊
我們接下來看看如何做驗證工作。
validateRootsAndRetrieveEdges 被用來驗證節點和邊的有效性,具體邏輯是:
- 驗證根節點的有效性,獲取根節點的邊。
- 看看根節點是否為空。
- 根節點是否需要計算梯度。
- 根節點是否有梯度函數。
- 計算梯度的邊,生成相應的梯度。
- 調用 validate_outputs 來驗證輸出。
void DistEngine::validateRootsAndRetrieveEdges(
const variable_list& roots,
edge_list& rootEdges,
variable_list& grads) {
TORCH_CHECK(!roots.empty(), "No tensors provided for gradient computation.");
TORCH_INTERNAL_ASSERT(rootEdges.empty());
TORCH_INTERNAL_ASSERT(grads.empty());
// Verify roots are all scalar and require gradients.
for (const auto& root : roots) {
TORCH_CHECK(root.requires_grad(), "requires_grad not set on root");
TORCH_CHECK(
root.numel() == 1, // python numel()函數:返回數組中元素的個數
root.name(),
" is not a scalar, all roots need to be scalar");
TORCH_CHECK(
root.grad_fn(),
root.name(),
" does not have a valid gradient function.");
// Compute the root edges and generate the appropriate gradients.
rootEdges.push_back(torch::autograd::impl::gradient_edge(root));
grads.push_back(at::ones_like(root, LEGACY_CONTIGUOUS_MEMORY_FORMAT));
}
// Validate rootEdges and grads.
validate_outputs(
rootEdges, grads, [](const std::string& msg) { return msg; });
}
4.1 gradient_edge
gradient_edge 在本文下面會用到,就是利用一個Variable的梯度和前向傳播的輸出來構建一個Edge。
Edge gradient_edge(const Variable& self) {
// If grad_fn is null (as is the case for a leaf node), we instead
// interpret the gradient function to be a gradient accumulator, which will
// accumulate its inputs into the grad property of the variable. These
// nodes get suppressed in some situations, see "suppress gradient
// accumulation" below. Note that only variables which have `requires_grad =
// True` can have gradient accumulators.
// self.grad_fn() 這里觸發了一個調用,得到了一個Node實例
if (const auto& gradient = self.grad_fn()) {
return Edge(gradient, self.output_nr()); // self.output_nr() 表示本Edge是function的第n個輸入。前向傳播時候的第 n 個輸出在反向傳播時候就是第 n 個輸入。
} else {
return Edge(grad_accumulator(self), 0); // 0表示本Edge是function的第一個輸入
}
}
4.2 validate_outputs
其定義在 torch/csrc/autograd/engine.cpp,原生引擎和分布式引擎都會調用。validate_outputs 之中包含了大量的驗證代碼。
- 如果梯度數量與邊數目不同,則退出。
- 遍歷梯度,對於每個梯度:
- 獲取對應的邊,如果邊無效,則去下一個梯度。
- 使用input_metadata 獲取輸入信息。
- 如果梯度沒有定義,也去下一個梯度。
- 如果梯度尺寸與輸入形狀不同,則退出。
- 對梯度的設備,元數據的設備進行一系列判斷。
具體代碼如下:
void validate_outputs(
const edge_list& edges,
variable_list& grads,
const std::function<std::string(const std::string&)>& format_error) {
if (grads.size() != edges.size()) {
std::stringstream ss;
ss << "invalid number of gradients - expected ";
ss << edges.size() << ", but got " << grads.size();
AT_ERROR(format_error(ss.str()));
}
for (size_t i = 0; i < grads.size(); i++) {
const auto& edge = edges[i];
if (!edge.is_valid()) continue;
const auto& metadata = edge.function->input_metadata(edge.input_nr);
auto& grad = grads[i];
if (!grad.defined()) {
// FIXME: TestJit.test_ge_optimized fails this assertion.
// std::stringstream ss;
// ss << "undefined gradient at index " << i;
// AT_ERROR(format_error(ss.str()));
continue;
}
// 如果梯度尺寸與輸入形狀不同,則退出
if (!grad.sizes().equals(metadata.shape())) {
if (!at::is_expandable_to(metadata.shape(), grad.sizes())) {
std::stringstream ss;
ss << "invalid gradient at index " << i << " - got ";
ss << grad.sizes() << " but expected shape compatible with ";
ss << metadata.shape();
AT_ERROR(format_error(ss.str()));
}
grad = at::sum_to(std::move(grad), metadata.shape());
}
bool input_is_complex = isComplexType(c10::typeMetaToScalarType(metadata.options().dtype()));
bool grad_is_complex = isComplexType(grad.scalar_type());
TORCH_CHECK(isFloatingType(grad.scalar_type()) || (input_is_complex == grad_is_complex));
if (c10::typeMetaToScalarType(metadata.options().dtype()) != grad.scalar_type()) {
grad = grad.to(c10::typeMetaToScalarType(metadata.options().dtype()));
}
if (grad.device() != metadata.device() &&
grad.dim() == 0) {
grad = grad.to(metadata.device());
}
if (!is_compatible_type(metadata.options(), grad.options())) {
std::stringstream ss;
ss << "invalid gradient at index " << i << " - expected type ";
ss << metadata.options() << " but got " << grad.options();
AT_ERROR(format_error(ss.str()));
}
auto grad_device = grad.device();
if (grad_device != metadata.device()) {
std::stringstream ss;
ss << "invalid gradient at index " << i << " - expected device ";
ss << metadata.device() << " but got " << grad_device;
AT_ERROR(format_error(ss.str()));
}
// We should not build graph for Tensors that are not differentiable
TORCH_INTERNAL_ASSERT(isDifferentiableType(grad.scalar_type()));
}
}
4.3 VS 普通 engine
我們和普通引擎進行對比一下校驗部分。
普通Engine 之中只調用了 validate_outputs。
auto Engine::execute(const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
bool accumulate_grad,
const edge_list& outputs) -> variable_list {
validate_outputs(roots, const_cast<variable_list&>(inputs), [](const std::string& msg) {
return msg;
});
// 省略其他后續代碼
因此,對於校驗部分,DistEngine 可以總結為:
- 做校驗。
- 根據 roots 來計算root對應的邊和生成對應梯度。
- 再用validate_outputs驗證輸出。
0x05 計算依賴
我們回憶一下設計文檔中的 FAST模式算法。該算法的關鍵假設是:當我們運行反向傳播時,每個send
函數的依賴為 1。換句話說,我們假設我們會從另一個節點通過 RPC 接收梯度。算法如下:
- 我們從具有反向傳播根的worker開始(所有根都必須是本地的)。
- 查找當前Distributed Autograd Context 的所有
send
函數 。 - 從提供的根和我們檢索到的所有
send
函數開始,我們在本地計算依賴項 。 - 計算依賴項后,使用提供的根來啟動本地 autograd 引擎。
- 當 autograd 引擎執行該
recv
函數時,該recv
函數通過 RPC 將輸入梯度發送到適當的worker。每個recv
函數都知道目標 worker id,因為它被記錄為前向傳播的一部分。通過autograd_context_id
和autograd_message_id
該recv
函數被發送到遠程主機。 - 當遠程主機收到這個請求時,我們使用
autograd_context_id
和autograd_message_id
來查找適當的send
函數。 - 如果這是worker第一次收到對給定
autograd_context_id
的請求,它將按照上面的第 1-3 點所述在本地計算依賴項。 - 然后將在第6點接受到的
send
方法插入隊列,以便在該worker的本地 autograd 引擎上執行。 - 最后,我們不是在 Tensor的
.grad
之上累積梯度,而是在每個Distributed Autograd Context之上分別累積梯度 。梯度存儲在Dict[Tensor, Tensor]
之中 ,Dict[Tensor, Tensor]
基本上是從 Tensor 到其關聯梯度的映射,並且可以使用 get_gradients() API檢索該映射 。
本章就是對應了算法的前三項,這部分是和普通引擎最大區別之一。
5.1 總體過程
計算依賴分為兩大部分,第一部分是做准備工作,第二部分是計算依賴關系,第三部分是根據依賴關系來得到需要計算哪些函數。
我們先給出總體代碼和注釋,后續會仔細分析。
void DistEngine::computeDependencies(
const ContextPtr& autogradContext,
const edge_list& rootEdges,
const variable_list& grads,
const std::shared_ptr<Node>& graphRoot,
edge_list& outputEdges,
bool retainGraph) {
TORCH_INTERNAL_ASSERT(graphRoot, "graphRoot is null!");
// 第一部分,准備工作
// 1. 生成一個GraphTask
// Build the graph task and graph root.
// NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask
// as we use execute_graph_task_until_ready_queue_empty, which will build
// a separate ReadyQueue for each call.
// 不需要給 GraphTask 傳一個cpu_ready_queue,因為我們后面使用execute_graph_task_until_ready_queue_empty,在那里會給每一個調用建立一個獨立的ReadyQueue
auto graphTask = std::make_shared<GraphTask>(
/* keep_graph */ retainGraph,
/* create_graph */ false,
/* depth */ 0,
/* cpu_ready_queue */ global_cpu_ready_queue_,
/* exit_on_error */ true);
// Run BFS to traverse the graph locally. The roots of the graph are
// GraphRoot and all send functions for this autograd context.
std::unordered_set<Node*> seen; // 記錄已經訪問過的節點
std::queue<Node*> queue; // 一個 Node 類型的 queue
queue.push(static_cast<Node*>(graphRoot.get())); // 插入根對應的Node
auto sendFunctions = autogradContext->sendFunctions(); // 為了獲取出邊
// 2. 獲取出邊列表
// Add all the send functions to the queue as roots.
// 普通狀態下,root節點內在反向傳播時候,已經有了next edges,但是分布式模式下,出邊是在sendFunctions之中
for (const auto& mapEntry : sendFunctions) { // sendFunctions就是出邊,之前在 addSendFunction之中被添加
// Increment 'outstanding_tasks_' for GraphTask for each send_function
// since we want the local autograd engine to wait for all of them.
graphTask->outstanding_tasks_++; // 出邊增加
queue.push(mapEntry.second.get()); // 后續用queue來處理,插入的是 SendRpcBackward
}
// 第二部分,遍歷圖,計算依賴關系,此時 queue 里面是 root 和 若干 SendRpcBackward
edge_list recvBackwardEdges;
// Traverse the graph.
auto& dependencies = graphTask->dependencies_; // 獲取依賴關系
while (!queue.empty()) { // 遍歷所有發送邊
auto fn = queue.front(); // 得到發送邊
queue.pop();
for (const auto& edge : fn->next_edges()) { // 遍歷Node(根節點或者SendRpcBackward)的next_edges
if (auto nextFn = edge.function.get()) { // 得到一個邊
dependencies[nextFn] += 1; // 對應的節點依賴度加一
const bool wasInserted = seen.insert(nextFn).second; // 是否已經訪問過
if (wasInserted) { // 如果true,是插入了,就說明之前沒有訪問過,否則插不進去,是false
// Seeing this function for the first time.
queue.push(nextFn); // 既然之前沒有訪問過,就插入到queue
if (nextFn->next_edges().empty()) { // 如果這個邊本身沒有輸出邊,說明是葉子節點
TORCH_INTERNAL_ASSERT(
dynamic_cast<AccumulateGrad*>(nextFn) ||
dynamic_cast<RecvRpcBackward*>(nextFn)); // 葉子節點有兩種
// We have found a leaf node which should be either AccumulateGrad
// or RecvRpcBackward. Record the function
// to ensure we don't execute it and instead accumulate the grads on
// the autograd context. These functions would be passed in as the
// 'outputs' parameter of the vanilla autograd engine.
// We don't accumulate any grads in the context for RecvRpcBackward.
// RecvRpcBackward is added as an output edge to indicate it is a
// leaf node and this helps in properly computing dependencies for
// the local autograd graph. Putting RecvRpcBackward in
// 'outputEdges' means that this function needs to be executed
// (inline with our assumption for FAST mode that all send/recv
// functions are valid in the backward pass), and as a result all of
// its ancestors need to be executed as well.
if (dynamic_cast<RecvRpcBackward*>(nextFn)) {
recvBackwardEdges.emplace_back(edge); // 特殊處理
}
outputEdges.emplace_back(edge); // 最終輸出邊
}
}
}
}
}
// 此時,recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad
// 以下是第三部分,根據依賴關系找到需要計算那些functions
// Now lets compute which functions need to be executed. The algorithm is as
// follows:
// 1. Create a dummy GraphRoot which points to all 'send' functions for this
// context and the original graphRoot. Run 'init_to_execute' with the
// outputEdges and the dummy GraphRoot. This ensures we mark
// appropriate functions as needed if they are reachable only from a
// specific 'send' function locally and not necessarily from the provided
// roots.
// 2. For all edges in 'outputEdges' which point to 'RecvRpcBackward', mark
// those functions as needed for execution. The reason for this is that
// 'init_to_execute', will mark these as not needed. But 'RecvRpcBackward'
// is unique in the sense that we use it as a leaf node in graph to compute
// needed execution accurately, but unlike AccumulateGrad, we do need to
// execute this function.
if (!outputEdges.empty()) {
// Compute 'needed execution' starting from all 'send' functions and the
// original graphRoot.
edge_list edges;
// Create some dummy edges (input_nr not important for init_to_execute).
for (const auto& mapEntry : sendFunctions) { // 遍歷
edges.emplace_back(mapEntry.second, 0); // 得到出邊列表
}
// Add the original graphRoot as an edge.
edges.emplace_back(graphRoot, 0); // root也加入出邊列表
// Create a dummy GraphRoot and run init_to_execute with it.
GraphRoot dummyRoot(edges, {}); // 建立一個虛擬Root
// 如果出邊不為空,則會調用 init_to_execute 對GraphTask進行初始化
graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false, /*min_topo_nr=*/0);
// exec_info_ 的數據結構是std::unordered_map<Node*, ExecInfo>
for (auto& mapEntry : graphTask->exec_info_) {
auto& execInfo = mapEntry.second;
if (!execInfo.captures_) { // 看看此張量是否在所求梯度的張量路徑上
continue;// 如果不在路徑之上,就跳到下一個張量
}
auto fn = mapEntry.first; // 拿到 Node
// There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward,
// to be captured.
if (auto accumulateGradFn = dynamic_cast<AccumulateGrad*>(fn)) {
// 如果是葉子節點
for (auto& capture : *execInfo.captures_) { // 遍歷張量路徑上的節點
capture.hooks_.push_back(
std::make_unique<DistAccumulateGradCaptureHook>( // 給張量插入Hook
std::dynamic_pointer_cast<AccumulateGrad>(
accumulateGradFn->shared_from_this()),
autogradContext));
}
}
}
// Mark all 'RecvRPCBackward' as needing execution.
// RecvRPCBackward需要執行
for (const auto& recvBackwardEdge : recvBackwardEdges) {
graphTask->exec_info_[recvBackwardEdge.function.get()].needed_ = true;
}
}
// Let autograd context take ownership of the GraphTask.
// 設定在上下文之中
autogradContext->setGraphTask(std::move(graphTask));
}
5.2 第一部分 准備工作
5.2.1 實現
因為這里是計算本地的依賴關系,所以遍歷需要從 root 和 本地的 SendRpcBackward 開始計算。我們先要先做一些准備工作:
- 首先生成一個GraphTask,但是不需要給 GraphTask 傳一個cpu_ready_queue,因為我們后面使用execute_graph_task_until_ready_queue_empty,在那里會給每一個調用 建立一個獨立的ReadyQueue。
- 其次用 seen 來記錄已經訪問過的節點。
- 構建一個 Node 類型的 queue,把根節點插入到queue。
- 然后從上下文之中拿到出邊Functions,放入到 sendFunctions 之中。
- sendFunctions就是出邊,之前在 addSendFunction之中被添加。
- 普通狀態下,root節點內在反向傳播時候,已經有了next edges,但是分布式模式下,出邊是在sendFunctions之中。
- 遍歷出邊 sendFunctions,構建出邊列表,對於 sendFunctions 中的每一項:
- GraphTask 出邊數目增加 graphTask->outstanding_tasks_++。
- 在 queue 之中插入 sendFunctions 中的 SendRpcBackward。
- 最后,queue 里面是 root 和 若干 SendRpcBackward。
5.2.2 相關
實現之中,使用了部分函數或者成員變量,我們選取重點進行介紹。
5.2.2.1 sendFunctions
sendFunctions 是獲取了上下文的sendAutogradFunctions_,這是一個 std::unordered_map<int64_t, std::shared_ptr
std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>>
DistAutogradContext::sendFunctions() const {
std::lock_guard<std::mutex> guard(lock_);
return sendAutogradFunctions_;
}
sendFunctions就是出邊,之前在 addSendFunction之中被添加,addSendRpcBackward 會調用 addSendFunction。
5.2.2.2 outstanding_tasks_
利用 graphTask->outstanding_tasks_++ 把GraphTask 出邊數目增加。
GraphTask
outstanding_tasks_ 是 GraphTask 的成員變量。
- outstanding_tasks_ :用來記錄當前任務數目,如果數目為0,則說明任務結束了。 如果這個數量不為0,則此GraphTask依然需要運行。
vania engine
在 vania engine 之中就有 outstanding_tasks_。
是待處理 NodeTask的數量,用來判斷該GrapTask是否還需要執行,如果數目為0,則說明任務結束了。
- 當 GraphTask 被創建出來時候,此數值為0。
- 如果有一個NodeTask被送入到 ReadyQueue,則outstanding_tasks_ 增加 1。
- 如果在工作線程作執行一次 evaluate_function(task)后,outstanding_tasks的值減1。
- 如果這個數量不為0,則此GraphTask依然需要運行。
bool GraphTask::completed() {
return outstanding_tasks_.load() == 0 ||
(exit_on_error_ && has_error_.load());
}
NodeTask任務增加時候 outstanding_tasks_ 就加一。
dist engine
在計算依賴時候,遍歷 sendFunctions,上下文有幾個SendRpcBackward,就把 outstanding_tasks_ 就加幾,每多一條出邊,就意味着多了一個計算過程。
std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>>
DistAutogradContext::sendFunctions() const {
std::lock_guard<std::mutex> guard(lock_);
return sendAutogradFunctions_;
}
而執行時候,void DistEngine::execute_graph_task_until_ready_queue_empty 和 Engine::thread_main 都會減少 outstanding_tasks_。
5.3 第二部分 計算依賴
第二部分是遍歷圖,計算依賴關系。
5.3.1 實現
此時 queue 里面是 root 和 若干 SendRpcBackward,所以接下來就是從 queue 之中不停彈出Node 進行計算。具體邏輯是:
- 遍歷所有發送邊(從 queue 之中不停彈出Node ),對於每個Node,遍歷Node(根節點或者SendRpcBackward)的next_edges:
- 如果可以得到一個邊,則:
- 對應的節點依賴度加一。
- 如果之前沒有訪問過,就插入到queue。
- 如果這個邊本身沒有輸出邊,說明是葉子節點,葉子節點有兩種:AccumulateGrad 或者 RecvRpcBackward。
- 對於 recvBackwardEdges.emplace_back(edge) 做特殊處理。
- 插入到最終輸出邊 outputEdges,注意,RecvRpcBackward 也插入到這里。
- 如果可以得到一個邊,則:
這之后,局部變量 recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward。
5.3.2 葉子節點的種類
有兩種葉子節點,所以需要分開處理。
- AccumulateGrad : 普通葉子節點,就是本地葉子節點。
- RecvRpcBackward : 在正向圖中,是RPC接收節點。
從設計文檔之中,有如下對應:"
我們發現了一個葉節點,它應該是AccumulateGrad或RecvRpcBackward。我們記錄函數以確保我們不執行它,而是在autograd上下文中累積梯度。這些函數將作為"輸出"參數傳入到vanilla autograd引擎。
我們沒有在RecvRpcBackward上下文積累任何梯度。RecvRpcBackward被添加為輸出邊,以指示它是葉節點,這有助於正確計算本地autograd graph的依賴關系。將RecvRpcBackward放在"outputEdges"中意味着需要執行此函數(與我們對快速模式的假設一致,即所有send/recv函數在向后傳播中都有效),因此也需要執行其所有祖先函數。
比如,對於 work 1, recv 就是葉子節點,是一個RecvRpcBackward,它需要把梯度傳遞給 worker 0。對於 worker 0,上面的子圖,t1, t2 也是葉子節點,都是AccumulateGrad。
5.4 第三部分 得到Functions
這部分根據依賴關系找到需要計算那些functions。
5.4.1 算法
現在讓我們計算需要執行哪些函數。算法如下:
-
- 創建一個虛擬GraphRoot,它指向此上下文和原始GraphRoot的所有"發送"函數。使用outputEdges和虛擬GraphRoot來運行"init_to_execute"。這確保我們根據需要標記適當的函數:如果它們只能從本地特定的"發送"函數訪問,而不需要從提供的根訪問。
-
- 對於"outputEdges"中指向"RecvRpcBackward"的所有邊,根據執行需要標記這些函數。原因是"init_to_execute"會將這些標記為不需要。但"RecvRpcBackward"的獨特之處在於,我們將其用作圖中的葉節點來准確計算所需的執行操作,但與AccumageGrad不同,我們確實需要執行此函數。
具體就是:
- RecvRpcBackward 需要執行。
- AccumulateGrad 需要累積梯度。
5.4.2 實現
此時,recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward。我們需要根據這些信息來標識后續如何執行。具體實現是:
-
先計算 AccumulateGrad,如果 outputEdges 不為空,則把 outputEdges 的信息插入到 GraphTask.exec_info_ 之中:
- 構建一個 edge_list edges,就是出邊列表。
- 遍歷 sendFunctions,得到輸出列表,加入到 edges。
- root也加入出邊列表。
- 建立一個虛擬Root。
- 如果出邊不為空,則會調用 init_to_execute 對GraphTask進行初始化。
- 遍歷 GraphTask 的 exec_info,exec_info_ 的數據結構是std::unordered_map<Node*, ExecInfo> 。
- 看看此張量是否在所求梯度的張量路徑上。
- 如果不在路徑之上,就跳到下一個張量。
- 拿到 exec_info_ 的 Node。
- 如果 Node 是葉子節點。
- 遍歷張量路徑上的節點。
- 給張量插入Hook。這里是關鍵,就是 AccumulateGrad 對應的張量加上了 Hook,用來后續累積梯度。
-
遍歷 recvBackwardEdges,對於每個 recvBackward,在 GraphTask.exec_info_ 之中對應項之上設止為 "需要執行"。
至此,依賴項處理完畢,所有需要計算的函數信息都位於 GraphTask.exec_info_ 之上,我們在下一篇來看看如何執行。
5.5 小結
我們總結一下計算依賴的邏輯:
- computeDependencies 開始計算依賴。
- 從 DistAutogradContext 之中獲取 sendAutogradFunctions_,把 SendRpcBackward 都放入到 sendFunctions。普通狀態下,root節點內在反向傳播時候,已經有了next edges,但是分布式模式下,出邊是在sendFunctions之中,所以要提取出來,放入下面的 queue。
- 遍歷 sendFunctions,把 Node 加入到 queue,此時 queue 之中是 root 和 一些 SendRpcBackward。
- 遍歷 Queue 進行處理,處理結果是兩個局部變量 edge_list。 recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward,我們需要根據這些信息來標識后續如何執行。
- 遍歷 recvBackwardEdges 和 outputEdges,把相關信息加入到
GraphTask.exec_info_
,至此,依賴項處理完畢,所有需要計算的函數信息都位於 GraphTask.exec_info_ 之上。- AccumulateGrad 被加入了 Hook,用來后續累積梯度。
- RecvRpcBackward 被設置了需要執行。
computeDependencies
+
+---------------------------+ | 1
| DistAutogradContext | |
| | v
| | 2
| sendAutogradFunctions_ +-------> map<int,SendRpcBackward> > sendFunctions
| |
+---------------------------+ +
|
| 3
v
queue<Node*> queue
+
| 4
|
|
v
recvBackwardEdges = [RecvRpcBackward 1, RecvRpcBackward 2, ...]
outputEdges = [RecvRpcBackward 1, RecvRpcBackward 2,
AccumulateGrad 1, AccumulateGrad 2, ...]
+
|
| 5
v
GraphTask.exec_info_
0xFF 參考
https://pytorch.org/docs/stable/distributed.html
https://pytorch.apachecn.org/docs/1.7/59.html
https://pytorch.org/docs/stable/distributed.html#module-torch.distributed
https://pytorch.org/docs/master/notes/autograd.html
https://pytorch.org/docs/master/rpc/distributed_autograd.html
https://pytorch.org/docs/master/rpc/rpc.html
https://www.w3cschool.cn/pytorch/pytorch-cdva3buf.html
Getting started with Distributed RPC Framework
Implementing a Parameter Server using Distributed RPC Framework
Combining Distributed DataParallel with Distributed RPC Framework