Netty最大連接數限制


如果在工作當中需要限制每個服務要求有個最大連接限制,比如最大連接限制為1000,當前連接數超過1000則超出的部分直接拒絕。

如何通過netty實現呢?可以先理一下思路。

首先Netty的線程模型是基於主從 Reactors 多線程模型,其中主從 Reactor 多線程模型有多個 Reactor各司其職,如下:

  1. MainReactor 負責客戶端的連接請求,並將請求轉交給 SubReactor
  2. SubReactor 負責相應通道的 IO 讀寫請求
  3. 非 IO 請求(具體邏輯處理)的任務則會直接寫入隊列,等待 worker threads 進行處理

那這里就可以通過自定義一個ConnectionLimitHandler實現連接數限制,然后將其注冊到MainReactor上,每次處理連接請求的時候,即可判斷是否超過設置的最大連接數,超過則拒絕訪問並記錄日志。

那么需求明確了,我們可以先設計一下,代碼大概需要做什么,需要哪些邏輯

  1. 首先需要繼承ChannelInboundHandlerAdapter,並且重寫channelRead方法
  2. 需要傳入一個最大連接數maxConnectionNum用於設置最大連接數配置
  3. 通過一個Set 保存MainReactor處理連接請求的Channel
  4. 通過numConnections來計數總連接數
  5. 通過numDroppedConnections來計數拒絕的連接數
  6. 通過loggingScheduled來判斷是否需要記錄連接拒絕的日志

代碼如下:

/**
 * @author i1619kHz
 */
@Sharable
final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
  private static final Logger log = LoggerFactory.getLogger(ConnectionLimitHandler.class);
  private final int maxConnectionNum;
  private final AtomicLong numConnections = new AtomicLong(0);
  private final LongAdder numDroppedConnections = new LongAdder();
  private final AtomicBoolean loggingScheduled = new AtomicBoolean(false);

  private final Set<Channel> childChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
  ConnectionLimitHandler(int maxConnectionNums) {
    this.maxConnectionNum = maxConnectionNums;
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel channel = (Channel) msg;
    long conn = numConnections.incrementAndGet();
    if (conn > 0 && conn <= maxConnectionNum) {
      this.childChannels.add(channel);
      channel.closeFuture().addListener(future -> {
        childChannels.remove(channel);
        numConnections.decrementAndGet();
      });
      super.channelRead(ctx, msg);
    } else {
      numConnections.decrementAndGet();
      // Set linger option to 0 so that the server doesn't get too many TIME_WAIT states.
      channel.config().setOption(ChannelOption.SO_LINGER, 0);
      channel.unsafe().closeForcibly();
      numDroppedConnections.increment();
      if (loggingScheduled.compareAndSet(false, true)) {
        ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS);
      }
    }
  }

  private void writeNumDroppedConnectionsLog() {
    loggingScheduled.set(false);
    final long dropped = numDroppedConnections.sumThenReset();
    if (dropped > 0) {
      log.warn("Dropped {} connection(s) to limit the number of open connections to {}",
          dropped, maxConnectionNum);
    }
  }

  public int getMaxConnectionNum() {
    return maxConnectionNum;
  }

  public AtomicLong getNumConnections() {
    return numConnections;
  }

  public LongAdder getNumDroppedConnections() {
    return numDroppedConnections;
  }

  public Set<Channel> getChildChannels() {
    return childChannels;
  }
}

然后通過ServerBootstrap注冊到MainReactor上

final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.handler(new ConnectionLimitHandler(Integer.MAX_VALUE));

如果需要獲取到ConnectionLimitHandler中的數據,比如當前的連接數,Channel,拒絕的連接數等,可以通過ServerBootstrapConfig獲取到注冊的Handler

ConnectionLimitHandler handler = (ConnectionLimitHandler) serverBootstrap.config().handler();
Set<Channel> childChannels = handler.getChildChannels();
int maxConnectionNum = handler.getMaxConnectionNum();
LongAdder numDroppedConnections = handler.getNumDroppedConnections();

然后通過獲取的handler對象進行獲取


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM