多機多卡訓練基本原理
在工業實踐中,許多較復雜的任務需要使用更強大的模型。強大模型加上海量的訓練數據,經常導致模型訓練耗時嚴重。比如在計算機視覺分類任務中,訓練一個在ImageNet數據集上精度表現良好的模型,大概需要一周的時間,需要不斷嘗試各種優化的思路和方案。如果每次訓練均要耗時1周,這會大大降低模型迭代的速度。在機器資源充沛的情況下,可以采用分布式訓練,大部分模型的訓練時間可壓縮到小時級別。
飛槳paddle有便利的數據並行訓練方式,僅改動幾行代碼即可實現多GPU訓練,如何將一個單機程序通過簡單的改造,變成多機多卡程序。
單機訓練改多機多卡訓練
將單機單卡訓練模式轉換為多機多卡訓練模式是非常簡單便捷的。在不修改原來的單機單卡程序的基礎上,只需在該代碼的指定位置上添加相應的函數,即可以實現多機多卡的轉換。
首先來看一個簡單的單機單卡程序。
單機單卡代碼示例
# 下面是一個簡單的單機單卡程序
import numpy as np
import paddle.fluid as fluid
import os
# 定義網絡
def mlp(input_x, input_y, hid_dim=1280, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
# 生成數據集
def gen_data():
return {"x": np.random.random(size=(128, 32)).astype('float32'),
"y": np.random.randint(2, size=(128, 1)).astype('int64')}
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
# 定義損失
cost = mlp(input_x, input_y)
# 定義優化器
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(cost)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 100
# 進行訓練
for i in range(step):
cost_val = exe.run(feed=gen_data(),
fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
# 模型保存
model_path = "./"
if os.path.exists(model_path):
fluid.io.save_persistables(exe, model_path)
單機單卡改多機多卡操作流程
將單機單卡訓練模式改成多機多卡訓練模式的操作流程如下:
1、導入分布式訓練庫。
2、定義訓練策略和集群環境定義。
3、對optimizer封裝,並調用封裝后的minimize方法。
4、保存模型。主要用於保存分布式訓練的模型。
單機單卡改多機多卡操作步驟
將單機單卡程序改成多機多卡的具體處理步驟如下所述。
1、導入分布式訓練庫
這里主要引入分布式Fleet API。 Fleet的設計在易於使用和算法可擴展性之間進行了權衡,並且非常高效。首先,用戶可以在十行代碼中,從本地機器槳式代碼轉換為分布式代碼。其次,可以通過Fleet API設置分布式策略,從而輕松定義不同的算法。
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.incubate.fleet.base import role_maker
2、定義訓練策略和集群環境定義
這里需要定義分布式訓練的相應策略,用於控制選取何種分布式方式以及相應的參數。詳細的訓練策略設定詳見多機性能調優。集群環境這里推薦使用PaddleCloudRoleMaker並使用paddle.distributed.launch啟動程序;這樣,PaddleCloudRoleMaker可自動獲取訓練集群相關信息。情見如何運行多機多卡程序。
dist_strategy = DistributedStrategy()
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
3、對optimizer封裝,並調用封裝后的minimize方法
這里主要將單機單卡的minimize方法轉換為多機多卡的minimize方法。調用第二步封裝后的optimizer的minimize方法。這里將optimizer轉換為distributed_optimizer,其主要是對單機的optimizer增加了_transpile()的實現,對main_program進行轉換(比如插入一些分布式的op等),主要是基於NCCL通信協議,實現梯度同步。
optimizer = fleet.distributed_optimizer(optimizer, strategy=dist_strategy)
optimizer.minimize(cost, fluid.default_startup_program())
4、保存模型
使用fleet.save_persistables 或fleet.inference_model保存模型。
if os.path.exists(model_path):
fleet.save_persistables(exe, model_path)
單機單卡改多機多卡代碼示例
將單機單卡訓練改成多機多卡訓練的代碼train_with_fleet.py示例如下。
# -*- coding: utf-8 -*-
import os
import numpy as np
import paddle.fluid as fluid
# 區別1: 導入分布式訓練庫
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.incubate.fleet.base import role_maker
# 定義網絡
def mlp(input_x, input_y, hid_dim=1280, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
# 生成數據集
def gen_data():
return {"x": np.random.random(size=(128, 32)).astype('float32'),
"y": np.random.randint(2, size=(128, 1)).astype('int64')}
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
# 定義損失
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
# 區別2: 定義訓練策略和集群環境定義
dist_strategy = DistributedStrategy()
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
# 區別3: 對optimizer封裝,並調用封裝后的minimize方法
optimizer = fleet.distributed_optimizer(optimizer, strategy=DistributedStrategy())
optimizer.minimize(cost, fluid.default_startup_program())
train_prog = fleet.main_program
# 獲得當前gpu的id號
gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
print(gpu_id)
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 100
for i in range(step):
cost_val = exe.run(program=train_prog, feed=gen_data(), fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
# 區別4: 模型保存
model_path = "./"
if os.path.exists(model_path):
fleet.save_persistables(exe, model_path)
運行多機多卡程序與單卡程序略有不同,需要通過特定的腳本在每一張卡上啟動分布式程序,詳情可看下一節內容。以單機八卡環境為例,采用默認的參數配置即可成功運行。方式如下:
config="--selected_gpus=0,1,2,3,4,5,6,7 --log_dir mylog"
python -m paddle.distributed.launch ${config} train.py
運行結果會在mylog文件夾中得到8個log日志,分別對應8張卡的運行結果。
以workerlog.0日志為例,會打印出該卡的運行結果。
如何運行多機多卡程序
多機多卡程序的運行一般依賴於相應的集群,不同集群環境,相應的運行方法有所不同。下面主要針對用戶自定義集群和PaddleCloud集群來說明程序的運行方式。
說明
更多API使用實例以及更復雜的模型,可以參考 https://github.com/PaddlePaddle/Fleet
方式一:使用用戶自定義集群
注意:
對於其它集群,用戶需要知道集群中所有節點的IP地址。
需要使用paddle.distributed.launch模塊啟動訓練任務,可以通過如下命令查看paddle.distributed.launch模塊的使用方法。
python -m paddle.distributed.launch --help
用戶只需要配置以下參數:
--cluster_node_ips: 集群中所有節點的IP地址列表,以','分隔,例如:192.168.1.2,192.168.1.3。
--node_ip: 當前節點的IP地址。
--started_port:起始端口號,假設起始端口號為51340,並且節點上使用的GPU卡數為4,那么GPU卡上對應訓練進程的端口號分別為51340、51341和51342。務必確保端口號可用。
--selected_gpus:使用的GPU卡。
假設用戶使用的訓練集群包含兩個節點(機器),IP地址分別為192.168.1.2和192.168.1.3,並且每個節點上使用的GPU卡數為4,那么在兩個節點的終端上分別運行如下任務。
192.168.1.2節點
python -m paddle.distributed.launch \
--cluster_node_ips=192.168.1.2,192.168.1.3 \
--node_ip=192.168.1.2 \
--started_port=6170 \
--selected_gpus=0,1,2,3 \
train_with_fleet.py
192.168.1.3節點
python -m paddle.distributed.launch \
--cluster_node_ips=192.168.1.2,192.168.1.3 \
--node_ip=192.168.1.3 \
--started_port=6170 \
--selected_gpus=0,1,2,3 \
train_with_fleet.py
注意:
對於想在單機多卡運行程序的用戶,可以直接采用默認參數運行多卡程序。命令是:
config="--selected_gpus=0,1,2,3,4,5,6,7 --log_dir mylog"
python -m paddle.distributed.launch ${config} train.py
方式二:使用PaddleCloud集群
針對百度內部用戶,可以使用PaddleCloud集群運行多機多卡程序。關於如何使用PaddleCloud,請參考PaddleCloud官網:PaddleCloud官網。
注意:
對於PaddleCloud分布式訓練,訓練方式需要選擇“分布式訓練”,任務模式需要選擇“NCCL2模式”,如下圖所示。
當采用客戶端提交任務的方式時,需要通過以下命令行指定運行模式為“NCCL2模式”。
paddlecloud job \
... \
--is-standalone 0 \
--distribute-job-type NCCL2
需要將運行命令配置為如下命令:
start_cmd="python -m paddle.distributed.launch --use_paddlecloud --seletected_gpus='0,1,2,3,4,5,6,7' train_with_fleet.py --model=ResNet50 --data_dir=./ImageNet"
paddlecloud job \
--start-cmd "${start_cmd}" \
... \
--is-standalone 0 \
--distribute-job-type NCCL2
多機多卡模式下的性能調優
針對單機單卡改造成多機多卡第二步中的訓練策略,使用默認的設置一般情況下可能無法達到最優的計算性能。
常見的性能調優設置
介紹一些提高分布式性能的調優設置。
設置環境變量
說明:
- FLAGS_sync_nccl_allreduce:配置 FLAGS_sync_nccl_allreduce=1,讓每次allreduce操作都等待完成,可以提升性能,詳細原因和分析可以參考:https://github.com/PaddlePaddle/Paddle/issues/15049
- FLAGS_fraction_of_gpu_memory_to_use:設置的范圍是0.0~1.0。比如,配置FLAGS_fraction_of_gpu_memory_to_use=0.95 ,0.95是指95%的顯存會預先分配。注意,設置成0.0會讓每次顯存分配都調用cudaMalloc,這樣會極大的降低訓練性能。
- NCCL_IB_DISABLE:在使用NCCL2模式訓練時,會默認嘗試開啟RDMA通信。如果系統不支持,則會自動降級為使用TCP通信。可以通過打開環境變量NCCL_DEBUG=INFO查看NCCL是否選擇了開啟RDMA通信。如果需要強制使用TCP方式通信,可以設置 NCCL_IB_DISABLE=1 。
設置訓練策略
訓練參數設置表
說明:
- 設置合適的CPU線程數num_threads和nccl通信器數量 nccl_comm_num :飛槳paddle使用“線程池”模型調度並執行OP,OP在啟動GPU計算之前,通常需要CPU的協助,然而如果OP本身占用時間很小,“線程池”模型下又會帶來額外的調度開銷。使用多進程模式時,如果神經網絡的計算圖節點間有較高的並發度,即使每個進程只在一個GPU上運行,使用多個線程可以更大限度的提升GPU利用率。NCCL通信器數量 nccl_comm_num 可以加快GPU之間的通信效率。
- AllReduce融合fuse_all_reduce_ops:默認情況下會將同一layer中參數的梯度的AllReduce操作合並成一個。比如,對於fluid.layers.fc中有Weight和Bias兩個參數,打開該選項之后,原本需要兩次AllReduce操作,現在只用一次AllReduce 操作。此外,為支持更大粒度的參數梯度融合,Paddle提供了 FLAGS_fuse_parameter_memory_size 和 FLAGS_fuse_parameter_groups_size 兩個環境變量選項。用戶可以指定融合AllReduce操作之后,每個AllReduce操作的梯度字節數。比如,希望每次AllReduce調用傳輸16MB的梯度,export FLAGS_fuse_parameter_memory_size=16 ,經驗值為總通信量的十分之一。可以指定每次AllReduce操作的最大層數,即到達該層數就進行AllReduce,如指定50層 export FLAGS_fuse_parameter_groups_size=50 。注意:目前不支持sparse參數梯度。
- 使用分級式reduce use_hierarchical_allreduce:對於多機模式,針對小數據量的通信,Ring AllReduce通信效率低,采用Hierarchical AllReduce可以解決該問題。
- 降低scope drop頻率 num_iteration_per_drop_scope 和fetch頻率 fetch_frequency:減少scope drop和fetch頻率,可以減少頻繁的變量內存申請、釋放和拷貝,從而提升性能。
設置訓練方式
GPU多機多卡同步訓練過程中存在慢trainer現象,即每步中訓練快的trainer的同步通信需要等待訓練慢的trainer。 由於每步中慢trainer的rank具有隨機性, 因此使用局部異步訓練的方式——LocalSGD, 通過多步異步訓練(無通信阻塞)實現慢trainer時間均攤, 從而提升同步訓練性能,如下圖所示:
Local SGD訓練方式主要有三個參數,分別是:
說明
- Local SGD的warmup步長local_sgd_is_warm_steps影響最終模型的泛化能力,一般需要等到模型參數穩定之后在進行Local SGD訓練,經驗值可以將學習率第一次下降時的epoch作為warmup步長,之后再進行Local SGD訓練。
- Local SGD步長local_sgd_steps ,一般該值越大,通信次數越少,訓練速度越快,但隨之而來的時模型精度下降。經驗值設置為2或者4。具體的Local SGD的訓練代碼可以參考: https://github.com/PaddlePaddle/Fleet/tree/develop/examples/local_sgd/resnet
- Local SGD訓練方式在Imagenet數據集,ResetNet50網絡上,4機32卡,有8%~10%的速度提升。
性能調優代碼示例
使用相應的分布式策略之后的多機多卡的代碼示例如下。
# -*- coding: utf-8 -*-
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.incubate.fleet.base import role_maker
# 調優策略的部分參數設置
# 建議多機設置為2,單機設置為1
nccl_comm_num = 2
# 建議多機設置為nccl_comm_num+1,單機設置為1
num_threads = 3
# scope drop頻率
num_iteration_per_drop_scope = 30
#AllReduce是否融合
fuse_all_reduce_ops = True
# 刷新頻率
fetch_frequency = 2
def mlp(input_x, input_y, hid_dim=1280, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
def gen_data():
return {"x": np.random.random(size=(128, 32)).astype('float32'),
"y": np.random.randint(2, size=(128, 1)).astype('int64')}
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
dist_strategy = DistributedStrategy()
dist_strategy.nccl_comm_num = nccl_comm_num
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = num_threads
exec_strategy.num_iteration_per_drop_scope = num_iteration_per_drop_scope
dist_strategy.exec_strategy = exec_strategy
dist_strategy.fuse_all_reduce_ops = fuse_all_reduce_ops
optimizer = fleet.distributed_optimizer(optimizer, strategy=dist_strategy)
optimizer.minimize(cost, fluid.default_startup_program())
train_prog = fleet.main_program
gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 100
for i in range(step):
# fetch頻率
if i % fetch_frequency == 0:
cost_val = exe.run(program=train_prog, feed=gen_data(), fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
else:
cost_val = exe.run(program=train_prog, feed=gen_data())