Netty最大连接数限制


如果在工作当中需要限制每个服务要求有个最大连接限制,比如最大连接限制为1000,当前连接数超过1000则超出的部分直接拒绝。

如何通过netty实现呢?可以先理一下思路。

首先Netty的线程模型是基于主从 Reactors 多线程模型,其中主从 Reactor 多线程模型有多个 Reactor各司其职,如下:

  1. MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor
  2. SubReactor 负责相应通道的 IO 读写请求
  3. 非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理

那这里就可以通过自定义一个ConnectionLimitHandler实现连接数限制,然后将其注册到MainReactor上,每次处理连接请求的时候,即可判断是否超过设置的最大连接数,超过则拒绝访问并记录日志。

那么需求明确了,我们可以先设计一下,代码大概需要做什么,需要哪些逻辑

  1. 首先需要继承ChannelInboundHandlerAdapter,并且重写channelRead方法
  2. 需要传入一个最大连接数maxConnectionNum用于设置最大连接数配置
  3. 通过一个Set 保存MainReactor处理连接请求的Channel
  4. 通过numConnections来计数总连接数
  5. 通过numDroppedConnections来计数拒绝的连接数
  6. 通过loggingScheduled来判断是否需要记录连接拒绝的日志

代码如下:

/**
 * @author i1619kHz
 */
@Sharable
final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
  private static final Logger log = LoggerFactory.getLogger(ConnectionLimitHandler.class);
  private final int maxConnectionNum;
  private final AtomicLong numConnections = new AtomicLong(0);
  private final LongAdder numDroppedConnections = new LongAdder();
  private final AtomicBoolean loggingScheduled = new AtomicBoolean(false);

  private final Set<Channel> childChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
  ConnectionLimitHandler(int maxConnectionNums) {
    this.maxConnectionNum = maxConnectionNums;
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel channel = (Channel) msg;
    long conn = numConnections.incrementAndGet();
    if (conn > 0 && conn <= maxConnectionNum) {
      this.childChannels.add(channel);
      channel.closeFuture().addListener(future -> {
        childChannels.remove(channel);
        numConnections.decrementAndGet();
      });
      super.channelRead(ctx, msg);
    } else {
      numConnections.decrementAndGet();
      // Set linger option to 0 so that the server doesn't get too many TIME_WAIT states.
      channel.config().setOption(ChannelOption.SO_LINGER, 0);
      channel.unsafe().closeForcibly();
      numDroppedConnections.increment();
      if (loggingScheduled.compareAndSet(false, true)) {
        ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS);
      }
    }
  }

  private void writeNumDroppedConnectionsLog() {
    loggingScheduled.set(false);
    final long dropped = numDroppedConnections.sumThenReset();
    if (dropped > 0) {
      log.warn("Dropped {} connection(s) to limit the number of open connections to {}",
          dropped, maxConnectionNum);
    }
  }

  public int getMaxConnectionNum() {
    return maxConnectionNum;
  }

  public AtomicLong getNumConnections() {
    return numConnections;
  }

  public LongAdder getNumDroppedConnections() {
    return numDroppedConnections;
  }

  public Set<Channel> getChildChannels() {
    return childChannels;
  }
}

然后通过ServerBootstrap注册到MainReactor上

final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.handler(new ConnectionLimitHandler(Integer.MAX_VALUE));

如果需要获取到ConnectionLimitHandler中的数据,比如当前的连接数,Channel,拒绝的连接数等,可以通过ServerBootstrapConfig获取到注册的Handler

ConnectionLimitHandler handler = (ConnectionLimitHandler) serverBootstrap.config().handler();
Set<Channel> childChannels = handler.getChildChannels();
int maxConnectionNum = handler.getMaxConnectionNum();
LongAdder numDroppedConnections = handler.getNumDroppedConnections();

然后通过获取的handler对象进行获取


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM