【深度學習 有效煉丹】多GPU使用教程, DP與DDP對比, ray多線程並行處理等 [GPU利用率低的分析]


⬅️ 前言


更新日志:

20220404:新增一個DDP 加載模型時顯存分布不均問題,見目錄遇到的問題及解決處


主要是上次server12 被自己一個train 直接線程全部拉滿了(沒錯 ... server8 也被拉滿過 emm我一開始還沒發現 原來是我拉滿的)

現場實況

后面劉所就跟我說讓我看看是不是dataset里面的處理太多了,這樣下來GPU占着 使用率也不高,建議先處理完了再直接由load進來 直接訓練;因為server上的cpu都不是很好,一開始那樣玩會拉慢訓練速度,兩種選擇:

  1. 把cpu的操作全部放到gpu上去做
  2. 先把cpu操作做了保存成數據,這樣不用每次epoch都做一次操作了

下面主要就是針對前言里面說的兩條進行分析與方案確認

✔️ 1. 預處理


📍 最終方案 ( 並發多線程, dataset→pkl )

參考:

線程並發 並行 多機並行等一系列操作:

Ray v1.10.0

tips:

Programming in Ray: Tips for first-time users - RISE Lab

詳情代碼見:暫時還沒開源

  1. 多CPU線程並行 → ray庫
  2. 一個進程運行較多sample → ray.append

相關代碼與解釋

# 記得改一下 num cpus
sample_in_pre_run = 50
@ray.remote(num_cpus=48)
def write_pkl(save_dir, cur_num, dataset, run_num, pba: ActorHandle):
    # print(cur_num, cur_num + run_num)
    for i in range(cur_num, min(cur_num + run_num, len(dataset))):
        with open(f'{save_dir}/%d.pkl'%(i), 'wb') as fd:
            pickle.dump(dataset[i], fd)
    pba.update.remote(1)
  1. num_cpus 是需要根據自己的線程數設置,不是越多越好!比如服務器上12, 8左右就行,因為每開一個線程就需要很多時間,如果開線程的耗費>>進程的耗費,那還不如單進程自己跑跑跑
  2. 根據上面提示,也就引入了sample_in_pre_run 也就是一個進程里干多少個工作,如果太多 可能進程拖慢了,太少 開線程又浪費了

里面有些操作是進度條可忽略,注意並發的ray 原tqdm進度條會失效!

詳情見官方:(也可見上述gitlab中有copy下面進度條)

Ray v1.10.0

ray_train_set = ray.put(train_set)
for cur_num in range(0, len(train_set), sample_in_pre_run):
      tasks_pre.append(write_pkl.remote(train_dir, cur_num, ray_train_set, sample_in_pre_run))
ray_val_set = ray.put(val_set)
for cur_num in range(0, len(val_set), sample_in_pre_run):
    tasks_pre.append(write_pkl.remote(val_dir, cur_num, val_set, sample_in_pre_run))

pb.print_until_done()
tasks = ray.get(tasks_pre)
# 進度條相關
tasks == list(range(num_ticks))
num_ticks == ray.get(actor.get_counter.remote())
  1. 可以看到每個for里面的進程加上了remote也就是ray所需要的函數,可以看一下上面的walkin

  2. 把每一個任務都append到tasks_pre后,再統一 ray.get(tasks_pre) 即可

  3. 注意新加了一個ray.put 是因為上服務器大數據的時候發現append操作巨慢,然后查了一下和傳入的參數有關,所以把參直接放到put共享下了

  4. 不要進度條的話 大概簡潔版是這樣的:

    from torch.utils.data import DataLoader
    from datasets.config import GlobalConfig
    
    # 一開始的數據操作都在dataset的get_item里面做了
    from datasets.dataloader import CARLA_Data
    # 一開始的數據操作都在dataset的get_item里面做了
    
    import ray
    ray.init()
    sample_in_pre_run = 50
    @ray.remote(num_cpus=48)
    def write_pkl(save_dir, cur_num, dataset, run_num):
        # print(cur_num, cur_num + run_num)
        for i in range(cur_num, min(cur_num + run_num, len(dataset))):
            with open(f'{save_dir}/%d.pkl'%(i), 'wb') as fd:
                pickle.dump(dataset[i], fd)
    
    ray_train_set = ray.put(train_set)
    for cur_num in range(0, len(train_set), sample_in_pre_run):
          tasks_pre.append(write_pkl.remote(train_dir, cur_num, ray_train_set, sample_in_pre_run))
    ray_val_set = ray.put(val_set)
    for cur_num in range(0, len(val_set), sample_in_pre_run):
        tasks_pre.append(write_pkl.remote(val_dir, cur_num, val_set, sample_in_pre_run))
    
    # 一起並發處理
    ray.get(tasks_pre)
    

速度效果對比

最終效果速度對比:

大概是2.5倍的速度 處理速度

  • 上面提到了append操作 每次傳進dataset太慢 放到put里面 共享了就快了

這之后要是還是很慢的話,就不是CPU的鍋了,建議檢查一下io的讀取速度 如下:

記一次服務端 IO 瓶頸問題定位 · TesterHome

使用率對比

主要就是處理完數據后通過pkl等保存的文件直接再讀取一次dataset 比如上面處理完CARLA_Data后再寫一個直接load pkl的dataset即可

class PRE_Data(Dataset):
    def __init__(self, root, config, data_use='train'):

        self.preload_dict = []
        preload_file = os.path.join(root, 'rg_mmfn_diag_pl_'+str(self.seq_len)+'_'+str(self.pred_len)+ '_' + data_use +'.npy')
        preload_dict = []
        if not os.path.exists(preload_file):
            # list sub-directories in root
            for pkl_file in os.listdir(root):
                if pkl_file.split('.')[-1]=='pkl':
                    pkl_file = str(root) + '/' + pkl_file
                    preload_dict.append(pkl_file)
            np.save(preload_file, preload_dict)

        # load from npy if available
        preload_dict = np.load(preload_file, allow_pickle=True)
        self.preload_dict = preload_dict
        print("Preloading sequences from " + preload_file)

    def __len__(self):
        """Returns the length of the dataset. """
        return len(self.preload_dict)

    def __getitem__(self, index):
        """Returns the item at index idx. """

        with open(self.preload_dict[index], 'rb') as fd:
            data = pickle.load(fd)
        return data

可以看到提前通過處理后,CPU的使用率基本不會在拉滿到100%,甚至等同於無(因為同時還有其他人使用這個server

同時GPU使用率也提高了兩倍,主要是之前的使用率一直在20%不動,CPU都跑滿了都不動

✔️ 2. 單機多卡並行


參考:

官方的DDP教程:

Getting Started with Distributed Data Parallel - PyTorch Tutorials 1.10.1+cu102 documentation

github 211star 中文:

https://github.com/jia-zhuang/pytorch-multi-gpu-training

主要就是調研情況里面的 那個參考鏈接 的做法,首先根據官方文檔 我們看一下 選擇 DistributedDataParallelDataParallel 之間的官方給出的區別及效率,官方推薦前者 DistributedDataParallel :Before we dive in, let’s clarify why, despite the added complexity, you would consider using DistributedDataParallel over DataParallel:

  1. 首先如果不是那么急切的話 其實 DataParallel 也行,主要是因為實現起來真的很方便加一行 真就一行
  2. DistributedDataParallel 就不一樣了 要加很多行(還行還行),但是呢 是高效的

兩者之間的效率對比我並沒有做,所以我們就根據官方文檔而的來這個結論了

兩種方案 理論知識


以下為 官方對比鏈接人工翻譯

  • DataParallel 是單進程多線程的,而且只能在一個機器上進行 也就是單機多卡
  • DistributedDataParallel 多進程 既可以用於單機訓練,也可以用於多機訓練

再者因為 DataParallel 跨線程的操作使得 線程之間的 GIL 競爭 、每次迭代間復制模型同步 以及 分散輸入 和 集成輸出,這些都會導致額外開銷,即使在單台機器上,DataParallel 通常也比 DistributedDataParallel 慢。

這是其一,其二呢如果 你的模型太太太 大以至於一個小GPU都裝不下,那么 DataParallel 就失效了,因為必須使用 模型並行 將其拆分到多個 GPU 上。 DistributedDataParallel 是與模型並行工作的,而DataParallel 目前沒有實現

第三點是小提醒 與對比無關:DDP 與模型並行相結合時,每個 DDP 進程將使用模型並行,所有進程共同使用數據並行。如果模型需要跨越多台機器,或者 模型方案等 不適合數據並行范式,請參閱 RPC API 以獲得更通用的分布式訓練支持。


理論知識學習完了 進入代碼實踐部分

代碼部分 修改

https://github.com/jia-zhuang/pytorch-multi-gpu-training

正如參考的github中那樣 (其實那個寫的挺不錯的 hhhh 以下為部分重復及補充 因為遇到一些意想不到的情況 emmm 一言難盡)

DataParallel


之所以簡單是因為... 只需要一行,只需要把自己的model放進去就行網絡

注意參考所說的這點,更為詳情 點擊參考鏈接查看

為方便說明,我們假設模型輸入為(32, input_dim),這里的 32 表示batch_size,模型輸出為(32, output_dim),使用 4 個GPU訓練。nn.DataParallel 起到的作用是將這 32 個樣本拆成 4 份,發送給 4 個GPU 分別做 forward,然后生成 4 個大小為(8, output_dim)的輸出,然后再將這 4 個輸出都收集到cuda:0 上並合並成(32, output_dim)
可以看出,nn.DataParallel沒有改變模型的輸入輸出,因此其他部分的代碼不需要做任何更改,非常方便。但弊端是,后續的loss計算只會在cuda:0上進行,沒法並行,因此會導致負載不均衡的問題;如果把loss放在模型里計算的話,則可以緩解上述負載不均衡的問題

# Model
model = TransFuser(config, args.device)
if args.is_multi_gpu:
    print(bcolors.OKGREEN + "Multi GPU USE"+ bcolors.ENDC)
    model = nn.DataParallel(model)

DistributedDataParallel


實現起來更為麻煩”一“點,因為是多進程

從一開始就會啟動多個進程(進程數等於GPU數),每個進程獨享一個GPU,每個進程都會獨立地執行代碼。這意味着每個進程都獨立地初始化模型、訓練,當然,在每次迭代過程中會通過進程間通信共享梯度,整合梯度,然后獨立地更新參數。

CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 train.py

所以你運行了一句話 但是類似於一個腳本運行了四個python,所以呢 所有打印和保存都會運行四個,辦法就是判斷這是args.local_rank是0 的時候再進行這些保存打印操作

CUDA_VISIBLE_DEVICES 為運行時腳本能看到的GPU id,nproc_per_node 為GPU個數,nnodes 為主機個數 單機就是1,train.py 就是你的正常訓練代碼,注意需要經過以下幾點修改:

  1. 要有local_rank的傳入 因為運行的時候 他會出入一個local rank指定

    from torch.utils.data.distributed import DistributedSampler
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=-1)
    args = parser.parse_args()
    
    • 碎碎念,也可以 走 自己在 程序里指定 的方式, 比如官方所示這樣

      """run.py:"""
      #!/usr/bin/env python
      import os
      import torch
      import torch.distributed as dist
      import torch.multiprocessing as mp
      
      def run(rank, size):
          """ Distributed function to be implemented later. """
          pass
      
      def init_process(rank, size, fn, backend='gloo'):
          """ Initialize the distributed environment. """
          os.environ['MASTER_ADDR'] = '127.0.0.1'
          os.environ['MASTER_PORT'] = '29500'
          dist.init_process_group(backend, rank=rank, world_size=size)
          fn(rank, size)
      
      if __name__ == "__main__":
          size = 2
          processes = []
          mp.set_start_method("spawn")
          for rank in range(size):
              p = mp.Process(target=init_process, args=(rank, size, run))
              p.start()
              processes.append(p)
      
          for p in processes:
              p.join()
      
  2. 因為每個進程都會初始化一份模型,為保證模型初始化過程中生成的隨機權重相同,需要設置隨機種子。方法如下:

    def set_seed(seed):
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    
  3. train_set給一下DistributedSampler

    # Data
    train_set = PRE_Data(root=config.train_data, config=config, data_use='train')
    val_set = PRE_Data(root=config.val_data, config=config, data_use='val')
    
    # 多GPU訓練
    train_sampler = DistributedSampler(train_set)
    val_sampler = DistributedSampler(val_set)
    
    dataloader_train = DataLoader(train_set, batch_size=args.batch_size, sampler=train_sampler, num_workers=8, pin_memory=True)
    dataloader_val = DataLoader(val_set, batch_size=args.batch_size,  sampler=val_sampler, num_workers=4, pin_memory=True)
    
  4. 保存和eval都只需要進行一次即可 → wandb記錄和打印等等都只需要一次哈

    if epoch % args.val_every == 0 and args.local_rank == 0: 
        trainer.validate(model, dataloader_val, config)
        if epoch % args.save_every == 0:
            trainer.save(model, optimizer)
    
    # 官方保存
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
    

    保存也只需要一次是因為(注釋也有):所有進程都應該看到相同的參數,因為它們都從相同的隨機參數開始,並且梯度在反向傳遞中是同步的。 因此,將其保存在一個進程中就足夠了。

  5. 保存模型時應注意只需要保存一次,而且必須在GPU上,cpu會有問題 見后問題欄有提到

    torch.save(model.module.state_dict(), os.path.join(self.logdir, 'best_model.pth'))
    

    后續導入的時候 一定要注意 1. map到cpu上 2. 映射一下所有layer的東西,見后問題欄也有提示

    state_dict = torch.load(os.path.join(self.config_path.model_path, 'best_model.pth'), map_location=torch.device('cpu'))
    pretrained_dict = {key.replace("module.", ""): value for key, value in state_dict.items()}
    self.net.load_state_dict(pretrained_dict)
    
  6. 為保證所有gpu分配均勻顯存,請早模型前執行,詳情見問題欄

    # 就是這兩句話
    torch.cuda.set_device(args.local_rank)
    torch.cuda.empty_cache()
    # 就是這兩句話
    
    # Model
    model = TransFuser(config, args.device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)
    

剩余問題 見后續問題部分,請提前進行查看,了解相關會遇到的問題

效果對比

多GPU 速度

相同數據量,兩塊2080Ti和一塊2080Ti的對比,使用 DistributedDataParallel 最終效果速度對比:

DP 和 DDP 的使用率對比

DP弊端還有就是... GPU利用率很低,不知道是不是因為樣本拆成分配的問題,利用率直接掉半,如下圖對比所示 同樣的數據集和訓練 DDP 和 DP(隨着GPU數量越多 利用率越低) 的使用率對比

❓ 3. 遇到的問題及解決

DistributedDataParallel


  1. RuntimeError: NCCL communicator was aborted

    這個問題是我隨機遇到的同樣模型下 小數據集沒啥報錯,但是全的時候發現emmm 挺隨機的;相關pytorch 討論區鏈接如下:

    RuntimeError: NCCL communicator was aborted

    好像找到原因了,不應該在保存是.cpu 模型文件 因為即使.to 回去 也是會報錯的,所以直接保存args.local_rank為0的那個模型就行,因為官方里面也是這么干的...

    所有進程都應該看到相同的參數,因為它們都從相同的隨機參數開始,並且梯度在反向傳遞中是同步的。 因此,將其保存在一個進程中就足夠了。

  2. Pytorch distributed RuntimeError: Address already in use

    因為緊急kill掉了 留下了后患,雖然通過命令行kill了所屬pid 但是好像還是占着那個默認口,比如通過htop里面的命令行進行選擇所有 kill

    kill -9 $(pgrep -f "/opt/conda/envs/python37/bin/python -u mmfn" | xargs echo)
    

    然后好像口還是沒能釋放 換一下端口號就行:

    python -m torch.distributed.launch --nproc_per_node=4 --master_port 12120 --nnodes=1 mmfn/phase2_train_multipgpu.py
    
  3. 注意 使用了DDP 不知道是需要多勻一些顯存的,比如一下,用了一塊3090,顯存是24G,正常單獨GPU訓練時 batch_size設置了64,占用了20G顯存,那么到DDP這個方案的時候,雖然是多進程運行按理來說應該也設置64,畢竟其他也是3090嘛 但是呢!實際運行的時候 發現第一塊需要占用更多的東西,如果GPU越多 他越需要占用(2G/塊) → 但是我看華哥的好像... 沒有這種現象產生 估計哪里我沒注意到

    • 現場實況:

    傑哥太強了!啊!找到原因啦!他喵... 竟然要在model前說明一下

    Extra 10GB memory on GPU 0 in DDP tutorial

    # 就是這兩句話
    torch.cuda.set_device(args.local_rank)
    torch.cuda.empty_cache()
    # 就是這兩句話
    
    # Model
    model = TransFuser(config, args.device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)
    
  4. 上面這點 還有可能是需要加載模型再進行下一個模型的訓練,那么也有可能導致GPU顯存分布不均勻

    需要提前load的時候專門map_location

    state_dict = torch.load(model_name, map_location=self.args.device)
    self.load_state_dict(state_dict)
    

    相關鏈接:DDP taking up too much memory on rank 0

  5. 如果resume的話,需要重新組織一下讀取的layer id,相關鏈接:

    https://github.com/bearpaw/pytorch-classification/issues/27

    Missing keys & unexpected keys in state_dict when loading self trained model

    # for DDP model load use
    state_dict = torch.load(os.path.join(args.logdir, 'best_model.pth'))
    optimizer.load_state_dict(torch.load(os.path.join(args.logdir, 'best_optim.pth')))
    from collections import OrderedDict
    new_state_dict = OrderedDict()
    
    for k, v in state_dict.items():
        if 'module' not in k:
            k = 'module.'+k
        else:
            k = k.replace('features.module.', 'module.features.')
        new_state_dict[k]=v
    
    model.load_state_dict(new_state_dict)
    

DataParallel


這個主要是注意那個batch_size 是分配的,也就是會除GPU個數,如果你的處理里有涉及到這樣的情況,則會出現相關數據的size對不上

比如在dataloader里 進行了 對batch里的數據取最大,作為一個長度;然后再到模型的forward里繼續處理一次數據 作為長度,那么;四個分散的batch就會有四個不同的size長度,也就會發生一定的問題,前方實況:

建議措施,所有對數據size的確定在loader部分進行完成


以下為一開始調研情況

相關原因分析

  1. GPU利用率低:https://zhuanlan.zhihu.com/p/410244780

  2. https://github.com/jia-zhuang/pytorch-multi-gpu-training

​ 附加:cpu htop參數相關解釋:Understanding and using htop to monitor system resources

這里也提到了loader 一個個小的太小了 最好處理成大的再讀取:https://github.com/Lyken17/Efficient-PyTorch#data-loader

pkl

已經寫了但是直接在loader出來的 也就是batch_size 從這兒就定下來了 → 錯誤做法

lmdb:https://github.com/dotchen/WorldOnRails/blob/release/docs/DATASET.md

看一下是否能攜程lmdb式,loader式有點過於暴力了


贈人點贊 手有余香 😆;正向回饋 才能更好開放記錄 hhh


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM