HBase-0.98.3 如何調整RPC連接的數量


一、背景

由於HBase版本從0.94.6遷移到0.98.3,使用了以前的HBase 配置,發現無論怎么調整參數hbase.regionserver.handler.count,都無法改變RPC Handler Tasks的個數。

后來通過閱讀源碼,才發現HBase RPC實現已經重寫了,參數的意義不同了,現在PRC Handler的數量由ipc.server.read.threadpool.size控制。

而hbase.regionserver.handler.count其實是server端處理request線程的個數,RPC Handler 與 Request Handler形成了生產者-消費者的關系。

看了下HBase的wiki,發現參數說明沒有修改:

hbase.regionserver.handler.count

Count of RPC Listener instances spun up on RegionServers. Same property is used by the Master for count of master handlers.

 

Default. 30

ipc.server.read.threadpool.size 不存在。

二、源代碼解析

HRegionServer

HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There are many HRegionServers in a single HBase deployment.

HRegionServer 維護了一個RPCServer實例,用於相應clients的各種請求。

An RPC server that hosts protobuf described Services. An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does total read off the channel and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done and loops till done.

Scheduler can be variously implemented but default simple scheduler has handlers to which it has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking while the server is up. CallRunner#run executes the call. When done, asks the included Call to put itself on new queue for Responder to pull from and return result to client.

RpcServer 實例持有一個Listener,Listener持有socket。每個Listener有固定數量的Readers,默認10個,Listener循環接受請求,選取Reader來讀取socketChannel中的request。每個Reader都要注冊到selector。Reader把讀取的Request解析成一個call,然后封裝成CallRunnner,加入scheduler維護的隊列里,等待運行。Handlers從scheduler維護的隊列里獲取CallRunnner。總之,scheduler起到了生產者-消費者隊列的作用。

1.RpcServer 構造函數

/**
   * Constructs a server listening on the named port and address.
   * @param serverInstance hosting instance of {@link Server}. We will do authentications if an
   * instance else pass null for no authentication check.
   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
   * @param services A list of services.
   * @param isa Where to listen
   * @param conf
   * @throws IOException
   */
  public RpcServer(final Server serverInstance, final String name,
      final List<BlockingServiceAndInterface> services,
      final InetSocketAddress isa, Configuration conf,
      RpcScheduler scheduler)
  throws IOException {
    this.serverInstance = serverInstance;
    this.services = services;
    this.isa = isa;
    this.conf = conf;
    this.socketSendBufferSize = 0;
    this.maxQueueSize =
      this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
    this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
    this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);

    // Start the listener here and let it bind to the port
    listener = new Listener(name);
    this.port = listener.getAddress().getPort();

    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
    this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);

    this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
    this.delayedCalls = new AtomicInteger(0);
    this.ipcUtil = new IPCUtil(conf);


    // Create the responder here
    responder = new Responder();
    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
    this.userProvider = UserProvider.instantiate(conf);
    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
    if (isSecurityEnabled) {
      HBaseSaslRpcServer.init(conf);
    }
    this.scheduler = scheduler;
    this.scheduler.init(new RpcSchedulerContext(this));
  }

2.Listener 構造函數

public Listener(final String name) throws IOException {
      super(name);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), isa, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();

     readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads,
        new ThreadFactoryBuilder().setNameFormat(
          "RpcServer.reader=%d,port=" + port).setDaemon(true).build());
      for (int i = 0; i < readThreads; ++i) {
        Reader reader = new Reader();
        readers[i] = reader;
        readPool.execute(reader);
      }
      LOG.info(getName() + ": started " + readThreads + " reader(s).");

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("RpcServer.listener,port=" + port);
      this.setDaemon(true);
    }

Listener維護ServerSocketChannel,(詳見NIO),根據參數ipc.server.read.threadpool.size,創建相應長度的Reader數組。要理解Listener和Reader的關系,就要搞懂ServerSocketChannel 和SocketChannel的關系。

Java NIO中的ServerSocketChannel是一個可以監聽新進來的TCP連接的通道。使用觀察者模式,有新的連接進來(事件是: acceptChannel.register(selector, SelectionKey.OP_ACCEPT);),Listener獲取到通知,執行doAccept()

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
      Connection c;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();

      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        try {
          channel.configureBlocking(false);
          channel.socket().setTcpNoDelay(tcpNoDelay);
          channel.socket().setKeepAlive(tcpKeepAlive);
        } catch (IOException ioe) {
          channel.close();
          throw ioe;
        }

        Reader reader = getReader();//從Reader 數組中選取一個Reader
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);// 同樣使用觀察者模式,在把Channel注冊進Reader的Selector。
          c = getConnection(channel, System.currentTimeMillis());
          readKey.attach(c);                                      // 將一個對象或者更多信息附着到SlectionKey上,這樣就能方便的識別某個給定的通道。
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": connection from " + c.toString() +
                "; # active connections: " + numConnections);
        } finally {
          reader.finishAdd();
        }
      }
    }

accept一個新的連接,把SocketChannel注冊進入一個Reader的Selector。因此,一個Reader可能維護着多個SocketChannel。Reader的Selector相應的事件:( channel.register(readSelector, SelectionKey.OP_READ);)。當ReaderSelector接受到OP_READER事件時,觀察者模式就起其相應的作用。

public void run() {
        try {
          doRunLoop();
        } finally {
          try {
            readSelector.close();
          } catch (IOException ioe) {
            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
          }
        }
      }

      private synchronized void doRunLoop() {//reader會一直執行這個循環,一直等待readSelector的通知。
        while (running) {
          SelectionKey key = null;
          try {
            readSelector.select();
            while (adding) {
              this.wait(1000);
            }

            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if (key.isReadable()) {
                  doRead(key);
                }
              }
              key = null;
            }
          } catch (InterruptedException e) {
            if (running) {                      // unexpected -- log it
              LOG.info(getName() + ": unexpectedly interrupted: " +
                StringUtils.stringifyException(e));
            }
          } catch (IOException ex) {
            LOG.error(getName() + ": error in Reader", ex);
          }
        }
      }
void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;
      }
      c.setLastContact(System.currentTimeMillis());
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        throw ieo;
      } catch (Exception e) {
        LOG.warn(getName() + ": count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
            " because read count=" + count +
            ". Number of active connections: " + numConnections);
        }
        closeConnection(c);
        // c = null;
      } else {
        c.setLastContact(System.currentTimeMillis());
      }
    }
/**
     * Read off the wire.
     * @return Returns -1 if failure (and caller will close connection) else return how many
     * bytes were read and processed
     * @throws IOException
     * @throws InterruptedException
     */
    public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
        // does, read in the rest of the connection preamble, the version and the auth method.
        // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
        // length into the 4-byte this.dataLengthBuffer.
        int count;
        if (this.dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, this.dataLengthBuffer);
          if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
            return count;
          }
        }
        // If we have not read the connection setup preamble, look to see if that is on the wire.
        if (!connectionPreambleRead) {
          // Check for 'HBas' magic.
          this.dataLengthBuffer.flip();
          if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
            return doBadPreambleHandling("Expected HEADER=" +
              Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
              " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
              " from " + toString());
          }
          // Now read the next two bytes, the version and the auth to use.
          ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
          count = channelRead(channel, versionAndAuthBytes);
          if (count < 0 || versionAndAuthBytes.remaining() > 0) {
            return count;
          }
          int version = versionAndAuthBytes.get(0);
          byte authbyte = versionAndAuthBytes.get(1);
          this.authMethod = AuthMethod.valueOf(authbyte);
          if (version != CURRENT_VERSION) {
            String msg = getFatalConnectionString(version, authbyte);
            return doBadPreambleHandling(msg, new WrongVersionException(msg));
          }
          if (authMethod == null) {
            String msg = getFatalConnectionString(version, authbyte);
            return doBadPreambleHandling(msg, new BadAuthException(msg));
          }
          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
            AccessControlException ae = new AccessControlException("Authentication is required");
            setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
            responder.doRespond(authFailedCall);
            throw ae;
          }
          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
            doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
                SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
            authMethod = AuthMethod.SIMPLE;
            // client has already sent the initial Sasl message and we
            // should ignore it. Both client and server should fall back
            // to simple auth from now on.
            skipInitialSaslHandshake = true;
          }
          if (authMethod != AuthMethod.SIMPLE) {
            useSasl = true;
          }
          connectionPreambleRead = true;
          // Preamble checks out. Go around again to read actual connection header.
          dataLengthBuffer.clear();
          continue;
        }
        // We have read a length and we have read the preamble.  It is either the connection header
        // or it is a request.
        if (data == null) {
          dataLengthBuffer.flip();
          int dataLength = dataLengthBuffer.getInt();
          if (dataLength == RpcClient.PING_CALL_ID) {
            if (!useWrap) { //covers the !useSasl too
              dataLengthBuffer.clear();
              return 0;  //ping message
            }
          }
          if (dataLength < 0) {
            throw new IllegalArgumentException("Unexpected data length "
                + dataLength + "!! from " + getHostAddress());
          }
          data = ByteBuffer.allocate(dataLength);
          incRpcCount();  // Increment the rpc count
        }
        count = channelRead(channel, data);
        if (count < 0) {
          return count;
        } else if (data.remaining() == 0) {
          dataLengthBuffer.clear();
          data.flip();
          if (skipInitialSaslHandshake) {
            data = null;
            skipInitialSaslHandshake = false;
            continue;
          }
          boolean headerRead = connectionHeaderRead;
          if (useSasl) {
            saslReadAndProcess(data.array());
          } else {
            processOneRpc(data.array());
          }
          this.data = null;
          if (!headerRead) {
            continue;
          }
        } else if (count > 0) {
          // We got some data and there is more to read still; go around again.
          if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
          continue;
        }
        return count;
      }
    }

processOneRpc(data.array()); 就是處理從SocketChannel讀取到的數據。

下面看怎么把數據封裝成RunCaller,然后加入Scheduler隊列的:

/**
     * @param buf Has the request header and the request param and optionally encoded data buffer
     * all in this one array.
     * @throws IOException
     * @throws InterruptedException
     */
    protected void processRequest(byte[] buf) throws IOException, InterruptedException {
      long totalRequestSize = buf.length;
      int offset = 0;
      // Here we read in the header.  We avoid having pb
      // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
      CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
      int headerSize = cis.readRawVarint32();
      offset = cis.getTotalBytesRead();
      RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
      offset += headerSize;
      int id = header.getCallId();
      if (LOG.isTraceEnabled()) {
        LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
          " totalRequestSize: " + totalRequestSize + " bytes");
      }
      // Enforcing the call queue size, this triggers a retry in the client
      // This is a bit late to be doing this check - we have already read in the total request.
      if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
        final Call callTooBig =
          new Call(id, this.service, null, null, null, null, this,
            responder, totalRequestSize, null);
        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
        setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
          "Call queue is full, is ipc.server.max.callqueue.size too small?");
        responder.doRespond(callTooBig);
        return;
      }
      MethodDescriptor md = null;
      Message param = null;
      CellScanner cellScanner = null;
      try {
        if (header.hasRequestParam() && header.getRequestParam()) {
          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
          if (md == null) throw new UnsupportedOperationException(header.getMethodName());
          Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
          // To read the varint, I need an inputstream; might as well be a CIS.
          cis = CodedInputStream.newInstance(buf, offset, buf.length);
          int paramSize = cis.readRawVarint32();
          offset += cis.getTotalBytesRead();
          if (builder != null) {
            param = builder.mergeFrom(buf, offset, paramSize).build();
          }
          offset += paramSize;
        }
        if (header.hasCellBlockMeta()) {
          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
            buf, offset, buf.length);
        }
      } catch (Throwable t) {
        String msg = "Unable to read call parameter from client " + getHostAddress();
        LOG.warn(msg, t);

        // probably the hbase hadoop version does not match the running hadoop version
        if (t instanceof LinkageError) {
          t = new DoNotRetryIOException(t);
        }
        // If the method is not present on the server, do not retry.
        if (t instanceof UnsupportedOperationException) {
          t = new DoNotRetryIOException(t);
        }

        final Call readParamsFailedCall =
          new Call(id, this.service, null, null, null, null, this,
            responder, totalRequestSize, null);
        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
        setupResponse(responseBuffer, readParamsFailedCall, t,
          msg + "; " + t.getMessage());
        responder.doRespond(readParamsFailedCall);
        return;
      }

      TraceInfo traceInfo = header.hasTraceInfo()
          ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
          : null;
      Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo);
      scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
    }

下面看

SimpleRpcScheduler 是如何管理任務隊列的

 

public SimpleRpcScheduler(
      Configuration conf,
      int handlerCount,// handlerCount 由參數hbase.regionserver.handler.count決定。因此
      int priorityHandlerCount,
      int replicationHandlerCount,
      PriorityFunction priority,
      int highPriorityLevel) {
    int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
    this.handlerCount = handlerCount;
    this.priorityHandlerCount = priorityHandlerCount;
    this.replicationHandlerCount = replicationHandlerCount;
    this.priority = priority;
    this.highPriorityLevel = highPriorityLevel;
    this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
    this.priorityCallQueue = priorityHandlerCount > 0
        ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
        : null;
    this.replicationQueue = replicationHandlerCount > 0
        ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
        : null;
  }
private void startHandlers(
      int handlerCount,
      final BlockingQueue<CallRunner> callQueue,
      String threadNamePrefix) {
    for (int i = 0; i < handlerCount; i++) {// 因此hbase.regionserver.handler.count,其實是處理call線程的數量,和rpc handler數量已沒有關系。兩者形成了生產者-消費者關系。
      Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
          consumerLoop(callQueue);
        }
      });
      t.setDaemon(true);
      t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
      t.start();
      handlers.add(t);
    }
  }
@Override
  public void dispatch(CallRunner callTask) throws InterruptedException {
    RpcServer.Call call = callTask.getCall();
    int level = priority.getPriority(call.header, call.param);
    if (priorityCallQueue != null && level > highPriorityLevel) {
      priorityCallQueue.put(callTask);
    } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
      replicationQueue.put(callTask);
    } else {
      callQueue.put(callTask); // queue the call; maybe blocked here
    }
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM