spark 源碼分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析


RpcEndpoint

文檔對RpcEndpoint的解釋:
An end point for the RPC that defines what functions to trigger given a message. It is guaranteed that onStart, receive and onStop will be called in sequence. The life-cycle of an endpoint is: constructor -> onStart -> receive* -> onStop Note: receive can be called concurrently. If you want receive to be thread-safe, please use ThreadSafeRpcEndpoint If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.

其子類繼承關系如下:

其下面還有一個抽象子接口:ThreadSafeRpcEndpoint

文檔對ThreadSafeRpcEndpoint的解釋如下:
需要RpcEnv線程安全地向其發送消息的trait。線程安全意味着在通過相同的ThreadSafeRpcEndpoint處理一條消息完成后再處理下一個消息。換句話說,在處理下一條消息時,可以看到對ThreadSafeRpcEndpoint的內部字段的更改,並且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保證同一個線程將為不同的消息執行相同的ThreadSafeRpcEndpoint。
即順序處理消息,不能同時並發處理。traint RpcEndpoint的方法如下:

對其變量和方法解釋如下:

1. rpcEnv:RpcEndpoint 注冊的那個 RpcEnv 對象

2. self : RpcEndpoint 對應的 RpcEndpointRef。onStart 方法被調用的時候,RpcEndpointRef有效,onStop 調用后,self會是null,注意由於在onStart之前,RpcEndpoint 還沒有被注冊,還沒有有效的RpcEndpointRef,所以不要在onStart之前調用 self 方法

3. receive :處理從RpcEndpointRef.send 或 RpcCallContext.reply 過來的消息,如果接收到一個未匹配的消息,會拋出 SparkException 並且發送給onError 方法

4. receiveAndReply:處理從RpcEndpointRef.ask發過來的消息,如果接收到一個未匹配的消息,會拋出 SparkException 並且發送給onError 方法

5. onError: 在消息處理過程中,如果有異常都會調用此方法

6. onConnected:當remoteAddress 連接上當前節點時被調用

7. onDisconnected: 當當前節點丟失掉 remoteAddress 后被調用

8. onNetworkError:當連接當前節點和remoteAddress時,有網絡錯誤發生時被調用

9. onStart:在RpcEndpoint開始處理其他消息之前被調用

10. onStop:當RpcEndpoint停止時被調用,self 將會是null,不能用於發送消息

11. stop: 停止RpcEndpoint

RpcEndPointRef

RpcEndPointRef:遠程的RpcEndpoint引用,RpcEndpointRef是線程安全的。

有一個跟RpcEndPoint 很像的類 -- RpcEndPointRef。先來看 RpcEndpointRef抽象類。下面我們重點來看一下它內部構造。

首先看它的繼承結構:

它的父類是 RpcEndpointRef。先來剖析它的內部變量和方法的解釋:

有三個成員變量:

1. maxRetries: 最大嘗試連接次數。可以通過 spark.rpc.numRetries 參數來指定,默認是 3 次。 該變量暫時沒有使用。

2. retryWaitMs:每次嘗試連接最大等待毫秒值。可以通過 spark.rpc.retry.wait 參數來指定,默認是 3s。該變量暫時沒有使用。

3. defaultAskTimeout: spark 默認 ask 請求操作超時時間。 可以通過 spark.rpc.askTimeout 或 spark.network.timeout參數來指定,默認是120s。

成員方法:

1. address : 抽象方法,返回 RpcEndpointRef的RpcAddress

2. name:抽象方法,返回 endpoint 的name

3. send: 抽象方法,Sends a one-way asynchronous message. Fire-and-forget semantics. 發送單向的異步消息,滿足 即發即忘 語義。

4. ask:抽象方法。發送消息到相應的 RpcEndpoint.receiveAndReply , 並返回 Future 以在默認超時內接收返回值。它有兩個重載方法:其中沒有RpcTimeOut 的ask方法添加一個 defaultAskTimeout 參數繼續調用 有RpcTimeOut 的ask方法。

5. askSync:調用抽象方法ask。跟ask類似,有兩個重載方法:其中沒有RpcTimeOut 的askSync方法添加一個 defaultAskTimeout 參數繼續調用 有RpcTimeOut 的askSync方法。有RpcTimeOut 的askSync方法 會調用 ask 方法生成一個Future 對象,然后等待任務執行完畢后返回。
注意,這里面其實就涉及到了模板方法模式。ask跟askSync都是設定好了,ask 要返回一個Future 對象,askSync則是 調用 ask 返回的Future 對象,然后等待 future 的 result 方法返回。

 

下面看RpcEndpointRef 的唯一實現類 - NettyRpcEndpointRef

RpcEndpointRef的NettyRpcEnv版本。此類的行為取決於它的創建位置。在“擁有”RpcEndpoint的節點上,它是RpcEndpointAddress實例的簡單包裝器。在接收序列化版本引用的其他計算機上,行為會發生變化。實例將跟蹤發送引用的TransportClient,以便通過客戶端連接發送到端點的消息,而不需要打開新連接。此ref的RpcAddress可以為null;這意味着ref只能通過客戶端連接使用,因為托管端點的進程不會偵聽傳入連接。不應與第三方共享這些引用,因為它們將無法向端點發送消息。

先來看 成員變量:

1. conf : 是一個SparkConf 實例

2. endpointAddress:是一個RpcEndpointAddress 實例,主要包含了 RpcAddress (host和port) 和 rpc endpoint name的信息

3. nettyEnv:是一個NettyRpcEnv實例

4. client: 是一個TransportClient實例,這個client 是不參與序列化的。

成員方法:

1. 實現並重寫了繼承自超類的ask方法, 如下:

 

2. 實現並重寫了繼承自超類的send方法,如下:

3. 關於序列化和反序列化的兩個方法:writeObject(序列化方法)和 readObject(反序列化方法),如下:

RequestMessage

順便,我們來看RequestMessage對象,代碼如下:

RequestMessage里面的消息是sender 發給 receiver 的,RequestMessage主要負責sender RpcAddress, receiver RpcAddress,receiver rpcendpoint name以及 消息 content 的序列化。

總結: 本文主要剖析了 RpcEndpoint和RpcEntpointRef兩個類,順便,也介紹了支持序列化的 RequestMessage 類。

注:到目前為止,Spark RPC組件還沒有全部剖析完畢,預計還有三到四篇文章才能完全剖析完, be patient 😊

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM