Hadoop之RPC


       Hadoop的RPC主要是通過Java的動態代理(Dynamic Proxy)與反射(Reflect)實現,代理類是由java.lang.reflect.Proxy類在運行期時根據接口,采用Java反射功能動態生成的,並且結合java.lang.reflect.InvocationHandler來處理客戶端的請求,當用戶調用這個動態生成的實現類時,實際上是調用了InvocationHandler實現類的invoke方法。RPC源代碼在org.apache.hadoop.ipc下,有以下幾個主要類: 
    Client: 客戶端,連接服務器、傳遞函數名和相應的參數、等待結果;
    Server:服務器端,主要接受Client的請求、執行相應的函數、返回結果;
    VersionedProtocol:通信雙方所遵循契約的父接口;
    RPC:RPC通信機制,主要是為通信的服務方提供代理。

  1.通信雙方遵循的契約

    要通過RPC服務進行通信,服務的提供方必須實現某個接口,而這個即可是VersionedProtocol的子類,諸如:
InterTrackerProtocol,它是TaskTracker與JobTracker進行通信所遵循的契約,JobTracker是一個Server,它必須實現這個接口;
JobSubmissionProtocol,它是JobTracker與JobClient通訊所遵循的契約,JobClient利用契約中的方法可以提交作業去執行, 並且得到當前系統的狀態;
DatanodeProtocol,利用此契約,DataNode可以向NameNode匯報自己的塊狀態以及負載情況。
InterDatanodeProtocol,DataNode之間利用此契約可以更新數據塊。
其它的接口在此不再一一贅述。

    2.Hadoop中RPC通信原理 

  我們通過TaskTracker與JobTracker的通信來剖析其通信過程,JobTracker的代理是通過下面的方法得到的,
 this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
View Code

  它是通過調用RPC類中的靜態方法waitForProxy()方法而得到了InterTrackerProtocol的一個代理,借助於這個代理對象,TaskTracker就可以與JobTracker進行通信了。

  VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
View Code

  跟蹤Hadoop的源代碼,我們可以發現PRC.waitForProxy()最終是調用的Proxy.newProxyInstance()來創建一個代理對象,第一個參數是類加載器(代理類在運行的過程中動態生成),第二個參數是要實現的代理類的接口,第三個參數是InvokercationHandler接口的子類,最終調用的也就是InvokercationHandler實現類的的invoker()方法。

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
.....

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
    
....
  }
View Code

  我們可以看到,InvocationHandler的實現類Invoker中主要包含兩個成員變量即remoteId(唯一標識RPC的服務器端)、Client(通過工廠模式得到的客戶端),invoke()方法中最重要的就是下面的語句:

ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);

  其中call方法的第一個參數封裝調用方法和參數並實現Writable接口的對象,以便於在分布式環境中傳輸,第二個參數勿需多言,它就用於唯一標識RPC Server,也就是與指定的Server進行通信。call方法的核心代碼如下:

  public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);//請看下面的說明
    connection.sendParam(call);                 // 將參數封裝成一個call對象發送給Server
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // 等待Server發送的內容
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
...
        return call.value;
  }
View Code

  其中竟然出現了一個Call對象,我們看到此方法返回的結果是call對象的一個成員變量,也就是說Call封裝了Client的請求以及Server的響應,synchronized的使用會同步Client的請求以及Server的響應。通Connection對象的sendParam方法可以將請求發送給Server,那么Connection又是什么呢?

   private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException {
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));
    
...
    connection.setupIOstreams();
    return connection;
  }
View Code

  其實Connection是擴展Thread而得到的一個線程,最終把所有的connection對象都放入到一個Hashtable中,同一個ConnectionId的Connection可以復用,降低了創建線程的開銷。connection.setupIOstreams()用於在真正的建立連接,並將RPC的header寫入到輸出流中,通過start方法啟動線程,其核心代碼如下所示:

  public void run() {
      while (waitForWork()) {//等到可以讀響應時返回true
        receiveResponse();
}   

  receiveResponse方法主要是從輸入流反序列化出value,並將其封裝在call對象中,這樣client端就得到了server的響應,核心代碼如下:

private void receiveResponse() {
          try {
        int id = in.readInt();                  // 讀取連接id,以便從calls中取出相應的call對象
        Call call = calls.get(id);
        int state = in.readInt();     // 讀取輸入流的狀態
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          call.setValue(value);
          calls.remove(id);
        } 
...
    }
View Code

才疏學淺,錯誤之處在所難免,懇請各位予以指正。。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM