Hadoop RPC源碼分析


Hadoop RPC源碼分析

上一篇文章http://www.cnblogs.com/dycg/p/rpc.html

講了Hadoop RPC的使用方法,這一次我們從demo中一層層進行分析。

 

RPC說白了,就3個核心,交互協議、服務端、客戶端。

在Hadoop RPC(hadoop-common-2.4.jar)中也是這樣

 

交互協議

org.apache.hadoop.ipc.VersionedProtocol ,所有協議的父類

其實就2個方法,版本與簽名。不同版本與簽名的協議,就算同一個類名也無法通信。

 

服務端:

RPC.Server 處理客戶端的連接請求,並處理相關業務,最后返回結果

 

客戶端:

Client,封裝請求數據,並接收Response

好,正式開始分析源碼吧。

協議部分,我就不說了,就是實現VersionedProtocol接口並添加一些業務方法即可。

 

我們從客戶端程序入口點開始分析,先看看客戶端是如何取得協議對象的。

想要與服務端通信就先要得到協議對象,RPC.getProxy就是得到協議對象的方法,沿着代碼進入最底層,你會發現,它默認先得到一個RpcEngine(默認實現是WritableRpcEngine),它是什么呢?簡單點說就是,它相當於我們啟動服務器,獲取協議的類。有了WritableRpcEngine后,調用它的getProxy方法,得到我們的協議代理對象(采用java的動態代理機制實現),對應我們的例子就是ClientProtocol的代理對象。

最關鍵就是這個Invoker對象,我們調用ClientProtocol.echo()方法時候,會先觸發這個Invoker.invoke()方法。

Invoker對象如何構造的呢

其實就創建了2個成員變量:

ConnectionID:

保存目標地址(remoteAddress,protocol)和用戶ticket,這三者可以唯一確定一個Connection

Client:

主要完成的功能是發送遠程調用信息並接收返回結果。圖中的factory,是SocketFactory

 

接着,當我們調用ClientProtocol.echo()方法的時候,觸發Invoker.invoke,讓我們看看這一步又干了什么事

封裝一個Invocation對象,這個對象持有目標方法和參數。

進入client.call()方法看看

這下切入正題了

  1. 首先創建一個Call對象,封裝了RPC請求,成員變量有唯一標識id、請求數據、返回數據、是否完成等
  2. 創建Connection對象(它是個線程),並與服務器連接,即Client與Server之間的一個通信連接。保存未完成的Call對象至哈希表,唯一標識ID,Server通信的Socket,網絡輸入輸出流。
  3. 調用connection.sendRpcRequest(call);將Call對象發送給Server
  4. 等待Server端處理Call請求。服務端處理完成后,通過網絡返回給Client端。這部分代碼不在call方法里,還記得1中Connection是個線程嗎?去run方法看看

    線程一直循環,直到Server返回結果,然后調用receiveRpcResponse方法返回數據。

  5. 再次回到call方法,它也有個循環,一直在等待結果返回。結果返回后,檢查下成功失敗后,就將Call從哈希表中移除了。

經歷了上面5步,String result = proxy.echo("123"); 的結果是result = hello 123

 

最后,我們再看看服務端是怎么工作的。

如何啟動服務?

啟動服務器很簡單,通過RPC.Builder().build()構造Server后就能start啟動了。我們進入build()方法內部看看。

return這里,想想看,它其實就是調用WritableRpcEngine.getServer()方法。

看看代碼非常多,其實最關鍵的就一個Server.java Line 2176行

responder = new Responder();

仔細看這個類,它作用是啟動一個線程,從reponseQueue中一個個處理要返回給客戶端的數據,有些數據可能比較大,一次無法完全返回,則將剩下的數據重新加入隊列等待下一次返回。

再進入Server.start()方法看看:

簡單明了。 Responder就是剛剛創建的用於返回數據給客戶端的線程,啟動它。

Listener是什么?

繼續看代碼,用了JAVA NIO, 它是負責監聽客戶端連接請求的,它內部又有

private Reader[] readers = null;

每個Reader是一個線程,負責讀取連接請求發來的數據,也用了NIO。

那它把數據讀來放哪?

processRpcRequest()跟蹤到這個方法,發現它把讀取完成的數據創建到一個新的Call對象,然后放入callQueue

那什么時候處理呢?

別急,上上圖還有個Handler還沒看呢。

它也是個線程,啟動了N個。一直在循環處理callQueue中的call,如果隊列中沒call就block waiting。

讀到Call后,依然是調用call方法,一層層進去看,最后還是回到了WritableRpcEngine.call() Line 417行,

Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters());

得到結果后,就開始返回給Client了,如果沒發一次性全部返回,剩下部分就交給Reponder線程去完成。

 

至此,整體流程全部完成。 我們來個全家福。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM