Hadoop RPC源碼分析
上一篇文章http://www.cnblogs.com/dycg/p/rpc.html
講了Hadoop RPC的使用方法,這一次我們從demo中一層層進行分析。
RPC說白了,就3個核心,交互協議、服務端、客戶端。
在Hadoop RPC(hadoop-common-2.4.jar)中也是這樣
交互協議
org.apache.hadoop.ipc.VersionedProtocol ,所有協議的父類
其實就2個方法,版本與簽名。不同版本與簽名的協議,就算同一個類名也無法通信。
服務端:
RPC.Server 處理客戶端的連接請求,並處理相關業務,最后返回結果
客戶端:
Client,封裝請求數據,並接收Response
好,正式開始分析源碼吧。
協議部分,我就不說了,就是實現VersionedProtocol接口並添加一些業務方法即可。
我們從客戶端程序入口點開始分析,先看看客戶端是如何取得協議對象的。
想要與服務端通信就先要得到協議對象,RPC.getProxy就是得到協議對象的方法,沿着代碼進入最底層,你會發現,它默認先得到一個RpcEngine(默認實現是WritableRpcEngine),它是什么呢?簡單點說就是,它相當於我們啟動服務器,獲取協議的類。有了WritableRpcEngine后,調用它的getProxy方法,得到我們的協議代理對象(采用java的動態代理機制實現),對應我們的例子就是ClientProtocol的代理對象。
最關鍵就是這個Invoker對象,我們調用ClientProtocol.echo()方法時候,會先觸發這個Invoker.invoke()方法。
Invoker對象如何構造的呢
其實就創建了2個成員變量:
ConnectionID:
保存目標地址(remoteAddress,protocol)和用戶ticket,這三者可以唯一確定一個Connection
Client:
主要完成的功能是發送遠程調用信息並接收返回結果。圖中的factory,是SocketFactory
接着,當我們調用ClientProtocol.echo()方法的時候,觸發Invoker.invoke,讓我們看看這一步又干了什么事
封裝一個Invocation對象,這個對象持有目標方法和參數。
進入client.call()方法看看
這下切入正題了
-
首先創建一個Call對象,封裝了RPC請求,成員變量有唯一標識id、請求數據、返回數據、是否完成等
-
創建Connection對象(它是個線程),並與服務器連接,即Client與Server之間的一個通信連接。保存未完成的Call對象至哈希表,唯一標識ID,Server通信的Socket,網絡輸入輸出流。
-
調用connection.sendRpcRequest(call);將Call對象發送給Server
-
等待Server端處理Call請求。服務端處理完成后,通過網絡返回給Client端。這部分代碼不在call方法里,還記得1中Connection是個線程嗎?去run方法看看
線程一直循環,直到Server返回結果,然后調用receiveRpcResponse方法返回數據。
-
再次回到call方法,它也有個循環,一直在等待結果返回。結果返回后,檢查下成功失敗后,就將Call從哈希表中移除了。
經歷了上面5步,String result = proxy.echo("123"); 的結果是result = hello 123
最后,我們再看看服務端是怎么工作的。
如何啟動服務?
啟動服務器很簡單,通過RPC.Builder().build()構造Server后就能start啟動了。我們進入build()方法內部看看。
return這里,想想看,它其實就是調用WritableRpcEngine.getServer()方法。
看看代碼非常多,其實最關鍵的就一個Server.java Line 2176行
responder = new Responder();
仔細看這個類,它作用是啟動一個線程,從reponseQueue中一個個處理要返回給客戶端的數據,有些數據可能比較大,一次無法完全返回,則將剩下的數據重新加入隊列等待下一次返回。
再進入Server.start()方法看看:
簡單明了。 Responder就是剛剛創建的用於返回數據給客戶端的線程,啟動它。
Listener是什么?
繼續看代碼,用了JAVA NIO, 它是負責監聽客戶端連接請求的,它內部又有
private Reader[] readers = null;
每個Reader是一個線程,負責讀取連接請求發來的數據,也用了NIO。
那它把數據讀來放哪?
processRpcRequest()跟蹤到這個方法,發現它把讀取完成的數據創建到一個新的Call對象,然后放入callQueue
那什么時候處理呢?
別急,上上圖還有個Handler還沒看呢。
它也是個線程,啟動了N個。一直在循環處理callQueue中的call,如果隊列中沒call就block waiting。
讀到Call后,依然是調用call方法,一層層進去看,最后還是回到了WritableRpcEngine.call() Line 417行,
Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters());
得到結果后,就開始返回給Client了,如果沒發一次性全部返回,剩下部分就交給Reponder線程去完成。
至此,整體流程全部完成。 我們來個全家福。