# 導入BytePS模塊 import byteps.torch as bps
# 初始化BytePS bps.init()
# 設置訓練進程使用的GPU torch.cuda.set_device(bps.local_rank())
local_rank: """A function that returns the local BytePS rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive. Returns: An integer scalar with the local BytePS rank of the calling process.
"""
# 在push和pull過程中,把32位梯度壓縮成16位。(注:精度損失問題怎么解決的?) compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none
compression: """Optional gradient compression algorithm used during push_pull.""" """Compress all floating point gradients to 16-bit.""" 注:梯度壓縮是這個意思?
# optimizer = bps.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), compression=compression)
#
bps.broadcast_parameters(model.state_dict(), root_rank=0)
root_rank: The rank of the process from which parameters will be broadcasted to all other processes. 注:這里的root_rank是本地的還是全局的?本地的,通常是0號進程。
push_pull_async: """ A function that performs asynchronous averaging or summation of the input tensor over all the BytePS processes. The input tensor is not modified. The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all BytePS processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor. Arguments: tensor: A tensor to average or sum. average: A flag indicating whether to compute average or summation, defaults to average. name: A name of the reduction operation. Returns: A handle to the push_pull operation that can be used with `poll()` or `synchronize()`. """
#
bps.broadcast_optimizer_state(optimizer, root_rank=0)
Python通過ctypes函數庫調用C/C++。
節點之間的通信格式是key-value。
一個節點中,只有0號進程才參與網絡通信。
scheduler和server都是直接用MXNet代碼,沒用BytePS。
worker之間沒有通信,server之間也沒有通信。(注:李沐論文中說的Parameter Server之間有通信,是為了備份容錯。)
rank: # A function that returns the BytePS rank of the calling process. 注:全局進程編號,通常用於控制日志打印。
size: # A function that returns the number of BytePS processes.
local_size: # A function that returns the number of BytePS processes within the node the current process is running on.
""" An optimizer that wraps another torch.optim.Optimizer, using an push_pull to average gradient values before applying gradients to model weights. push_pull operations are executed after each gradient is computed by `loss.backward()` in parallel with each other. The `step()` method ensures that all push_pull operations are finished before applying gradients to the model. DistributedOptimizer exposes the `synchronize()` method, which forces push_pull operations to finish before continuing the execution. It's useful in conjunction with gradient clipping, or other operations that modify gradients in place before `step()` is executed. Example of gradient clipping: ``` output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.synchronize() torch.nn.utils.clip_grad_norm(model.parameters(), args.clip) optimizer.step() ``` Arguments: optimizer: Optimizer to use for computing gradients and applying updates. named_parameters: A mapping between parameter names and values. Used for naming of push_pull operations. Typically just `model.named_parameters()`. compression: Compression algorithm used during push_pull to reduce the amount of data sent during the each parameter update step. Defaults to not using compression. backward_passes_per_step: Number of expected backward passes to perform before calling step()/synchronize(). This allows accumulating gradients over multiple mini-batches before executing averaging and applying them. """ # We dynamically create a new class that inherits from the optimizer that was passed in. # The goal is to override the `step()` method with an push_pull implementation.
common/__init__.py:C++基礎API的Python封裝,如BytePSBasics local_rank。
communicator.h和communicator.cc:附加的基於socket的信號通信,用於同步,如BytePSCommSocket。
global.h和global.cc:如全局初始化BytePSGlobal::Init,worker的pslite單例BytePSGlobal::GetPS。
core_loops.h和core_loops.cc:死循環,如PushLoop和PullLoop,處理隊列中的任務,即TensorTableEntry。(注:改成事件循環,理論上可以減少CPU占用)
logging.h和logging.cc:日志組件。(注:無關邏輯,先忽略)
nccl_manager.h和nccl_manager.cc:管理NCCL,如IsCrossPcieSwitch。
operations.h和operations.cc:如GetPullQueueList、GetPushQueueList、
ready_table.h和ready_table.cc:維護key對應的任務的准備狀態。根據功能分為PUSH、COPY、PCIE_REDUCE和NCCL_REDUCE等。
schedule_queue.h和schedule_queue.cc:任務調度隊列,提供任務給事件循環。
shared_memory.h和shared_memory.cc:共享內存,用於存儲CPU中的張量。(注:用的是POSIX API,即共享內存文件shm_open,結合內存映射mmap,相比System V API,有更好的可移植性)
ops.h和ops.cc:如DoPushPull。
adapter.h和adapter.cc:C++和Python的張量數據類型適配。
cpu_reducer.h和cpu_reducer.cpp:
common.h和common.cc:
ops.py:如_push_pull_function_factory。
torch/__init__.py:如DistributedOptimizer。
// Total key space is 0 to 2^64 - 1 // It will be divided to N PS servers, for now we assume N <= 2^16
ps::KVWorker,繼承SimpleApp,用於向server Push,或者從server Pull key-value數據,還有Wait函數。
ps is_recovery,節點是不是恢復的。(注:有可能中途斷掉過?)
ps::Postoffice,全局管理單例。
ps::StartAsync,異步初始化節點。
ZPush/ZPull:zero-copy Push/Pull, This function is similar to Push except that all data will not be copied into system for better performance. It is the caller's responsibility to keep the content to be not changed before actually finished.
ADD_NODE
BARRIER
Tensor Partition
張量划分,可以讓多個server並行分擔計算和網絡帶寬,同時有利於異步pipeline。
_name_to_cxt:哈希表,保存初始化過的張量(能用於PS通信)。
declared_key:初始化過的張量的編號,從0遞增。
GetPartitionBound:張量划分的單塊字節數。
key = declared_key * 2^16 + part_num。
共享內存
_key_shm_addr:哈希表,每塊共享內存的起始地址。
_key_shm_size:哈希表,每塊共享內存的大小。
cudaHostRegister:把host內存注冊為pin memory,用於CUDA。這樣CPU->GPU,只需要一次copy。(注:pin memory就是page locked和non pageable,不使用虛擬內存,直接物理內存,也就不會有內存頁交換到硬盤上,自然不會有缺頁中斷)
numa_max_node() returns the highest node number available on the current system. (See the node numbers in /sys/devices/system/node/ ). Also see numa_num_configured_nodes(). numa_set_preferred() sets the preferred node for the current task to node. The system will attempt to allocate memory from the preferred node, but will fall back to other nodes if no memory is available on the the preferred node. Passing a node of -1 argument specifies local allocation and is equivalent to calling numa_set_localalloc(). numa_set_interleave_mask() sets the memory interleave mask for the current task to nodemask. All new memory allocations are page interleaved over all nodes in the interleave mask. Interleaving can be turned off again by passing an empty mask (numa_no_nodes). The page interleaving only occurs on the actual page fault that puts a new page into the current address space. It is also only a hint: the kernel will fall back to other nodes if no memory is available on the interleave target.
注:priority的作用是啥?值都為0。
注:named_parameters的作用是啥?
注:param_groups是啥?
注:梯度reduce和權值更新分別在哪里做?
參考鏈接
https://github.com/bytedance/byteps
https://pytorch.org/docs/stable/distributed.html
https://www.cs.cmu.edu/~muli/file/parameter_server_nips14.pdf
https://mxnet.incubator.apache.org/versions/master/faq/distributed_training.html