rpc是Hadoop分布式底層通信的基礎,無論是client和namenode,namenode和datanode,以及yarn新框架之間的通信模式等等都是采用的rpc方式。
下面我們來概要分析一下Hadoop2的rpc。
Hadoop通信模式主要是C/S方式,及客戶端和服務端的模式。
客戶端采用傳統的socket通信方式向服務端發送信息,並等待服務端的返回。
服務端采用reactor的模式(Java nio)的方式來處理客戶端的請求並給予響應。
一、客戶端到服務端的通信
下面我們先分析客戶端到服務端的通信。
要先通信,就要建立連接,建立連接就要發頭消息。
客戶端代碼在Hadoop common中的ipc包里,主要類為client.java。負責通信的內部類是Client.Connection,Connection中包括以下幾個屬性
private InetSocketAddress server;// 連接服務端的地址
private final ConnectionId remoteId;//connection復用,此類是為了復用連接而創建的,在client類中有一個連接池屬性Hashtable<ConnectionId, Connection> connections,此屬性表示如果多個客戶端來自同一個remoteID連接,如果connection沒有關閉,那么就復用這個connection。那么如何判斷是來自同一個ConnectionId呢,見下面的代碼。
/** *ConnectionId類重寫了equals方法 * **/ @Override public boolean equals(Object obj) { if (obj == this) { return true; } if (obj instanceof ConnectionId) { ConnectionId that = (ConnectionId) obj; //同一個遠端服務地址,即要連接同一個服務端 return isEqual(this.address, that.address) && this.doPing == that.doPing && this.maxIdleTime == that.maxIdleTime && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy) && this.pingInterval == that.pingInterval //同一個遠程協議,像datanode與namenode,client與 //namenode等之間通信的時候都各自有自己的協議, //如果不是同一個協議則使用不同的連接 && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout && this.tcpNoDelay == that.tcpNoDelay && isEqual(this.ticket, that.ticket); } return false; }
private DataInputStream in;//輸入
private DataOutputStream out;//輸出
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//Call類是client的內部類,將客戶端的請求,服務端的響應等信息封裝成一個call類,在后面我們會詳細分析此類。而calls屬性是建立連接后進行的多次消息傳送,也就是我們每次建立連接可能會在連接有效期間發送了多次請求。
說了這些屬性的含義,那么是怎么和服務端建立連接的呢。看下面的代碼解析
private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //running是client的一個屬性,表示客戶端現在是否向服務端進行請求,如果沒有running(running是一個AtomicBollean原子布爾類的對象)就是返回false if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; do { synchronized (connections) { //判斷是否存在對應的連接沒有則新建 connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } //addCall中判斷當獲取取得接應該關閉了,則不能將call放到這個關閉的連接中 } while (!connection.addCall(call)); //進行輸入輸出對象初始化 connection.setupIOstreams(fallbackToSimpleAuth); return connection; } private synchronized boolean addCall(Call call) { //shouldCloseConnection也是connection類的屬性,當連接異常,或者客戶端要斷開連接是,它返回false,說明這個連接正在回收中,不能繼續使用。 if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; }
getConnection方法只是初始化了connection對象,並將要發送的請求call對象放入連接connection中,其實還並沒有與客戶端進行通信。開始通信的方法是setupIOstreams方法,此方法不僅建立與服務端通信的輸入輸出對象,還進行消息頭的發送,判斷能否與服務端進行連接,由於Hadoop有很多個版本,而且並不是每個版本之間都能進行完美通信的。所以不同版本是不能通信的,消息頭就是負責這個任務的,消息頭中也附帶了,通信的協議,說明到底是誰和誰之間通信(是client和namenode還是datanode和namenode,還是yarn中的resourceManage 和nodemanage等等)。
//省略了部分代碼 private synchronized void setupIOstreams( AtomicBoolean fallbackToSimpleAuth) {
//在socket不為空的情況下就不用再初始化下面的內容了,這說明了,目前正在重用已有的connection,而shouldCloseConnection為true則表示當前的連接正要關閉狀態,不可用因此下面的初始化也沒有意義,要獲取一個新 //的連接才可以 if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connecting to " + server); } short numRetries = 0; Random rand = null; while (true) { //connection一些初始化信息,建立socket,初始socket等等操作 setupConnection(); //初始輸入 InputStream inStream = NetUtils.getInputStream(socket); //初始輸出 OutputStream outStream = NetUtils.getOutputStream(socket); //向服務端寫消息頭信息 writeConnectionHeader(outStream); . . . . . . //向服務端寫連接上下文,詳見下面代碼解析 writeConnectionContext(remoteId, authMethod); //connection連接有一定的超時限制,touch方法進行時間更新將連接最新時間更新到現在。 touch(); if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connected to " + server); } // connection類繼承自thread類,在其run方法中開始接收服務端的返回消息,詳見下面run方法 start(); return; } } catch (Throwable t) { if (t instanceof IOException) { markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } //如果出現錯誤就關閉連接, close(); } }
先來看一下client端發送的頭消息以及連接上下文中都是什么。
在writeConnectionHeader(OutputStream outStream)方法中主要發送的信息是Hadoop魔數(hrpc),當前版本version,所應用的通信協議的類名和協議的callid。詳見下面代碼,內容很簡單不做過多解釋。
/** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(RpcConstants.HEADER.array()); out.write(RpcConstants.CURRENT_VERSION); out.write(serviceClass); out.write(authProtocol.callId); out.flush(); }
傳完頭信息就要繼續傳連接上下文,上下文信息主要是確定當前連接來自於那個客戶端,正在處理的是當前客戶端的那個call調用,等等信息以確保服務端能夠准確的將應答消息發送給正確的客戶端。詳見代碼解析
private void writeConnectionContext(ConnectionId remoteId, AuthMethod authMethod) throws IOException { // 建立上下文,依據協議名稱,connectionId所屬用戶組 IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext( RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), authMethod); //建立上下文頭信息,包括RpcKind.RPC_PROTOCOL_BUFFER說明消息采用的序列化方式,CONNECTION_CONTEXT_CALL_ID應用的那個call,這里采用一個特殊的callId,CONNECTION_CONTEXT_CALL_ID=-3,表示是一個上下文信息,沒有請求需要處理,RpcConstants.INVALID_RETRY_COUNT表示call的重試次數,遠程調用肯定會出現調用失敗,而失敗可能是網絡等問題,所以重試幾次以確保最終能夠獲得返回結果,這里的RpcConstants.INVALID_RETRY_COUNT=-1,並不需要重試,因為沒有請求需要處理,clientId顧名思義當前發出請求的客戶端 RpcRequestHeaderProto connectionContextHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); RpcRequestMessageWrapper request = new RpcRequestMessageWrapper(connectionContextHeader, message); // 寫出消息到服務端,先寫消息長度,然后是內容,這是固定的方式。 out.writeInt(request.getLength()); request.write(out); }
消息發送完畢就要等待回應,run方法不僅僅是對消息頭發送出的信息的響應,他是對當前連接在有效期間所有請求的響應的一個接收端。
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //waitForWork方法判斷當前連接是否處於工作狀態, while (waitForWork()) {//wait here for work - read or close connection //接受消息 receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } //connection已經關閉,進行連接回收,包括輸入輸出的回收將連接從連接池中清除等 close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
//接收服務端返回的信息 private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { //對返回消息的處理,分布式消息的處理方式有很多種,一種是定長格式,一種是不定長,定長方式很容易理解,不定長中包含了消息的長度,在消息頭處,則可以容易的讀出消息准確長度,並進行處理。 int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); //每個連接中有很多個call,call類中有一個callId的屬性,類似於mac地址在對應的集群中是唯一的,從而能讓客戶端和服務端能夠准去的處理請求。 int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); //獲取正在處理的call Call call = calls.get(callId); //處理狀態,RpcStatusProto是一個枚舉類,有三種狀態成功,錯誤,連接關閉。 RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { //通過反射方式獲取返回的消息值 Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value //處理完成后將call從calls中刪除掉 calls.remove(callId); //將返回值放到client的結果值中 call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc Request failed // Verify that length was correct if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(re); } } } catch (IOException e) { markClosed(e); } } //此方法是call中的 public synchronized void setRpcResponse(Writable rpcResponse) { //將結果值放到返回值中 this.rpcResponse = rpcResponse; //當前call已處理完畢, callComplete(); } //此方法是call中的 protected synchronized void callComplete() { //done=true表示此call已經處理完成 this.done = true; //在處理call的時候采用的是同步處理方案,所有處理完后要喚醒wait端, notify(); // notify caller }
下面我們來講一下Client.Call這個類
Call是對消息的一個封裝。包括以下屬性
final int id; // call id
final int retry; // call重試次數
final Writable rpcRequest; // 序列化的rpc請求
Writable rpcResponse; // 序列化的返回響應,如果有錯誤則是null,即Nullwritable
IOException error; // 處理中的異常
final RPC.RpcKind rpcKind; // rpc引擎采用的種類,主要有writable引擎方式,和protocolbuffer引擎方式,兩種的序列化和rpc消息處理各不相同,writable是Hadoop創建之初自帶的一種處理方式,protocolbuffer是google公司所采用的一種方式,目前Hadoop默認的采用方式是protocolbuffer方式,主要是平台化和速度上都要勝於writalble。
boolean done; // true表示call已完成,判斷call完成與否的依據
Call類的主要方法在上面已經提到過,可以返回上面回顧一下。
上面分析了client端是如何處理連接,那么我們什么時候會建立client端對象,以及如何發送正式的消息內容呢?那我們就接下來繼續分析。
其實客戶端和服務端之間的通信依賴於Java內部的動態代理方式。
主要代理的就是協議代理,Hadoop內的所有協議都實現自VesionedProtocol接口,主要有兩個方法,getProtocolVersion判斷協議的版本,getProtocolSignature對協議的認證,認證就是判斷客戶端發送的協議服務端有沒有對應的實現等等信息。
client端通過協議發送的請求都要經過代理對象,代理對象invoke方法會在發送請求是建立一個invocation類的對象(在writable引擎中是這樣,protocolbuffer引擎中則比較復雜),所有的請求都要經過這個對象打包發送到server端,server端接收到請求后將消息轉化成對應的invocation對象處理。詳細解析看下面代碼。
//客戶端或者datanode等在開始發送請求通信時,會調用RPC類中的getProxy方法,這個方法用很多個重載方法,最終會調用下面的方法 public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } //最終獲取對應RPC引擎的代理對象。 return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); } //在protobufRpcEngine中的實現如下 public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { //Invoker實現了InvocationHandler 最后的invoke方法就在此類中 final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); //這是我們非常熟悉的動態代理的創建方式 return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); }
在invoker中的invoke方法中處理client端的請求
@Override public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException {
... //請求頭信息 RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); ...
//請求包裹在參數中 Message theRequest = (Message) args[1]; final RpcResponseWrapper val; try { //調用C/S中的client端的call方法處理請求 val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, fallbackToSimpleAuth); } catch (Throwable e) { if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Exception <- " + remoteId + ": " + method.getName() + " {" + e + "}"); } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation( "Call got exception: " + e.getMessage()); } throw new ServiceException(e); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } Message prototype = null; try {
//獲取協議類型, prototype = getReturnProtoType(method); } catch (Exception e) { throw new ServiceException(e); } Message returnMessage; try { //通過client call返回的結果構造最終的返回值。 returnMessage = prototype.newBuilderForType() .mergeFrom(val.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString(returnMessage) + "}"); } } catch (Throwable e) { throw new ServiceException(e); } return returnMessage; }
下面就看一下client 中的call方法做了些什么
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //根據引擎種類,請求消息建立call對象 final Call call = createCall(rpcKind, rpcRequest); //上面分析過的getConnection方法,獲取一個連接 Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { //通過連接開始發送消息給服務端,詳見下面代碼解析 connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } boolean interrupted = false; //采用同步阻塞方式,直到此call得到了對應的應答,然后對應答消息進行處理。 synchronized (call) { while (!call.done) { try { //對應callComplete方法中的notify調用。 call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { //處理正確將應答消息返回上面的invoke方法中 return call.getRpcResponse(); } } }
下面看看是如何發送請求消息的
public void sendRpcRequest(final Call call) throws InterruptedException, IOException { //判斷連接是否關閉 if (shouldCloseConnection.get()) { return; } // Serialize the call to be sent. This is done from the actual // caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here.
//創建輸出緩沖准備將請求信息輸出到服務端 final DataOutputBuffer d = new DataOutputBuffer();
//拼裝請求消息的頭信息 RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId);
//將頭消息放入緩沖區 header.writeDelimitedTo(d);
//將請求正文放入緩沖區 call.rpcRequest.write(d); //采用同步方式發送消息,不然消息之間交叉重疊無法讀取 synchronized (sendRpcRequestLock) {
//啟動發送線程發送消息,sederFuture等待響應 Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData(); int totalLength = d.getLength(); //先寫出消息的長度,在寫出消息的內容。
out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early 關閉流和緩沖區 IOUtils.closeStream(d); } } }); try {
//等待返回結果,真正返回結果是放到call中的RPCResponse屬性值,是通過connection的run(方法上面有詳解)獲取的run方法一直處於輪詢狀態,直到連接關閉或出現異常等現象才結束,這里的get只是阻塞等待消息成 //功發送為止。 senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } }
以上就是client端到服務端rpc連接及發送消息的全部內容。下一節將會分析server端到client端的rpc連接方式節消息接受處理和發送方式。