netty系列之:channel和channelGroup


簡介

channel是netty中數據傳輸和數據處理的渠道,也是netty程序中不可或缺的一環。在netty中channel是一個接口,針對不同的數據類型或者協議channel會有具體的不同實現。

雖然channel很重要,但是在代碼中確實很神秘,基本上我們很少能夠看到直接使用channel的情況,那么事實真的如此嗎?和channel相關的ChannelGroup又有什么作用呢?一起來看看吧。

神龍見首不見尾的channel

其實netty的代碼是有固定的模板的,首先根據是server端還是client端,然后創建對應的Bootstrap和ServerBootstrap。然后給這個Bootstrap配置對應的group方法。然后為Bootstrap配置channel和handler,最后啟動Bootstrap即可。

這樣一個標准的netty程序就完成了。你需要做的就是為其挑選合適的group、channel和handler。

我們先看一個最簡單的NioServerSocketChannel的情況:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChatServerInitializer());

            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

這里,我們將NioServerSocketChannel設置為ServerBootstrap的channel。

這樣就完了嗎?channel到底是在哪里用到的呢?

別急,我們仔細看一下try block中的最后一句:

b.bind(PORT).sync().channel().closeFuture().sync();

b.bind(PORT).sync()實際上返回了一個ChannelFuture對象,通過調用它的channel方法,就返回了和它關聯的Channel對象。

然后我們調用了channel.closeFuture()方法。closeFuture方法會返回一個ChannelFuture對象,這個對象將會在channel關閉的時候收到通知。

而sync方法會實現同步阻塞,一直等到channel關閉為止,從而進行后續的eventGroup的shutdown操作。

在ServerBootstrap中構建模板中,channel其實有兩個作用,第一個作用是指定ServerBootstrap的channel,第二個作用就是通過channel獲取到channel關閉的事件,最終關閉整個netty程序。

雖然我們基本上看不到channel的直接方法調用,但是channel毋庸置疑,它就是netty的靈魂。

接下來我們再看一下具體消息處理的handler的基本操作:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // channel活躍
        ctx.write("Channel Active狀態!\r\n");
        ctx.flush();
    }

通常如果需要在handler中向channel寫入數據,我們調用的是ChannelHandlerContext的write方法。這個方法和channel有什么關系呢?

首先write方法是ChannelOutboundInvoker接口中的方法,而ChannelHandlerContext和Channel都繼承了ChannelOutboundInvoker接口,也就是說,ChannelHandlerContext和Channel都有write方法:

ChannelFuture write(Object msg);

因為這里我們使用的是NioServerSocketChannel,所以我們來具體看一下NioServerSocketChannel中write的實現。

經過檢查代碼我們會發現NioServerSocketChannel繼承自AbstractNioMessageChannel,AbstractNioMessageChannel繼承自AbstractNioChannel,AbstractNioChannel繼承自AbstractChannel,而這個write方法就是AbstractChannel中實現的:

    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

Channel的write方法,實際上調用了pipeline的write方法。下面是pipeLine中的write方法:

    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

這里的tail是一個AbstractChannelHandlerContext對象。

這樣我們就得出了這樣的結論:channel中的write方法最終實際上調用的是ChannelHandlerContext中的write方法。

所以上面的:

ctx.write("Channel Active狀態!\r\n");

實際上可以直接從channel中調用:

Channel ch = b.bind(0).sync().channel();

// 將消息寫入channel中
ch.writeAndFlush("Channel Active狀態!\r\n").sync();

channel和channelGroup

channel是netty的靈魂,對於Bootstrap來說,要獲取到對應的channel,可以通過調用:

b.bind(PORT).sync().channel()

來得到,從上面代碼中我們也可以看到一個Bootstrap只會對應一個channel。

channel中有一個parent()方法,用來返回它的父channel,所以channel是有層級結構的,

我們再來看一下channelGroup的定義:

public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> 

可以看到ChannelGroup實際上是Channel的集合。ChannelGroup用來將類似的Channel構建成集合,從而可以對多個channel進行統一的管理。

可以能有小伙伴要問了,一個Bootstrap不是只對應一個channel嗎?那么哪里來的channel的集合?

事實上,在一些復雜的程序中,我們可能啟動多個Bootstrap來處理不同的業務,所以相應的就會有多個channel。

如果創建的channel過多,並且這些channel又是很同質化的時候,就有需求對這些channel進行統一管理。這時候就需要用到channelGroup了。

channelGroup的基本使用

先看下channelGroup的基本使用,首先是創建一個channelGroup:

ChannelGroup recipients =
           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

有了channelGroup之后,可以調用add方法,向其中添加不同的channel:

   recipients.add(channelA);
   recipients.add(channelB);

並且還可以統一向這些channel中發送消息:

recipients.write(Unpooled.copiedBuffer(
           "這是從channelGroup中發出的統一消息.",
           CharsetUtil.UTF_8));

基本上channelGroup提供了write,flush,flushAndWrite,writeAndFlush,disconnect,close,newCloseFuture等功能,用於對集合中的channel進行統一管理。

如果你有多個channel,那么可以考慮使用channelGroup。

另外channelGroup還有一些特性,我們來詳細了解一下。

將關閉的channel自動移出

ChannelGroup是一個channel的集合,當然我們只希望保存open狀態的channel,如果是close狀態的channel,還要手動從ChannelGroup中移出的話實在是太麻煩了。

所以在ChannelGroup中,如果一個channel被關閉了,那么它會自動從ChannelGroup中移出,這個功能是怎么實現的呢?

先來看下channelGroup的add方法:

   public boolean add(Channel channel) {
        ConcurrentMap<ChannelId, Channel> map =
            channel instanceof ServerChannel? serverChannels : nonServerChannels;

        boolean added = map.putIfAbsent(channel.id(), channel) == null;
        if (added) {
            channel.closeFuture().addListener(remover);
        }

        if (stayClosed && closed) {
            channel.close();
        }

        return added;
    }

可以看到,在add方法中,為channel區分了是server channel還是非server channel。然后根據channel id將其存入ConcurrentMap中。

如果添加成功,則給channel添加了一個closeFuture的回調。當channel被關閉的時候,會去調用這個remover方法:

private final ChannelFutureListener remover = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            remove(future.channel());
        }
    };

remover方法會將channel從serverChannels或者nonServerChannels中移出。從而保證ChannelGroup中只保存open狀態的channel。

同時關閉serverChannel和acceptedChannel

雖然 ServerBootstrap的bind方法只會返回一個channel,但是對於server來說,可以有多個worker EventLoopGroup,所以當客戶端和服務器端建立連接之后建立的accepted Channel是server channel的子channel。

也就是說一個服務器端有一個server channel和多個accepted channel。

那么如果我們想要同時關閉這些channel的話, 就可以使用ChannelGroup的close方法。

因為如果Server channel和非Server channel在同一個ChannelGroup的話,所有的IO命令都會先發給server channel,然后才會發給非server channel。

所以我們可以將Server channel和非Server channel統統加入同一個ChannelGroup中,在程序的最后,統一調用ChannelGroup的close方法,從而達到該目的:

   ChannelGroup allChannels =
           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  
   public static void main(String[] args) throws Exception {
       ServerBootstrap b = new ServerBootstrap(..);
       ...
       b.childHandler(new MyHandler());
  
       // 啟動服務器
       b.getPipeline().addLast("handler", new MyHandler());
       Channel serverChannel = b.bind(..).sync();
       allChannels.add(serverChannel);
  
       ... 等待shutdown指令 ...
  
       // 關閉serverChannel 和所有的 accepted connections.
       allChannels.close().awaitUninterruptibly();
   }
  
   public class MyHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           // 將accepted channel添加到allChannels中
           allChannels.add(ctx.channel());
           super.channelActive(ctx);
       }
   }

ChannelGroupFuture

另外,和channel一樣,channelGroup的操作都是異步的,它會返回一個ChannelGroupFuture對象。

我們看下ChannelGroupFuture的定義:

public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture>

可以看到ChannelGroupFuture是一個Future,同時它也是一個ChannelFuture的遍歷器,可以遍歷ChannelGroup中所有channel返回的ChannelFuture。

同時ChannelGroupFuture提供了isSuccess,isPartialSuccess,isPartialFailure等方法判斷命令是否部分成功。

ChannelGroupFuture還提供了addListener方法用來監聽具體的事件。

總結

channel是netty的核心,當我們有多個channel不便進行管理的時候,就可以使用channelGroup進行統一管理。

本文已收錄於 http://www.flydean.com/04-1-netty-channel-group/

最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM