[源碼解析] 深度學習流水線並行 PipeDream(4)--- 運行時引擎
0x00 摘要
在前文中,我們介紹了PipeDream的總體架構,Profile階段,計算分區階段和模型轉換階段,本文我們介紹運行時執行引擎,這是一個統一基礎設施層。
流水線並行其他文章鏈接如下:
[源碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現
[源碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積
[源碼解析] 深度學習流水線並行 GPipe(3) ----重計算
[源碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段
[源碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分區
[源碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型
0x01 前言
1.1 前文回顧
在前文,我們經歷了三個階段:profile,計算分區,模型轉換,目前就得到了若干python文件&配置文件。PipeDream 加載這些文件之后,就可以進行訓練。
所以從本文開始,我們介紹訓練所需要的各種支撐系統,比如運行時執行引擎。主要是看看一個深度學習訓練運行時應該包括什么功能。
1.2 運行時系統
結合之前的分析和我們先思考為何要實現一個運行時,以及針對深度學習(流水線並行)需要實現什么功能。
1.2.1 PyTorch 的特點
首先看看PyTorch 的特點:
- PyTorch 本身實現了 autograd 功能,這樣反向傳播就做到了自動微分。
- 在分布式數據並行實現上,PyTorch 實現了 DDP 功能。
- 在分布式模型並行等方面,PyTorch 也提供了 RPC 功能作為支撐基礎。但是,RPC功能是在 PyTorch 1.5 版本中引入的,時間是2020-06-12。
- 針對 DDP 和 RPC,PyTorch 也相應實現了 distributed.autograd 功能,對用戶屏蔽了大量分布式細節,讓用戶對分布式訓練盡量無感(我們后文會有專門系列來分析PyTorch的分布式)。
1.2.2 PyTorch RPC
RPC 功能是在 PyTorch 1.5 版本中正式引入的,時間是2020-06-12,具體如下。
Distributed RPC framework APIs [Now Stable]
The
torch.distributed.rpc
package aims at supporting a wide range of distributed training paradigms that do not fit intoDistributedDataParallel
. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in thetorch.distributed.rpc
package can be categorized into four main sets of APIs.
- The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
- The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
- With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
- The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g.,
SGD
,Adagrad
, etc.) and a list of parameter RRefs, and itsstep()
function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.
但是 PipeDream 論文是在 2019 年發布,這就意味着 PipeDream無法精准利用 PyTorch RPC,只能自己實現通信邏輯,即對計算圖的支撐。
1.2.3 PipeDream的特性
其次看看PipeDream的特性:
- PipeDream是把模型並行,數據並行結合在一起,實現了流水線並行。
- PipeDream實際上是把一個完整的深度訓練模型拆分開來,各個子模型(子圖)分別放在不同節點之上。
1.2.4 結合起來
綜合以上兩點,這就意味着,對於PipeDream來說,單純的 DDP,模型並行和 autograd 功能無法滿足我們的需求,必須結合起來使用。
PipeDream需要自己實現至少:
- 如何在多個階段(節點)之間進行通信,這可能會使用到 PyTorch RPC 功能,但是因為開始時候沒有穩定版本,只能自己實現一個分布式計算圖,這樣就用到了 PyTorch distributed 的 P2P 功能。
- 因為通信需要,所以自己管理每個階段(節點)的發送、接受rank,就是配置和管理各個階段(節點)的生產者,消費者。這樣也意味着需要找到每個階段(節點)的輸入,輸出。
- 因為 P2P 通信功能需要,所以需要給每個張量配置一個唯一的標識(對應下文的tag)。
- 如何在單個階段(若干節點)上進行數據並行,這應該會用到 PyTorch DDP 功能。
- 因為用到數據並行,所以需要自己管理每個階段的並行數目。
- 因為需要結合模型並行和數據並行,所以需要自己管理進程工作組。
- 因為在不同節點(機器)上運行,所以每個機器獨立運行訓練腳本時候,需要對自己訓練job進行獨立配置。
所以,下面我們結合這些功能點,做具體分析。
0x02 使用
2.1 如何調用
通過源碼中的樣例我們可以看到,可以在多個節點上分別運行 main_with_runtime.py 腳本,每個腳本啟動參數不同,比如 rank 就不同,這樣各個節點之上就運行了不同的階段所對應的模型。
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 0 --local_rank 0 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 1 --local_rank 1 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 2 --local_rank 2 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 3 --local_rank 3 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
2.2 總體邏輯
使用 runtime 的總體邏輯以如下文件為例 :runtime/translation/main_with_runtime.py。主要邏輯是:
- 解析輸入參數
- 加載,生成模型
- 依據模塊來構建模型
- 依據參數進行配置比如輸入大小,batch size等
- 遍歷模型的每個層(跳過最后loss層)
- 遍歷每層的輸入,構建輸入張量
- 通過調用stage對應的forward函數,構建出輸出
- 遍歷每層的輸出,設置其類型和形狀
- 構建輸出值張量類型
- 加載配置文件
- 構建一個 StageRuntime
- 建立 optimizer
- 加載 dataset
- 進行訓練,保存checkpoint
總體代碼如下:
def main():
# 解析輸入參數
global args, best_prec1
args = parser.parse_args()
# Special case handling for GNMT model
l2_promote()
torch.cuda.set_device(args.local_rank)
# build tokenizer
tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))
# define loss function
criterion = build_gnmt_criterion(
vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)
# create stages of the model
# 加載,生成模型
module = importlib.import_module(args.module)
args.arch = module.arch()
# 依據模塊來構建模型
model = module.model(criterion)
# 依據參數進行配置比如輸入大小,batch size等
input_size = [args.max_length_train, args.batch_size]
training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
"input2": input_size, "target": [args.max_length_train * args.batch_size],
"target_length": [args.batch_size]}
dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
"target": torch.int64, "target_length": torch.int32}
inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
target_tensor_names = {"target", "target_length"}
# 遍歷模型的每個層(跳過最后loss層)
for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss).
input_tensors = []
# 遍歷每層的輸入,構建輸入張量
for module_input in inputs:
if module_input in inputs_module_destinations:
inputs_module_destinations[module_input] = module_id
input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
dtype=dtypes[module_input])#.cuda()
input_tensors.append(input_tensor)
#stage.cuda()
# PyTorch should not maintain metadata for a backward pass on
# synthetic inputs. Without the following line, the runtime is
# as much as 1.5x slower in a full DP configuration.
with torch.no_grad():
# 通過調用stage對應的forward函數,構建出輸出
output_tensors = stage(*tuple(input_tensors))
if not type(output_tensors) is tuple:
output_tensors = [output_tensors]
# 遍歷每層的輸出,設置其類型和形狀
for output, output_tensor in zip(outputs,
list(output_tensors)):
# output 是 ['out2', 'out1']
training_tensor_shapes[output] = list(output_tensor.size())
dtypes[output] = output_tensor.dtype
# 構建輸出值張量類型
eval_tensor_shapes = {}
for key in training_tensor_shapes:
eval_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
training_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
# 加載配置文件
configuration_maps = {
'module_to_stage_map': None,
'stage_to_rank_map': None,
'stage_to_depth_map': None
}
if args.config_path is not None:
json_config_file = json.load(open(args.config_path, 'r'))
configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
configuration_maps['stage_to_rank_map'] = {
int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
# 構建一個 StageRuntime
r = runtime.StageRuntime(
model=model, distributed_backend=args.distributed_backend,
fp16=args.fp16, loss_scale=args.loss_scale,
training_tensor_shapes=training_tensor_shapes,
eval_tensor_shapes=eval_tensor_shapes,
training_tensor_dtypes=dtypes,
inputs_module_destinations=inputs_module_destinations,
target_tensor_names=target_tensor_names,
configuration_maps=configuration_maps,
master_addr=args.master_addr,
rank=args.rank, local_rank=args.local_rank,
num_ranks_in_server=args.num_ranks_in_server,
verbose_freq=args.verbose_frequency,
model_type=runtime.TRANSLATION,
enable_recompute=args.recompute)
# stage needed to determine if current stage is the first stage
# num_stages needed to determine if current stage is the last stage
# num_ranks needed to determine number of warmup_minibatches in case of pipelining
args.stage = r.stage
args.num_stages = r.num_stages
args.num_ranks = r.num_ranks
if not is_first_stage():
args.synthetic_data = True
# define optimizer
if args.no_input_pipelining:
num_versions = 1
else:
# number of versions is the total number of machines following the current
# stage, shared amongst all replicas in this stage
num_versions = r.num_warmup_minibatches + 1
# if specified, resume from checkpoint
if args.resume:
checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
assert os.path.isfile(checkpoint_file_path)
print("=> loading checkpoint '{}'".format(checkpoint_file_path))
checkpoint = torch.load(checkpoint_file_path)
args.start_epoch = checkpoint['epoch']
best_prec1 = checkpoint['best_prec1']
r.load_state_dict(checkpoint['state_dict'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(checkpoint_file_path, checkpoint['epoch']))
# TODO: make this configurable by args
# 建立 optimizer
use_adam_optimizer = True
if use_adam_optimizer:
optimizer = adam.AdamWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
macrobatch=args.macrobatch)
else:
optimizer = sgd.SGDWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, momentum=args.momentum,
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
if args.resume:
optimizer.load_state_dict(checkpoint['optimizer'])
cudnn.benchmark = True
# 加載 dataset
train_dataset = LazyParallelDataset(
src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
tokenizer=tokenizer,
min_len=args.min_length_train,
max_len=args.max_length_train,
sort=False,
max_size=None)
val_dataset = ParallelDataset(
src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
tokenizer=tokenizer,
min_len=args.min_length_train,
max_len=args.max_length_train,
sort=True)
distributed_sampler = False
if configuration_maps['stage_to_rank_map'] is not None:
num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
if num_ranks_in_first_stage > 1:
distributed_sampler = True
# TODO: fix random seeds
train_loader = train_dataset.get_loader(
batch_size=args.batch_size, seeds=range(args.epochs),
batch_first=False, shuffle=True,
bucketing=not args.no_bucketing, num_workers=args.workers,
world_size=r.num_ranks_in_first_stage,
rank=r.rank_in_stage if r.stage == 0 else 0
)
val_loader = val_dataset.get_loader(
batch_size=args.batch_size, batch_first=False,
shuffle=True, num_workers=args.workers,
world_size=r.num_ranks_in_first_stage,
seeds=range(args.epochs),
rank=r.rank_in_stage if r.stage == 0 else 0
)
# if checkpoint is loaded, start by running validation
if args.resume:
assert args.start_epoch > 0
validate(val_loader, r, args.start_epoch-1)
# 進行訓練,保存checkpoint
for epoch in range(args.start_epoch, args.epochs):
if distributed_sampler:
train_loader.sampler.set_epoch(epoch)
adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)
# train or run forward pass only for one epoch
if args.forward_only:
validate(val_loader, r, epoch)
else:
train(train_loader, r, optimizer, epoch)
# evaluate on validation set
prec1 = validate(val_loader, r, epoch)
if r.stage != r.num_stages: prec1 = 0
# remember best prec@1 and save checkpoint
best_prec1 = max(prec1, best_prec1)
should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
if args.checkpoint_dir and should_save_checkpoint:
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': r.state_dict(),
'best_prec1': best_prec1,
'optimizer' : optimizer.state_dict(),
'tokenizer': tokenizer.get_state()
}, args.checkpoint_dir, r.stage, epoch)
0x03 加載模型
我們先來看看如何加載模型。
3.1 模型文件
模型文件在上文中生成,所以這里加載模型文件,我們以 ../translation/models/gnmt/gpus=4/ 下的模型文件為例。
這里的__init__
文件如下:
from .gnmt import GNMTSplit
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
def arch():
return "gnmt"
def model(criterion):
return [
(Stage0(), ["input0", "input1"], ["out2", "out1"]),
(Stage1(), ["out2", "input1", "input2", "out1"], ["out3", "out7"]),
(Stage2(), ["out3", "out7"], ["out8", "out9", "out10"]),
(Stage3(), ["out8", "out9", "out10"], ["out12"]),
(criterion, ["out12"], ["loss"])
]
def full_model():
return GNMTSplit()
具體每個 item 的格式如下:
(stage, inputs, outputs)
所以就需要按照這個格式來加載。
3.2 加載
具體加載方法如下:
# create stages of the model
module = importlib.import_module(args.module)
args.arch = module.arch()
得到module如下:
module = {module} <module 'translation.models.gnmt.gpus=4' from '../translation/models/gnmt/gpus=4/__init__.py'>
GNMTSplit = {type} <class 'translation.models.gnmt.gpus=4.gnmt.GNMTSplit'>
Stage0 = {type} <class 'translation.models.gnmt.gpus=4.stage0.Stage0'>
Stage1 = {type} <class 'translation.models.gnmt.gpus=4.stage1.Stage1'>
Stage2 = {type} <class 'translation.models.gnmt.gpus=4.stage2.Stage2'>
Stage3 = {type} <class 'translation.models.gnmt.gpus=4.stage3.Stage3'>
gnmt = {module} <module 'translation.models.gnmt.gpus=4.gnmt' from '../translation/models/gnmt/gpus=4/gnmt.py'>
stage0 = {module} <module 'translation.models.gnmt.gpus=4.stage0' from '../translation/models/gnmt/gpus=4/stage0.py'>
stage1 = {module} <module 'translation.models.gnmt.gpus=4.stage1' from '../translation/models/gnmt/gpus=4/stage1.py'>
stage2 = {module} <module 'translation.models.gnmt.gpus=4.stage2' from '../translation/models/gnmt/gpus=4/stage2.py'>
stage3 = {module} <module 'translation.models.gnmt.gpus=4.stage3' from '../translation/models/gnmt/gpus=4/stage3.py'>
3.3 構建模型
接下來會依據模塊來構建模型。
model = module.model(criterion)
這里 criterion 是 LabelSmoothing() 。
在 model(criterion) 調用之中,會逐一調用 Stage0() ~ Stage3(),構建每個層。
比如Stage3 會調用到 __init__
函數。
class Stage3(torch.nn.Module):
def __init__(self):
super(Stage3, self).__init__()
self.layer5 = torch.nn.LSTM(2048, 1024)
self.layer8 = Classifier(1024, 32320)
得到了model,具體如下。
model = {list: 5}
0 = {tuple: 3}
0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LSTM(2048, 1024)\n (layer9): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['input0', 'input1']
2 = {list: 2} ['out2', 'out1']
__len__ = {int} 3
1 = {tuple: 3}
0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n)
1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
2 = {list: 2} ['out3', 'out7']
__len__ = {int} 3
2 = {tuple: 3}
0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['out3', 'out7']
2 = {list: 3} ['out8', 'out9', 'out10']
__len__ = {int} 3
3 = {tuple: 3}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {list: 3} ['out8', 'out9', 'out10']
2 = {list: 1} ['out12']
__len__ = {int} 3
4 = {tuple: 3} (LabelSmoothing(), ['out12'], ['loss'])
0 = {LabelSmoothing} LabelSmoothing()
1 = {list: 1} ['out12']
2 = {list: 1} ['loss']
__len__ = {int} 3
__len__ = {int} 5
3.4 輸入輸出
模型加載完之后,開始設置輸入和輸出,具體邏輯是:
- 依據參數進行配置
- 遍歷模型的每個層(跳過最后loss層)做如下操作:
- 遍歷每層的輸入,構建輸入張量。
- 通過調用stage對應的forward函數,構建出輸出。
- 遍歷每層的輸出,設置類型。
- 構建張量形狀。
需要注意的是每個子模塊的格式如下:
(
Stage0(),
["input0", "input1"], # 輸入
["out2", "out1"] # 輸出
)
代碼注釋如下:
# 依據參數進行配置比如輸入大小,batch size等
input_size = [args.max_length_train, args.batch_size]
training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
"input2": input_size, "target": [args.max_length_train * args.batch_size],
"target_length": [args.batch_size]}
dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
"target": torch.int64, "target_length": torch.int32}
inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
target_tensor_names = {"target", "target_length"}
# 遍歷模型的每個層(跳過最后loss層)
for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss).
input_tensors = []
# 遍歷每層的輸入,構建輸入張量
for module_input in inputs:
if module_input in inputs_module_destinations:
inputs_module_destinations[module_input] = module_id
input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
dtype=dtypes[module_input]).cuda()
input_tensors.append(input_tensor)
stage.cuda()
# PyTorch should not maintain metadata for a backward pass on
# synthetic inputs. Without the following line, the runtime is
# as much as 1.5x slower in a full DP configuration.
with torch.no_grad():
# 通過調用stage對應的forward函數,構建出輸出
output_tensors = stage(*tuple(input_tensors))
if not type(output_tensors) is tuple:
output_tensors = [output_tensors]
# 遍歷每層的輸出,設置其類型和形狀
for output, output_tensor in zip(outputs,
list(output_tensors)):
# output 是 ['out2', 'out1']
training_tensor_shapes[output] = list(output_tensor.size())
dtypes[output] = output_tensor.dtype
# 構建輸出值張量類型
eval_tensor_shapes = {}
for key in training_tensor_shapes:
eval_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
training_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
得到了輸出的形狀和類型。
dtypes = {dict: 13}
'input0' = {dtype} torch.int64
'input1' = {dtype} torch.int64
'input2' = {dtype} torch.int64
'target' = {dtype} torch.int64
'target_length' = {dtype} torch.int32
'out2' = {dtype} torch.float32
'out1' = {dtype} torch.float32
'out3' = {dtype} torch.float32
'out7' = {dtype} torch.float32
'out8' = {dtype} torch.float32
'out9' = {dtype} torch.float32
'out10' = {dtype} torch.float32
'out12' = {dtype} torch.float32
__len__ = {int} 13
training_tensor_shapes = {dict: 13}
'input0' = {tuple: 2} (50, 128)
'input1' = {tuple: 1} 128
'input2' = {tuple: 2} (50, 128)
'target' = {tuple: 1} 6400
'target_length' = {tuple: 1} 128
'out2' = {tuple: 3} (50, 128, 1024)
'out1' = {tuple: 3} (50, 128, 1024)
'out3' = {tuple: 3} (50, 128, 1024)
'out7' = {tuple: 3} (50, 128, 1024)
'out8' = {tuple: 3} (50, 128, 1024)
'out9' = {tuple: 3} (50, 128, 1024)
'out10' = {tuple: 3} (50, 128, 1024)
'out12' = {tuple: 3} (50, 128, 32320)
__len__ = {int} 13
eval_tensor_shapes = {dict: 13} {
'input0' = {tuple: 2} (50, 128)
'input1' = {tuple: 1} 128
'input2' = {tuple: 2} (50, 128)
'target' = {tuple: 1} 6400
'target_length' = {tuple: 1} 128
'out2' = {tuple: 3} (50, 128, 1024)
'out1' = {tuple: 3} (50, 128, 1024)
'out3' = {tuple: 3} (50, 128, 1024)
'out7' = {tuple: 3} (50, 128, 1024)
'out8' = {tuple: 3} (50, 128, 1024)
'out9' = {tuple: 3} (50, 128, 1024)
'out10' = {tuple: 3} (50, 128, 1024)
'out12' = {tuple: 3} (50, 128, 32320)
__len__ = {int} 13
3.5 配置
加載上文生成的配置文件。
configuration_maps = {
'module_to_stage_map': None,
'stage_to_rank_map': None,
'stage_to_depth_map': None
}
if args.config_path is not None:
json_config_file = json.load(open(args.config_path, 'r'))
configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
configuration_maps['stage_to_rank_map'] = {
int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
對應的文件是:
{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
得到:
configuration_maps = {dict: 3}
'module_to_stage_map' = {list: 5} [0, 1, 2, 3, 3]
'stage_to_rank_map' = {dict: 4} {0: [0], 1: [1], 2: [2], 3: [3]}
'stage_to_depth_map' = {NoneType} None
__len__ = {int} 3
0x04 runtime
為了演示,我們這里用如下參數進行啟動 main_with_runtime.py。
--module translation.models.gnmt.gpus=4 --data_dir=wmt16_ende_data_bpe_clean --config_path pipedream-pipedream/runtime/translation/models/gnmt/gpus=4/mp_conf.json --local_rank 3 --rank 3 --master_addr 127.0.0.1
在main函數中用如下辦法構建了Runtime。Runtime是執行引擎,提供一個統一的、可擴展的基礎設施層。
r = runtime.StageRuntime(
model=model, distributed_backend=args.distributed_backend,
fp16=args.fp16, loss_scale=args.loss_scale,
training_tensor_shapes=training_tensor_shapes,
eval_tensor_shapes=eval_tensor_shapes,
training_tensor_dtypes=dtypes,
inputs_module_destinations=inputs_module_destinations,
target_tensor_names=target_tensor_names,
configuration_maps=configuration_maps,
master_addr=args.master_addr,
rank=args.rank, local_rank=args.local_rank,
num_ranks_in_server=args.num_ranks_in_server,
verbose_freq=args.verbose_frequency,
model_type=runtime.TRANSLATION,
enable_recompute=args.recompute)
4.1 StageRuntime
StageRuntime定義如下,可以看到其主要成員變量為在此stage內部進行前向后向操作所需要的元數據,比如:
張量,梯度,分布式后端,loss scale,訓練數據的張量類型,輸出值張量形狀等等。
class StageRuntime:
def __init__(self, model, distributed_backend, fp16, loss_scale,
training_tensor_shapes, eval_tensor_shapes,
training_tensor_dtypes, inputs_module_destinations,
target_tensor_names, configuration_maps, master_addr,
rank, local_rank, num_ranks_in_server, verbose_freq,
model_type, enable_recompute=False):
# Metadata needed for forward and backward pass within this stage.
self.tensors = []
self.gradients = {}
self.distributed_backend = distributed_backend
self.fp16 = fp16
self.loss_scale = loss_scale
self.training_tensor_shapes = training_tensor_shapes
self.eval_tensor_shapes = eval_tensor_shapes
self.training_tensor_dtypes = training_tensor_dtypes
self.model_type = model_type
self.target_tensor_names = target_tensor_names
self.initialize(model, inputs_module_destinations, configuration_maps,
master_addr, rank, local_rank, num_ranks_in_server)
self.verbose_freq = verbose_freq
self.forward_only = False
self.forward_stats = runtime_utilities.RuntimeStats(forward=True)
self.backward_stats = runtime_utilities.RuntimeStats(forward=False)
# Enable recomputation to prevent the need to save activations
# computed from the forward pass for the backward pass.
self.enable_recompute = enable_recompute
# Disable recomputation for the last stage.
if rank == num_ranks_in_server - 1:
self.enable_recompute = False
4.2 初始化
初始化函數代碼很長,我們逐段進行分析。
4.2.1 設置tag
在函數開始,遍歷模型每一層的輸入和輸出,設置 tensor_tag,就是給每一個tensor一個獨立唯一的tag,tag經過層層傳遞,最終會用到 distributed_c10d.py 中的 recv 函數。tensor_tag 會在通信過程中被使用,被p2p用作確定標識。
def recv(tensor,
src=None,
group=None,
tag=0):
"""
Receives a tensor synchronously.
Args:
tensor (Tensor): Tensor to fill with received data.
src (int, optional): Source rank. Will receive from any
process if unspecified.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
tag (int, optional): Tag to match recv with remote send
Returns:
Sender rank
-1, if not part of the group
"""
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
return -1
if group is None:
pg = _get_default_group()
else:
pg = group
if src is None:
work = pg.recv_anysource([tensor], tag)
work.wait()
src_rank = work._source_rank()
if group is None or group is GroupMember.WORLD:
return src_rank
else:
return _get_global_rank(pg, src_rank)
else:
if group is None or group is GroupMember.WORLD:
pg.recv([tensor], src, tag).wait()
else:
group_src_rank = _get_group_rank(pg, src)
pg.recv([tensor], group_src_rank, tag).wait()
return src
具體設置 tag 的代碼如下:
def initialize(self, model, inputs_module_destinations,
configuration_maps, master_addr, rank,
local_rank, num_ranks_in_server):
self.send_ranks = {}
self.receive_ranks = {}
self.rank = rank
self.local_rank = local_rank
self.stage = None
self.tensor_tags = {}
self.forward_minibatch_id = 0
self.backward_minibatch_id = 0
self.criterion_input_name = str(model[-1][1][0])
tensor_tag = 1
# 遍歷模型中每一層,每一層的格式是 (_, input_tensors, output_tensors)
for (_, input_tensors, output_tensors) in model:
# 遍歷輸入
for input_tensor in input_tensors:
if input_tensor not in self.tensor_tags:
self.tensor_tags[input_tensor] = tensor_tag
tensor_tag += 1 # 設置 tag
# 遍歷輸出
for output_tensor in output_tensors:
if output_tensor not in self.tensor_tags:
self.tensor_tags[output_tensor] = tensor_tag
tensor_tag += 1 # 設置 tag
for target_tensor_name in sorted(self.target_tensor_names):
self.tensor_tags[target_tensor_name] = tensor_tag
tensor_tag += 1 # 設置 tag
self.tensor_tags["ack"] = tensor_tag
tensor_tag += 1 # 設置 tag
輸入是:
target_tensor_names = {set: 2} {'target_length', 'target'}
{str} 'target_length'
{str} 'target'
__len__ = {int} 2
model = {list: 5}
0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LS
1 = {list: 2} ['input0', 'input1']
2 = {list: 2} ['out2', 'out1']
__len__ = {int} 3
1 = {tuple: 3}
0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n)
1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
2 = {list: 2} ['out3', 'out7']
__len__ = {int} 3
2 = {tuple: 3}
0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['out3', 'out7']
2 = {list: 3} ['out8', 'out9', 'out10']
__len__ = {int} 3
3 = {tuple: 3}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {list: 3} ['out8', 'out9', 'out10']
2 = {list: 1} ['out12']
__len__ = {int} 3
4 = {tuple: 3}
0 = {LabelSmoothing} LabelSmoothing()
1 = {list: 1} ['out12']
2 = {list: 1} ['loss']
__len__ = {int} 3
__len__ = {int} 5
得到:
tensor_tags = {dict: 15}
'input0' = {int} 1
'input1' = {int} 2
'out2' = {int} 3
'out1' = {int} 4
'input2' = {int} 5
'out3' = {int} 6
'out7' = {int} 7
'out8' = {int} 8
'out9' = {int} 9
'out10' = {int} 10
'out12' = {int} 11
'loss' = {int} 12
'target' = {int} 13
'target_length' = {int} 14
'ack' = {int} 15
__len__ = {int} 15
4.2.2 配置map
回憶一下配置文件中的部分定義:
- module_to_stage_map 就是 :本模型被划分為哪些stage。
- stage_to_rank_map 就是 :每個stage對應了哪些rank,rank 就代表了具體的 worker 進程,比如本stage被幾個rank進行數據並行。
我們給出一個樣例,對應的文件內容如下:
{
"module_to_stage_map": [0, 1, 2, 2],
"stage_to_rank_map": {"0": [0, 1, 4, 5, 8, 9, 12, 13], "1": [2, 6, 10, 14], "2": [3, 7, 11, 15]}
}
針對我們本文的模型,配置文件如下:
{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
加載到內存中為:
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
rank_to_stage_map = {dict: 4} {0: 0, 1: 1, 2: 2, 3: 3}
因為有時候也需要反過來查找,所以程序接下來進行反向配置,得到如下。
stage_to_module_map = {defaultdict: 4}
default_factory = {type} <class 'list'>
0 = {list: 1} [0]
1 = {list: 1} [1]
2 = {list: 1} [2]
3 = {list: 2} [3, 4]
__len__ = {int} 4
stage_to_rank_map = {dict: 4}
0 = {list: 1} [0]
1 = {list: 1} [1]
2 = {list: 1} [2]
3 = {list: 1} [3]
__len__ = {int} 4
4.2.3 找到自己的配置
因為在命令行設置了本地的 local_rank 和 rank,所以接下來runtime從配置文件中依據rank找到自己的東西,對自己進一步做配置。
stage_to_module_map = collections.defaultdict(list)
for module in range(len(module_to_stage_map)):
# 這里配置了哪個stage擁有哪些module
stage_to_module_map[module_to_stage_map[module]].append(module)
rank_to_stage_map = {}
for stage in stage_to_rank_map:
for rank in stage_to_rank_map[stage]:
# 配置了哪個 rank 擁有哪些 stage
rank_to_stage_map[rank] = stage
# Now, use this mapping to determine the modules contained in
# each stage.
assert 0 <= self.rank < len(rank_to_stage_map)
self.num_ranks = len(rank_to_stage_map) # 就是得到了world_size,因為有多少個rank,就是有多少個訓練進程,就是world size
self.num_stages = len(stage_to_module_map) # 多少個階段
self.stage = rank_to_stage_map[self.rank] # 通過自己的rank得到自己的stage
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第幾個
self.num_ranks_in_stage = len(stage_to_rank_map[self.stage])#得到自己stage的rank數目,就是數據並行數目,可以得到本層的數據並行次數
self.num_ranks_in_first_stage = len(stage_to_rank_map[0])
self.num_ranks_in_previous_stage = 0
self.ranks_in_previous_stage = []
if self.stage > 0:
self.num_ranks_in_previous_stage = len(
stage_to_rank_map[self.stage - 1])
self.ranks_in_previous_stage = stage_to_rank_map[self.stage - 1]
self.num_ranks_in_next_stage = 0
self.ranks_in_next_stage = []
if self.stage < self.num_stages - 1:
self.num_ranks_in_next_stage = len(
stage_to_rank_map[self.stage + 1])
self.ranks_in_next_stage = stage_to_rank_map[self.stage + 1]
modules = stage_to_module_map[self.stage] # 這里得到 [3,4],后續會用到。
self.modules_with_dependencies = ModulesWithDependencies(
[model[module] for module in modules])
self.is_criterion = self.stage == (self.num_stages - 1)
if stage_to_depth_map is not None:
self.num_warmup_minibatches = stage_to_depth_map[
str(self.stage)]
else:
self.num_warmup_minibatches = self.num_ranks - 1
for i in range(self.stage):
self.num_warmup_minibatches -= len(
stage_to_rank_map[i])
self.num_warmup_minibatches = self.num_warmup_minibatches // \
self.num_ranks_in_stage
變量為:
self = {StageRuntime}
backward_minibatch_id = {int} 0
criterion_input_name = {str} 'out12'
distributed_backend = {NoneType} None
eval_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
forward_minibatch_id = {int} 0
fp16 = {bool} False
gradients = {dict: 0} {}
is_criterion = {bool} True
local_rank = {int} 3
loss_scale = {int} 1
model_type = {str} 'translation'
modules_with_dependencies = {ModulesWithDependencies}
_all_input_names = {list: 2} [['out8', 'out9', 'out10'], ['out12']]
_all_output_names = {list: 2} [['out12'], ['loss']]
_modules = {list: 2}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {LabelSmoothing} LabelSmoothing()
__len__ = {int} 2
num_ranks = {int} 4
num_ranks_in_first_stage = {int} 1
num_ranks_in_next_stage = {int} 0
num_ranks_in_previous_stage = {int} 1
num_ranks_in_stage = {int} 1
num_stages = {int} 4
num_warmup_minibatches = {int} 0
rank = {int} 3
rank_in_stage = {int} 0
ranks_in_next_stage = {list: 0} []
ranks_in_previous_stage = {list: 1} [2]
receive_ranks = {dict: 0} {}
send_ranks = {dict: 0} {}
stage = {int} 3
target = {str} 'python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 178, in _getPyDictionary\n attr = getattr(var, n)\n File "../runtime.py", line 295, in target\n r
target_tensor_names = {set: 2} {'target', 'target_length'}
tensor_tags = {dict: 15} {'input0': 1, 'input1': 2, 'out2': 3, 'out1': 4, 'input2': 5, 'out3': 6, 'out7': 7, 'out8': 8, 'out9': 9, 'out10': 10, 'out12': 11, 'loss': 12, 'target': 13, 'target_length': 14, 'ack': 15}
tensors = {list: 0} []
training_tensor_dtypes = {dict: 13} {'input0': torch.int64, 'input1': torch.int64, 'input2': torch.int64, 'target': torch.int64, 'target_length': torch.int32, 'out2': torch.float32, 'out1': torch.float32, 'out3': torch.float32, 'out7': torch.float32, 'out8': torch.float32, 'out9': torch.floa
training_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
我們看看幾個變量如何使用。
4.2.3.1 num_ranks
首先,看看 num_ranks 如何使用。在后續代碼中有使用,比如:
world_size=self.num_ranks # 依據 num_ranks 得到 world_size
self.num_warmup_minibatches = self.num_ranks - 1 # 依據 num_ranks 得到熱身batch數目
4.2.3.2 rank_in_stage
其次,再看看 rank_in_stage 如何使用?
前面有
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第幾個
rank_in_stage 會傳遞給 Comm 模塊。
self.comm_handler.initialize(
self.receive_ranks,
self.send_ranks,
self.tensor_tags,
self.target_tensor_names,
self.training_tensor_dtypes,
self.rank_in_stage, # 在這里作為參數傳入,在函數里面代表本節點,后續會詳細介紹
self.num_ranks_in_stage,
self.ranks_in_previous_stage,
self.ranks_in_next_stage)
4.2.4 設置通信模塊
接下來對通信模塊進行配置。
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
master_addr=master_addr,
master_port=master_port,
rank=self.rank,
local_rank=self.local_rank,
num_ranks_in_server=num_ranks_in_server,
world_size=self.num_ranks,
fp16=self.fp16,
backend=self.distributed_backend)
配置代碼如下,構建了 CommunicationHandler,這個模塊是為后續“設置生產者和消費者”服務的,所以我們暫時把后續代碼一起放在這里。
else:
......
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
master_addr=master_addr,
master_port=master_port,
rank=self.rank,
local_rank=self.local_rank,
num_ranks_in_server=num_ranks_in_server,
world_size=self.num_ranks,
fp16=self.fp16,
backend=self.distributed_backend)
# 設置生產者和消費者部分,我們下面會詳細分析
# 設置接受ranks
for i in range(len(model)): # 遍歷層
for j in range(i+1, len(model)): # 遍歷 i 層之后的若干層
for tensor_name in model[i][2]: # 找出前面層 output 的tensor
if tensor_name in model[j][1]: # 看看 output 在不在input之中
if module_to_stage_map[i] == \
module_to_stage_map[j]:
continue
# For now, assume that each stage is served by only
# a single machine.
if module_to_stage_map[j] == self.stage:
self.receive_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[i]]
if module_to_stage_map[i] == self.stage:
self.send_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[j]]
# 設置發送ranks
for model_inputs in inputs_module_destinations.keys():
destination_stage = module_to_stage_map[
inputs_module_destinations[model_inputs]]
if destination_stage > self.stage:
self.send_ranks[model_inputs] = \
self.ranks_in_next_stage
if 0 < self.stage <= destination_stage:
self.receive_ranks[model_inputs] = \
self.ranks_in_previous_stage
if destination_stage > 0:
if model_inputs not in self.tensor_tags:
self.tensor_tags[model_inputs] = tensor_tag
tensor_tag += 1
4.2.5 設置生產者和消費者
接下來對發送,接受的rank進行配置,receive_ranks 和 send_ranks 就是在本階段各個張量對應的發送,接收目標 rank。
前面已經提到,在 PipeDream開發時候,PyTorch 並沒有發布穩定的RPC,所以 PipeDream (2019年發布論文)只能自己實現一套通信邏輯關系,或者說是分布式計算圖。生產者和消費者就是分布式計算圖的重要組成部分。
邏輯抽象如下:
- 遍歷模型的 model,假定是 model [i],注意,這里的 model[i] 是具體的 layer。一個stage可以包括多個layer,比如 [layer1, layer 2, layer3],這個stage又可以在多個rank上進行數據並行,比如 rank 1 和 rank 2 都會運行 [layer1, layer 2, layer3]。
- 對於每個model [i],遍歷model [i] 之后的model,假定是 model [j]。
- 對於model [i] 的輸出進行遍歷,假定是 tensor_name。
- 如果 tensor_name 也在 modle[j] 的輸入之中,即 tensor_name即在 model[i] 的輸出,也在 module[j]的輸入,就說明他們之間可以建立聯系。因為如果一個 張量只有輸入或者只有輸出,就不需要為這個張量建立任何 通信機制。
- 如果 model[i] 和 modle[j] 在同一個stage 之中,就是同一個節點 或者 若干節點但是用 DDP 控制,這樣就用不到 通信機制。
- 如果 tensor_name 是 modle[j]的輸入,且module[j] 位於本節點上,說明本節點的 receive_ranks 就包括 module[j] 的輸入(當然也可能包括其他model的輸入)。
- 所以tensor_name的輸入rank包括model[j] 對應的rank。
- tensor_name 是module[i] 的輸出,且 module[i] 位於本節點上,說明 本節點的 send_ranks 就包括 module[i] 的輸出(當然也可能包括其他model的輸出)。
- 所以tensor_name的輸出rank包括 model[i] 對應的rank。
- 如果 tensor_name 也在 modle[j] 的輸入之中,即 tensor_name即在 model[i] 的輸出,也在 module[j]的輸入,就說明他們之間可以建立聯系。因為如果一個 張量只有輸入或者只有輸出,就不需要為這個張量建立任何 通信機制。
具體代碼如下:
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
for i in range(len(model)): # 遍歷層
for j in range(i+1, len(model)): # 遍歷 i 層之后的若干層
for tensor_name in model[i][2]: # 找出前面層 output 的tensor
if tensor_name in model[j][1]: # 看看 tensor_name 在不在input之中,即tensor_name 是不是 modle[j]的輸入
# tensor_name即在 model[i] 的輸出,也在 module[j]的輸入,就說明他們之間可以建立聯系
if module_to_stage_map[i] == \
module_to_stage_map[j]: # 兩個module在一個node上,不用通信機制
continue
# For now, assume that each stage is served by only
# a single machine.
# tensor_name 是 modle[j]的輸入,且module[j]位於本節點上,說明可以和本節點的 receive_ranks 建立聯系
if module_to_stage_map[j] == self.stage:
# 所以tensor_name的輸入rank包括rank i
self.receive_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[i]]
# tensor_name 是module[i]的輸出,且module[i]位於本節點上,說明可以和本節點的 send_ranks 建立聯系
if module_to_stage_map[i] == self.stage:
# 所以tensor_name的輸出rank包括rank j
self.send_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[j]]
for model_inputs in inputs_module_destinations.keys():
destination_stage = module_to_stage_map[
inputs_module_destinations[model_inputs]]
if destination_stage > self.stage:
self.send_ranks[model_inputs] = \
self.ranks_in_next_stage
if 0 < self.stage <= destination_stage:
self.receive_ranks[model_inputs] = \
self.ranks_in_previous_stage
if destination_stage > 0:
if model_inputs not in self.tensor_tags:
self.tensor_tags[model_inputs] = tensor_tag
tensor_tag += 1
得到變量如下:
num_ranks = {int} 4
num_ranks_in_first_stage = {int} 1
num_ranks_in_next_stage = {int} 0
num_ranks_in_previous_stage = {int} 1
num_ranks_in_stage = {int} 1
num_stages = {int} 4
num_warmup_minibatches = {int} 0
rank = {int} 3
rank_in_stage = {int} 0
ranks_in_next_stage = {list: 0} []
ranks_in_previous_stage = {list: 1} [2]
receive_ranks = {dict: 3} # 這里就是每個tensor對應的接收目標rank
'out8' = {list: 1} [2]
'out9' = {list: 1} [2]
'out10' = {list: 1} [2]
__len__ = {int} 3
send_ranks = {dict: 0} {} # 這里就是每個tensor對應的發送目標rank
__len__ = {int} 0
stage = {int} 3
4.2.6 設置module
接下來會處理module相關操作,這里具體會:
- 首先使用 ModulesWithDependencies 對模型進行繼續處理,把輸入,輸出配置出來。
- 然后調用 cuda 把模型和參數移動到 GPU。
- 如果需要進行處理,針對 fp16 進行轉換。
關於 ModulesWithDependencies 部分,我們重點說明。
之前我們代碼中有如下,就是得倒本stage對應的modules index。
modules = stage_to_module_map[self.stage] # 這里得到 [3,4],后續會用到。
stage_to_module_map 就是設置 stage 到 modules 的關系,目的是為了得到本stage所對應的modules。
回憶一下配置文件,本stage(數值為 3)對應的是 index 為 3,4 的兩個 module,就是下面的 3 ,3
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
接下來要通過如下代碼拿到本stage具體的modules,包括每個module的輸入,輸出。
modules = self.modules_with_dependencies.modules()
for i in range(len(modules)):
modules[i] = modules[i].cuda()
if self.fp16:
import apex.fp16_utils as fp16_utils
modules[i] = fp16_utils.BN_convert_float(modules[i].half())
運行之后如下
modules = {list: 2}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {LabelSmoothing} LabelSmoothing()
__len__ = {int} 2
具體 ModulesWithDependencies 如下:
class ModulesWithDependencies:
def __init__(self, modules_with_dependencies):
self._modules = []
self._all_input_names = []
self._all_output_names = []
for (module, input_names, output_names) in modules_with_dependencies:
self._modules.append(module)
self._all_input_names.append(input_names)
self._all_output_names.append(output_names)
def modules(self):
return self._modules
def all_input_names(self):
return self._all_input_names
def all_output_names(self):
return self._all_output_names
def is_input_tensor(self, tensor_name):
for module_input_names in self._all_input_names:
if tensor_name in module_input_names:
return True
return False
4.2.7 設置group
接下來針對每個stage的並行數目,建立group。
ranks就是每個stage的並行 rank,比如 stage 0 對應的就是 [0, 1, 2]。
{
"module_to_stage_map": [0, 1, 1],
"stage_to_rank_map": {"0": [0, 1, 2], "1": [3]} # 每個stage的rank,這里目的是得到並行的機器
}
遍歷stage,針對每個stage,調用new_group()
建立進程組。new_group()
函數使用所有進程的任意子集來創建新的進程組,該方法返回一個分組句柄,可作為 collectives
(用於特定編程模式中的信息交換)相關分布式函數的 group
參數 。
這里就是最開始問題中提到的:為了數據並行,每個stage都需要建立並且管理自己的進程組。
# Initialize all groups in the same order on every worker.
if stage_to_rank_map is not None:
groups = []
for stage in range(self.num_stages): # 遍歷stage
ranks = stage_to_rank_map[stage] # 與stage的數據並行對應,比如得到 [0, 1, 2]
if len(ranks) > 1: # 與后面的 ddp 相對應
groups.append(dist.new_group(ranks=ranks))
else:
groups.append(None)
group = groups[self.stage]
else:
group = None
4.2.8 設置數據並行
最后調用 DistributedDataParallel 進行處理。這里參數 process_group=group
就是前面 “設定group” 返回的。
就是針對每一個group建立一套 DistributedDataParallel。
# self.modules_with_dependencies contains a list of PyTorch
# modules, along with a list of user-defined input and output
# tensor names. We use our module_executor.ModuleExecutor
# class to wrap these dependencies, and use run_forward and
# run_backward methods downstream.
num_parameters = 0
for i in range(len(modules)):
if group is not None:
if ((i < (len(modules)-1) and self.is_criterion)
or not self.is_criterion):
num_parameters += \
sum(x.size()[0] * x.size()[1]
if len(x.size()) > 1 else x.size()[0]
for x in modules[i].parameters() if x.size())
# 建立分布式數據並行
modules[i] = torch.nn.parallel.DistributedDataParallel(
modules[i],
process_group=group,
device_ids=[local_rank],
output_device=local_rank)
if self.num_ranks_in_stage > 1:
module_size = 4. * num_parameters
print("Replicating stage: ranks=%d, module_size=%.3f" % (
self.num_ranks_in_stage, module_size))
關於 DistributedDataParallel,我們以后有專門系列會進行分析。
4.2.9 初始化通信函數
最后,針對這個通信模塊,進行初始化。
if self.comm_handler is not None:
self.comm_handler.initialize(
self.receive_ranks,
self.send_ranks,
self.tensor_tags,
self.target_tensor_names,
self.training_tensor_dtypes,
self.rank_in_stage,
self.num_ranks_in_stage,
self.ranks_in_previous_stage,
self.ranks_in_next_stage)
我們還是使用論文中的圖片為例來看看運行時引擎初始化之后的結果:
如果針對本文再細化,則是:
+----------------------------------------+
| Stage 2 StageRuntime |
| |
| CommunicationHandler |
| |
| +----------------------------+ |
| | +------------------------+ | |
| | |Rank 2 | | |
| | | | | |
| | | | | |
+-----------------------------+ | | | Layer 3 +---> Layer 4 | | |
| Stage 1 StageRuntime | | | | | | | +---------------------------+
| | | | | | | | | Stage 3 StageRuntime |
| | | | +------------------------+ | | | |
| CommunicationHandler | | | +------------------------+ | | | CommunicationHandler |
| | | | |Rank 3 | | | | |
| +-----------------------+ | | DDP | | | | | | +-----------------------+ |
| |Rank 1 | +---------------->+ | | +----------> | | Rank 4 | |
| | | | | | | Layer 3 +---> Layer 4 | | | | | | |
| | Layer 1 +---> Layer 2 | | | | | | | | | | Layer 5 +---> Layer 6 | |
| | | | | | | | | | | | | |
| | | | | | +------------------------+ | | | | | |
| +-----------------------+ | | | +------------------------+ | | | +-----------------------+ |
| | | | |Rank 4 | | | | |
| | | | | | | | | |
+-----------------------------+ | | | | | | +---------------------------+
| | | Layer 3 +---> Layer 4 | | |
| | | | | |
| | | | | |
| | +------------------------+ | |
| +----------------------------+ |
+----------------------------------------+
手機如下:
4.3 功能函數
我們這里只是介紹基礎功能函數。另外有幾個業務功能函數,比如 run_forward 會在1F1B文章中一並介紹。
以下這幾個功能函數都是調用通訊模塊完成功能。
4.3.1 receive_tensors_forward
receive_tensors_forward 就是在前向傳播中,從前面層獲取張量。
前向傳播中,張量記錄在本實例的 self.tensors 之中。
def receive_tensors_forward(self):
if self.forward_only and len(self.tensors) > 0:
self.tensors.pop(0) # 彈出以前
self.tensors.append({})
if self.loader_iter is not None: # 前向傳播第一層,需要加載數據
input = next(self.loader_iter) # 加載新的
if self.model_type == TRANSLATION:
(input, target) = input
src, src_length = input
tgt, tgt_length = target
self.tensors[-1]["input0"] = src.cuda(non_blocking=True)
self.tensors[-1]["input1"] = torch.LongTensor(src_length).cuda(
non_blocking=True)
self.tensors[-1]["input2"] = tgt[:-1].cuda(non_blocking=True)
self.tensors[-1]["target"] = tgt[1:].cuda().contiguous().view(-1)
self.tensors[-1]["target_length"] = \
torch.tensor([int(sum(torch.LongTensor(tgt_length) - 1))],
dtype=torch.int).cuda()
elif self.model_type == IMAGE_CLASSIFICATION:
(input, target) = input
if self.fp16:
input = input.half()
self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
self.tensors[-1]["target"] = target.cuda(non_blocking=True)
elif self.model_type == SPEECH_TO_TEXT:
input, target, input_percentages, target_sizes = input
input_sizes = input_percentages.mul_(int(input.size(3))).int()
self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
self.tensors[-1]["input1"] = input_sizes.cuda(non_blocking=True)
self.tensors[-1]["target"] = target.cuda(non_blocking=True)
self.tensors[-1]["target_length"] = target_sizes.cuda(
non_blocking=True)
else:
# Receive all required tensors from upstream machines.
for input_name in self.receive_ranks: # 遍歷本stage對應的接受rank,從前面層獲取
if input_name == "ack":
continue
self.tensors[-1][input_name] = \
self.comm_handler.recv(
input_name,
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=False)
self.forward_stats.stats['receive_tensors_size'] += \
(self.tensors[-1][input_name].element_size() *
self.tensors[-1][input_name].nelement())
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(
sending=False)
4.3.2 send_tensors_forward
send_tensors_forward就是在前向傳播中,向后面層發送張量。
def send_tensors_forward(self):
# Send all required tensors downstream.
for output_name in self.send_ranks: # 遍歷本stage對應的發送rank,進行發送
if output_name == "ack":
continue
self.comm_handler.send(
output_name,
self.tensors[-1][output_name],
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=False)
self.forward_stats.stats['send_tensors_size'] += \
(self.tensors[-1][output_name].element_size() *
self.tensors[-1][output_name].nelement())
4.3.3 receive_tensors_backward
后向傳播中,梯度保存在 self.gradients。
receive_tensors_backward 就是在后向傳播中,從前面層獲取張量。
注意,這里對應的是self.send_ranks,就是前向過程中的發送rank,它們在反向過程中就是接受rank
def receive_tensors_backward(self):
# Receive all required gradients from downstream
# machines.
for output_name in self.send_ranks: # 遍歷本stage對應的發送rank(前向),進行接受
if output_name in self.target_tensor_names:
continue
# 獲取梯度
self.gradients[output_name] = \
self.comm_handler.recv(
output_name,
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['receive_tensors_size'] += \
(self.gradients[output_name].element_size() *
self.gradients[output_name].nelement())
4.3.4 send_tensors_backward
后向傳播中,梯度保存在 self.gradients。
send_tensors_forward就是在后向傳播中,向后面層發送梯度張量。
注意,這里對應的是self.receive_ranks,就是前向過程中的接受rank,它們在反向過程中就是發送rank
def send_tensors_backward(self):
# Send all required gradients upstream.
for input_name in self.receive_ranks: # 遍歷本stage對應的接受rank,進行發送
if input_name in self.target_tensor_names:
continue
self.comm_handler.send(
input_name,
self.gradients[input_name],
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['send_tensors_size'] += \
(self.gradients[input_name].element_size() *
self.gradients[input_name].nelement())
if self.num_ranks_in_previous_stage > 0:
# Used to track where to send tensors in the
# backward pass.
self.comm_handler.increment_messaging_index(
sending=True)
4.3.5 run_ack
run_ack就是在傳播中,給前面層,后面層回應一個確認。
def run_ack(self):
# No need for ack if running on a single worker.
if self.rank is None:
return
# Receive ack from next stage. Send ack to previous stage.
if self.stage < (self.num_stages-1):
self.comm_handler.recv(
"ack",
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
if self.stage > 0:
self.comm_handler.send(
"ack",
torch.zeros(self.tensor_shapes["ack"],
dtype=torch.int64).cuda(),
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(sending=True)
self.backward_minibatch_id += 1
至此,運行時引擎我們介紹完畢其靜態信息和初始化,下一篇我們介紹通信模塊。