lz程序猿一枚,在大數據的道路上一騎絕塵,最近對源碼分析饒有興趣,so寫下此文共享給給位碼農們,實力有限如有錯誤的地方希望大家予以指正。話不多說上文章。
RPC 實現一共有3個最重要的類,Client 客戶端、Server 服務端、RPC 三類,RPC實現主要是通過java NIO 、java 動態代理、java 反射的方式實現。
本文只分析client 和RPC當前這兩部分,后續會加入Server端的部分。
RPC
RPC是在Client和Server的基礎上實現了Hadoop的IPC,共分兩部分功能
與客戶端相關的RPCInvoker,與服務端相關的Server(是RPC的內部類而不是上面的Server服務端類)。RPC中還有一個跟RPC引擎相關的類,RPCKind 枚舉類,內容如下:
public enum RpcKind { RPC_BUILTIN ((short) 1), // 測試用 RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size public final short value; //TODO make it private RpcKind(short val) { this.value = val; } }
可以看出 Hadoop自從yarn的引入,Hadoop的序列化引擎已經不單單是writable了,新引入了google的protocol方式,因此引入了RPCEngine接口和對應的實現類ProtoBufRPCEngine和WritableRPCEngine。RPCEngine 是客戶端和服務端統一獲取IPC連接的地方(RPC類中也包含相關部分,最終通過RPCKind類選擇適當的引擎的實現類),客戶端通過getProxy獲取客戶端連接,服務端通過getServer獲取連接。

先從getProxy開始分析,這也是客戶端的IPC入口。
getProxy采用java動態代理的方式,每次對協議接口方法的調用都會被攔截下來,通過invoke方法將客戶端的請求交給Client類處理。
RPCEngine中的getProxy <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException
分析一下各個參數的含義(只分析重要參數,安全相關略過)
Class<T> protocol Hadoop各個角色之間的協議(2.0之后Hadoop協議接口都已經protocol化,不在采用writable方式)如客戶端和namenode之間的協議,namenode和datanode之間的協議都要接口化,各個接口中都相關的可用方法,IPC遠程調用其實就是調用這些接口的實現類中的方法。下面是客戶端和datanode之間的協議接口(下面的是為了說明協議接口的應用,有一定了解的可以略過):
--------------------------------------------------------協議接口-------------------------------------------------------
public interface ClientDatanodeProtocol { public static final long versionID = 9L; /**返回一個副本的可見長度. */ long getReplicaVisibleLength(ExtendedBlock b) throws IOException; /** * 刷新聯合namenode名單,由於configuration中的namenode節點的增加和停止已經 *刪除的namenode節點(2.x開始引入了聯合namenode的方式,namenode不再是單一 *節點,分布在多個節點上,每個節點管理不同的目錄,如namenode1管理*/application1 ,namenode2管理/application2,每個目錄互不干擾,其中某個namenode掛 *掉了,只是其管理的目錄下的*應用不可用,不會影響其他的節點,datanode不變,任*何一個namenode都可以控制所有的*datanode ) * * @throws IOException on error **/ void refreshNamenodes() throws IOException; /** *刪除塊池目錄。如果“force”是false只有塊池目錄為空時刪除,否則塊池與它的內容 *一並刪除。(此方法和新hdfs datanode數據管理相關,下章會講解) * * @param bpid Blockpool id to be deleted. * @param force If false blockpool directory is deleted only if it is empty * i.e. if it doesn't contain any block files, otherwise it is * deleted along with its contents. * @throws IOException */ void deleteBlockPool(String bpid, boolean force) throws IOException; /** * 檢索存儲在本地文件系統上的塊文件和元數據文件的路徑名。 * 為了使此方法有效,下列情況之一應滿足 * 客戶端用戶必須在數據節點被配置成能夠使用這一方法 * * 當啟用安全,Kerberos身份驗證必須能夠連接到這個Datanode * * @param block * the specified block on the local datanode * @param token * the block access token. * @return the BlockLocalPathInfo of a block * @throws IOException * on error */ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException; /** *檢索Datanode上有關一個list塊上卷位置信息。 *這是在一個不透明的形式{@link org.apache.hadoop.fs.VolumeId} *為配置的每個數據目錄,這是不能保證橫跨DN重新啟動一樣的。 * * @param blockPoolId the pool to query * @param blockIds * list of blocks on the local datanode * @param tokens * block access tokens corresponding to the requested blocks * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with * data directories * @throws IOException * if datanode is unreachable, or replica is not found on datanode */ HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, long []blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException; /** * 關閉一個datanode節點. * * @param forUpgrade If true, data node does extra prep work before shutting * down. The work includes advising clients to wait and saving * certain states for quick restart. This should only be used when * the stored data will remain the same during upgrade/restart. * @throws IOException */ void shutdownDatanode(boolean forUpgrade) throws IOException; /** * 獲取datanode元數據信息 * * @return software/config version and uptime of the datanode */ DatanodeLocalInfo getDatanodeInfo() throws IOException; /** * Asynchronously reload configuration on disk and apply changes. */ void startReconfiguration() throws IOException; /** *獲取之前發出的重新配置任務的狀態. * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. */ ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; /** * 觸發一個新block report */ void triggerBlockReport(BlockReportOptions options) throws IOException; }
---------------------------------------------協議接口---------------------------------------------------
long clientVersion client標識
InetSocketAddress addr 訪問的服務端地址
UserGroupInformation ticket 用戶組信息
Configuration conf configuration配置信息
SocketFactory factory socket工廠用來生成socket連接(IPC通信采用socket的TCP方式)
int rpcTimeout 超時時間
RetryPolicy connectionRetryPolicy 連接重試策略(直接失敗,重試和切換到另一台機器重試詳細見RetryPolicy類)
AtomicBoolean fallbackToSimpleAuth 是否退到一般用戶
此方法最終會調用相關子類的對應的方法,以ProtoBuRPCEngine為例,
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { //Invoker 類實現了InvocationHandler final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); //生成代理對象(此部分不熟悉看一下java的動態代理) return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); }
Invoker
Invoker類圖如

isClosed 與連接關閉有關
remoteId Client端到Server端的連接id,Client會繼續分析
client Client對象
clientProtocolVersion 不同Hadoop版本之間的協議版本是不一致的,所以不能用2.1的版本與2.5的通信
protocolName 協議名
returnTypes 緩存每個協議接口中方法的返回類型(Message封裝Message是google protocolBuffer的消息序列化類)
invoker構造方法
private Invoker(Class<?> protocol, Client.ConnectionId connId, Configuration conf, SocketFactory factory) { this.remoteId = connId; // CLIENTS 是ClientCache類型的對象,其中緩存着所有訪問過的客戶端對象信息,如果是新的客戶端則構造新的client對象並將其緩存。 this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class); this.protocolName = RPC.getProtocolName(protocol); this.clientProtocolVersion = RPC .getProtocolVersion(protocol); }
Invoke
下面看看關鍵的invoke方法,當調用協議接口中的某個方法時,就會觸發此方法。
@Override public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.now();//當前時間毫秒數 } if (args.length != 2) { // 參數必須是2個RpcController + Message throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } if (args[1] == null) { throw new ServiceException("null param while calling Method: [" + method.getName() + "]"); } //追述信息相關, TraceScope traceScope = null; // if Tracing is on then start a new span for this rpc. // guard it in the if statement to make sure there isn't // any extra string manipulation. if (Trace.isTracing()) { traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method)); } //RPC請求頭信息,類似http中的請求頭一樣,客戶端和服務端都要先發送頭信息,然后在發送內容。注意,構造頭信息是將method放入了請求中,在服務端接受時就會知道調用哪個方法。 RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Call -> " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); } //method的參數信息,method反射是用到。 Message theRequest = (Message) args[1]; // server端返回的結果 final RpcResponseWrapper val; try { // 調用client(client已經在構造方法里生成了對應的對象)類中的call方法(client類中會具體分析該方法)返回server端的返回結果 val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, fallbackToSimpleAuth); } catch (Throwable e) { if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Exception <- " + remoteId + ": " + method.getName() + " {" + e + "}"); } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation( "Call got exception: " + e.getMessage()); } throw new ServiceException(e); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } Message prototype = null; try { //獲取method的返回類型 prototype = getReturnProtoType(method); } catch (Exception e) { throw new ServiceException(e); } Message returnMessage; try { //將返回值message序列化 returnMessage = prototype.newBuilderForType() .mergeFrom(val.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString(returnMessage) + "}"); } } catch (Throwable e) { throw new ServiceException(e); } return returnMessage; } 獲取方法的返回類型(message序列化后的結果) private Message getReturnProtoType(Method method) throws Exception { if (returnTypes.containsKey(method.getName())) { return returnTypes.get(method.getName()); } Class<?> returnType = method.getReturnType(); Method newInstMethod = returnType.getMethod("getDefaultInstance"); newInstMethod.setAccessible(true); Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); returnTypes.put(method.getName(), prototype); return prototype; } 關閉客戶端的IPC連接 public void close() throws IOException { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } }
總之,invoker 類通過client call方法攔截了協議接口方法的調用,並將處理方式發送到Client.call方法中,由call方法處理如何將調用信息發送到服務端並獲取返回結果,封裝成message返回最終的調用的結果。
RPCInvoker接口
此接口與上面的Invoker沒有任何關系,此類只有一個call方法由server端調用,用於處理最終請求處理的地方,就是調用協議接口實現類對應方法的地方。主要采用反射的方式實現。在WritableRPCEngine和ProtoBufRPCEngine中都有對應的實現類。之所以會多出這一步驟,而不是直接在Server里直接實現call方法,是因為當前Hadoop版本序列化的方式存在兩種,Hadoop實現者將這兩個序列化的解析處理方法分開實現,供其他類調用,怎加了代碼的重用性。
ProtoBufRpcInvoker.Call
下面以ProtoBufRPCEngine. ProtoBufRpcInvoker為例講解call方法的具體處理步驟。
public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception { RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; RequestHeaderProto rpcRequest = request.requestHeader; //獲取調用的方法名 String methodName = rpcRequest.getMethodName(); //獲取協議接口名 String protoName = rpcRequest.getDeclaringClassProtocolName(); //獲取客戶端版本 long clientVersion = rpcRequest.getClientProtocolVersion(); if (server.verbose) LOG.info("Call: protocol=" + protocol + ", method=" + methodName); //獲取接口實現類 ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; //根據方法名獲取方法描述信息 MethodDescriptor methodDescriptor = service.getDescriptorForType() .findMethodByName(methodName); if (methodDescriptor == null) { String msg = "Unknown method " + methodName + " called on " + protocol + " protocol."; LOG.warn(msg); throw new RpcNoSuchMethodException(msg); } //根據方法描述信息獲取客戶端發送的message信息(protocol方式采用message類序列化信息)。 Message prototype = service.getRequestPrototype(methodDescriptor); //獲取方法參數 Message param = prototype.newBuilderForType() .mergeFrom(request.theRequestRead).build(); Message result; long startTime = Time.now(); int qTime = (int) (startTime - receiveTime); Exception exception = null; try { server.rpcDetailedMetrics.init(protocolImpl.protocolClass); //調用方法返回結果,內部是protocol方式實現調用協議接口中的方法。 result = service.callBlockingMethod(methodDescriptor, null, param); } catch (ServiceException e) { exception = (Exception) e.getCause(); throw (Exception) e.getCause(); } catch (Exception e) { exception = e; throw e; } finally { int processingTime = (int) (Time.now() - startTime); if (LOG.isDebugEnabled()) { String msg = "Served: " + methodName + " queueTime= " + qTime + " procesingTime= " + processingTime; if (exception != null) { msg += " exception= " + exception.getClass().getSimpleName(); } LOG.debug(msg); } String detailedMetricsName = (exception == null) ? methodName : exception.getClass().getSimpleName(); server.rpcMetrics.addRpcQueueTime(qTime); server.rpcMetrics.addRpcProcessingTime(processingTime); server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, processingTime); } //返回最終的結果 return new RpcResponseWrapper(result); }
Client
Client中包含很多內部類,大致可歸納為兩部分,一部分是與IPC連接相關的類 connection、connectionId等,另一部分與遠程接口調用相關的 Call、ParallelCall等
Client大致類圖如下(不包含內部類,最終總結會包含所有類)

callIDCounter 一個生成Client.Call 類中唯一id的一個生成器。
callId 當前線程對應的call對象的id
retryCount 重試次數,連接失敗或者返回結果錯誤或者超時
connections 當前client所有的正在處理的連接
running client是否處於運行狀態
conf configuration配置類
socketFactory 創建socket的工廠
clientId 當前client的唯一id
CONNECTION_CONTEXT_CALL_ID 特殊的一種callId 用於傳遞connection上下文信息的callId
valueClass :Class<? extends Writable> Call服務端返回結果類型
sendParamsExecutor 多線程方式處理connection
Client構造方法
先看Client構造方法,上面Invoker調用過
public Client(Class<? extends Writable> valueClass, Configuration conf, SocketFactory factory) { this.valueClass = valueClass; this.conf = conf; this.socketFactory = factory; //獲取超時時間 this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); //通過uuid方式生成clientId this.clientId = ClientId.getClientId(); //生成一個cache類型的executorService 稍后分析 this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); }
call
下面就看一下,Invoker類中的invoke方法調用的call方法是怎么把方法發送到服務端的。
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //生成一個Call類型的對象,上面曾說過,client中包含很多內部類,Call就是其中之一,負責遠程接口調用。下面會細化此類 final Call call = createCall(rpcKind, rpcRequest); //生成一個connection對象,Hadoop在此處進行了一些優化措施,如果當前連接在過去的曾經應用過,並且當前仍然是活躍的,那么就復用此連接。這會減少內存的開銷和遠程socket通信的開銷,后面會細化此類 Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { //call對象已經把調用信息進行了封裝,然后通過connection對象將call封裝的信息發送到server端。 connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } boolean interrupted = false; synchronized (call) { while (!call.done) { try { //在此處會堵塞當前線程,直道call有返回結果。由notify喚醒。 call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } //線程中斷異常處理 if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } //call 返回錯誤處理 if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { //將正確信息返回到invoker中。 return call.getRpcResponse(); } } }
此方法主要步驟,先創建call遠程調用對象將調用信息封裝,在生成遠程連接對象connection,然后將call通過connection發送到服務端等待返回結果,期間可能出現各種錯誤信息(超時、連接錯誤,線程中斷等等),最后將正確的結果返回到invoker中。
getConnection
獲取連接connection方法getConnection
private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //確保當前client處於運行狀態 if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ do { //加上同步鎖會有多個線程同時獲取連接,避免相同連接生成多次 synchronized (connections) { connection = connections.get(remoteId); //如果連接池中不包含想要的連接則創建新連接 if (connection == null) { connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } } while (!connection.addCall(call));//將剛剛創建的call添加到次connection中,一個connection可以處理多個調用。 //connection初始IOstream,其中包含創建請求頭消息並發送信息。 //此段代碼並沒有放到同步代碼塊中,原因是如果服務端很慢的話,它會花費很長的時間創建一個連接,這會使整個系統宕掉(同步代碼使得每次只能處理一個線程,其他的connection都要等待,這會使系統處於死等狀態)。 connection.setupIOstreams(fallbackToSimpleAuth); return connection; }
createCall
創建Call 方法很簡單直接調用call的構造方法。
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { return new Call(rpcKind, rpcRequest); }
Connection
下面講一下Client的內部類:
在說connection之前,說一下Hadoop IPC消息傳遞的方式,其實是采用變長消息格式,所以每次發送消息之前要發送消息的總長度包含消息頭信息,一般用dataLength表示消息長度,Hadoop用4個字節的來存儲消息的大小。
Hadoop在connection初始建立連接的時候,會發送connection消息頭和消息上下文(后面會有兩個方法處理這兩段信息),那么Hadoop是如何判斷發送過來的信息是connection過來的,
類似java,Hadoop也有一個魔數 ‘hrpc’ 這個魔數存儲在connection發送的消息頭中,正好占的是dataLength的4個字節,這是Hadoop精心設置的一種方式。如果dataLength字段是hrpc則說明是集群中某個client發送過來的信息,而頭信息並不需要數據內容,只包含頭信息,這使得在處理頭信息時,不用關心信息長度。因為他的長度就是頭信息那么大。
Connection類圖大致如下(只包含重要信息,安全和權限相關去掉)

Server 對應服務端的地址和端口
remoteId connectionId 是connection的唯一id屬性
socket 與服務端的socket連接
in 輸入,從連接中獲取服務端返回的結果用
out 輸出,發送數據到服務端用
lastActivity 最近一次進行I/O的時間用於判斷超時
rpcTimeout 超時時間范圍
calls 當前connection處理的所有call
maxIdleTime 最大空閑時間,如果超過這個時間,connection將會從client對象中的connections map對象中剔除掉,將剩余的空間留給比較忙的connection。
connectionRetryPolicy 連接失敗的重試策略。
maxRetriesOnSocketTimeouts 在socket中最大的重試超時時間范圍。
shouldCloseConnection 是否應該關閉當前connection,true關閉
sendRpcRequestLock 同步鎖用對象。
TcpNoDelay 是否采用Nagle算法(與tcp數據包相關)
closeException 關閉connection可能是因為某種錯誤,記錄錯誤信息
doping 每隔一段時間發送的ping信息,防止服務端誤認為客戶端死掉。
pingInterval ping的時間間隔
pingRequest ping發送的內容
在上面的getConnection中,如果當前沒有對應的Connection對象,那么就生成新的
//Connection中的很多屬性在ConnectionId類中都已經存在了。構造方法主要是初始化上面的屬性
public Connection(ConnectionId remoteId, int serviceClass) throws IOException { this.remoteId = remoteId; this.server = remoteId.getAddress(); if (server.isUnresolved()) { throw NetUtils.wrapException(server.getHostName(), server.getPort(), null, 0, new UnknownHostException()); } this.rpcTimeout = remoteId.getRpcTimeout(); this.maxIdleTime = remoteId.getMaxIdleTime(); this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.maxRetriesOnSasl = remoteId.getMaxRetriesOnSasl(); this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts(); this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); if (doPing) { // construct a RPC header with the callId as the ping callId pingRequest = new ByteArrayOutputStream(); RpcRequestHeaderProto pingHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); pingHeader.writeDelimitedTo(pingRequest); } this.pingInterval = remoteId.getPingInterval(); this.serviceClass = serviceClass; if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is " + this.pingInterval + " ms."); } UserGroupInformation ticket = remoteId.getTicket(); // try SASL if security is enabled or if the ugi contains tokens. // this causes a SIMPLE client with tokens to attempt SASL boolean trySasl = UserGroupInformation.isSecurityEnabled() || (ticket != null && !ticket.getTokens().isEmpty()); this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE; this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + server.toString() + " from " + ((ticket==null)?"an unknown user":ticket.getUserName())); this.setDaemon(true); }
setupIOstreams
下面分析一下在getConnection中的setupIOstreams,這是Connection初始IO和發送頭信息的方法 ,注意此處的同步鎖synchronized和上面的getConnection 的同步代碼塊意義不一樣,代碼塊鎖住了所有的Connection,而這里的同步鎖只是在Connection重用的時候同步鎖。
private synchronized void setupIOstreams( AtomicBoolean fallbackToSimpleAuth) { //如果是已經存在的連接,或者這個連接應該關閉了,直接返回。兩種情況都已不需要初始化Connection了。 if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connecting to " + server); } short numRetries = 0; Random rand = null; while (true) { //connection初始化 setupConnection(); //生成socket的IO InputStream inStream = NetUtils.getInputStream(socket); OutputStream outStream = NetUtils.getOutputStream(socket); //發送請求頭信息 writeConnectionHeader(outStream); ----------------------------------------安全、權限相關--------------------------------------------- if (authProtocol == AuthProtocol.SASL) { final InputStream in2 = inStream; final OutputStream out2 = outStream; UserGroupInformation ticket = remoteId.getTicket(); if (ticket.getRealUser() != null) { ticket = ticket.getRealUser(); } try { authMethod = ticket .doAs(new PrivilegedExceptionAction<AuthMethod>() { @Override public AuthMethod run() throws IOException, InterruptedException { return setupSaslConnection(in2, out2); } }); } catch (Exception ex) { authMethod = saslRpcClient.getAuthMethod(); if (rand == null) { rand = new Random(); } handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex, rand, ticket); continue; } if (authMethod != AuthMethod.SIMPLE) { // Sasl connect is successful. Let's set up Sasl i/o streams. inStream = saslRpcClient.getInputStream(inStream); outStream = saslRpcClient.getOutputStream(outStream); // for testing remoteId.saslQop = (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP); LOG.debug("Negotiated QOP is :" + remoteId.saslQop); if (fallbackToSimpleAuth != null) { fallbackToSimpleAuth.set(false); } } else if (UserGroupInformation.isSecurityEnabled()) { if (!fallbackAllowed) { throw new IOException("Server asks us to fall back to SIMPLE " + "auth, but this client is configured to only allow secure " + "connections."); } if (fallbackToSimpleAuth != null) { fallbackToSimpleAuth.set(true); } } } ----------------------------------------安全、權限相關--------------------------------------------- //是否到了發送ping的時間 if (doPing) { //將ping內容讀入 inStream = new PingInputStream(inStream); } this.in = new DataInputStream(new BufferedInputStream(inStream)); // SASL may have already buffered the stream if (!(outStream instanceof BufferedOutputStream)) { outStream = new BufferedOutputStream(outStream); } this.out = new DataOutputStream(outStream); //發送Connection上下文 writeConnectionContext(remoteId, authMethod); // 更新活躍時間 touch(); if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connected to " + server); } // 開啟run方法,其中包含接受server返回信息。 start(); return; } } catch (Throwable t) { //異常關閉連接 if (t instanceof IOException) { //此方法會是shouldCloseConnection 變為true, markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } close(); } }
此方法主要是初始化Connection,建立連接頭信息,並發送請求頭和請求上下文,更新活躍時間。代碼最后開啟線程開始接受server端返回的結果。markClosed方法會使shouldCloseConnection變為true,標記表示Connection應該關閉了,其他方法遇到這個屬性時將會直接跳過不處理任何事情,最終到run(Connection繼承自Thread)方法時,通過waitForWork判斷關閉連接,調用Connection的close方法。
markClosed
private synchronized void markClosed(IOException e) { //通過cas方式設置為true if (shouldCloseConnection.compareAndSet(false, true)) { closeException = e; //喚醒所有阻塞在此連接的線程。 notifyAll(); } }
setupConnection
下面看一下如何初始化Connection
private synchronized void setupConnection() throws IOException { //io錯誤次數 short ioFailures = 0; //超時次數 short timeoutFailures = 0; //循環直道成功創建socket連接 while (true) { try { //創建socket this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(true); ---------------------------權限、安全相關--------------------------------------- /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ UserGroupInformation ticket = remoteId.getTicket(); if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } ---------------------------權限、安全相關--------------------------------------- //將socket綁定到server端 NetUtils.connect(this.socket, server, connectionTimeout); //超時時間和ping間隔相同。 if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } //設置socket超時 this.socket.setSoTimeout(pingInterval); return; } catch (ConnectTimeoutException toe) { /* 連接超時可能是連接地址發生了改變,調用updateAdress方法,如果返回true *說明連接地址確實改變了,重新建立連接。 */ if (updateAddress()) { //更新超時次數和io錯誤次數為0 timeoutFailures = ioFailures = 0; } //此方法會關閉socket連接, handleConnectionTimeout(timeoutFailures++, maxRetriesOnSocketTimeouts, toe); } catch (IOException ie) { if (updateAddress()) { timeoutFailures = ioFailures = 0; } handleConnectionFailure(ioFailures++, ie); } } }
updateAddress
更新server端
private synchronized boolean updateAddress() throws IOException { // Do a fresh lookup with the old host name. InetSocketAddress currentAddr = NetUtils.createSocketAddrForHost( server.getHostName(), server.getPort()); //如果地址與以前的不同則更新 if (!server.equals(currentAddr)) { LOG.warn("Address change detected. Old: " + server.toString() + " New: " + currentAddr.toString()); //更新為新的地址 server = currentAddr; return true; } return false; }
writeConnectionHeader
發送請求頭,相對簡單,不解釋
/** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(RpcConstants.HEADER.array()); out.write(RpcConstants.CURRENT_VERSION); out.write(serviceClass); out.write(authProtocol.callId); out.flush(); }
writeConnectionContext
發送請求上下文
/* 此方法和上面的方法都不是同步的,原因是他們只在初始化的時候調用一次。
*/
private void writeConnectionContext(ConnectionId remoteId, AuthMethod authMethod) throws IOException { // Write out the ConnectionHeader IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext( RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), authMethod); //構造上下文信息,只有上下文內容,沒有信系, RpcRequestHeaderProto connectionContextHeader = ProtoUtil //rpc引擎類型,rpc打包方式,context的callId默認-3,重試次數-1表示一直重試,客戶端id .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); RpcRequestMessageWrapper request = new RpcRequestMessageWrapper(connectionContextHeader, message); // Write out the packet length out.writeInt(request.getLength()); request.write(out); }
sendRpcRequest
下面是client call方法中通過Connection sendRPCRequest發送遠程調用
/** Initiates a rpc call by sending the rpc request to the remote server. */ public void sendRpcRequest(final Call call) throws InterruptedException, IOException { //如果應該關閉連接,返回 if (shouldCloseConnection.get()) { return; } // 序列化的call將會被發送到服務端,這是在call線程中處理 // 而不是sendParamsExecutor 線程 // 因此如果序列化出現了問題,也能准確的報告 // 這也是一種並發序列化的方式. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); //構造請求頭信息,與連接剛建立時候類似。 RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); //將請求信息和頭信息寫到一個輸入流的buffer中 header.writeDelimitedTo(d); call.rpcRequest.write(d); // synchronized (sendRpcRequestLock) { //多線程方式發送請求 Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { //out加同步鎖,以免多個消息寫亂輸出流 synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //通過Connection的out輸出流將請求信息發送到服務端 byte[] data = d.getData(); //計算信息總長度 int totalLength = d.getLength(); //寫出長度信息 out.writeInt(totalLength); // Total Length //寫出內容信息 out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { //阻塞等待結果,真正的返回結果是在call 中。 senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } }
Connection.run
Connection是thread的子類,每個Connection都會有一個自己的線程,這樣能夠加快請求的處理速度。在setupIOStream方法中最后的地方調用的Connection開啟線程的方法,start,這樣Connection就能夠等待返回的結果。
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //等待是否有可用的call,直到Connection可關閉時,結束循環 while (waitForWork()) {//wait here for work - read or close connection //接受返回結果 receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } //while循環判斷shouldCloseConnection為true,關閉Connection close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
此方法中如果有待處理的call並且當前Connection可用,client客戶端尚在運行中,則停留在while循環中處理call。直到shouldCloseConnection為true,關閉連接。下面是waitForWork方法
waitForWork
private synchronized boolean waitForWork() { //在連接可用,尚未有可處理的call時,掛起當前線程直到達到最大空閑時間。 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (Time.now()-lastActivity.get()); if (timeout>0) { try { wait(timeout); } catch (InterruptedException e) {} } } //在有處理的call且連接可用,client尚在運行,返回true if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; //其他狀況則返回false,並標記shouldCloseConnection為true } else if (shouldCloseConnection.get()) { return false; } else if (calls.isEmpty()) { // idle connection closed or stopped markClosed(null); return false; } else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause( new InterruptedException())); return false; } }
waitForWork方法主要作用就是判斷當前在所有情況都正常時,有沒有可處理的call,有返回true,沒有等待到最大空閑時間(這段時間內會被addCalls中的notify喚醒,由於有了新的call要處理所有要喚醒),如果這段時間當中扔沒有要處理的call則返回false,其他情況均返回false,並標記shouldCloseConnection為true。
addCall
private synchronized boolean addCall(Call call) { //如果當前連接不可用則返回false。 if (shouldCloseConnection.get()) return false; //將call對象放入Connection正在處理的call隊列里。 calls.put(call.id, call); //喚醒在waitForWork中被wait的連接,如果沒有這略過 notify(); return true; }
Addcall 方法是在上面client解析中getConnection的方法中調用。因為連接會復用,所以方法中會判斷連接是否可用。
receiveRpcResponse
下面看一下Connection接受返回結果的receiveRpcResponse方法。HadoopIPC連接采用的是變長格式的消息,所以每次發送消息是先發送消息的長度,讓后是消息的內容。
private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { //獲取消息長度 int totalLen = in.readInt(); 讀取消息內容 RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); //結果校驗 checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); //獲取對應處理的call int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); //找到對應的call並將結果放到call對象的RpcResponse中 Call call = calls.get(callId); //查看處理結果的狀態,是否為success RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { //狀態success將返回值放入call的rpcresponse中 Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value //此請求已處理完成,從calls中移除call calls.remove(callId); call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily //如果是ProtoBuffEngine則用protocol方式將結果包裹一次,用於protocol的方式處理 if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc 返回錯誤 // Verify that length was correct if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } //獲取錯誤信息 final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { //error時,將錯誤信息填充到call中,並將call從calls中移除 calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { //如果是致命錯誤則關閉連接,可能是連接異常引起的錯誤 // Close the connection markClosed(re); } } } catch (IOException e) { //如果發生IO錯誤則關閉連接。 markClosed(e); } }
Call
下面看一下client中最后一個內部類call,大概的類圖如下

Id call的唯一id 來自於client的callId
Retry 重試次數,來自於client的retryCount
rpcRequest 請求內容序列化后的
rpcResponese 返回結果序列化后的
error 錯誤信息
rpcKind rpc引擎
done 此請求是否完成
setRpcResponse
下面看一下Connection中receiveRpcResponse方法里所調用的setRPCResponse方法。看看結果是如何設置並返回到client中的call方法里的(前面有記載)。
//其實方法很簡單只是將receiveRpcResponse中序列化好的結果放到了call的RPCResponse中。並調用了callComplete。 public synchronized void setRpcResponse(Writable rpcResponse) { this.rpcResponse = rpcResponse; callComplete(); }
callComplete
那么看看callComplete中又做了什么。
protected synchronized void callComplete() { //標記此次請求已完成 this.done = true; notify(); // notify caller }
還記得在client的call方法中,有一段判斷call的done字段是否為true么,如下
如果當前正在處理的call沒有做完,就wait等待,直到完成notify喚醒,或者是線程被中斷。
while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } }
Client圖解
以上所有就是client端的全部內容。下面一個整體的client端的一個圖解。

