使用horovod構建分布式深度學習框架


  最近一直在嘗試着分布式深度學習的架構,主要的原因一方面是幾台機子全是1060卡,利用深度網絡在較大數據樣本上訓練的效率極其低下,所以嘗試着將幾台機子做成分布式,看看能否提高訓練效率;第二方面是有人習慣使用tensorflow,有人習慣使用keras,也有人喜歡使用pytorch等,雖然這些框架各自都有分布式的實現,但總的來說不能統一到一個平台上,造成使用上有不好的體驗。在查資料的時候正好看到了horovod這個框架,它是集成了多個深度框架的一個統一平台,搭建和使用起來都比較方便,所以打算嘗試基於horovod搭建一個分布式環境,供后期使用。可惜沒有使用docker去部署,其中配置的過程中遇到不少坑,還好都解決了。

  一.分布式中的ps架構和ring-allreduce架構

  1.ps架構

    在Parameter server架構(PS架構)中,集群中的節點被分為parameter serverworker兩類。其中parameter server收集、更新和存放模型的參數,而worker負責計算模型參  數的梯度。在每個迭代過程ps聚合所有worker傳回的梯度,然后更新參數,並將新的參數廣播給worker缺點是集群通信不均衡問題,類似木桶效應。

  ps架構同步隨機梯度下降SGD如下圖:

  

   

  2.ring-allreduce架構

    在Ring-allreduce架構中,各個設備都是worker,並且形成一個環,如上圖所示,沒有中心節點來聚合所有worker計算的梯度。在一個迭代過程,每個worker完成自己的mini-  batch訓練,計算出梯度,並將梯度傳遞給環中的下一個worker,同時它也接收從上一個worker的梯度。對於一個包含Nworker的環,各個worker需要收到其它N-1worker的梯度  后就可以更新模型參數。其實這個過程需要兩個部分:scatter-reduceallgather,百度開發了自己的allreduce框架,並將其用在了深度學習的分布式訓練中。如下圖。

  相比PS架構,Ring-allreduce架構有如下優點:

  a. 帶寬優化,因為集群中每個節點的帶寬都被充分利用。而PS架構,所有的worker計算節點都需要聚合給parameter server,這會造成一種通信瓶頸。parameter server的帶寬瓶頸  會影響整個系統性能,隨着worker數量的增加,其加速比會迅速的惡化。  

   b.此外,在深度學習訓練過程中,計算梯度采用BP算法,其特點是后面層的梯度先被計算,而前面層的梯度慢於前面層,Ring-allreduce架構可以充分利用這個特點,在前面層梯  度計算的同時進行后面層梯度的傳遞,從而進一步減少訓練時間。在百度的實驗中,他們發現訓練速度基本上線性正比於GPUs數目(worker數)。

   

  

 

  二.准備工作:

       1.一開始是利用虛擬機虛擬了三台機子進行了cpu版的成功測試,可惜GPU無法用在虛擬環境上。

          2.后來弄了三兩台真實的物理機,將原windows都改為linux系統。

        a.安裝nvidia驅動

        b.安裝cuda

        c.安裝cudnn

        d.安裝pytorch和tf

        e.兩台機子ssh相互免密碼登錄

        f.nfs共享文件系統(將腳本和樣本放在這里),共它機器掛載在此目錄下

        g.openmpi的安裝配置

        h.nccl的安裝配置

        i.horovod的安裝配置

 (我這里使用的版本是:ubuntu16.0.4、nvidia384.130、cuda9.0、cudnn7.6.4、pytorch1.1.0、tf1.12、nccl2(nccl2.x版本為多機多卡)、openmpi4.0.0...)

  注意:在安裝這些工具的時候,需要注意各自匹配的版本以及環境的配置,否則安裝不成功,需要耐心。  

  環境在配置好后,進行了測試。有個問題是在指定命令運行的時候,多機多gpu會出現bash: orted: command not found,單機可以運行。這里解決的辦法有2種:第一      是在安裝 openmpi時指定--prefix=openmpi安裝目錄;第二種是在運行時指定命令--prefix openmpi安裝目錄。

 

  三. 利用mnist數字識別進行分布式測試,將測試腳本放在nfs共享目錄里,mnist會自動下載訓練樣本

    

import argparse
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets,transforms
import torch.utils.data.distributed
import horovod.torch as hvd

# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                    help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                    help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
                    help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                    help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                    help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
                    help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()

# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)

if args.cuda:
    # horovod: pin GPU to local rank.
    torch.cuda.set_device(hvd.local_rank())
    torch.cuda.manual_seed(args.seed)
#用來記錄訓練的loss情況,利用tensorboard可視化loss
if hvd.rank() == 0:
    writer = SummaryWriter('/home/user/share/log/mnist_test')

# horovod: limit # of cpu threads to be used per worker.
torch.set_num_threads(1)
kwargs = {'num_workers' : 1, 'pin_memory' : True} if args.cuda else {}
train_dataset = datasets.MNIST('/home/user/share/data/', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ]))
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, 
                                                                num_replicas=hvd.size(),
                                                                rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset,
                                          batch_size=args.batch_size,
                                          sampler=train_sampler, **kwargs)

test_dataset = datasets.MNIST('/home/user/share/data/', train=False, download=True,transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]))
# Horovod: use DistributedSampler to partition the test data.
test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset,
                                                                num_replicas=hvd.size(),
                                                                rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset,
                                         batch_size=args.test_batch_size,
                                         sampler=test_sampler, **kwargs)

class Net(nn.Module):
    def __init__(self):
        super(Net,self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)
    
    def forward(self,x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)),2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
    
model = Net()

if args.cuda:
    #move model to GPU
    model.cuda()

# Horovod: scale learning rate by the number of GPUs.由於是多機運行,batch較大,所以這里將學習率增大
optimizer = optim.SGD(model.parameters(),
                     lr=args.lr*hvd.size(),
                     momentum=args.momentum)

# Horovod: 廣播初始化參數和優化器狀態
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Horovod: wrap optimizer with DistributedOptimizer
optimizer = hvd.DistributedOptimizer(optimizer,
                                    named_parameters=model.named_parameters(),
                                    compression=compression)

def train(epoch):
    model.train()
    # Horovod: set epoch to sampler for shuffling.
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if (batch_idx % args.log_interval == 0) and (hvd.rank() == 0):
            # Horovod: use train_sampler to determine the number of examples in
            # this worker's partition.
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_sampler),
                100. * batch_idx / len(train_loader), loss.item()))
            niter = epoch * len(train_loader) + batch_idx
            writer.add_scalar('Train/Loss',loss.item(),niter)


def metric_average(val, name):
    tensor = torch.tensor(val)
    avg_tensor = hvd.allreduce(tensor, name=name)
    return avg_tensor.item()


def test():
    model.eval()
    test_loss = 0.
    test_accuracy = 0.
    for data, target in test_loader:
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        output = model(data)
        # sum up batch loss
        test_loss += F.nll_loss(output, target, size_average=False).item()
        # get the index of the max log-probability
        pred = output.data.max(1, keepdim=True)[1]
        test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()

    # Horovod: use test_sampler to determine the number of examples in
    # this worker's partition.
    test_loss /= len(test_sampler)
    test_accuracy /= len(test_sampler)

    # Horovod: average metric values across workers.
    test_loss = metric_average(test_loss, 'avg_loss')
    test_accuracy = metric_average(test_accuracy, 'avg_accuracy')

    # Horovod: 在first rank上打印輸出
    if hvd.rank() == 0:
        print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
            test_loss, 100. * test_accuracy))


for epoch in range(1, args.epochs + 1):
    train(epoch)
    test()
    
if hvd.rank() == 0:
    print('hvd.rank:{}'.format(hvd.rank()))
    save_path = '/home/user/model/mnist_model.pkl'
    # 將模型保存在rank為0的機子上
    torch.save(model.state_dict(), save_path)
    print('model save success! path = {}'.format(save_path))

在linux中執行以上腳本的命令是:

    nohup mpirun -np 2 -H work_1:1,work_2:1 -bind-to none -map-by slot -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -mca pml ob1 -mca btl ^openib -prefix /home/user/openmpi python -u /home/user/share/torch/test_mnist.py  > /home/user/share/log/test_mnist.log 2>&1 &

其中兩台機器為work_1和work_2,分別使用一個GPU;腳本名為test_mnist.py;test_mnist.log為日志。

 

參考:

1.https://blog.csdn.net/zwqjoy/article/details/81013527

2.https://www.cnblogs.com/goya/p/11790387.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM