如何使用modelarts訓練海量數據


在modelarts上使用notebook上使用evs空間默認大小是5G,能滿足大部分文本和圖片訓練模型的需求。如果訓練數據稍微超過這個限額,可以適當的擴增下空間。但如果訓練對象是視頻,或是實際生成過程中的海量數據,這個空間就顯得小了,這時候擴增evs空間就顯得很不經濟了。

最近老山便碰到這樣的案例,客戶的訓練數據大約在1T的量級,在obs上存儲的數據結構大概如下圖所示。

your-obs-name
└── ...
   └── video
       ├── folder1
       │   ├── text.txt
       │   └── video.mp4
       ├── folder2
       │   ├── text.txt
       │   └── video.mp4
       ├── folder3
       │   ├── text.txt
       │   └── video.mp4
       ├── folder4
       │   ├── text.txt
       │   └── video.mp4
       ├── folder5
       │   ├── text.txt
       │   └── video.mp4
       ├── ...

雖然使用華為雲自帶的moxing模塊可以直接讀取obs的數據,但由於實質是通過http實時讀取數據,這個速度比從evs的ssd硬盤上讀取數據要慢得多。而解決方案也比較直接,在evs上開辟一個固定大小的空間作為緩存區,一方面不斷把obs數據讀入緩存區,如果緩存區滿了,就等待其騰出空間,另一方面訓練任務消費evs數據,當消費完后便刪除數據。

程序上也自然選用生產者-消費者模型。程序定義了管道類Pipeline,有生產者線程producer用於將obs數據保存到evs;同時輸出evs數據用於外部模型的消費。由於每個視頻文件都單獨放在一個文件夾下,所以程序的輸出對象也是這個文件夾在evs上保存的地址,如folder1,folder2等。至於讀取文件夾內部文件信息等消費工作,由用戶自行定義。

不多說,直接上代碼。

import moxing as mox
mox.file.shift('os', 'mox')
import os, shutil
from queue import Queue
from time import sleep
import threading
import logging
logging.basicConfig(level=logging.INFO,
                   format="%(asctime)s %(name)s %(levelname)s %(message)s",)

class ObsClient:
   def __init__(self, root):
       '''獲取obs路徑上需要讀取的文件夾的相關信息'''
       self.root = root
       self.directory = self.list_directory()
       self.maxSize = self.getMaxSize()

   def getMaxSize(self):
       '''最大的文件夾的大小'''
       return max([size for *_, size in self.directory])

   def list_directory(self):
       '''輸出用於訓練的文件夾的路徑,輸出directory:
       [(文件夾相對路徑,文件夾絕對路徑,文件夾大小), ...]
       '''
       directory = []
       folders = mox.file.list_directory(self.root)
       for folder in folders:
           folderPath = os.path.join(self.root, folder)
           if mox.file.is_directory(folderPath):
               size = self.get_size(folderPath)
               directory.append((folder, folderPath, size))
       return directory

   def get_size(self, path):
       '''獲取文件(夾)的大小'''
       if mox.file.is_directory(path):
           return self.get_size_folder(path)
       return self.get_size_file(path)

   def get_size_file(self, path):
       '''獲取文件的大小'''
       return mox.file.get_size(path)

   def get_size_folder(self, path):
       '''獲取文件夾的大小'''
       size = 0
       for filename in mox.file.list_directory(path, recursive=True):
           filepath = os.path.join(path, filename)
           if not mox.file.is_directory(filepath):
               size+= self.get_size_file(filepath)
       return size
   
class EvsClient:
   def __init__(self, root, memory, queue, directory, interval = 0.1):
       self.root = root # evs緩存區根目錄
       self.directory = directory # obs文件夾信息
       self.size = 0 # evs緩存區已使用的空間
       self.memory = memory # evs上用於緩存的空間大小
       self.queue = queue # 隊列,存儲了evs緩存區文件夾的信息
       self.interval = interval # 如果緩存區滿后,查詢緩存大小的間隔時間

   def remove(self, folder, size):
       '''刪除evs文件夾,在文件夾被消費后調用'''
       logging.info(f"consumer: start removing folder {folder} with size {size}|{self.size}")
       shutil.rmtree(folder, True)
       self.size -= size
       logging.info(f"consumer: end removing folder {folder} with size -{size}|{self.size}")
   
   def work(self):
       '''生成者主程序,用於從obs中copy文件夾到evs'''
       for relObsFolder, absObsFolder, size in self.directory:
           while True:
               # 緩存區沒滿,就copy文件
               if not self.waitOrDo(size):
                   self.copy(relObsFolder, absObsFolder, size)
                   break
               # 如果緩存區滿了,就等待
               sleep(self.interval)
       # 當所有文件都拷貝后,置入結束符(None, None)
       self.queue.put((None, None))
               
   def waitOrDo(self, size):
       '''返回True時等待,返回False時工作'''
       return self.size + size > self.memory

   def copy(self, relObsFolder, absObsFolder, size):
       '''從obs中copy文件夾到evs'''
       evsFolder = os.path.join(self.root, relObsFolder)
       logging.info(f"producer: start copying folder {relObsFolder} with size {size}|{self.size}")
       mox.file.copy_parallel(absObsFolder, evsFolder)
       self.queue.put((evsFolder, size))
       self.size += size
       logging.info(f"producer: end copying folder {relObsFolder} with size +{size}|{self.size}")

class Pipeline:
   def __init__(self, evsRoot, obsRoot, memory = '1g', timeout = 300, interval = 0.1):
       self.memory = self.rescript(memory) # evs上用於緩存的空間大小
       self.timeout = timeout # 消費者獲取evs緩存區文件夾的最長等待時間
       self.queue = Queue() # 隊列,存儲了evs緩存區文件夾的信息
       self.obsClient = ObsClient(obsRoot) # 存儲obs上的文件夾信息
       # evs上的操作
       self.evsClient = EvsClient(evsRoot, self.memory, self.queue, self.obsClient.directory, interval)
       self.checkMemory() # 驗證evs上用於緩存的空間大小是否足夠大       

   def checkMemory(self):
       '''evs上用於緩存的空間大小不能小於obs上最大文件夾大小'''
       if self.memory<self.obsClient.maxSize:
           raise Exception("memory should bigger than maxFolderSize!")

   def rescript(self, memory):
       '''將文本或數值類型的memory轉寫成數值'''
       try:
           if isinstance(memory, str):
               if memory[-1].lower()=='g':
                   return int(float(memory[:-1])*1024*1024*1024)
               elif memory[-1].lower()=='m':
                   return int(float(memory[:-1])*1024*1024)
               elif memory[-1].lower()=='k':
                   return int(float(memory[:-1])*1024)
               else:
                   return int(float(memory))
           else:
               return int(float(memory))
       except:
           raise Exception("Error when rescripting memory!")

   def __iter__(self):
       '''生成器,yield輸出evs文件夾路徑和大小'''
       # 生產者線程
       producer = threading.Thread(target = self.evsClient.work)
       producer.start()
       # 主程序提供生成器用於消費,輸出evs文件夾路徑和大小
       while True:
           logging.info(f"consumer: start to get the queue")
           path, size = self.queue.get(timeout=self.timeout)
           logging.info(f"consumer: get the queue {path}, {size} ")
           if path is None and size is None:
               break
           yield path, size
           self.evsClient.remove(path, size)
       # 主程序等待
       producer.join()

if __name__ == '__main__':
   # 使用示例
   for path, size in Pipeline('./video', 's3://your-obs-name/.../video'):
       do_job(path, size)

如果你覺得老山的文章不錯,不妨點擊下關注。

作者::山找海味


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM