Netty 系列七(那些開箱即用的 ChannelHandler).


一、前言

    Netty 為許多通用協議提供了編解碼器和處理器,幾乎可以開箱即用, 這減少了你在那些相當繁瑣的事務上本來會花費的時間與精力。另外,這篇文章中,就不涉及 Netty 對 WebSocket協議 的支持了,因為涉及的篇幅有點大,會在下一篇文章做一個具體的介紹。

二、SSL 協議

    SSL 協議是安全協議,層疊在其他協議之上。為了支持 SSL/TLS, Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 類使得實現解密和加密相當簡單直接。 Netty 通過一個名為 SslHandler 的 ChannelHandler 實現利用了這個 API, 其中 SslHandler 在內部使用 SSLEngine 來完成實際的工作。下圖描述的是 SslHandler 的數據流。

    

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ByteBufAllocator byteBufAllocator = ch.alloc();
        //對於每個 SslHandler 實例,都使用 Channel 的 ByteBufAllocator 從 SslContext 獲取一個新的 SSLEngine
        SSLEngine sslEngine = context.newEngine(byteBufAllocator);
        //服務器端模式,客戶端模式設置為true
        sslEngine.setUseClientMode(false);
        //不需要驗證客戶端,客戶端不設置該項
        sslEngine.setNeedClientAuth(false);
        //要將 SslHandler 設置為第一個 ChannelHandler。這確保了只有在所有其他的 ChannelHandler 將他們的邏輯應用到數據之后,才會進行加密。
        //startTls 如果為true,第一個寫入的消息將不會被加密(客戶端應該設置為true)
        ch.pipeline().addFirst("ssl",new SslHandler(sslEngine, startTls));
    }

tips:對於 ChannelPipeline 鏈中 ChannelHandler 執行的順序 —— 入站事件順序執行、出站事件逆序執行。

三、HTTP 協議

    HTTP 是基於請求/響應模式的:客戶端向服務器發送一個 HTTP 請求,然后服務器將會返回一個 HTTP 響應。 下圖展示了 Netty 中 HTTP請求和響應的組成部分:

        

    Netty 對 HTTP 協議的支持主要提供了以下 ChannelHandler:

HttpResponseDecoder:解碼器,用於客戶端,解碼來自服務端的響應。
HttpRequestEncoder:編碼器,用戶客戶端,編碼向服務端發送的請求。
HttpRequestDecoder:解碼器,用於服務端,解碼來自客戶端的請求。
HttpResponseEncoder:編碼器,用於服務端,編碼向客戶端的響應。
HttpClientCodec:編解碼器,用戶客戶端,效果等於 HttpResponseDecoder + HttpRequestEncoder。
HttpServerCodec:編解碼器,用戶服務端,效果等於 HttpRequestDecoder + HttpResponseEncoder。
HttpObjectAggregator:聚合器,由於 HTTP 的請求和響應可能由許多部分組成,需要聚合它們以形成完整的消息,HttpObjectAggregator 可以將多個消息部分合並為 FullHttpRequest 或者 FullHttpResponse 消息。
HttpContentCompressor:壓縮,用戶服務端,壓縮要傳輸的數據,支持 gzip 和 deflate 壓縮格式。
HttpContentDecompressor:解壓縮,用於客戶端,解壓縮服務端傳輸的數據。

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
        if (isClient) {
            //使用 HTTPS,添加 SSL 認證
            pipeline.addFirst("ssl", new SslHandler(sslEngine, true));
            pipeline.addLast("codec", new HttpClientCodec());
            //1、建議開啟壓縮功能以盡可能多地減少傳輸數據的大小
            //2、客戶端處理來自服務器的壓縮內容
            pipeline.addLast("decompressor", new HttpContentDecompressor());
        }else {
            pipeline.addFirst("ssl", new SslHandler(sslEngine));
            //HttpServerCodec:將HTTP客戶端請求轉成HttpRequest對象,將HttpResponse對象編碼成HTTP響應發送給客戶端。
            pipeline.addLast("codec", new HttpServerCodec());
            //服務端,壓縮數據
            pipeline.addLast("compressor", new HttpContentCompressor());
        }
        //目的多個消息轉換為一個單一的FullHttpRequest或是FullHttpResponse
        //將最大的消息為 512KB 的HttpObjectAggregator 添加到 ChannelPipeline
        //在消息大於這個之后會拋出一個 TooLongFrameException 異常。
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
    }

 tips:當使用 HTTP 時,建議開啟壓縮功能以盡可能多地減小傳輸數據的大小。雖然壓縮會帶來一些 CPU 時鍾周期上的開銷。

四、拆包和粘包的解決方案

    TCP 傳輸過程中,客戶端發送了兩個數據包,而服務端卻只收到一個數據包,客戶端的兩個數據包粘連在一起,稱為粘包;

    TCP 傳輸過程中,客戶端發送了兩個數據包,服務端雖然收到了兩個數據包,但是兩個數據包都是不完整的,或多了數據,或少了數據,稱為拆包;

    發生TCP粘包、拆包主要是由於下面一些原因:

1、應用程序寫入的數據大於套接字緩沖區大小,這將會發生拆包。
2、應用程序寫入數據小於套接字緩沖區大小,網卡將應用多次寫入的數據發送到網絡上,這將會發生粘包。
3、進行MSS(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>MSS的時候將發生拆包。
4、接收方法不及時讀取套接字緩沖區數據,這將發生粘包。

    Netty 預定義了一些解碼器用於解決粘包和拆包現象,其中大體分為兩類:

基於分隔符的協議:在數據包之間使用定義的字符來標記消息或者消息段的開頭或者結尾。這樣,接收端通過這個字符就可以將不同的數據包拆分開。
基於長度的協議:發送端給每個數據包添加包頭部,頭部中應該至少包含數據包的長度,這樣接收端在接收到數據后,通過讀取包頭部的長度字段,便知道每一個數據包的實際長度了。

    基於分隔符的協議

        

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
                // 將提取到的楨轉發給下一個Channelhandler
                new LineBasedFrameDecoder(64 * 1024),
                // 添加 FrameHandler 以接收幀
                new FrameHandler()
        );
    }
    
    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            //Do something with the data extracted from the frame
        }
    }
}

    基於長度的協議

 

    LengthFieldBasedFrameDecoder 是 Netty 基於長度協議解決拆包粘包問題的一個重要的類,主要結構就是 header+body 結構。我們只需要傳入正確的參數就可以發送和接收正確的數據,那嗎重點就在於這幾個參數的意義。下面我們就具體了解一下這幾個參數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:

public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip)

maxFrameLength:最大幀長度。也就是可以接收的數據的最大長度。如果超過,此次數據會被丟棄。
lengthFieldOffset:長度域偏移。就是說數據開始的幾個字節可能不是表示數據長度,需要后移幾個字節才是長度域。
lengthFieldLength:長度域字節數。用幾個字節來表示數據長度。
lengthAdjustment:數據長度修正。因為長度域指定的長度可以使 header+body 的整個長度,也可以只是body的長度。如果表示header+body的整個長度,那么我們需要修正數據長度。
initialBytesToStrip:跳過的字節數。如果你需要接收 header+body 的所有數據,此值就是0,如果你只想接收body數據,那么需要跳過header所占用的字節數。

public class LengthBasedInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
                new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8),
                new FrameHandler()
        );
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            //處理楨的數據
        }
    }
}

 tips:UDP協議不會發生沾包或拆包現象, 因為UDP是基於報文發送的,在UDP首部采用了16bit來指示UDP數據報文的長度,因此在應用層能很好的將不同的數據報文區分開。

五、其他

    由於網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。Netty 通過一個 FileRegion 接口來實現,其在 Netty 的API 文檔中的定義是:"通過支持零拷貝的文件傳輸的 Channel 來發送的文件區域"。但是該接口只適用於文件內容的直接傳輸,不包括應用程序對文件數據的任何處理。

View Code

    如果大塊的數據要從文件系統復制到用戶內存中時,可以安裝一個 ChunkedWriteHandler,並用 ChunkedInput 實現寫入文件數據。 它支持異步寫大型數據流,而又不會導致大量的內存消耗。

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
                new SslHandler(sslCtx.newEngine(ch.alloc())),
                // 添加 ChunkedWriteHandler 以處理作為 ChunkedInput 傳入的數據
                new ChunkedWriteHandler(),
                new WriteStreamHandler()
        );
    }
    private final class WriteStreamHandler extends ChannelHandlerAdapter {
        //當連接建立時,channelActive() 方法將使用 ChunkedInput 寫文件數據
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}
ChunkedWriteHandlerInitializer.java

    Netty提供的用於和JDK進行互操作的序列化類 :

    Netty提供的用於和 JBoss Marshalling 進行互操作的序列化類 :

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
                new MarshallingDecoder(unmarshallerProvider),
                new MarshallingEncoder(marshallerProvider),
                new ObjectHandler()
        );
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Serializable msg) throws Exception { }
    }
}
MarshallingInitializer.java

    Netty提供的用於和 Protocol Buffers 進行互操作的序列化類 :

 

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/protocol


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM