Netty 框架學習 —— 預置的 ChannelHandler 和編解碼器



Netty 為許多提供了許多預置的編解碼器和處理器,幾乎可以開箱即用,減少了在煩瑣事務上話費的時間和精力


空閑的連接和超時

檢測空閑連接以及超時對於釋放資源來說至關重要,Netty 特地為它提供了幾個 ChannelHandler 實現

名稱 描述
IdleStateHandler 當連接空閑時間太長時,將會觸發一個 IdleStateEvent 事件,然后,你可以通過在 ChannelInboundHandler 重寫 userEventTriggered() 方法來處理該 IdleStateEvent 事件
ReadTimeoutHandler 如果在指定的時間間隔內沒有收到入站數據,則拋出一個 ReadTimeoutException 並關閉對應的 Channel。可以通過重寫你的 ChannelHandler 中的 exceptionCaught() 方法來檢測該 ReadTimeoutException
WriteTimeoutHandler 如果在指定的時間間隔內沒有出站數據寫入,則拋出一個 WriteTimeoutException 並關閉對應的 Channel。可以通過重寫你的 ChannelHandler 中的 exceptionCaught() 方法來檢測該 WriteTimeoutException

下述代碼展示了當使用通常的發送心跳消息到遠程節點的方法時,如果 60 秒內沒有接收或者發送任何數據,我們將得到通知,如果沒有響應,則連接會關閉

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // IdleStateHandler 將在被觸發時發送一個 IdleStateEvent 事件
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        // 將一個 HeartbeatHandler 添加到 ChannelPipeline
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends SimpleChannelInboundHandler {

        // 發送到遠程節點的心跳消息
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
                .unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // 發送心跳消息,並在發送失敗時關閉該連接
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
            
        }
    }
}

解碼基於分隔符的協議

基於分隔符的消息協議使用定義的字符來標記消息或者消息段的開頭或者結尾,下表列出的解碼器能幫助你定義可以提取由任意標記序列分隔的幀的自定義解碼器

名稱 描述
DelimiterBasedFrameDecoder 使用由用戶提供的分隔符來提取幀
LineBasedFrameDecoder 由行尾符(\n 或者 \r\n)分隔幀

下述代碼展示了如何使用 LineBasedFrameDecoder 處理由行尾符分隔的幀

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 該 LineBasedFrameDecoder 將提取的幀轉發給下一個 ChannelInboundHandler
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        // 添加 FrameHandler 以接收幀
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something
        }
    }
}

如果你還使用除了行尾符之外的分隔符來分隔幀,那么你還可以使用 DelimiterBasedFrameDecoder,只需要將特定的分隔符序列指定到其構造函數即可

作為示例,我們將使用下面的協議規范:

  • 傳入數據流是一系列的幀,每個幀都由換行符 \n 來分隔
  • 每個幀都由一系列元素組成,每個元素由單個空格字符分隔
  • 一個幀的內容代表一個命令,定義為一個命令名稱后跟着數目可變的參數

基於這個協議,我們的自定義解碼器將定義以下類:

  • Cmd —— 將幀的命令存儲在 ByteBuf 中,一個 ByteBuf 用於名稱,另一個用於參數
  • CmdDecoder —— 從被重寫了的 decode() 方法中獲取一行字符串,從它的內容構建一個 Cmd 實例
  • CmdHandler —— 從 CmdDecoder 獲取解碼的 Cmd 對象,並對它進行一些處理
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {

    static final byte SPACE = (byte) ' ';

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new CmdDecoder(64 * 1024));
        pipeline.addLast(new CmdHandler());
    }

    /**
     * Cmd POJO
     */
    public static final class Cmd {

        private final ByteBuf name;
        private final ByteBuf args;

        public Cmd(ByteBuf name, ByteBuf args) {
            this.name = name;
            this.args = args;
        }

        public ByteBuf getArgs() {
            return args;
        }

        public ByteBuf getName() {
            return name;
        }
    }

    public static final class CmdDecoder extends LineBasedFrameDecoder {

        public CmdDecoder(int maxLength) {
            super(maxLength);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            // 從 ByteBuf 中提取由行尾符序列分隔的幀
            ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
            // 如果輸入中沒有幀,則返回 null
            if (frame == null) {
                return null;
            }
            // 查找第一個空格字符的索引,前面是命令名稱,后面是參數
            int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
            // 使用包含命令名稱和參數的切片創建新的 Cmd 對象
            return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));
        }
    }

    public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {
            // 處理傳經 ChannelPipeline 的 Cmd 對象
        }
    }
}

基於長度的協議

基於長度的協議通過將它的長度編碼到幀的頭部來定義幀,而不是使用特殊的分隔符來標記它的結束,下表列出 Netty 提供的用於處理這種類型的協議的兩種解碼器

名稱 描述
FixedLengthFrameDecoder 提取在調用構造函數時指定的定長幀
LengthFieldBasedFrameDecoder 根據幀頭部中的長度值來提取幀:該字段的偏移量以及長度在構造函數中指定

你經常會遇到被編碼到消息頭部的幀大小不是固定值的協議,為了處理這種變長幀,你可以使用 LengthFieldBasedFrameDecoder,它將從頭部字段確定幀長,然后從數據流中提取指定的字節數

下圖展示了一個示例,其中長度字段在幀中的偏移量為 0,並且長度為 2 字節

下述代碼展示了如何使用其 3 個構造函數分別為 maxFrameLength、lengthFieldOffser 和 lengthFieldLength 的構造函數。在這個場景下,幀的長度被編碼到了幀起始的前 8 個字節中

public class LengthBasedInitializer extends ChannelInitializer<Channel> {

    /**
     * 使用 LengthFieldBasedFrameDecoder 解碼將幀長度編碼到幀起始的前 8 個字節中的消息
     */
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something
        }
    }
}

寫大型數據

因為網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。由於寫操作是非阻塞的,所以即時沒有寫出所有的數據,寫操作也會在完成時返回並通知 ChannelFuture。當這種情況發生時,如果仍然不停地寫入,就有內存耗盡的風險。所以在寫大型數據時,需要考慮處理遠程節點的連接是慢速連接的情況,這種情況會導致內存釋放的延遲。讓我們考慮下將一個文件內容寫出到網絡的情況

NIO 的零拷貝特性,這種特性消除了將文件的內容從文件系統移動到網絡棧的復制過程。所有這一切都發生在 Netty 的核心中,所以應用程序需要做的就是使用一個 FileRegion 接口的實現

下述代碼展示了如何通過從 FileInputStream 創建一個 DefaultFileRegion,並將其寫入 Channel

// 創建一個 FileInputStream
leInputStream in = new FileInputStream(file);
// 以該文件的的完整長度創建一個新的 DefaultFileRegion
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// 發送該 DefaultFileRegion,並注冊一個 ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        // 處理失敗
        if(!future.isSuccess()) {
            Throwable cause = future.cause();
            // do something
        }
    }
});

這個示例只適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理。在需要將數據從文件系統復制到用戶內存中時,可以使用 ChunkedWriteHandler,它支持異步寫大型數據流,而又不會導致大量的內存消耗

interface ChunkedInput<B> 中的類型參數 B 是 readChunk() 方法返回的類型。Netty 預置了該接口的四個實現,如表所示,每個都代表了一個將由 ChunkedWriteHandler 處理的不定長度的數據流

名稱 描述
ChunkedFile 從文件中逐塊獲取數據,當你的平台不支持零拷貝或者你需要轉換數據時使用
ChunkedNioFile 和 ChunkedFile 類似,只是它使用了 FileChannel
ChunkedStream 從 InputStream 中逐塊傳輸內容
ChunkedNioStream 從 ReadableByteChannel 中逐步傳輸內容

下述代碼說明了 ChunkedStream 的用法,它是實踐中最常用的實現。所示的類使用了一個 File 以及一個 SSLContext 進行實例化,當 initChannel() 方法被調用時,它將使用所示的 ChannelHandler 鏈初始化該 Channel

當 Channel 的狀態變為活動時,WriteStreamHandler 將會逐塊地把來自文件中的數據作為 ChunkedStream 寫入

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {

    private final File file;
    private final SslContext sslContext;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) {
        this.file = file;
        this.sslContext = sslContext;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslContext.newEngine(ch.alloc())));
        // 添加 ChunkedWriteHandler 以處理作為 ChunkedInput 傳入的數據
        pipeline.addLast(new ChunkedWriteHandler());
        // 一旦連接建立,WriteStreamHandler 就開始寫文件數據
        pipeline.addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends SimpleChannelInboundHandler<Channel> {

        /**
         * 當連接建立時,channelActive() 方法將使用 ChunkedInput 寫文件數據
         */
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Channel msg) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

序列化數據

JDK 提供了 ObjectOutputStream 和 ObjectInputStream,用於通過網絡對 POJO 的基本數據類型和圖進行序列化和反序列化。該 API 並不復雜,可以被應用於任何實現了 java.io.Serializable 接口的對象。但它的性能並不高效,在這一節,我們將看到 Netty 如何實現序列化

1. JDK 序列化

如果你的程序必須要和使用了 ObjectOutputStream 和 ObjectInputStream 的遠程節點交互,並且考慮兼容性,那么 JDK 序列化將是正確的選擇,下表列出了 Netty 提供的用於和 JDK 進行交互操作的序列化類

名稱 描述
CompatibleObjectDecoder 和使用 JDK 序列化的非基於 Netty 的遠程節點進行互操作的解碼器
CompatibleObjectEncoder 和使用 JDK 序列化的非基於 Netty 的遠程節點進行互操作的編碼器
ObjectDecoder 構建於 JDK 序列化之上的使用自定義的序列化來解碼的解碼器
ObjectEncoder 構建於 JDK 序列化之上的使用自定義的序列化來編碼的編碼器

2. Protocol Buffers 序列化

Protocol Buffers 是一種由 Google 公司開發的、開源的數據交換格式,以一種緊湊而高效的方式對結構化的數據進行編碼以及解碼,能跨多語言使用。下表展示了 Netty 為支持 Protobuf 所提供的 ChannelHandler 實現

名稱 描述
ProtobufDecoder 使用 Protobuf 對消息進行解碼
ProtobufEncoder 使用 Protobuf 對消息進行編碼
ProtobufVarint32FrameDecoder 根據消息中的 Google Protobuf Buffers 的 Base 128 Varints 整型長度字段值動態地分割所接收到的 ByteBuf
ProtobufVarint32LengthFieldPrepender 由 ByteBuf 前追加一個 Google Protobuf Buffers 的 Base 128 Varints 整型的長度字段值


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM