簡介
channel是netty中數據傳輸和數據處理的渠道,也是netty程序中不可或缺的一環。在netty中channel是一個接口,針對不同的數據類型或者協議channel會有具體的不同實現。
雖然channel很重要,但是在代碼中確實很神秘,基本上我們很少能夠看到直接使用channel的情況,那么事實真的如此嗎?和channel相關的ChannelGroup又有什么作用呢?一起來看看吧。
神龍見首不見尾的channel
其實netty的代碼是有固定的模板的,首先根據是server端還是client端,然后創建對應的Bootstrap和ServerBootstrap。然后給這個Bootstrap配置對應的group方法。然后為Bootstrap配置channel和handler,最后啟動Bootstrap即可。
這樣一個標准的netty程序就完成了。你需要做的就是為其挑選合適的group、channel和handler。
我們先看一個最簡單的NioServerSocketChannel的情況:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChatServerInitializer());
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
這里,我們將NioServerSocketChannel設置為ServerBootstrap的channel。
這樣就完了嗎?channel到底是在哪里用到的呢?
別急,我們仔細看一下try block中的最后一句:
b.bind(PORT).sync().channel().closeFuture().sync();
b.bind(PORT).sync()實際上返回了一個ChannelFuture對象,通過調用它的channel方法,就返回了和它關聯的Channel對象。
然后我們調用了channel.closeFuture()方法。closeFuture方法會返回一個ChannelFuture對象,這個對象將會在channel關閉的時候收到通知。
而sync方法會實現同步阻塞,一直等到channel關閉為止,從而進行后續的eventGroup的shutdown操作。
在ServerBootstrap中構建模板中,channel其實有兩個作用,第一個作用是指定ServerBootstrap的channel,第二個作用就是通過channel獲取到channel關閉的事件,最終關閉整個netty程序。
雖然我們基本上看不到channel的直接方法調用,但是channel毋庸置疑,它就是netty的靈魂。
接下來我們再看一下具體消息處理的handler的基本操作:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// channel活躍
ctx.write("Channel Active狀態!\r\n");
ctx.flush();
}
通常如果需要在handler中向channel寫入數據,我們調用的是ChannelHandlerContext的write方法。這個方法和channel有什么關系呢?
首先write方法是ChannelOutboundInvoker接口中的方法,而ChannelHandlerContext和Channel都繼承了ChannelOutboundInvoker接口,也就是說,ChannelHandlerContext和Channel都有write方法:
ChannelFuture write(Object msg);
因為這里我們使用的是NioServerSocketChannel,所以我們來具體看一下NioServerSocketChannel中write的實現。
經過檢查代碼我們會發現NioServerSocketChannel繼承自AbstractNioMessageChannel,AbstractNioMessageChannel繼承自AbstractNioChannel,AbstractNioChannel繼承自AbstractChannel,而這個write方法就是AbstractChannel中實現的:
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
Channel的write方法,實際上調用了pipeline的write方法。下面是pipeLine中的write方法:
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
這里的tail是一個AbstractChannelHandlerContext對象。
這樣我們就得出了這樣的結論:channel中的write方法最終實際上調用的是ChannelHandlerContext中的write方法。
所以上面的:
ctx.write("Channel Active狀態!\r\n");
實際上可以直接從channel中調用:
Channel ch = b.bind(0).sync().channel();
// 將消息寫入channel中
ch.writeAndFlush("Channel Active狀態!\r\n").sync();
channel和channelGroup
channel是netty的靈魂,對於Bootstrap來說,要獲取到對應的channel,可以通過調用:
b.bind(PORT).sync().channel()
來得到,從上面代碼中我們也可以看到一個Bootstrap只會對應一個channel。
channel中有一個parent()方法,用來返回它的父channel,所以channel是有層級結構的,
我們再來看一下channelGroup的定義:
public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup>
可以看到ChannelGroup實際上是Channel的集合。ChannelGroup用來將類似的Channel構建成集合,從而可以對多個channel進行統一的管理。
可以能有小伙伴要問了,一個Bootstrap不是只對應一個channel嗎?那么哪里來的channel的集合?
事實上,在一些復雜的程序中,我們可能啟動多個Bootstrap來處理不同的業務,所以相應的就會有多個channel。
如果創建的channel過多,並且這些channel又是很同質化的時候,就有需求對這些channel進行統一管理。這時候就需要用到channelGroup了。
channelGroup的基本使用
先看下channelGroup的基本使用,首先是創建一個channelGroup:
ChannelGroup recipients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
有了channelGroup之后,可以調用add方法,向其中添加不同的channel:
recipients.add(channelA);
recipients.add(channelB);
並且還可以統一向這些channel中發送消息:
recipients.write(Unpooled.copiedBuffer(
"這是從channelGroup中發出的統一消息.",
CharsetUtil.UTF_8));
基本上channelGroup提供了write,flush,flushAndWrite,writeAndFlush,disconnect,close,newCloseFuture等功能,用於對集合中的channel進行統一管理。
如果你有多個channel,那么可以考慮使用channelGroup。
另外channelGroup還有一些特性,我們來詳細了解一下。
將關閉的channel自動移出
ChannelGroup是一個channel的集合,當然我們只希望保存open狀態的channel,如果是close狀態的channel,還要手動從ChannelGroup中移出的話實在是太麻煩了。
所以在ChannelGroup中,如果一個channel被關閉了,那么它會自動從ChannelGroup中移出,這個功能是怎么實現的呢?
先來看下channelGroup的add方法:
public boolean add(Channel channel) {
ConcurrentMap<ChannelId, Channel> map =
channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = map.putIfAbsent(channel.id(), channel) == null;
if (added) {
channel.closeFuture().addListener(remover);
}
if (stayClosed && closed) {
channel.close();
}
return added;
}
可以看到,在add方法中,為channel區分了是server channel還是非server channel。然后根據channel id將其存入ConcurrentMap中。
如果添加成功,則給channel添加了一個closeFuture的回調。當channel被關閉的時候,會去調用這個remover方法:
private final ChannelFutureListener remover = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
remove(future.channel());
}
};
remover方法會將channel從serverChannels或者nonServerChannels中移出。從而保證ChannelGroup中只保存open狀態的channel。
同時關閉serverChannel和acceptedChannel
雖然 ServerBootstrap的bind方法只會返回一個channel,但是對於server來說,可以有多個worker EventLoopGroup,所以當客戶端和服務器端建立連接之后建立的accepted Channel是server channel的子channel。
也就是說一個服務器端有一個server channel和多個accepted channel。
那么如果我們想要同時關閉這些channel的話, 就可以使用ChannelGroup的close方法。
因為如果Server channel和非Server channel在同一個ChannelGroup的話,所有的IO命令都會先發給server channel,然后才會發給非server channel。
所以我們可以將Server channel和非Server channel統統加入同一個ChannelGroup中,在程序的最后,統一調用ChannelGroup的close方法,從而達到該目的:
ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static void main(String[] args) throws Exception {
ServerBootstrap b = new ServerBootstrap(..);
...
b.childHandler(new MyHandler());
// 啟動服務器
b.getPipeline().addLast("handler", new MyHandler());
Channel serverChannel = b.bind(..).sync();
allChannels.add(serverChannel);
... 等待shutdown指令 ...
// 關閉serverChannel 和所有的 accepted connections.
allChannels.close().awaitUninterruptibly();
}
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 將accepted channel添加到allChannels中
allChannels.add(ctx.channel());
super.channelActive(ctx);
}
}
ChannelGroupFuture
另外,和channel一樣,channelGroup的操作都是異步的,它會返回一個ChannelGroupFuture對象。
我們看下ChannelGroupFuture的定義:
public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture>
可以看到ChannelGroupFuture是一個Future,同時它也是一個ChannelFuture的遍歷器,可以遍歷ChannelGroup中所有channel返回的ChannelFuture。
同時ChannelGroupFuture提供了isSuccess,isPartialSuccess,isPartialFailure等方法判斷命令是否部分成功。
ChannelGroupFuture還提供了addListener方法用來監聽具體的事件。
總結
channel是netty的核心,當我們有多個channel不便進行管理的時候,就可以使用channelGroup進行統一管理。
本文已收錄於 http://www.flydean.com/04-1-netty-channel-group/
最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!