hbase之RPC詳解


Hbase的RPC主要由HBaseRPC、RpcEngine、HBaseClient、HBaseServer、VersionedProtocol 5個概念組成。

1、HBaseRPC是hbase RPC的實現類,核心方法:

  1)、RpcEngine getProtocolEngine():返回RpcEngine對象

  2)、<T extends VersionedProtocol> T waitForProxy():調用RpcEngine的getProxy()方法,返回一個遠程代理對象,比如:第一次訪問HRegionServer時需要執行該方法,設置代理后,會緩存該對象到HConnectionImplementation中。

2、RpcEngine接口,其實現類:WritableRpcEngine,核心方法:

  1)、VersionedProtocol getProxy():返回代理對象,HRegionServer和HMaster均是VersionedProtocol的實現類,即返回的對象可代理執行HRegionServer和HMaster的方法;

  2)、Object[] call():調用程序接口,最終是經過HBaseClient的內部類Connection通過socket方式完成;

  3)、RpcServer getServer():返回RpcServer的實現類,有一個抽象實現: HBaseServer和HBaseServer的子類:WritableRpcEngine.Server。

  4)、stopProxy()

3、HBaseClient:RPC的client端實現,最核心的方法是call(),通過該方法可執行服務端的方法,該類中有一個重要的內部類:HBaseClient.Connection,該類封裝了socket,具體原理就是把要執行的方法通過socket告訴服務端,服務端通過HBaseServer類從socket中讀出client端的調用方法,然后執行相關類的相應方法,結果再通過socket傳回。

4、HBaseServer:RPC的server端實現,HBaseServer有兩個重要的內部類,一個是HBaseServer.Connection,另一個是Handler類,這里的Connection從socket中讀出call方法並放入callQueue隊列中,Handler類從該隊列中取出call方法(比如:scan查詢時執行的一次next(),該方法會執行到服務端HRegionServer的next(),這里next就是call方法)並執行,結果通過socket輸出給client端,Handler是Thread的子類,在RS啟動時就會創建所有的Handle線程,然后一直執行,具體的handler線程數可以通過配置項hbase.regionserver.handler.count配置,默認是10。

5、VersionedProtocol,該接口的類圖如下:

 

可進行RPC調用的類必須是該接口的實現類,hbase client、 RS、Master相互之間的訪問總結為:

HBase Client 通過HMasterInterface接口訪問HMasterServer,通過HRegionInterface接口訪問HRegionServer;
HRegionServer通過HMasterRegionInterface接口訪問HMasterServer;
HMaster通過HRegionInterface接口訪問HRegionServer,在訪問RS時Master就是HBase Client的角色。
以scan查詢為例畫時序圖,通過時序圖詳細理解HBase的RPC調用過程

關於HBase的RPC一些知識補充如下

1、HBaseClient緩存了HBaseClient.Connection,默認一個client應用與每個RS均只有一個socket鏈接,可以通過以下兩個配置項修改:

1)、hbase.client.ipc.pool.type:默認為RoundRobin,共三種,如下:

// PoolMap類的createPool方法,在HBaseClient緩存connection時會調用,從pool中獲取connection時,如果緩存的數量沒有達到poolMaxSize,
//則會返回null,從而創建新的connection對象
protected Pool<V> createPool() {     switch (poolType) {     case Reusable: return new ReusablePool<V>(poolMaxSize); //復用的池     case RoundRobin: return new RoundRobinPool<V>(poolMaxSize); //輪詢     case ThreadLocal:  return new ThreadLocalPool<V>(); //本地線程     }     return null;   } //PoolMap.RoundRobinPool類的get方法,緩存的數量沒有達到poolMaxSize,則會返回null,這時會創建新的connection對象,不同的poolType有不同的實現,
//比如:ReusablePool的get方法是:return poll();
public R get() {       if (size() < maxSize) {         return null;       }       nextResource %= size();       R resource = get(nextResource++);       return resource;     }

 

2)、hbase.client.ipc.pool.size:socket鏈接池大小,默認為1

 

2、詳解HBaseClient的getConnection()方法:

//HBaseClient的getConnection方法,默認一個regionserver和一個master均只會建立一個socket鏈接,
//可以通過修改hbase.client.ipc.pool.size(默認值為1)增加socket鏈接數,可參看本文檔上面幾行的內容
protected Connection getConnection(InetSocketAddress addr,                                    Class<? extends VersionedProtocol> protocol,                                    User ticket,                                    int rpcTimeout,                                    Call call)                                    throws IOException, InterruptedException {     if (!running.get()) {       throw new IOException("The client is stopped");     } Connection connection; //一個regionserver對應一個ConnectionId     ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); synchronized (connections) { //connections是PoolMap類的實例,如果connections中remoteId對應的鏈接數量小於hbase.client.ipc.pool.size的配置值則會返回null       connection = connections.get(remoteId);       if (connection == null) {         connection = createConnection(remoteId); //一個regionserver對應一個ConnectionId         connections.put(remoteId, connection);       }     }     connection.addCall(call);       //如果沒有socket鏈接則建立socket鏈接     connection.setupIOstreams();     return connection;   }   //ConnectionId的equals方法 public boolean equals(Object obj) {      if (obj instanceof ConnectionId) {        ConnectionId id = (ConnectionId) obj;        return address.equals(id.address) && protocol == id.protocol &&               ((ticket != null && ticket.equals(id.ticket)) ||                (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;      }      return false; }   //HBaseClient內部類:Connection的setupIOstreams方法,在getConnection()中被調用 protected synchronized void setupIOstreams()         throws IOException, InterruptedException { //如果有可用的socket對象則直接返回       if (socket != null || shouldCloseConnection.get()) {         return;       }         if (failedServers.isFailedServer(remoteId.getAddress())) {         IOException e = new FailedServerException(             "This server is in the failed servers list: " + remoteId.getAddress());         markClosed(e);         close();         throw e;       }         try {         setupConnection();         this.in = new DataInputStream(new BufferedInputStream             (new PingInputStream(NetUtils.getInputStream(socket))));         this.out = new DataOutputStream             (new BufferedOutputStream(NetUtils.getOutputStream(socket)));         writeHeader();           // update last activity time         touch();           // start the receiver thread after the socket connection has been set up         start();       } catch (Throwable t) {         failedServers.addToFailedServers(remoteId.address);         IOException e;         if (t instanceof IOException) {           e = (IOException)t;         } else {           e = new IOException("Could not set up IO Streams", t);         }         markClosed(e);         close();           throw e;       } } //HBaseClient內部類:Connection的setupConnection方法,在setupIOstreams ()中被調用 //在這里創建socket對象 protected synchronized void setupConnection() throws IOException {       short ioFailures = 0;       short timeoutFailures = 0;       while (true) {         try {           this.socket = socketFactory.createSocket();           this.socket.setTcpNoDelay(tcpNoDelay);           this.socket.setKeepAlive(tcpKeepAlive);           // connection time out is 20s           NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf));           if (remoteId.rpcTimeout > 0) {             pingInterval = remoteId.rpcTimeout; // overwrite pingInterval           }           this.socket.setSoTimeout(pingInterval);           return;         } catch (SocketTimeoutException toe) {           /* The max number of retries is 45,            * which amounts to 20s*45 = 15 minutes retries.            */           handleConnectionFailure(timeoutFailures++, maxRetries, toe);         } catch (IOException ie) {           handleConnectionFailure(ioFailures++, maxRetries, ie);         }       }     }

 

3、Hbase有三個鏈接類:

org.apache.hadoop.hbase.client.HConnection

org.apache.hadoop.hbase.ipc.HBaseClient.Connection

org.apache.hadoop.hbase.ipc.HBaseServer.Connection

 l  HConnection接口

實現類:HConnectionImplementation,該鏈接是client與hbase集群這個層面的鏈接對象,一個集群的一個client就一個該鏈接對象,在該對象持有

   1)  RpcEngine對象

   2)   ZooKeeperWatcher 對象

   3)    master的RPC代理對象:HMasterInterface

   4)    regionserver的RPC代理對象:HRegionInterface(一個rs對應一個HRegionInterface代理對象)

   5)     緩存的region的location信息

   6)    線程池batchPool,batchPool用於HTable的如下方法:

 

 

其中批量get查詢api:get(List<Get> gets)會調用batch方法,而單條查詢get不會,只要是可能涉及多個regionserver的操作均會使用多線程處理,像批量get、批量delete、put,scan一次只能查詢一個RS,因此雖然功能上是批量查詢數據,但是不需要線程池。

有兩個地方均可能創建該批量操作的線程池(注意是線程池不是連接池),分別是HTablePool和HConnectionImplementation,如果通過HTablePool獲取HTable對象則采用HTablePool的線程池,如果采用HConnectionImplementation獲取HTable對象,則采用HConnectionImplementation的線程池,這個取決於hbase client程序的用法,HTablePool已經是不推薦的方式,0.94.12的版本推薦通過HConnection獲取HTable。

// HTable的默認線程池,HTablePool每次創建HTable時均會創建一個直接提交的線程池(采用SynchronousQueue隊列),該線程池的特點是不會緩存任務,
//有任務會直接執行,缺點是:如果並發大會導致同時存活大量的線程,優點是不會緩存任務,從而不會存在任務堆積過多導致jvm內存暴漲,不過開啟過多的線程也會導致大量的消耗內存
private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);     if (maxThreads == 0) {       maxThreads = 1; // is there a better default?     }     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,         keepAliveTime, TimeUnit.SECONDS,         new SynchronousQueue<Runnable>(),         Threads.newDaemonThreadFactory("hbase-table"));     pool.allowCoreThreadTimeOut(true);     return pool;   }   // HConnectionImplementation的默認線程池 private ExecutorService getBatchPool() {       if (batchPool == null) {         // shared HTable thread executor not yet initialized         synchronized (this) {           if (batchPool == null) {             int maxThreads = conf.getInt("hbase.hconnection.threads.max", Integer.MAX_VALUE);             if (maxThreads == 0) {               maxThreads = Runtime.getRuntime().availableProcessors();             }             long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);             this.batchPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                 maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),                 Threads.newDaemonThreadFactory("hbase-connection-shared-executor"));             ((ThreadPoolExecutor) this.batchPool).allowCoreThreadTimeOut(true);           }           this.cleanupPool = true;         }       }       return this.batchPool;     }

 

由源碼可以看出:

兩個線程池均是采用的直接提交線程池,唯一區別是創建ThreadPoolExecutor對象時指定的corePoolSize不一樣,HTablePool是1,HConnection是Runtime.getRuntime().availableProcessors()。

 

  l  HBaseClient.Connection和HBaseServer.Connection

這兩個Connection鏈接類是對socket的封裝,HBaseClient.Connection類的實例數默認等於1個Master+ RS個數,可以通過hbase.client.ipc.pool.size配置,默認為1,如果是2則client與Master和每個RS均有兩個Connection類的實例,也可理解為有2個socket鏈接,也可配置hbase.client.ipc.pool.type(默認為輪詢)修改socket連接池的類型。

總之:HConnection緩存HMasterInterface和HRegionInterface的RPC代理對象,HMasterInterface和HRegionInterface的RPC代理對象最終均是通過Connection類建立的socket與服務端通信, master和每個RS均只對應一個RPC代理對象,每個RPC代理對象默認對應一個Connection對象,一個Connection對象持有一個socket鏈接。

 

4、關於參數hbase.regionserver.lease.period:RS的租憑期,RS的租憑設計用於當client端持有RS資源的場景,主要用於scan操作,RS會為client端保留scanner對象,以便多次交互,默認60秒,客戶端必須在該時間內向RS發送心跳信息,否則RS認為client是deaded,超過該時間請求RS時,RS會拋出異常,對於scan操作一次next讀取相當於一次心跳(參看:Leases類),在client端用該時間作為scan查詢時每次next()的超時時間。

 

5、關於hbase.rpc.timeout配置:每次RPC的超時時間,默認為60000,如果沒有超時則等待1s后再重試,直到超時或者重試成功,起三個作用:

  1)     Socket讀數據的超時時間。如果超過超時值,將引發 java.net.SocketTimeoutException具體解釋請參考:java.net.Socket.setSoTimeout()的API

  2)       控制整個HBaseRPC.waitForProxy()方法的超時時間,在該方法中RPC遠程執行HRegionServer的getProtocolVersion()方法,檢查client和server端的協議版本,這個過程的總時間不能超過該配置時間,在RPC的過程中涉及建立socket鏈接和socket通信,因此該時間應該大於或者等於建立鏈接(ipc.socket.timeout)的時間(socket 通信時間=整體時間),如果小於建立鏈接的時間,則多導致無用的建立鏈接等待,就算建立成功但是也會因為整體超時后拋出異常。

  3)        在定位root表所在的regionserver地址時,會從zk中獲取root表的regionserver信息,該過程的超時時間由該參數控制,當zk不可用等原因時,會返回null,這時就會阻塞直到超時,這個設計應該是有問題,阻塞沒有任何意義。

HBaseRPC有setRpcTimeout()方式,在一些場景會通過該方法修改rpc超時時間,通過HBaseRPC的如下方法可以看出,hbase最終采用的時間是編程方式制定的超時時間與hbase.rpc.timeout配置的時間中的最小時間:

//HBaseRPC

public static void setRpcTimeout(int rpcTimeout) {

    HBaseRPC.rpcTimeout.set(rpcTimeout);

  }

//HBaseRPC

public static int getRpcTimeout(int defaultTimeout) {// defaultTimeout即hbase.rpc.timeout配置的時間

    return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get());

  }

//WritableRpcEngine

public <T extends VersionedProtocol> T getProxy(

      Class<T> protocol, long clientVersion,

      InetSocketAddress addr, Configuration conf, int rpcTimeout)

    throws IOException {

    T proxy =

          (T) Proxy.newProxyInstance(

              protocol.getClassLoader(), new Class[] { protocol },

              new Invoker(client, protocol, addr, User.getCurrent(), conf,

                  HBaseRPC.getRpcTimeout(rpcTimeout)));

}

 

通過編程方式設置rpc超時時間的操作只來自ServerCallable類的beforeCall()方法:

public void beforeCall() {

    this.startTime = EnvironmentEdgeManager.currentTimeMillis();

    int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));

    if (remaining < MIN_RPC_TIMEOUT) {

      remaining = MIN_RPC_TIMEOUT;

    }

    HBaseRPC.setRpcTimeout(remaining);

  }

 

scan、get、delete等上十個與server交互的操作類均是通過ServerCallable實現。

 

6、HBase中client與server的socket連接是通過hadoop的org.apache.hadoop.net.NetUtils類實現,在hadoop的1.2.1版本中NetUtils類是基於nio實現socket通信的。

 

7、關於ipc.socket.timeout:默認20s,該參數控制java.nio.channels.Selector的select(timeout)超時時間,如果20s通道沒有就緒則拋出超時異常,也即是socket建立連接的超時時間,讀數據的超時時間由hbase.rpc.timeout配置。

 

8、HBaseClient的call方法

public Writable call(Writable param, InetSocketAddress addr,

                       Class<? extends VersionedProtocol> protocol,

                       User ticket, int rpcTimeout)  {

    Call call = new Call(param);

    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);

    connection.sendParam(call);                 // send the parameter

      while (!call.done) {

        try {

//如果遠端調用沒有執行完,則會每隔1s鍾檢查一次,可見是比較低效的。

          call.wait(1000);        

        } catch (InterruptedException ignored) {

          interrupted = true;

        }

      }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM