如果在工作當中需要限制每個服務要求有個最大連接限制,比如最大連接限制為1000,當前連接數超過1000則超出的部分直接拒絕。
如何通過netty實現呢?可以先理一下思路。
首先Netty的線程模型是基於主從 Reactors 多線程模型,其中主從 Reactor 多線程模型有多個 Reactor各司其職,如下:
- MainReactor 負責客戶端的連接請求,並將請求轉交給 SubReactor
- SubReactor 負責相應通道的 IO 讀寫請求
- 非 IO 請求(具體邏輯處理)的任務則會直接寫入隊列,等待 worker threads 進行處理
那這里就可以通過自定義一個ConnectionLimitHandler實現連接數限制,然后將其注冊到MainReactor上,每次處理連接請求的時候,即可判斷是否超過設置的最大連接數,超過則拒絕訪問並記錄日志。
那么需求明確了,我們可以先設計一下,代碼大概需要做什么,需要哪些邏輯
- 首先需要繼承ChannelInboundHandlerAdapter,並且重寫channelRead方法
- 需要傳入一個最大連接數maxConnectionNum用於設置最大連接數配置
- 通過一個Set
保存MainReactor處理連接請求的Channel - 通過numConnections來計數總連接數
- 通過numDroppedConnections來計數拒絕的連接數
- 通過loggingScheduled來判斷是否需要記錄連接拒絕的日志
代碼如下:
/**
* @author i1619kHz
*/
@Sharable
final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ConnectionLimitHandler.class);
private final int maxConnectionNum;
private final AtomicLong numConnections = new AtomicLong(0);
private final LongAdder numDroppedConnections = new LongAdder();
private final AtomicBoolean loggingScheduled = new AtomicBoolean(false);
private final Set<Channel> childChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
ConnectionLimitHandler(int maxConnectionNums) {
this.maxConnectionNum = maxConnectionNums;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = (Channel) msg;
long conn = numConnections.incrementAndGet();
if (conn > 0 && conn <= maxConnectionNum) {
this.childChannels.add(channel);
channel.closeFuture().addListener(future -> {
childChannels.remove(channel);
numConnections.decrementAndGet();
});
super.channelRead(ctx, msg);
} else {
numConnections.decrementAndGet();
// Set linger option to 0 so that the server doesn't get too many TIME_WAIT states.
channel.config().setOption(ChannelOption.SO_LINGER, 0);
channel.unsafe().closeForcibly();
numDroppedConnections.increment();
if (loggingScheduled.compareAndSet(false, true)) {
ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS);
}
}
}
private void writeNumDroppedConnectionsLog() {
loggingScheduled.set(false);
final long dropped = numDroppedConnections.sumThenReset();
if (dropped > 0) {
log.warn("Dropped {} connection(s) to limit the number of open connections to {}",
dropped, maxConnectionNum);
}
}
public int getMaxConnectionNum() {
return maxConnectionNum;
}
public AtomicLong getNumConnections() {
return numConnections;
}
public LongAdder getNumDroppedConnections() {
return numDroppedConnections;
}
public Set<Channel> getChildChannels() {
return childChannels;
}
}
然后通過ServerBootstrap注冊到MainReactor上
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.handler(new ConnectionLimitHandler(Integer.MAX_VALUE));
如果需要獲取到ConnectionLimitHandler中的數據,比如當前的連接數,Channel,拒絕的連接數等,可以通過ServerBootstrapConfig獲取到注冊的Handler
ConnectionLimitHandler handler = (ConnectionLimitHandler) serverBootstrap.config().handler();
Set<Channel> childChannels = handler.getChildChannels();
int maxConnectionNum = handler.getMaxConnectionNum();
LongAdder numDroppedConnections = handler.getNumDroppedConnections();
然后通過獲取的handler對象進行獲取