[hadoop源碼閱讀][6]-org.apache.hadoop.ipc-ipc.server


1.      nioreactor模式

1334842640_7902 

具體的處理方式:

·     1.一個線程來處理所有連接(使用一個Selector

·     2.一組線程來讀取已經建立連接的數據(多個Selector,這里的線程數一般和cpu的核數相當);

·     3.一個線程池(這個線程池大小可以根據業務需求進行設置)

·     4.一個線程處理所有的連接的數據的寫操作(一個Selector

 

2.      簡明流程圖

e23d9744-85f4-3c22-aafb-33d09a36ab34

 

3.      RPC Server主要流程

RPC Server作為服務提供者由兩個部分組成:接收Call調用和處理Call調用。

  hadoop_rpc

接收Call調用負責接收來自RPC Client的調用請求,編碼成Call對象后放入到Call隊列中。這一過程由Listener線程完成。具體步驟:

l      Listener線程監視RPC Client發送過來的數據。

l      當有數據可以接收時,調用ConnectionreadAndProcess方法。

l      Connection邊接收邊對數據進行處理,如果接收到一個完整的Call包,則構建一個Call對象PUSHCall隊列中,由Handler線程來處理Call隊列中的所有Call

 

處理Call調用負責處理Call隊列中的每個調用請求,由Handler線程完成:

l      Handler線程監聽Call隊列,如果Call隊列非空,按FIFO規則從Call隊列取出Call

l      Call交給RPC.Server處理。

l      借助JDK提供的Method,完成對目標方法的調用,目標方法由具體的業務邏輯實現。

l      返回響應。Server.Handler按照異步非阻塞的方式向RPC Client發送響應,如果有未發送出的數據,則交由Server.Responder來完成。

 

4. server類的結構

0)抽象類

這里的Server類是個抽象類,唯一抽象的地方,就是

public abstract Writable call(Writable param, long receiveTime) throws IOException;

RPC.server來實現

1Call

用以存儲客戶端發來的請求,這個請求會放入一個BlockQueue中;

2Listener

監聽類,用以監聽客戶端發來的請求。同時Listener下面還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便用讓Reader讀取用戶請求。

Listener主要負責Socket的監聽以及Connection的建立,同時監控ClientSocket的數據可讀事件,通知Connection進行processData,收到完成請求包以后,封裝為一個Call對象(包含Connection對象,從網絡流中讀取的參數信息,調用方法信息),將其放入隊列

3Responder

響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。

它不斷地檢查響應隊列中是否有調用信息,如果有的話,就把調用的結果返回給客戶端

4Connection

連接類,真正的客戶端請求讀取邏輯在這個類中。

Connection,代表與Client端的連接,讀取客戶端的call並放到一個阻塞隊列中,Handler負責從這個隊列中讀取數據並處理

5Handler

請求(blockQueueCall)處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操作。

真正做事的實體。它從調用隊列中獲取調用信息,然后反射調用真正的對象,得到結果,然后再把此次調用放到響應隊列(response queue) 

 

5.      Server的啟動,運行

5.1.Namenodegetserver實例使用

  
  
  
          
private void initialize(Configuration conf) throws IOException { this .serviceRpcServer = RPC.getServer( this , dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false , conf, namesystem.getDelegationTokenSecretManager()); serviceRpcServer.start(); }

 

5.2.Start服務啟動

  
  
  
          
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for ( int i = 0 ; i < handlerCount; i ++ ) { handlers[i] = new Handler(i); handlers[i].start(); } }

responderlistenerhandlers三個對象的線程均阻塞了,前兩個阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客戶端請求。Responder設置了超時時間,為15分鍾。而listener還開啟了Reader線程,該線程也阻塞了。

 

5.3. Listener線程做的工作

Listener監聽到請求,獲得所有請求的SelectionKey,執行doAccept(key)方法,該方法將所有的連接對象放入list中,並將connection對象與key綁定,以供reader使用。初始化玩所有的conne對象之后,就可以激活Reader線程了.

Readerrun方法和Listener基本一致,也是獲得所有的SelectionKey,再執行doRead(key)方法。該方法獲得key中綁定的connection,並執行conectionreadAndProcess()方法

簡明調用函數過程:

Listener:: run-> Listener:: doAccept( 激活Reader線程)->>Reader:: doRead->>connection:: readAndProcess->> connection::processOneRpc->>connection:: processData

 
  
  
  
          
private void processData( byte [] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream( new ByteArrayInputStream(buf)); int id = dis.readInt(); // 嘗試讀取id Writable param = ReflectionUtils.newInstance(paramClass, conf); // 讀取參數 param.readFields(dis); // 這個就是client傳遞過來的Invocation,包含了函數名和參數 Call call = new Call(id, param, this ); // 封裝成call callQueue.put(call); // 將call存入callQueue incRpcCount(); // 增加rpc請求的計數 }

 

5.4. Handler線程做的工作

Handler線程的run函數

  
  
  
          
while (running) { try { final Call call = callQueue.take(); // 彈出call,可能會阻塞 // 調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實現在RPC.Server類中 value = call(call.connection.protocol, call.param, call.timestamp); synchronized (call.connection.responseQueue) { setupResponse(buf, call, (error == null ) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); // 給客戶端響應請求 responder.doRespond(call); } } }

關於call函數的調用,稍后分析

 

5.5.Responder線程做的工作

  
  
  
          
void doRespond(Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); // 放到隊列里面去 if (call.connection.responseQueue.size() == 1 ) { processResponse(call.connection.responseQueue, true ); } } }

簡明調用結構為:

Responder::run->>doAsyncWrite->>processResponse

 

5.6.最后來看server.call函數是怎么執行的

  
  
  
          
public Writable call(Class <?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation) param; Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); // 獲取client端調用的函數 Object value = method.invoke(instance, call.getParameters()); // instance即啟動服務的對象,也即實現protocol的對象 return new ObjectWritable(method.getReturnType(), value); // 將結果序列化 } catch (InvocationTargetException e) { } catch (Throwable e) { } }

6.      用戶可以做的操作

1. Reader數量

      正常情況下,一個客戶端關聯一個Reader,如果有很多客戶端(clientDataNode),那么就可以相應增加這個配置

      參數:ipc.server.read.threadpool.size,默認是1,需要注意的是,這個配置參數是0.21版本的,不同版本的參數可能不一樣

2. Handler數量

      對於這種做事的線程,不好把握度,到底多少才是合適。

      參數:dfs.namenode.handler.count, 這里是以NameNode舉例

3. 客戶端重試次數

      客戶端在調用時發生異常,重試是無可厚非。但如果對實時性有要求,那么這里的重試就有考量。Fackbook在做的Realtime分析就有提到RPC的重試是需要修改的

      參數:ipc.client.connect.max.retries,默認是10

4. tcp no delay

      不建議對它有什么設置。如果我們對整個調用的過程中數據量大小及網絡環境不清楚的話,就是設置了也不知道它是否有作用。

      參數:ipc.client.tcpnodelay,默認是false

 

7. 時序圖

4

 

8. 類圖

09a22207-eec0-321c-a990-d8aa248c1609

 

9.參考

http://blog.csdn.net/sxf_824/article/details/4842153

http://www.wikieno.com/2012/02/hadoop-ipc-server/

http://caibinbupt.iteye.com/blog/281281

http://www.tbdata.org/archives/1413

http://blog.csdn.net/shirdrn/article/details/4598295

http://lidejiasw.wordpress.com/2011/05/07/hadoop-rpc%E5%88%86%E6%9E%90/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM