Hbase的RPC主要由HBaseRPC、RpcEngine、HBaseClient、HBaseServer、VersionedProtocol 5個概念組成。
1、HBaseRPC是hbase RPC的實現類,核心方法:
1)、RpcEngine getProtocolEngine():返回RpcEngine對象
2)、<T extends VersionedProtocol> T waitForProxy():調用RpcEngine的getProxy()方法,返回一個遠程代理對象,比如:第一次訪問HRegionServer時需要執行該方法,設置代理后,會緩存該對象到HConnectionImplementation中。
2、RpcEngine接口,其實現類:WritableRpcEngine,核心方法:
1)、VersionedProtocol getProxy():返回代理對象,HRegionServer和HMaster均是VersionedProtocol的實現類,即返回的對象可代理執行HRegionServer和HMaster的方法;
2)、Object[] call():調用程序接口,最終是經過HBaseClient的內部類Connection通過socket方式完成;
3)、RpcServer getServer():返回RpcServer的實現類,有一個抽象實現: HBaseServer和HBaseServer的子類:WritableRpcEngine.Server。
4)、stopProxy()
3、HBaseClient:RPC的client端實現,最核心的方法是call(),通過該方法可執行服務端的方法,該類中有一個重要的內部類:HBaseClient.Connection,該類封裝了socket,具體原理就是把要執行的方法通過socket告訴服務端,服務端通過HBaseServer類從socket中讀出client端的調用方法,然后執行相關類的相應方法,結果再通過socket傳回。
4、HBaseServer:RPC的server端實現,HBaseServer有兩個重要的內部類,一個是HBaseServer.Connection,另一個是Handler類,這里的Connection從socket中讀出call方法並放入callQueue隊列中,Handler類從該隊列中取出call方法(比如:scan查詢時執行的一次next(),該方法會執行到服務端HRegionServer的next(),這里next就是call方法)並執行,結果通過socket輸出給client端,Handler是Thread的子類,在RS啟動時就會創建所有的Handle線程,然后一直執行,具體的handler線程數可以通過配置項hbase.regionserver.handler.count配置,默認是10。
5、VersionedProtocol,該接口的類圖如下:

可進行RPC調用的類必須是該接口的實現類,hbase client、 RS、Master相互之間的訪問總結為:
HBase Client 通過HMasterInterface接口訪問HMasterServer,通過HRegionInterface接口訪問HRegionServer;
HRegionServer通過HMasterRegionInterface接口訪問HMasterServer;
HMaster通過HRegionInterface接口訪問HRegionServer,在訪問RS時Master就是HBase Client的角色。
以scan查詢為例畫時序圖,通過時序圖詳細理解HBase的RPC調用過程

關於HBase的RPC一些知識補充如下:
1、HBaseClient緩存了HBaseClient.Connection,默認一個client應用與每個RS均只有一個socket鏈接,可以通過以下兩個配置項修改:
1)、hbase.client.ipc.pool.type:默認為RoundRobin,共三種,如下:
// PoolMap類的createPool方法,在HBaseClient緩存connection時會調用,從pool中獲取connection時,如果緩存的數量沒有達到poolMaxSize,
//則會返回null,從而創建新的connection對象 protected Pool<V> createPool() { switch (poolType) { case Reusable: return new ReusablePool<V>(poolMaxSize); //復用的池 case RoundRobin: return new RoundRobinPool<V>(poolMaxSize); //輪詢 case ThreadLocal: return new ThreadLocalPool<V>(); //本地線程 } return null; } //PoolMap.RoundRobinPool類的get方法,緩存的數量沒有達到poolMaxSize,則會返回null,這時會創建新的connection對象,不同的poolType有不同的實現,
//比如:ReusablePool的get方法是:return poll(); public R get() { if (size() < maxSize) { return null; } nextResource %= size(); R resource = get(nextResource++); return resource; }
2)、hbase.client.ipc.pool.size:socket鏈接池大小,默認為1
2、詳解HBaseClient的getConnection()方法:
//HBaseClient的getConnection方法,默認一個regionserver和一個master均只會建立一個socket鏈接,
//可以通過修改hbase.client.ipc.pool.size(默認值為1)增加socket鏈接數,可參看本文檔上面幾行的內容 protected Connection getConnection(InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout, Call call) throws IOException, InterruptedException { if (!running.get()) { throw new IOException("The client is stopped"); } Connection connection; //一個regionserver對應一個ConnectionId ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); synchronized (connections) { //connections是PoolMap類的實例,如果connections中remoteId對應的鏈接數量小於hbase.client.ipc.pool.size的配置值則會返回null connection = connections.get(remoteId); if (connection == null) { connection = createConnection(remoteId); //一個regionserver對應一個ConnectionId connections.put(remoteId, connection); } } connection.addCall(call); //如果沒有socket鏈接則建立socket鏈接 connection.setupIOstreams(); return connection; } //ConnectionId的equals方法 public boolean equals(Object obj) { if (obj instanceof ConnectionId) { ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && protocol == id.protocol && ((ticket != null && ticket.equals(id.ticket)) || (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; } return false; } //HBaseClient內部類:Connection的setupIOstreams方法,在getConnection()中被調用 protected synchronized void setupIOstreams() throws IOException, InterruptedException { //如果有可用的socket對象則直接返回 if (socket != null || shouldCloseConnection.get()) { return; } if (failedServers.isFailedServer(remoteId.getAddress())) { IOException e = new FailedServerException( "This server is in the failed servers list: " + remoteId.getAddress()); markClosed(e); close(); throw e; } try { setupConnection(); this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream (new BufferedOutputStream(NetUtils.getOutputStream(socket))); writeHeader(); // update last activity time touch(); // start the receiver thread after the socket connection has been set up start(); } catch (Throwable t) { failedServers.addToFailedServers(remoteId.address); IOException e; if (t instanceof IOException) { e = (IOException)t; } else { e = new IOException("Could not set up IO Streams", t); } markClosed(e); close(); throw e; } } //HBaseClient內部類:Connection的setupConnection方法,在setupIOstreams ()中被調用 //在這里創建socket對象 protected synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); if (remoteId.rpcTimeout > 0) { pingInterval = remoteId.rpcTimeout; // overwrite pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, maxRetries, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
3、Hbase有三個鏈接類:
org.apache.hadoop.hbase.client.HConnection
org.apache.hadoop.hbase.ipc.HBaseClient.Connection
org.apache.hadoop.hbase.ipc.HBaseServer.Connection
l HConnection接口
實現類:HConnectionImplementation,該鏈接是client與hbase集群這個層面的鏈接對象,一個集群的一個client就一個該鏈接對象,在該對象持有
1) RpcEngine對象
2) ZooKeeperWatcher 對象
3) master的RPC代理對象:HMasterInterface
4) regionserver的RPC代理對象:HRegionInterface(一個rs對應一個HRegionInterface代理對象)
5) 緩存的region的location信息
6) 線程池batchPool,batchPool用於HTable的如下方法:

其中批量get查詢api:get(List<Get> gets)會調用batch方法,而單條查詢get不會,只要是可能涉及多個regionserver的操作均會使用多線程處理,像批量get、批量delete、put,scan一次只能查詢一個RS,因此雖然功能上是批量查詢數據,但是不需要線程池。
有兩個地方均可能創建該批量操作的線程池(注意是線程池不是連接池),分別是HTablePool和HConnectionImplementation,如果通過HTablePool獲取HTable對象則采用HTablePool的線程池,如果采用HConnectionImplementation獲取HTable對象,則采用HConnectionImplementation的線程池,這個取決於hbase client程序的用法,HTablePool已經是不推薦的方式,0.94.12的版本推薦通過HConnection獲取HTable。
// HTable的默認線程池,HTablePool每次創建HTable時均會創建一個直接提交的線程池(采用SynchronousQueue隊列),該線程池的特點是不會緩存任務,
//有任務會直接執行,缺點是:如果並發大會導致同時存活大量的線程,優點是不會緩存任務,從而不會存在任務堆積過多導致jvm內存暴漲,不過開啟過多的線程也會導致大量的消耗內存 private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table")); pool.allowCoreThreadTimeOut(true); return pool; } // HConnectionImplementation的默認線程池 private ExecutorService getBatchPool() { if (batchPool == null) { // shared HTable thread executor not yet initialized synchronized (this) { if (batchPool == null) { int maxThreads = conf.getInt("hbase.hconnection.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors(); } long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); this.batchPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-connection-shared-executor")); ((ThreadPoolExecutor) this.batchPool).allowCoreThreadTimeOut(true); } this.cleanupPool = true; } } return this.batchPool; }
由源碼可以看出:
兩個線程池均是采用的直接提交線程池,唯一區別是創建ThreadPoolExecutor對象時指定的corePoolSize不一樣,HTablePool是1,HConnection是Runtime.getRuntime().availableProcessors()。
l HBaseClient.Connection和HBaseServer.Connection
這兩個Connection鏈接類是對socket的封裝,HBaseClient.Connection類的實例數默認等於1個Master+ RS個數,可以通過hbase.client.ipc.pool.size配置,默認為1,如果是2則client與Master和每個RS均有兩個Connection類的實例,也可理解為有2個socket鏈接,也可配置hbase.client.ipc.pool.type(默認為輪詢)修改socket連接池的類型。
總之:HConnection緩存HMasterInterface和HRegionInterface的RPC代理對象,HMasterInterface和HRegionInterface的RPC代理對象最終均是通過Connection類建立的socket與服務端通信, master和每個RS均只對應一個RPC代理對象,每個RPC代理對象默認對應一個Connection對象,一個Connection對象持有一個socket鏈接。
4、關於參數hbase.regionserver.lease.period:RS的租憑期,RS的租憑設計用於當client端持有RS資源的場景,主要用於scan操作,RS會為client端保留scanner對象,以便多次交互,默認60秒,客戶端必須在該時間內向RS發送心跳信息,否則RS認為client是deaded,超過該時間請求RS時,RS會拋出異常,對於scan操作一次next讀取相當於一次心跳(參看:Leases類),在client端用該時間作為scan查詢時每次next()的超時時間。
5、關於hbase.rpc.timeout配置:每次RPC的超時時間,默認為60000,如果沒有超時則等待1s后再重試,直到超時或者重試成功,起三個作用:
1) Socket讀數據的超時時間。如果超過超時值,將引發 java.net.SocketTimeoutException具體解釋請參考:java.net.Socket.setSoTimeout()的API
2) 控制整個HBaseRPC.waitForProxy()方法的超時時間,在該方法中RPC遠程執行HRegionServer的getProtocolVersion()方法,檢查client和server端的協議版本,這個過程的總時間不能超過該配置時間,在RPC的過程中涉及建立socket鏈接和socket通信,因此該時間應該大於或者等於建立鏈接(ipc.socket.timeout)的時間(socket 通信時間=整體時間),如果小於建立鏈接的時間,則多導致無用的建立鏈接等待,就算建立成功但是也會因為整體超時后拋出異常。
3) 在定位root表所在的regionserver地址時,會從zk中獲取root表的regionserver信息,該過程的超時時間由該參數控制,當zk不可用等原因時,會返回null,這時就會阻塞直到超時,這個設計應該是有問題,阻塞沒有任何意義。
HBaseRPC有setRpcTimeout()方式,在一些場景會通過該方法修改rpc超時時間,通過HBaseRPC的如下方法可以看出,hbase最終采用的時間是編程方式制定的超時時間與hbase.rpc.timeout配置的時間中的最小時間:
//HBaseRPC public static void setRpcTimeout(int rpcTimeout) { HBaseRPC.rpcTimeout.set(rpcTimeout); } //HBaseRPC public static int getRpcTimeout(int defaultTimeout) {// defaultTimeout即hbase.rpc.timeout配置的時間 return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get()); } //WritableRpcEngine public <T extends VersionedProtocol> T getProxy( Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout) throws IOException { T proxy = (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(client, protocol, addr, User.getCurrent(), conf, HBaseRPC.getRpcTimeout(rpcTimeout))); }
通過編程方式設置rpc超時時間的操作只來自ServerCallable類的beforeCall()方法:
public void beforeCall() { this.startTime = EnvironmentEdgeManager.currentTimeMillis(); int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime)); if (remaining < MIN_RPC_TIMEOUT) { remaining = MIN_RPC_TIMEOUT; } HBaseRPC.setRpcTimeout(remaining); }
scan、get、delete等上十個與server交互的操作類均是通過ServerCallable實現。
6、HBase中client與server的socket連接是通過hadoop的org.apache.hadoop.net.NetUtils類實現,在hadoop的1.2.1版本中NetUtils類是基於nio實現socket通信的。
7、關於ipc.socket.timeout:默認20s,該參數控制java.nio.channels.Selector的select(timeout)超時時間,如果20s通道沒有就緒則拋出超時異常,也即是socket建立連接的超時時間,讀數據的超時時間由hbase.rpc.timeout配置。
8、HBaseClient的call方法
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) { Call call = new Call(param); Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter while (!call.done) { try { //如果遠端調用沒有執行完,則會每隔1s鍾檢查一次,可見是比較低效的。 call.wait(1000); } catch (InterruptedException ignored) { interrupted = true; } }
