如果在工作当中需要限制每个服务要求有个最大连接限制,比如最大连接限制为1000,当前连接数超过1000则超出的部分直接拒绝。
如何通过netty实现呢?可以先理一下思路。
首先Netty的线程模型是基于主从 Reactors 多线程模型,其中主从 Reactor 多线程模型有多个 Reactor各司其职,如下:
- MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor
- SubReactor 负责相应通道的 IO 读写请求
- 非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理
那这里就可以通过自定义一个ConnectionLimitHandler实现连接数限制,然后将其注册到MainReactor上,每次处理连接请求的时候,即可判断是否超过设置的最大连接数,超过则拒绝访问并记录日志。
那么需求明确了,我们可以先设计一下,代码大概需要做什么,需要哪些逻辑
- 首先需要继承ChannelInboundHandlerAdapter,并且重写channelRead方法
- 需要传入一个最大连接数maxConnectionNum用于设置最大连接数配置
- 通过一个Set
保存MainReactor处理连接请求的Channel - 通过numConnections来计数总连接数
- 通过numDroppedConnections来计数拒绝的连接数
- 通过loggingScheduled来判断是否需要记录连接拒绝的日志
代码如下:
/**
* @author i1619kHz
*/
@Sharable
final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ConnectionLimitHandler.class);
private final int maxConnectionNum;
private final AtomicLong numConnections = new AtomicLong(0);
private final LongAdder numDroppedConnections = new LongAdder();
private final AtomicBoolean loggingScheduled = new AtomicBoolean(false);
private final Set<Channel> childChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
ConnectionLimitHandler(int maxConnectionNums) {
this.maxConnectionNum = maxConnectionNums;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = (Channel) msg;
long conn = numConnections.incrementAndGet();
if (conn > 0 && conn <= maxConnectionNum) {
this.childChannels.add(channel);
channel.closeFuture().addListener(future -> {
childChannels.remove(channel);
numConnections.decrementAndGet();
});
super.channelRead(ctx, msg);
} else {
numConnections.decrementAndGet();
// Set linger option to 0 so that the server doesn't get too many TIME_WAIT states.
channel.config().setOption(ChannelOption.SO_LINGER, 0);
channel.unsafe().closeForcibly();
numDroppedConnections.increment();
if (loggingScheduled.compareAndSet(false, true)) {
ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS);
}
}
}
private void writeNumDroppedConnectionsLog() {
loggingScheduled.set(false);
final long dropped = numDroppedConnections.sumThenReset();
if (dropped > 0) {
log.warn("Dropped {} connection(s) to limit the number of open connections to {}",
dropped, maxConnectionNum);
}
}
public int getMaxConnectionNum() {
return maxConnectionNum;
}
public AtomicLong getNumConnections() {
return numConnections;
}
public LongAdder getNumDroppedConnections() {
return numDroppedConnections;
}
public Set<Channel> getChildChannels() {
return childChannels;
}
}
然后通过ServerBootstrap注册到MainReactor上
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.handler(new ConnectionLimitHandler(Integer.MAX_VALUE));
如果需要获取到ConnectionLimitHandler中的数据,比如当前的连接数,Channel,拒绝的连接数等,可以通过ServerBootstrapConfig获取到注册的Handler
ConnectionLimitHandler handler = (ConnectionLimitHandler) serverBootstrap.config().handler();
Set<Channel> childChannels = handler.getChildChannels();
int maxConnectionNum = handler.getMaxConnectionNum();
LongAdder numDroppedConnections = handler.getNumDroppedConnections();
然后通过获取的handler对象进行获取