一些分布式系統的基本概念
-
group
進程組。默認情況只有一個組,一個 job 為一個組,也為一個 world -
world size
全局進程個數 -
rank
表示進程序號,用於進程間的通訊。rank=0 的主機為 master 節點 -
local rank
進程內 GPU 編號,非顯式參數,由 torch.distributed.launch 內部指定。 rank=3, local_rank=0 表示第 3 個進程內的第 1 塊 GPU。
具體操作
首先需要進行一些參數的設置
import argparse
parser = argparse.ArgumentParser(description='PyTorch distributed training')
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--dist", type=bool, default=True)
parser.add_argument("--gpu_ids", type=list, default=[0,1,2,3])
args = parser.parse_args()
- local_rank 是為了 torch.distributed.launch 內部指定
- dist 是否使用分布式訓練
- gpu_ids 指定使用 GPU 的編號
初始化分布式
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def init_dist(backend="nccl", **kwargs):
""" initialization for distributed training"""
if (
mp.get_start_method(allow_none=True) != "spawn"
): # Return the name of start method used for starting processes
mp.set_start_method("spawn", force=True) #'spawn' is the default on Windows
rank = int(os.environ['RANK']) # system env process ranks
num_gpus = torch.cuda.device_count() # Returns the number of GPUs available
torch.cuda.set_device(rank % num_gpus)
dist.init_process_group(
backend=backend, **kwargs
) # Initializes the default distributed process group
if args.dist:
init_dist()
world_size = (
torch.distributed.get_world_size()
) # Returns the number of processes in the current process group
rank = torch.distributed.get_rank() # Returns the rank of current process group
else:
rank = -1
torch.backends.cudnn.benchmark = True
需要進行修改的地方
- 需要記錄日志的地方
if rank <= 0:
logger.info('Something need to log')
- 數據加載的地方
import math
from torch.utils.data import DataLoader
dataset_ratio = 200
if train:
train_set = define_Dataset(train_dataset)
train_size = int(math.ceil(len(train_set) / batch_size))
total_epochs = int(math.ceil(total_iters / train_size))
if args.dist:
world_size = torch.distributed.get_world_size()
assert batch_size % world_size == 0
batch_size = batch_size // world_size
train_sampler = DistIterSampler(
train_set, world_size, rank, dataset_ratio
)
total_epochs = int(math.ceil(total_iters / (train_size * dataset_ratio)))
train_loader = DataLoader(train_set,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
drop_last=True,
pin_memory=True,
sampler=train_sampler)
else:
train_loader = DataLoader(train_set,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
pin_memory=True)
else:
test_set = define_Dataset(test_dataset)
test_loader = DataLoader(test_set, batch_size=1,
shuffle=False, num_workers=1,
drop_last=False, pin_memory=True)
需要注意的是
♠ world_size 可以理解為 GPU 的數量,需要保證 batch_size 能整除 world_size 即把原本一個 batch 分給幾個 GPU
♣ 使用分布式訓練時 DataLoader 中 shuffle 需要為 False
♥ 測試時是使用單 GPU 的
♦ 分布式需要指定 sampler
DistIterSampler 的代碼如下:
"""
Modified from torch.utils.data.distributed.DistributedSampler
Support enlarging the dataset for *iter-oriented* training, for saving time when restart the
dataloader after each epoch
"""
import math
import torch
import torch.distributed as dist
from torch.utils.data.sampler import Sampler
class DistIterSampler(Sampler):
"""Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
:class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
process can pass a DistributedSampler instance as a DataLoader sampler,
and load a subset of the original dataset that is exclusive to it.
.. note::
Dataset is assumed to be of constant size.
Arguments:
dataset: Dataset used for sampling.
num_replicas (optional): Number of processes participating in
distributed training.
rank (optional): Rank of the current process within num_replicas.
"""
def __init__(self, dataset, num_replicas=None, rank=None, ratio=100):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * ratio / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(
self.total_size, generator=g
).tolist() # Returns a random permutation of integers from 0 to n - 1
dsize = len(self.dataset)
indices = [v % dsize for v in indices]
# subsample
indices = indices[self.rank : self.total_size : self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
self.epoch = epoch
- 模型的設置
from torch.nn.parallel import DataParallel, DistributedDataParallel
device = torch.device('cuda' if opt['gpu_ids'] is not None else 'cpu')
net = define_net().to(self.device)
if args.dist:
rank = torch.distributed.get_rank()
net = DistributedDataParallel(self.netG, device_ids=[torch.cuda.current_device()])
else:
rank = -1 # non dist training
net = DataParallel(self.netG)
input = input.to(f'cuda:{net.device_ids[0]}')
if isinstance(network, nn.DataParallel) or isinstance(network, DistributedDataParallel):
network = network.module
state_dict = network.state_dict()
for key, param in state_dict.items():
state_dict[key] = param.cpu()
torch.save(state_dict, save_path)
因此需要在模型的定義、加載、保存以及輸入指定 GPU 需要修改。
- 腳本命令
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m torch.distributed.launch --nproc_per_node=4 --master_port=3210 train.py
參數說明
♠ CUDA_VISIBLE_DEVICES 指定 GPU 的編號
♣ nproc_per_node 參數指定為當前主機創建的進程數。一般設定為當前主機的 GPU 數量
♥ master_port 分別指定 master 節點的 ip:port
其余就是哪錯調哪了。