高性能分布式執行框架——Ray


Ray是UC Berkeley RISELab新推出的高性能分布式執行框架,它使用了和傳統分布式計算系統不一樣的架構和對分布式計算的抽象方式,具有比Spark更優異的計算性能。

Ray目前還處於實驗室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應用的分布式計算框架,但是它的架構具有通用的分布式計算抽象。本文對Ray進行簡單的介紹,幫助大家更快地了解Ray是什么,如有描述不當的地方,歡迎不吝指正。

一、簡單開始

首先來看一下最簡單的Ray程序是如何編寫的。

# 導入ray,並初始化執行環境
import ray
ray.init()

# 定義ray remote函數
@ray.remote
def hello():
    return "Hello world !"

# 異步執行remote函數,返回結果id
object_id = hello.remote()

# 同步獲取計算結果
hello = ray.get(object_id)

# 輸出計算結果
print hello

在Ray里,通過Python注解@ray.remote定義remote函數。使用此注解聲明的函數都會自帶一個默認的方法remote,通過此方法發起的函數調用都是以提交分布式任務的方式異步執行的,函數的返回值是一個對象id,使用ray.get內置操作可以同步獲取該id對應的對象。熟悉Java里的Future機制的話對此應該並不陌生,或許會有人疑惑這和普通的異步函數調用沒什么大的區別,但是這里最大的差異是,函數hello是分布式異步執行的。

remote函數是Ray分布式計算抽象中的核心概念,通過它開發者擁有了動態定制計算依賴(任務DAG)的能力。比如:

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

例子代碼中,對函數A、B的調用是完全並行執行的,但是對函數C的調用依賴於A、B函數的返回結果。Ray可以保證函數C需要等待A、B函數的結果真正計算出來后才會執行。如果將函數A、B、C類比為DAG的節點的話,那么DAG的邊就是函數C參數對函數A、B計算結果的依賴,自由的函數調用方式允許Ray可以自由地定制DAG的結構和計算依賴關系。另外,提及一點的是Python的函數可以定義函數具有多個返回值,這也使得Python的函數更天然具備了DAG節點多入和多出的特點。

二、系統架構

Ray是使用什么樣的架構對分布式計算做出如上抽象的呢,一下給出了Ray的系統架構(來自Ray論文,參考文獻1)。

作為分布式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全局協調和狀態維護,Slave執行分布式計算任務。不過和傳統的分布式計算系統不同的是,Ray使用了混合任務調度的思路。在集群部署模式下,Ray啟動了以下關鍵組件:

  1. GlobalScheduler:Master上啟動了一個全局調度器,用於接收本地調度器提交的任務,並將任務分發給合適的本地任務調度器執行。
  2. RedisServer:Master上啟動了一到多個RedisServer用於保存分布式任務的狀態信息(ControlState),包括對象機器的映射、任務描述、任務debug信息等。
  3. LocalScheduler:每個Slave上啟動了一個本地調度器,用於提交任務到全局調度器,以及分配任務給當前機器的Worker進程。
  4. Worker:每個Slave上可以啟動多個Worker進程執行分布式任務,並將計算結果存儲到ObjectStore。
  5. ObjectStore:每個Slave上啟動了一個ObjectStore存儲只讀數據對象,Worker可以通過共享內存的方式訪問這些對象數據,這樣可以有效地減少內存拷貝和對象序列化成本。ObjectStore底層由Apache Arrow實現。
  6. Plasma:每個Slave上的ObjectStore都由一個名為Plasma的對象管理器進行管理,它可以在Worker訪問本地ObjectStore上不存在的遠程數據對象時,主動拉取其它Slave上的對象數據到當前機器。

需要說明的是,Ray的論文中提及,全局調度器可以啟動一到多個,而目前Ray的實現文檔里討論的內容都是基於一個全局調度器的情況。我猜測可能是Ray尚在建設中,一些機制還未完善,后續讀者可以留意此處的細節變化。

Ray的任務也是通過類似Spark中Driver的概念的方式進行提交的,有所不同的是:

  1. Spark的Driver提交的是任務DAG,一旦提交則不可更改。
  2. 而Ray提交的是更細粒度的remote function,任務DAG依賴關系由函數依賴關系自由定制。

論文給出的架構圖里並未畫出Driver的概念,因此我在其基礎上做了一些修改和擴充。

Ray的Driver節點和和Slave節點啟動的組件幾乎相同,不過卻有以下區別:

  1. Driver上的工作進程DriverProcess一般只有一個,即用戶啟動的PythonShell。Slave可以根據需要創建多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全局調度器分配的任務。Slave可以提交任務,也可以接收全局調度器分配的任務。
  3. Driver可以主動繞過全局調度器給Slave發送Actor調用任務(此處設計是否合理尚不討論)。Slave只能接收全局調度器分配的計算任務。

三、核心操作

基於以上架構,我們簡單討論一下Ray中關鍵的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地啟動ray,包括Driver、HeadNode(Master)和若干Slave。

import ray
ray.init()

如果是直連已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="<redis-address>")

本地啟動Ray得到的輸出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>> 

本地啟動Ray時,可以看到Ray的WebUI的訪問地址。

2. ray.put()

使用ray.put()可以將Python對象存入本地ObjectStore,並且異步返回一個唯一的ObjectID。通過該ID,Ray可以訪問集群中任一個節點上的對象(遠程對象通過查閱Master的對象表獲得)。

對象一旦存入ObjectStore便不可更改,Ray的remote函數可以將直接將該對象的ID作為參數傳入。使用ObjectID作為remote函數參數,可以有效地減少函數參數的寫ObjectStore的次數。

@ray.remote
def f(x):
    pass

x = "hello"

# 對象x往ObjectStore拷貝里10次
[f.remote(x) for _ in range(10)]

# 對象x僅往ObjectStore拷貝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通過ObjectID獲取ObjectStore內的對象並將之轉換為Python對象。對於數組類型的對象,Ray使用共享內存機制減少數據的拷貝成本。而對於其它對象則需要將數據從ObjectStore拷貝到進程的堆內存中。

如果調用ray.get()操作時,對象尚未創建好,則get操作會阻塞,直到對象創建完成后返回。get操作的關鍵流程如下:

  1. Driver或者Worker進程首先到ObjectStore內請求ObjectID對應的對象數據。
  2. 如果本地ObjectStore沒有對應的對象數據,本地對象管理器Plasma會檢查Master上的對象表查看對象是否存儲其它節點的ObjectStore。
  3. 如果對象數據在其它節點的ObjectStore內,Plasma會發送網絡請求將對象數據拉到本地ObjectStore。
  4. 如果對象數據還沒有創建好,Master會在對象創建完成后通知請求的Plasma讀取。
  5. 如果對象數據已經被所有的ObjectStore移除(被LRU策略刪除),本地調度器會根據任務血緣關系執行對象的重新創建工作。
  6. 一旦對象數據在本地ObjectStore可用,Driver或者Worker進程會通過共享內存的方式直接將對象內存區域映射到自己的進程地址空間中,並反序列化為Python對象。

另外,ray.get()可以一次性讀取多個對象的數據:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用注解@ray.remote可以聲明一個remote function。remote函數時Ray的基本任務調度單元,remote函數定義后會立即被序列化存儲到RedisServer中,並且分配了一個唯一的ID,這樣就保證了集群的所有節點都可以看到這個函數的定義。

不過,這樣對remote函數定義有了一個潛在的要求,即remote函數內如果調用了其它的用戶函數,則必須提前定義,否則remote函數無法找到對應的函數定義內容。

remote函數內也可以調用其它的remote函數,Driver和Slave每次調用remote函數時,其實都是向集群提交了一個計算任務,從這里也可以看到Ray的分布式計算的自由性。

Ray中調用remote函數的關鍵流程如下:

  1. 調用remote函數時,首先會創建一個任務對象,它包含了函數的ID、參數的ID或者值(Python的基本對象直接傳值,復雜對象會先通過ray.put()操作存入ObjectStore然后返回ObjectID)、函數返回值對象的ID。
  2. 任務對象被發送到本地調度器。
  3. 本地調度器決定任務對象是在本地調度還是發送給全局調度器。如果任務對象的依賴(參數)在本地的ObejctStore已經存在且本地的CPU和GPU計算資源充足,那么本地調度器將任務分配給本地的WorkerProcess執行。否則,任務對象被發送給全局調度器並存儲到任務表(TaskTable)中,全局調度器根據當前的任務狀態信息決定將任務發給集群中的某一個本地調度器。
  4. 本地調度器收到任務對象后(來自本地的任務或者全局調度分配的任務),會將其放入一個任務隊列中,等待計算資源和本地依賴滿足后分配給WorkerProcess執行。
  5. Worker收到任務對象后執行該任務,並將函數返回值存入ObjectStore,並更新Master的對象表(ObjectTable)信息。

@ray.remote注解有一個參數num_return_vals用於聲明remote函數的返回值個數,基於此實現remote函數的多返回值機制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2

@ray.remote注解的另一個參數num_gpus可以為任務指定GPU的資源。使用內置函數ray.get_gpu_ids()可以獲取當前任務可以使用的GPU信息。

@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支持批量的任務等待,基於此可以實現一次性獲取多個ObjectID對應的數據。

# 啟動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,超時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個ObjectID,使用ray.wait操作可以一直等待有4個任務完成后返回,並將完成的數據對象放在第一個list類型返回值內,未完成的ObjectID放在第二個list返回值內。如果設置了超時時間,那么在超時時間結束后仍未等到預期的返回值個數,則已超時完成時的返回值為准。

6. ray.error_info()

使用ray.error_info()可以獲取任務執行時產生的錯誤信息。

>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函數只能處理無狀態的計算需求,有狀態的計算需求需要使用Ray的Actor實現。在Python的class定義前使用@ray.remote可以聲明Actor。

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

使用如下方式創建Actor對象。

a1 = Counter.remote()
a2 = Counter.remote()

Ray創建Actor的流程為:

  1. Master選取一個Slave,並將Actor創建任務分發給它的本地調度器。
  2. 創建Actor對象,並執行它的構造函數。

從流程可以看出,Actor對象的創建時並行的。

通過調用Actor對象的方法使用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

調用Actor對象的方法的流程為:

  1. 首先創建一個任務。
  2. 該任務被Driver直接分配到創建該Actor對應的本地執行器執行,這個操作繞開了全局調度器(Worker是否也可以使用Actor直接分配任務尚存疑問)。
  3. 返回Actor方法調用結果的ObjectID。

為了保證Actor狀態的一致性,對同一個Actor的方法調用是串行執行的。

四、安裝Ray

如果只是使用Ray,可以使用如下命令直接安裝。

pip intall ray

如果需要編譯Ray的最新源碼進行安裝,按照如下步驟進行(MaxOS):

# 更新編譯依賴包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下載源碼編譯安裝
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 測試
python test/runtest.py

# 安裝WebUI需要的庫[可選]
pip install jupyter ipywidgets bokeh

# 編譯Ray文檔[可選]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html

我在MacOS上安裝jupyter時,遇到了Python的setuptools庫無法升級的情況,原因是MacOS的安全性設置問題,可以使用如下方式解決:

  1. 重啟電腦,啟動時按住Command+R進入Mac保護模式。
  2. 打開命令行,輸入命令csrutils disable關閉系統安全策略。
  3. 重啟電腦,繼續安裝jupyter。
  4. 安裝完成后,重復如上的方式執行csrutils enable,再次重啟即可。

進入PythonShell,輸入代碼本地啟動Ray:

import ray
ray.init()

瀏覽器內打開WebUI界面如下:

參考資料

  1. Ray論文:Real-Time Machine Learning: The Missing Pieces
  2. Ray開發手冊:http://ray.readthedocs.io/en/latest/index.html
  3. Ray源代碼:https://github.com/ray-project/ray


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM