前段時間看spark,看着迷迷糊糊的。最近終於有點頭緒,先梳理了一下spark rpc相關的東西,先記錄下來。
1,概述
個人認為,如果把分布式系統(HDFS, HBASE,SPARK等)比作一個人,那么RPC可以認為是人體的血液循環系統。它將系統中各個不同的組件(如Hbase中的master, Regionserver, client)聯系了起來。同樣,在spark中,不同組件像driver,executor,worker,master(stanalone模式)之間的通信也是基於RPC來實現的。
Spark 1.6之前,spark的RPC是基於Akaa來實現的。Akka是一個基於scala語言的異步的消息框架。Spark1.6后,spark借鑒Akka的設計自己實現了一個基於Netty的rpc框架。大概的原因是1.6之前,RPC通過Akka來實現,而大文件是基於netty來實現的,加之akka版本兼容性問題,所以1.6之后把Akka改掉了,具體jira見(https://issues.apache.org/jira/browse/SPARK-5293)。
本文主要對spark1.6之后基於netty新開發的rpc框架做一個較為深入的分析。
2,整體架構
spark 基於netty新的rpc框架借鑒了Akka的中的設計,它是基於Actor模型,各個組件可以認為是一個個獨立的實體,各個實體之間通過消息來進行通信。具體各個組件之間的關系圖如下(圖片來自[1]):
2.1 RpcEndpoint
表示一個個需要通信的個體(如master,worker,driver),主要根據接收的消息來進行對應的處理。一個RpcEndpoint經歷的過程依次是:構建->onStart→receive→onStop。其中onStart在接收任務消息前調用,receive和receiveAndReply分別用來接收另一個RpcEndpoint(也可以是本身)send和ask過來的消息。
2.2 RpcEndpointRef
RpcEndpointRef是對遠程RpcEndpoint的一個引用。當我們需要向一個具體的RpcEndpoint發送消息時,一般我們需要獲取到該RpcEndpoint的引用,然后通過該應用發送消息。
2.3 RpcAddress
表示遠程的RpcEndpointRef的地址,Host + Port。
2.4 RpcEnv
RpcEnv為RpcEndpoint提供處理消息的環境。RpcEnv負責RpcEndpoint整個生命周期的管理,包括:注冊endpoint,endpoint之間消息的路由,以及停止endpoint。
3,實現
Rpc實現相關類之間的關系圖如下(圖片來自[2]):
核心要點如下:
1,核心的RpcEnv是一個特質(trait),它主要提供了停止,注冊,獲取endpoint等方法的定義,而NettyRpcEnv提供了該特質的一個具體實現。
2,通過工廠RpcEnvFactory來產生一個RpcEnv,而NettyRpcEnvFactory用來生成NettyRpcEnv的一個對象。
3,當我們調用RpcEnv中的setupEndpoint來注冊一個endpoint到rpcEnv的時候,在NettyRpcEnv內部,會將該endpoint的名稱與其本省的映射關系,rpcEndpoint與rpcEndpointRef之間映射關系保存在dispatcher對應的成員變量中。
接下來,我們看一個具體的案例:在standalone模式中,worker會定時發心跳消息(SendHeartbeat)給master,那心跳消息是怎么從worker發送到master的呢,master又是怎么接收消息的?
1,在worker中,forwordMessageScheduler線程會定時每隔心跳超時時間的四分之一時間向自己發送SendHeartbeat消息,在worker的receive函數中,我們看到一旦接收到SendHeartbeat消息,worker會調用sendToMaster函數,將Heartbeat消息(包含worker Id和當前worker的rpcEndpoint引用)發送給master。
2,在worker的sendToMaster函數中,通過masterRef.send(message)將消息發送出去。那這個調用背后又做了什么事情呢?NettryRpcEnv中send的實現如下:
可以看到,當前發送地址(nettyEnv.address),目標的master地址(this)和發送的消息(SendHeartbeat)被封裝成了RequestMessage消息,如果是遠程rpc調用的話,最終send將調用postToOutbox函數,並且此時消息會被序列化成Byte流。
3,在postToOutbox函數中,消息將經過OutboxMessage中的sendWith方法(client.send(content)),最終通過TransportClient的send方法(client.send(content)),而在TransportClient中將消息進一步封裝,然后發送給master。
4, 在master端TransportRequestHandler的handle方法中,由於心跳信息在worker端被分裝成了OneWayMessage,所以在該handle方法中,將調用processOneWayMessage進行處理。
5,processOneWayMessage函數將調用rpcHandler的實現類NettyRpcEnv中的receive方法。在該方法中,首先通過internalRecieve將消息解包成RequestMessage。然后該消息通過dispatcher的分發給對應的endpoint。
6,那消息是怎么分發的呢?在Dispatcher的postMessage方法中,可以看到,首先根據對應的endpoint的EndpointData信息(主要是該endpoint及其應用以及其信箱(inbox)),然后將消息塞到給endpoint(此例中的master)的信箱中,最后將消息塞到recievers的阻塞隊列中。
7,那隊列中的消息是怎么被消費的呢?在Dispatcher中有一個線程池threadpool在MessageLoop類的run方法中,將receivers中的對象取出來,交由信箱的process方法去處理。如果沒有收到任何消息,將會阻塞在take處。
8,在inbox的proces方法中,首先取出消息,然后根據消息的類型(此例中是oneWayMessage),最終將調用endpoint的receiver方法進行處理(也就是master中的receive方法)。至此,整個一次rpc調用的流程結束。
4,小結
本文主要對rpc的歷史,初始實現思想以及一次rpc的具體流程做了一個較為深入的分析。此外,對spark rpc實現涉及的一部分類也做了一個概括性說明。這也是一個開始,解下來,會陸續對spark的一些內部原理做較為深入的分析。
[1] https://wongxingjun.github.io/2016/12/08/Spark-RPC%E8%A7%A3%E8%AF%BB/
[2] http://shiyanjun.cn/archives/1545.html