server端rpc包括master和RegionServer。接下來主要梳理一下,master和regionserver中有關rpc創建,啟動以及處理的過程。
1,server rpc的初始化過程
首先看一下上篇rpc概述中有關hbase rpc端的總體流程圖。
由於HMaster繼承自HRegionServer,master和region server中有關rpc的成員變量主要在HRegionServer中,主要包括(rpcServices和rpcClient)。當前主要討論rpcServices,有關RpcClient會另外單獨討論。
Master和Region Server啟動過程中有關rpc初始化和啟動過程中的步驟如下:
1,在HRegionServer構造函數中調用createRpcService生成RSRpcServices對象。如果是master啟動,HRegionServer是Hmaster的父類,該函數也會調用。
1 protected RSRpcServices createRpcServices() throws IOException { 2 return new RSRpcServices(this); 3 }
2,在RSRpcServices的構造函數中,生成RpcServer對象。
1 rpcServer = new RpcServer(rs, name, getServices(), 2 bindAddress, // use final bindAddress for this server. 3 rs.conf, 4 rpcSchedulerFactory.create(rs.conf, this, rs));
在構造RpcServer對象的過程中,HMaster和HRegionServer分別實現了getService()函數以使HMaster和HRegionServer響應不同的rpc服務。
3,在RpcServer的構造函數中,分別生成Listener,responder以及scheduler等幾個重要的對象
1 // Start the listener here and let it bind to the port 2 listener = new Listener(name); 3 this.port = listener.getAddress().getPort(); 4 5 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 6 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 7 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 8 9 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS); 10 this.delayedCalls = new AtomicInteger(0); 11 this.ipcUtil = new IPCUtil(conf); 12 13 14 // Create the responder here 15 responder = new Responder(); 16 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 17 this.userProvider = UserProvider.instantiate(conf); 18 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 19 if (isSecurityEnabled) { 20 HBaseSaslRpcServer.init(conf); 21 } 22 this.scheduler = scheduler; 23 this.scheduler.init(new RpcSchedulerContext(this));
4,在Listener的構造函數中,還包含了readThreads個reader用來讀取請求。
1 readers = new Reader[readThreads]; 2 readPool = Executors.newFixedThreadPool(readThreads, 3 new ThreadFactoryBuilder().setNameFormat( 4 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + 5 ",port=" + port).setDaemon(true).build()); 6 for (int i = 0; i < readThreads; ++i) { 7 Reader reader = new Reader(); 8 readers[i] = reader; 9 readPool.execute(reader); 10 }
在以上這些對象構造完成以后,在HRegionServer的構造函數中會調用rpcServices.start()---》rpcServer.start(). 在rpcServer start函數中會分別啟動responder,listener以及scheduler。
1 public synchronized void start() { 2 if (started) return; 3 authTokenSecretMgr = createSecretManager(); 4 if (authTokenSecretMgr != null) { 5 setSecretManager(authTokenSecretMgr); 6 authTokenSecretMgr.start(); 7 } 8 this.authManager = new ServiceAuthorizationManager(); 9 HBasePolicyProvider.init(conf, authManager); 10 responder.start(); 11 listener.start(); 12 scheduler.start(); 13 started = true; 14 }
2,server rpc的處理過程
rpcserver監控,讀取,請求基於Reactor模式, 流程圖如下(來自引用)。
2.1 Listener
對於Listener,有一個acceptChannle的ServerSocketChannel,acceptChannle在selector注冊了OP_ACCEPT事件,同時Listener中包含了readThreads的readers線程由線程池管理。Listener的主要處理流程在doRunLoop函數中:
1 private synchronized void doRunLoop() { 2 while (running) { 3 try { 4 readSelector.select(); 5 while (adding) { 6 this.wait(1000); 7 } 8 9 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 10 while (iter.hasNext()) { 11 SelectionKey key = iter.next(); 12 iter.remove(); 13 if (key.isValid()) { 14 if (key.isReadable()) { 15 doRead(key); 16 } 17 } 18 } 19 } catch (InterruptedException e) { 20 LOG.debug("Interrupted while sleeping"); 21 return; 22 } catch (IOException ex) { 23 LOG.info(getName() + ": IOException in Reader", ex); 24 } 25 } 26 }
當沒有請求的時候,線程阻塞在第4行的select處。當有請求來臨時,在判斷請求有效后,會讀取該連接上的請求上的數據(具體邏輯在readAndProcess函數中)。讀取數據以后,會處理數據,具體在processOneRpc函數中。
1 private void processOneRpc(byte[] buf) throws IOException, InterruptedException { 2 if (connectionHeaderRead) { 3 processRequest(buf); 4 } else { 5 processConnectionHeader(buf); 6 this.connectionHeaderRead = true; 7 if (!authorizeConnection()) { 8 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 9 // down the connection instead of trying to read non-existent retun. 10 throw new AccessDeniedException("Connection from " + this + " for service " + 11 connectionHeader.getServiceName() + " is unauthorized for user: " + user); 12 } 13 } 14 }
根據連接頭是否已經讀取,如果沒有讀取連接頭信息,變通過ProcessConnectionHeader讀取連接頭信息。如果讀取連接頭信息以后,會解析請求,並且將請求構造成統一的結構CallRunner,最終這個CallRunnder會被添加到scheduler中任務隊列中,根據不同的調度策略(FifoRpcScheduler和SimpleRpcScheduler)進行處理。ProcessRequest的核心代碼如下:
1 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, 2 totalRequestSize, traceInfo, RpcServer.getRemoteIp()); 3 scheduler.dispatch(new CallRunner(RpcServer.this, call));
2.2 Scheduler
Scheduler 默認使用了SimpleRpcScheduler。SimpleRpcScheduler包含三個不同的RpcExecutor(callExecutor,priorityExecutor,replicationExecutor)。對於大部分基於用戶表的請求都是通過callExecutor來執行,callExecutor從之前添加的請求任務隊列中獲取請求,並且將請求交流對應的handler進行處理。具體邏輯在RpcExecutor的consumerLoop中,如下:
1 protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) { 2 boolean interrupted = false; 3 double handlerFailureThreshhold = 4 conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 5 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 6 try { 7 while (running) { 8 try { 9 MonitoredRPCHandler status = RpcServer.getStatus(); 10 CallRunner task = myQueue.take(); 11 task.setStatus(status); 12 try { 13 activeHandlerCount.incrementAndGet(); 14 task.run(); 15 }
由於myQueue是阻塞隊列,如果沒有請求,那么scheduler將阻塞在第10行take處。否則將執行CallRunner中的run函數。而緊接着會調用rpcServer中的call函數。
1 // make the call 2 resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, 3 call.timestamp, this.status);
而在rpcServer的call函數中,首先會根據請求調用本地的對應的實現函數,並且通過阻塞的方法調用,返回結果(result)。
1 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); 2 Message result = service.callBlockingMethod(md, controller, param); 3 ... 4 return new Pair<Message, CellScanner>(result, controller.cellScanner());
並且在CallRunner的 run函數中,將結果通過調用setResponse函數生成返回結果,將結果通過調用sendResponseIfReady通過responder將結果返回給client端。
// Set the response for undelayed calls and delayed calls with // undelayed responses. if (!call.isDelayed() || !call.isReturnValueDelayed()) { Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; call.setResponse(param, cells, errorThrowable, error); }
2.3 responder
responder負責將結果寫回給client端。responder的實現也是通過類似Listener的React模式。上面schedule調度執行完以后生成的結果,將通過doRespond函數加入到返回結果的相應隊列里面。在這個函數里面,如果一次channlewrite能夠完成操作,則直接完成該寫結果請求。否則將該call的connection注冊OP_WRITE到selector。
1 void doRespond(Call call) throws IOException { 2 boolean added = false; 3 4 // If there is already a write in progress, we don't wait. This allows to free the handlers 5 // immediately for other tasks. 6 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) { 7 try { 8 if (call.connection.responseQueue.isEmpty()) { 9 // If we're alone, we can try to do a direct call to the socket. It's 10 // an optimisation to save on context switches and data transfer between cores.. 11 if (processResponse(call)) { 12 return; // we're done. 13 } 14 // Too big to fit, putting ahead. 15 call.connection.responseQueue.addFirst(call); 16 added = true; // We will register to the selector later, outside of the lock. 17 } 18 } finally { 19 call.connection.responseWriteLock.unlock(); 20 } 21 } 22 23 if (!added) { 24 call.connection.responseQueue.addLast(call); 25 } 26 call.responder.registerForWrite(call.connection); 27 28 // set the serve time when the response has to be sent later 29 call.timestamp = System.currentTimeMillis(); 30 } 31 }
在registerForWrite中會喚醒writeSelect,使得一旦有該連接上的請求數據過來,那么responder將通過doAsSyncWrite--》ProcessAllResponse處理請求,此時便和Listener的處理類似了。
1 registerWrites(); 2 int keyCt = writeSelector.select(purgeTimeout); 3 if (keyCt == 0) { 4 continue; 5 } 6 7 Set<SelectionKey> keys = writeSelector.selectedKeys(); 8 Iterator<SelectionKey> iter = keys.iterator(); 9 while (iter.hasNext()) { 10 SelectionKey key = iter.next(); 11 iter.remove(); 12 try { 13 if (key.isValid() && key.isWritable()) { 14 doAsyncWrite(key); 15 }
3 小結
本文結合代碼了解了rpcserver的listener,reader,scheduler以及responder處理rpc請求的過程。對server端處理rpc請求有了一個較為清晰的認識。接下來會對client端的rpc請求邏輯做一個梳理,加油!