1.通信雙方遵循的契約
2.Hadoop中RPC通信原理

this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
它是通過調用RPC類中的靜態方法waitForProxy()方法而得到了InterTrackerProtocol的一個代理,借助於這個代理對象,TaskTracker就可以與JobTracker進行通信了。

VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
跟蹤Hadoop的源代碼,我們可以發現PRC.waitForProxy()最終是調用的Proxy.newProxyInstance()來創建一個代理對象,第一個參數是類加載器(代理類在運行的過程中動態生成),第二個參數是要實現的代理類的接口,第三個參數是InvokercationHandler接口的子類,最終調用的也就是InvokercationHandler實現類的的invoker()方法。

private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; ..... public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } .... }
我們可以看到,InvocationHandler的實現類Invoker中主要包含兩個成員變量即remoteId(唯一標識RPC的服務器端)、Client(通過工廠模式得到的客戶端),invoke()方法中最重要的就是下面的語句:
ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
其中call方法的第一個參數封裝調用方法和參數並實現Writable接口的對象,以便於在分布式環境中傳輸,第二個參數勿需多言,它就用於唯一標識RPC Server,也就是與指定的Server進行通信。call方法的核心代碼如下:

public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call);//請看下面的說明 connection.sendParam(call); // 將參數封裝成一個call對象發送給Server boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待Server發送的內容 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } ... return call.value; }
其中竟然出現了一個Call對象,我們看到此方法返回的結果是call對象的一個成員變量,也就是說Call封裝了Client的請求以及Server的響應,synchronized的使用會同步Client的請求以及Server的響應。通Connection對象的sendParam方法可以將請求發送給Server,那么Connection又是什么呢?

private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException { do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); ... connection.setupIOstreams(); return connection; }
其實Connection是擴展Thread而得到的一個線程,最終把所有的connection對象都放入到一個Hashtable中,同一個ConnectionId的Connection可以復用,降低了創建線程的開銷。connection.setupIOstreams()用於在真正的建立連接,並將RPC的header寫入到輸出流中,通過start方法啟動線程,其核心代碼如下所示:
public void run() { while (waitForWork()) {//等到可以讀響應時返回true receiveResponse();
}
receiveResponse方法主要是從輸入流反序列化出value,並將其封裝在call對象中,這樣client端就得到了server的響應,核心代碼如下:

private void receiveResponse() { try { int id = in.readInt(); // 讀取連接id,以便從calls中取出相應的call對象 Call call = calls.get(id); int state = in.readInt(); // 讀取輸入流的狀態 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } ... }
才疏學淺,錯誤之處在所難免,懇請各位予以指正。。