Hbase源碼分析:server端RPC


server端rpc包括master和RegionServer。接下來主要梳理一下,master和regionserver中有關rpc創建,啟動以及處理的過程。

1,server rpc的初始化過程

首先看一下上篇rpc概述中有關hbase rpc端的總體流程圖。

由於HMaster繼承自HRegionServer,master和region server中有關rpc的成員變量主要在HRegionServer中,主要包括(rpcServices和rpcClient)。當前主要討論rpcServices,有關RpcClient會另外單獨討論。

Master和Region Server啟動過程中有關rpc初始化和啟動過程中的步驟如下:

1,在HRegionServer構造函數中調用createRpcService生成RSRpcServices對象。如果是master啟動,HRegionServer是Hmaster的父類,該函數也會調用。

1   protected RSRpcServices createRpcServices() throws IOException {
2     return new RSRpcServices(this);
3   }

2,在RSRpcServices的構造函數中,生成RpcServer對象。

1     rpcServer = new RpcServer(rs, name, getServices(),
2       bindAddress, // use final bindAddress for this server.
3       rs.conf,
4       rpcSchedulerFactory.create(rs.conf, this, rs));

在構造RpcServer對象的過程中,HMaster和HRegionServer分別實現了getService()函數以使HMaster和HRegionServer響應不同的rpc服務。

3,在RpcServer的構造函數中,分別生成Listener,responder以及scheduler等幾個重要的對象

 1     // Start the listener here and let it bind to the port
 2     listener = new Listener(name);
 3     this.port = listener.getAddress().getPort();
 4 
 5     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
 6     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
 7     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
 8 
 9     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
10     this.delayedCalls = new AtomicInteger(0);
11     this.ipcUtil = new IPCUtil(conf);
12 
13 
14     // Create the responder here
15     responder = new Responder();
16     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
17     this.userProvider = UserProvider.instantiate(conf);
18     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
19     if (isSecurityEnabled) {
20       HBaseSaslRpcServer.init(conf);
21     }
22     this.scheduler = scheduler;
23     this.scheduler.init(new RpcSchedulerContext(this));

4,在Listener的構造函數中,還包含了readThreads個reader用來讀取請求。

 1 readers = new Reader[readThreads];
 2       readPool = Executors.newFixedThreadPool(readThreads,
 3         new ThreadFactoryBuilder().setNameFormat(
 4           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
 5           ",port=" + port).setDaemon(true).build());
 6       for (int i = 0; i < readThreads; ++i) {
 7         Reader reader = new Reader();
 8         readers[i] = reader;
 9         readPool.execute(reader);
10       }

在以上這些對象構造完成以后,在HRegionServer的構造函數中會調用rpcServices.start()---》rpcServer.start(). 在rpcServer start函數中會分別啟動responder,listener以及scheduler。

 1   public synchronized void start() {
 2     if (started) return;
 3     authTokenSecretMgr = createSecretManager();
 4     if (authTokenSecretMgr != null) {
 5       setSecretManager(authTokenSecretMgr);
 6       authTokenSecretMgr.start();
 7     }
 8     this.authManager = new ServiceAuthorizationManager();
 9     HBasePolicyProvider.init(conf, authManager);
10     responder.start();
11     listener.start();
12     scheduler.start();
13     started = true;
14   }

 

2,server rpc的處理過程

rpcserver監控,讀取,請求基於Reactor模式, 流程圖如下(來自引用)。 

2.1 Listener

對於Listener,有一個acceptChannle的ServerSocketChannel,acceptChannle在selector注冊了OP_ACCEPT事件,同時Listener中包含了readThreads的readers線程由線程池管理。Listener的主要處理流程在doRunLoop函數中:

 1 private synchronized void doRunLoop() {
 2         while (running) {
 3           try {
 4             readSelector.select();
 5             while (adding) {
 6               this.wait(1000);
 7             }
 8 
 9             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
10             while (iter.hasNext()) {
11               SelectionKey key = iter.next();
12               iter.remove();
13               if (key.isValid()) {
14                 if (key.isReadable()) {
15                   doRead(key);
16                 }
17               }
18             }
19           } catch (InterruptedException e) {
20             LOG.debug("Interrupted while sleeping");
21             return;
22           } catch (IOException ex) {
23             LOG.info(getName() + ": IOException in Reader", ex);
24           }
25         }
26       }

當沒有請求的時候,線程阻塞在第4行的select處。當有請求來臨時,在判斷請求有效后,會讀取該連接上的請求上的數據(具體邏輯在readAndProcess函數中)。讀取數據以后,會處理數據,具體在processOneRpc函數中。

 1     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
 2       if (connectionHeaderRead) {
 3         processRequest(buf);
 4       } else {
 5         processConnectionHeader(buf);
 6         this.connectionHeaderRead = true;
 7         if (!authorizeConnection()) {
 8           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
 9           // down the connection instead of trying to read non-existent retun.
10           throw new AccessDeniedException("Connection from " + this + " for service " +
11             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
12         }
13       }
14     }

根據連接頭是否已經讀取,如果沒有讀取連接頭信息,變通過ProcessConnectionHeader讀取連接頭信息。如果讀取連接頭信息以后,會解析請求,並且將請求構造成統一的結構CallRunner,最終這個CallRunnder會被添加到scheduler中任務隊列中,根據不同的調度策略(FifoRpcScheduler和SimpleRpcScheduler)進行處理。ProcessRequest的核心代碼如下:

1       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
2               totalRequestSize, traceInfo, RpcServer.getRemoteIp());
3       scheduler.dispatch(new CallRunner(RpcServer.this, call));

2.2 Scheduler

Scheduler 默認使用了SimpleRpcScheduler。SimpleRpcScheduler包含三個不同的RpcExecutor(callExecutor,priorityExecutor,replicationExecutor)。對於大部分基於用戶表的請求都是通過callExecutor來執行,callExecutor從之前添加的請求任務隊列中獲取請求,並且將請求交流對應的handler進行處理。具體邏輯在RpcExecutor的consumerLoop中,如下:

 1  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
 2     boolean interrupted = false;
 3     double handlerFailureThreshhold =
 4         conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
 5           HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
 6     try {
 7       while (running) {
 8         try {
 9           MonitoredRPCHandler status = RpcServer.getStatus();
10           CallRunner task = myQueue.take();
11           task.setStatus(status);
12           try {
13             activeHandlerCount.incrementAndGet();
14             task.run();
15           } 

由於myQueue是阻塞隊列,如果沒有請求,那么scheduler將阻塞在第10行take處。否則將執行CallRunner中的run函數。而緊接着會調用rpcServer中的call函數。

1         // make the call
2         resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
3           call.timestamp, this.status);

而在rpcServer的call函數中,首先會根據請求調用本地的對應的實現函數,並且通過阻塞的方法調用,返回結果(result)。

1 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2       Message result = service.callBlockingMethod(md, controller, param);
3 ...
4 return new Pair<Message, CellScanner>(result, controller.cellScanner());

並且在CallRunner的 run函數中,將結果通過調用setResponse函數生成返回結果,將結果通過調用sendResponseIfReady通過responder將結果返回給client端。

      // Set the response for undelayed calls and delayed calls with
      // undelayed responses.
      if (!call.isDelayed() || !call.isReturnValueDelayed()) {
        Message param = resultPair != null ? resultPair.getFirst() : null;
        CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
        call.setResponse(param, cells, errorThrowable, error);
      }

2.3 responder

responder負責將結果寫回給client端。responder的實現也是通過類似Listener的React模式。上面schedule調度執行完以后生成的結果,將通過doRespond函數加入到返回結果的相應隊列里面。在這個函數里面,如果一次channlewrite能夠完成操作,則直接完成該寫結果請求。否則將該call的connection注冊OP_WRITE到selector。

 1 void doRespond(Call call) throws IOException {
 2       boolean added = false;
 3 
 4       // If there is already a write in progress, we don't wait. This allows to free the handlers
 5       //  immediately for other tasks.
 6       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
 7         try {
 8           if (call.connection.responseQueue.isEmpty()) {
 9             // If we're alone, we can try to do a direct call to the socket. It's
10             //  an optimisation to save on context switches and data transfer between cores..
11             if (processResponse(call)) {
12               return; // we're done.
13             }
14             // Too big to fit, putting ahead.
15             call.connection.responseQueue.addFirst(call);
16             added = true; // We will register to the selector later, outside of the lock.
17           }
18         } finally {
19           call.connection.responseWriteLock.unlock();
20         }
21       }
22 
23       if (!added) {
24         call.connection.responseQueue.addLast(call);
25       }
26       call.responder.registerForWrite(call.connection);
27 
28       // set the serve time when the response has to be sent later
29       call.timestamp = System.currentTimeMillis();
30     }
31   }

在registerForWrite中會喚醒writeSelect,使得一旦有該連接上的請求數據過來,那么responder將通過doAsSyncWrite--》ProcessAllResponse處理請求,此時便和Listener的處理類似了。

 1           registerWrites();
 2           int keyCt = writeSelector.select(purgeTimeout);
 3           if (keyCt == 0) {
 4             continue;
 5           }
 6 
 7           Set<SelectionKey> keys = writeSelector.selectedKeys();
 8           Iterator<SelectionKey> iter = keys.iterator();
 9           while (iter.hasNext()) {
10             SelectionKey key = iter.next();
11             iter.remove();
12             try {
13               if (key.isValid() && key.isWritable()) {
14                 doAsyncWrite(key);
15               }

 

3 小結

本文結合代碼了解了rpcserver的listener,reader,scheduler以及responder處理rpc請求的過程。對server端處理rpc請求有了一個較為清晰的認識。接下來會對client端的rpc請求邏輯做一個梳理,加油!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM