[06] Protobuf&粘包拆包


1. ProtoBuf 說明

1.1 編碼和解碼

編寫網絡應用程序時,因為數據在網絡中傳輸的都是二進制字節碼數據,在發送數據時就需要編碼,接收數據時就需要解碼。

codec(編解碼器)的組成部分有兩個:decoder(解碼器)和 encoder(編碼器)。encoder 負責把業務數據轉換成字節碼數據,decoder 負責把字節碼數據轉換成業務數據。

Netty 自身提供了一些 codec(編解碼器):

  • Netty 提供的編碼器
    • StringEncoder,對字符串數據進行編碼
    • ObjectEncoder,對 Java 對象進行編碼
  • Netty 提供的解碼器
    • StringDecoder, 對字符串數據進行解碼
    • ObjectDecoder,對 Java 對象進行解碼

Netty 本身自帶的 ObjectDecoder 和 ObjectEncoder 可以用來實現 POJO 對象或各種業務對象的編碼和解碼,底層使用的仍是 Java 序列化技術 , 而 Java 序列化技術本身效率就不高,存在如下問題:

  • 無法跨語言;
  • 序列化后的體積太大,是二進制編碼的 5 倍多;
  • 序列化性能太低

故引出新的解決方案:GoogleのProtobuf ↓

1.2 數據結構化/序列化

https://www.jianshu.com/p/a24c88c0526a

Protocol Buffers 是一種語言無關、平台無關、可擴展的序列化結構數據的方法,它可用於數據通信協議、數據存儲等。

Protocol Buffers 是一種靈活,高效,自動化機制的結構數據序列化方法-可類比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更為簡單。

你可以定義數據的結構,然后使用特殊生成的源代碼輕松的在各種數據流中使用各種語言進行編寫和讀取結構數據。你甚至可以更新數據結構,而不破壞由舊數據結構編譯的已部署程序。

簡單來講, ProtoBuf 是結構數據序列化方法,可簡單類比於 XML,其具有以下特點:

  • 語言無關、平台無關。即 ProtoBuf 支持 Java、C++、Python 等多種語言,支持多個平台
  • 高效。即比 XML 更小(3 ~ 10 倍)、更快(20 ~ 100 倍)、更為簡單
  • 擴展性、兼容性好。你可以更新數據結構,而不影響和破壞原有的舊程序

【補充】

(1)序列化:將結構數據或對象轉換成能夠被存儲和傳輸(例如網絡傳輸)的格式,同時應當要保證這個序列化結果在之后(可能在另一個計算環境中)能夠被重建回原來的結構數據或對象。
(2)類比於 XML:這里主要指在數據通信和數據存儲應用場景中序列化方面的類比,但個人認為 XML 作為一種擴展標記語言和 ProtoBuf 還是有着本質區別的。


官方文檔以及網上很多文章提到 ProtoBuf 可類比 XML 或 JSON。那么 ProtoBuf 是否就等同於 XML 和 JSON 呢,它們是否具有完全相同的應用場景呢?

個人認為如果要將 ProtoBuf、XML、JSON 三者放到一起去比較,應該區分兩個維度。一個是數據結構化,一個是數據序列化。這里的數據結構化主要面向開發或業務層面,數據序列化面向通信或存儲層面,當然數據序列化也需要“結構”和“格式”,所以這兩者之間的區別主要在於面向領域和場景不同,一般要求和側重點也會有所不同。數據結構化側重人類可讀性甚至有時會強調語義表達能力,而數據序列化側重效率和壓縮。

從這兩個維度,我們可以做出下面的一些思考。

XML 作為一種擴展標記語言,JSON 作為源於 JS 的數據格式,都具有數據結構化的能力。

例如 XML 可以衍生出 HTML (雖然 HTML 早於 XML,但從概念上講,HTML 只是預定義標簽的 XML),HTML 的作用是標記和表達萬維網中資源的結構,以便瀏覽器更好的展示萬維網資源,同時也要盡可能保證其人類可讀以便開發人員進行編輯,這就是面向業務或開發層面的數據結構化。

再如 XML 還可衍生出 RDF/RDFS,進一步表達語義網中資源的關系和語義,同樣它強調數據結構化的能力和人類可讀。

JSON 也是同理,在很多場合更多的是體現了數據結構化的能力,例如作為交互接口的數據結構的表達。在 MongoDB 中采用 JSON 作為查詢語句,也是在發揮其數據結構化的能力。

當然,JSON、XML 同樣也可以直接被用來數據序列化,實際上很多時候它們也是這么被使用的,例如直接采用 JSON、XML 進行網絡通信傳輸,此時 JSON、XML 就成了一種序列化格式,它發揮了數據序列化的能力。但是經常這么被使用,不代表這么做就是合理。實際將 JSON、XML 直接作用數據序列化通常並不是最優選擇,因為它們在速度、效率、空間上並不是最優。換句話說它們更適合數據結構化而非數據序列化。

扯完 XML 和 JSON,我們來看看 ProtoBuf,同樣的 ProtoBuf 也具有數據結構化的能力,其實也就是上面介紹的 message 定義。我們能夠在 .proto 文件中,通過 message、import、內嵌 message 等語法來實現數據結構化,但是很容易能夠看出,ProtoBuf 在數據結構化方面和 XML、JSON 相差較大,人類可讀性較差,不適合上面提到的 XML、JSON 的一些應用場景。

但是如果從數據序列化的角度你會發現 ProtoBuf 有着明顯的優勢,效率、速度、空間幾乎全面占優,看完后面的 ProtoBuf 編碼的文章(https://www.jianshu.com/p/b33ca81b19b5),你更會了解 ProtoBuf 是如何極盡所能的壓榨每一寸空間和性能,而其中的編碼原理正是 ProtoBuf 的關鍵所在,message 的表達能力並不是 ProtoBuf 最關鍵的重點。所以可以看出 ProtoBuf 重點側重於數據序列化 而非數據結構化。

最終對這些個人思考做一些小小的總結:

  • XML、JSON、ProtoBuf 都具有數據結構化和數據序列化的能力;
  • XML、JSON 更注重數據結構化,關注人類可讀性和語義表達能力。ProtoBuf 更注重數據序列化,關注效率、空間、速度,人類可讀性差,語義表達能力不足(為保證極致的效率,會舍棄一部分元信息);
  • ProtoBuf 的應用場景更為明確,XML、JSON 的應用場景更為豐富。
  • RPC 調用建議使用 TCP+Protobuf 來替換 HTTP+JSON。

1.3 Protobuf 示例

a. 前置步驟

(1)導入依賴

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.6.1</version>
</dependency>

(2)下載插件

(3)protoc

https://github.com/google/protobuf/releases

下載對應的 protoc,本實例使用 protoc-3.6.1-win32.zip。

使用 protobuf 編譯器能自動生成代碼,Protobuf 是將類的定義使用 .proto 文件進行描述。然后通過 protoc.exe 編譯器根據 .proto 自動生成 .java 文件。

protoc.exe --java_out=<生成文件的存儲路徑> Student.proto

Protoc 語法:https://www.cnblogs.com/tohxyblog/p/8974763.html

b. 示例一

(1)prototype

// 版本
syntax = "proto3";
// 生成的外部類名,同時也是文件名
option java_outer_classname = "StudentPOJO";
// 使用 message 管理數據
// 會在 StudentPOJO 中生成一個內部類(真正被發送的POJO)
message Student {
  // 1、2 是屬性序號,不是賦值操作
  int32 id = 1;
  string name = 2;
}

生成的 StudentPOJO.java:

(2)client

NettyClient

public class NettyClient {
  public static void main(String[] args) {
    // 客戶端需要一個事件循環組
    NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
    // 創建客戶端啟動對象
    Bootstrap bootstrap = new Bootstrap();
    try {
        // 設置相關參數
        bootstrap
            .group(eventExecutors)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast("encoder", new ProtobufEncoder())
                            .addLast(new NettyClientHandler());
                }
            });
        // 啟動客戶端去連接服務器端
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6677).sync();
        // 給 ‘關閉通道’ 進行監聽
        channelFuture.channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        eventExecutors.shutdownGracefully();
    }
  }
}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

  /**
   * 當通道就緒就會觸發該方法
   * @param ctx
   * @throws Exception
   */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1101).setName("Nikki").build();
    ctx.writeAndFlush(student);
  }

  /**
   * 當通道有讀取事件時,會觸發
   * @param ctx
   * @param msg
   * @throws Exception
   */
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    System.out.println("[Server response] " + byteBuf.toString(CharsetUtil.UTF_8));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

(3)server

NettyServer

public class NettyServer {
  public static void main(String[] args) {
    // [while (true)] bossGroup 只處理連接請求;和客戶端的業務處理會交給 workerGroup
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    // 創建服務器端的啟動對象,配置啟動參數
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    try {
      // 啟動
      ChannelFuture channelFuture = serverBootstrap
        // 設置兩個線程組
        .group(bossGroup, workerGroup)
        // 使用 NioSocketChannel 作為服務器的通道實現
        .channel(NioServerSocketChannel.class)
        // 設置線程隊列等待連接的個數
        .option(ChannelOption.SO_BACKLOG, 128)
        // 設置保持活動連接狀態
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        // 創建一個通道測試對象(匿名對象)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            /**
             * 給 workerGroup 的 EventLoop 對應的 Pipeline 設置處理器
             *
             * @param ch
             * @throws Exception
             */
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline()
                // 指定對‘指定對象類型’進行解碼
                .addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()))
                // .addLast(new NettyServerHandler());
                .addLast(new StudentHandler());
            }
        })
        .bind(6677).sync().channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
  }
}

NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 獲取從客戶端發送的StudentPOJO.Student
    StudentPOJO.Student student = (StudentPOJO.Student) msg;
    System.out.println(StrUtil.format("[client] {}-{}", student.getId(), student.getName()));
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // 將數據寫入緩沖(先對發送的數據進行編碼)並刷新
    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client!", CharsetUtil.UTF_8));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // 處理異常,一般是要關閉通道
    ctx.channel().close();
  }
}

StudentHandler

public class StudentHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
    // 獲取從客戶端發送的StudentPOJO.Student
    System.out.println(StrUtil.format("[client] {}-{}", msg.getId(), msg.getName()));
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // 將數據寫入緩沖(先對發送的數據進行編碼)並刷新
    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client!", CharsetUtil.UTF_8));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // 處理異常,一般是要關閉通道
    ctx.channel().close();
  }
}

c. 示例二

(1)prototype

syntax = "proto3";
// 快速解析
option optimize_for = SPEED;
// 指定生成到指定包下
option java_package = "org.example.netty.proto2";
// 指定生成的外部類名
option java_outer_classname = "DataInfo";

// ProtoBuf 可以使用 message 管理其他的 message
message MyMessage {
  // 定義枚舉類型(enum 編號從 0 開始)
  enum DataType {
    StudentType = 0;
    WorkerType = 1;
  }

  // 用 data_type 來標識傳的是哪個枚舉類型
  // 這個 1 表示 data_type 是 MyMessage 的第一個屬性
  DataType data_type = 1;

  // 表示 Student、Worker 類型最多只能出現其中的一個(節省空間)
  oneof dataBody {
    Student student = 2;
    Worker worker = 3;
  }
}

message Student {
  int32 id = 1;
  string name = 2;
}

message Worker {
  string name = 1;
  int32 age = 2;
}

(2)client

其余代碼復用。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 隨機的發送Student/Worker對象
    int random = new Random().nextInt(3);
    DataInfo.MyMessage message = null;
    if (random == 0) {
      message = DataInfo.MyMessage.newBuilder()
            .setDataType(DataInfo.MyMessage.DataType.StudentType)
            .setStudent(DataInfo.Student.newBuilder().setId(1101).setName("學生").build())
            .build();
    } else {
      message = DataInfo.MyMessage.newBuilder()
            .setDataType(DataInfo.MyMessage.DataType.WorkerType)
            .setWorker(DataInfo.Worker.newBuilder().setName("社畜").setAge(24).build())
            .build();
    }
    ctx.writeAndFlush(message);
  }

  // ...
}

(3)server

其余代碼復用。

// .addLast("decoder", new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()))

public class NettyServerHandler extends SimpleChannelInboundHandler<DataInfo.MyMessage> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, DataInfo.MyMessage msg) throws Exception {
    if (msg.getDataType() == DataInfo.MyMessage.DataType.StudentType) {
      System.out.println(StrUtil.format("[Student] {}-{}", 
                  msg.getStudent().getId(), msg.getStudent().getName()));
    } else {
      System.out.println(StrUtil.format("[Worker] {}-{}", 
                  msg.getWorker().getName(), msg.getWorker().getAge()));
    }
  }

  // ...
}

2. TCP 粘包/拆包

摘自:https://www.cnblogs.com/rickiyang/p/12904552.html

2.1 問題說明

盡管我們在應用層面使用了 Netty,但 Netty 底層是基於 TCP 協議來處理網絡數據傳輸。盡管我們的應用層按照 ByteBuf 為單位來發送數據,但是到了底層操作系統,仍然是按照字節流發送數據的。

我們知道 TCP 協議是面向字節流(就是沒有界限的一長串二進制數據)的協議,數據像流水一樣在網絡中傳輸那何來 “包” 的概念呢?

TCP 是四層協議,其不負責數據邏輯的處理,但是數據在 TCP 層 “流” 的時候為了保證安全和節約效率會把 “流” 做一些分包處理,比如:

  • 發送方約定了每次數據傳輸的最大包大小,超過該值的內容將會被拆分成兩個包發送;
  • 發送端和接收端約定每次發送數據包長度並隨着網絡狀況動態調整接收窗口大小,這里也會出現拆包的情況;

Netty 本身是基於 TCP 協議做的處理,如果它不去對 “流” 進行處理,到底這個 “流” 從哪到哪才是完整的數據就是個迷,因為面向流的通信是無消息保護邊界的。

因此,數據到了服務端,也按照字節流的方式讀入,然后到了 Netty 應用層面,重新拼裝成 ByteBuf。這里的 ByteBuf 與客戶端按照順序發送的 ByteBuf 可能是不對等的。因此,我們需要在客戶端根據自定義協議來組裝應用層的數據包,然后在服務端根據應用層的協議來組裝數據包,這個過程通常在服務端被稱為“拆包”,在客戶端被稱為“粘包”。拆包和粘包是相對的,一端粘了包,另外一端就需要將粘過的包拆開。

【舉例】假設客戶端分別發送了兩個數據包 D1 和 D2 給服務端,由於服務端一次讀取到字節數是不確定的,故可能存在以下 4 種情況:

  1. 服務端分兩次讀取到了兩個獨立的數據包,分別是 D1 和 D2,沒有粘包和拆包;
  2. 服務端一次接受到了兩個數據包,D1 和 D2 粘合在一起,稱之為 TCP 粘包;
  3. 服務端分兩次讀取到了數據包,第一次讀取到了完整的 D1 包和 D2 包的部分內容,第二次讀取到了 D2 包的剩余內容,這稱之為 TCP 拆包;
  4. 服務端分兩次讀取到了數據包,第一次讀取到了 D1 包的部分內容 D1_1,第二次讀取到了 D1 包的剩余部分內容 D1_2 和完整的 D2 包。

下面先來看在下 TCP 協議中有哪些步驟可能會讓 “流” 不完整或者是出現粘滯的可能。

2.2 出現原因

數據流在 TCP 協議下傳播,因為協議本身對於流有一些規則的限制,這些規則會導致當前對端接收到的數據包不完整,歸結原因有下面 3 種情況:

  • Socket 緩沖區與滑動窗口
  • MSS/MTU 限制
  • Nagle 算法

a. Socket 緩沖區與滑動窗口

對於 TCP 協議而言,它傳輸數據是基於字節流傳輸的。應用層在傳輸數據時,實際上會先將數據寫入到 TCP 套接字的緩沖區,當緩沖區被寫滿后,數據才會被寫出去。每個 TCP Socket 在內核中都有一個發送緩沖區(SO_SNDBUF)和一個接收緩沖區(SO_RCVBUF),TCP 的全雙工的工作模式以及 TCP 的滑動窗口便是依賴於這兩個獨立的 buffer 以及此 buffer 的填充狀態。

(1)SO_SNDBUF

進程發送的數據的時候假設調用了一個 send 方法,將數據拷貝進入 Socket 的內核發送緩沖區之中,然后 send 便會在上層返回。換句話說,send 返回之時,數據不一定會發送到對端去(和 write 寫文件有點類似),send 僅僅是把應用層 buffer 的數據拷貝進 Socket 的內核發送緩沖區中。

(2)SO_RCVBUF

把接收到的數據緩存入內核,應用進程一直沒有調用 read 進行讀取的話,此數據會一直緩存在相應 Socket 的接收緩沖區內。不管進程是否讀取 Socket,對端發來的數據都會經由內核接收並且緩存到 Socket 的內核接收緩沖區之中。read 所做的工作,就是把內核緩沖區中的數據拷貝到應用層用戶的 buffer 里面,僅此而已。

接收緩沖區保存收到的數據一直到應用進程讀走為止。對於 TCP,如果應用進程一直沒有讀取,buffer 滿了之后發生的動作是:通知對端 TCP 協議中的窗口關閉。這個便是「滑動窗口」的實現。保證 TCP 套接口接收緩沖區不會溢出,從而保證了 TCP 是可靠傳輸。因為對方不允許發出超過所通告窗口大小的數據。 這就是 TCP 的流量控制,如果對方無視窗口大小而發出了超過窗口大小的數據,則接收方 TCP 將丟棄它。

(3)滑動窗口

TCP 連接在三次握手的時候,會將自己的窗口大小(window size)發送給對方,其實就是 SO_RCVBUF 指定的值。之后在發送數據的時,發送方必須要先確認接收方的窗口沒有被填充滿,如果沒有填滿,則可以發送。

每次發送數據后,發送方將自己維護的對方的 window size 減小,表示對方的 SO_RCVBUF 可用空間變小。

當接收方開始處理 SO_RCVBUF 中的數據時,會將數據從 Socket 在內核中的接收緩沖區讀出,此時接收方的 SO_RCVBUF 可用空間變大,即 window size 變大,接收方會以 ack 消息的方式將自己最新的 window size 返回給發送方,此時發送方將自己維護的接收方的 window size 設置為 ack 消息返回的 window size。

此外,發送方可以連續的給接收方發送消息,只要保證對方的 SO_RCVBUF 空間可以緩存數據即可,即 window size>0。當接收方的 SO_RCVBUF 被填充滿時,此時 window size=0,發送方不能再繼續發送數據,要等待接收方 ack 消息,以獲得最新可用的 window size。

b. MSS/MTU 限制

MTU(Maxitum Transmission Unit,最大傳輸單元)是鏈路層對一次可以發送的最大數據的限制。MSS(Maxitum Segment Size,最大分段大小)是 TCP 報文中 data 部分的最大長度,是傳輸層對一次可以發送的最大數據的限制。

數據在傳輸過程中,每經過一層,都會加上一些額外的信息:

  • 應用層:只關心發送的數據 data,將數據寫入 Socket 在內核中的緩沖區 SO_SNDBUF 即返回,操作系統會將 SO_SNDBUF 中的數據取出來進行發送;
  • 傳輸層:會在 data 前面加上 TCP Header(20 字節);
  • 網絡層:會在 TCP 報文的基礎上再添加一個 IP Header,也就是將自己的網絡地址加入到報文中。IPv4 中 IP Header 長度是 20 字節,IPV6 中 IP Header 長度是 40 字節;
  • 鏈路層:加上 Datalink Header 和 CRC。會將 SMAC(Source Machine,數據發送方的 MAC 地址)、DMAC(Destination Machine,數據接受方的 MAC 地址)和 Type 域加入。SMAC+DMAC+Type+CRC 總長度為 18 字節;
  • 物理層:進行傳輸。

在回顧這個基本內容之后,再來看 MTU 和 MSS。

MTU 是以太網傳輸數據方面的限制,每個以太網幀最大不能超過 1518 Bytes。刨去以太網幀的幀頭(DMAC+SMAC+Type 域)的 14 Bytes 和幀尾 (CRC 校驗 ) 的 4 Bytes,那么剩下承載上層協議的地方也就是 data 域最大就只能有 1500 Bytes 這個值 我們就把它稱之為 MTU。

MSS 是在 MTU 的基礎上減去網絡層的 IP Header 和傳輸層的 TCP Header 的部分,這就是 TCP 協議一次可以發送的實際應用數據的最大大小。

MSS = MTU(1500) - IP Header(20 or 40) - TCP Header(20)

由於 IPV4 和 IPV6 的長度不同,在 IPV4 中,以太網 MSS 可以達到 1460 byte。在 IPV6 中,以太網 MSS 可以達到 1440 byte。

發送方發送數據時,當 SO_SNDBUF 中的數據量大於 MSS 時,操作系統會將數據進行拆分,使得每一部分都小於 MSS,也形成了“拆包”。然后每一部分都加上 TCP Header,構成多個完整的 TCP 報文進行發送,當然經過網絡層和數據鏈路層的時候,還會分別加上相應的內容。

另外需要注意的是:對於本地回環地址(lookback)不需要走以太網,所以不受到以太網 MTU=1500 的限制。Linux 上輸入 ifconfig 命令,可以查看不同網卡的 MTU 大小,如下:

上圖顯示了 2 個網卡信息:

  • ens3 需要走以太網,所以 MTU 是 1500;
  • lo 是本地回環,不需要走以太網,所以不受 1500 的限制。

c. Nagle 算法

TCP/IP 協議中,無論發送多少數據,總是要在數據(data)前面加上協議頭(TCP Header + IP Header),同時,對方接收到數據,也需要發送 ACK 表示確認。

即使從鍵盤輸入的一個字符,占用一個字節,可能在傳輸上造成 41 字節的包,其中包括 1 字節的有用信息和 40 字節的首部數據。這種情況轉變成了 4000% 的消耗,這樣的情況對於重負載的網絡來是無法接受的,稱之為“糊塗窗口綜合征”。

為了盡可能的利用網絡帶寬,TCP 總是希望盡可能的發送足夠大的數據(一個連接會設置 MSS 參數,因此,TCP/IP 希望每次都能夠以 MSS 尺寸的數據塊來發送數據)。Nagle 算法就是為了盡可能發送大塊數據,避免網絡中充斥着許多小數據塊。

Nagle 算法的基本定義是任意時刻,最多只能有一個未被確認的小段。 所謂 “小段”,指的是小於 MSS 尺寸的數據塊;所謂“未被確認”,是指一個數據塊發送出去后,沒有收到對方發送的 ACK 確認該數據已收到。

Nagle 算法的規則:

  • 如果 SO_SNDBUF 中的數據長度達到 MSS,則允許發送;
  • 如果該 SO_SNDBUF 中含有 FIN,表示請求關閉連接,則先將 SO_SNDBUF 中的剩余數據發送,再關閉;
  • 設置了 TCP_NODELAY=true 選項,則允許發送。TCP_NODELAY 是取消 TCP 的確認延遲機制,相當於禁用了 Nagle 算法。正常情況下,當 Server 端收到數據之后,它並不會馬上向 client 端發送 ACK,而是會將 ACK 的發送延遲一段時間(一般是 40ms),它希望在 t 時間內 server 端會向 client 端發送應答數據,這樣 ACK 就能夠和應答數據一起發送,就像是應答數據捎帶着 ACK 過去。當然,TCP 確認延遲 40ms 並不是一直不變的, TCP 連接的延遲確認時間一般初始化為最小值 40ms,隨后根據連接的重傳超時時間(RTO)、上次收到數據包與本次接收數據包的時間間隔等參數進行不斷調整。另外可以通過設置 TCP_QUICKACK 選項來取消確認延遲;
  • 未設置 TCP_CORK 選項時,若所有發出去的小數據包(包長度小於 MSS)均被確認,則允許發送;
  • 上述條件都未滿足,但發生了超時(一般為 200ms),則立即發送。

d. 小結

基於以上問題,TCP 層肯定是會出現當次接收到的數據是不完整數據的情況。

出現“粘包”可能的原因有:

  • 發送方每次寫入數據 < 套接字緩沖區大小;
  • 接收方讀取套接字緩沖區數據不夠及時。

出現“半包”的可能原因有:

  • 發送方每次寫入數據 > 套接字緩沖區大小;
  • 發送的數據大於協議 MTU,所以必須要拆包。

解決問題肯定不能在 TCP 層來做,而是在〈應用層〉通過定義通信協議來解決“粘包”和“拆包”的問題。讓發送方和接收方約定某個規則:

  • 當發生粘包的時候通過某種約定來拆包;
  • 如果有拆包則通過某種約定來將數據組成一個完整的包處理。

2.3 解決策略

由於底層的 TCP 無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決。業界的主流協議的解決方案,可以歸納如下:

a. 定長協議

指定一個報文具有固定長度。比如約定一個報文的長度是 5 字節,報文為 1234,只有 4 字節,還差一個怎么辦呢,不足部分用空格補齊,就變為 1234 。如果不補齊空格,那么就會讀到下一個報文的字節來填充上一個報文直到補齊為止,這就“粘包”了。

定長協議的優點是使用簡單,缺點很明顯:浪費帶寬。

Netty 中提供了 FixedLengthFrameDecoder ,支持把固定的長度的字節數當做一個完整的消息進行解碼。

b. 特殊字符分割協議

很好理解,在每一個你認為是一個完整的包的尾部添加指定的特殊字符,比如:\n\r 等。但需要注意的是:約定的特殊字符要保證唯一性,不能出現在報文的正文中,否則就將正文一分為二了。

Netty 中提供了 DelimiterBasedFrameDecoder 根據特殊字符進行解碼,LineBasedFrameDecoder 默認以換行符作為分隔符。

c. 變長協議

這種也是最通用的一種拆包器,只要你的自定義協議中包含「長度域」字段,均可以使用這個拆包器來實現〈應用層〉拆包

變長協議的核心就是:將消息分為「消息頭」和「消息體」,「消息頭」中標識當前完整的「消息體長度」。

  • 發送方在發送數據之前先獲取數據的二進制字節大小,然后在消息體前面添加消息大小;
  • 接收方在解析消息時先獲取消息大小,之后必須讀到該大小的字節數才認為是完整的消息。

Netty 中提供了 LengthFieldBasedFrameDecoder 來基於“長度域”拆解數據包。

// 1. 數據包的最大長度
// 2. 長度域的偏移量
// 3. 長度域的長度
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);

2.4 粘包處理

【粘包演示】具體的粘包演示代碼,看 #2 開頭🔗的那篇博客。

演示客戶端發送多條消息,使用 Netty 自定義的 ByteBuf 作為傳輸數據格式,看看服務端接收數據是否是按每次發送的條數來接收還是按照當前緩沖區大小來接收。其中,客戶端 Handler 主要邏輯是循環 100 次給服務端發送測試消息。

處理 TCP 粘包的唯一方法就是制定應用層的數據通訊協議,通過協議來規范現有接收的數據是否滿足消息數據的需要。

為了解決網絡數據流的拆包粘包問題,Netty 為我們內置了如下的解碼器:

解碼器 說明
ByteToMessageDecoder 如果想實現自己的半包解碼器,實現該類
MessageToMessageDecoder 一般作為二次解碼器,當我們在 ByteToMessageDecoder 將一個 bytes 數組轉換成一個 Java 對象的時候,我們可能還需要將這個對象進行二次解碼成其他對象,我們就可以繼承這個類;
LineBasedFrameDecoder 通過在包尾添加回車換行符 \r\n 來區分整包消息;
StringDecoder 字符串解碼器;
DelimiterBasedFrameDecoder 特殊字符作為分隔符來區分整包消息;
FixedLengthFrameDecoder 報文大小固定長度,不夠空格補全;
ProtoBufVarint32FrameDecoder 通過 Protobuf 解碼器來區分整包消息;
ProtobufDecoder Protobuf 解碼器;
LengthFieldBasedFrameDecoder 指定長度來標識整包消息,通過在包頭指定整包長度來約定包長。

以及如下的編碼器:

編碼器 說明
ProtobufEncoder Protobuf 編碼器;
MessageToByteEncoder 將 Java 對象編碼成 ByteBuf;
MessageToMessageEncoder 如果不想將 Java 對象編碼成 ByteBuf,而是自定義類就繼承這個;
LengthFieldPrepender 如果我們在發送消息的時候采用的是:消息長度字段+原始消息的形式,那么我們就可以使用 LengthFieldPrepender。這是因為 LengthFieldPrepender 可以將待發送消息的長度(二進制字節長度)寫到 ByteBuf 的前兩個字節。

編解碼相關類結構圖如下:

上面的類關系能看到所有的自定義解碼器都是繼承自 ByteToMessageDecoder。在 Netty 中 Decoder 主要分為兩大類:

  1. 將字節流轉換為某種協議的數據格式:ByteToMessageDecoder 和 ReplayingDecoder
  2. 將一種協議的數據轉為另一種協議的數據格式:MessageToMessageDecoder

將字節流轉為對象是一種很常見的操作,也是一個消息框架應該提供的基礎功能。因為 Decoder 的作用是將輸入的數據解析成特定協議,上圖中可以看到所有的 Decoder 都實現了 ChannelInboundHandler 接口。在應用層將 byte 轉為 message 的難度在於如何確定當前的包是一個完整的數據包,有兩種方案可以實現:

  1. 監聽當前 Socket 的線程一直等待,直到收到的 byte 可以完成的構成一個包為止。這種方式的弊端就在於要浪費一個線程去等;
  2. 為每個監聽的 Socket 都構建一個本地緩存,當前監聽線程如果遇到字節數不夠的情況就先將獲取到的數據存入緩存,繼而處理別的請求,等到這里有數據的時候再來將新數據繼續寫入緩存直到數據構成一個完整的包取出。

a. ByteToMessageDecoder

ByteToMessageDecoder 采用的是第二種方案。在 ByteToMessageDecoder 中有一個對象 ByteBuf,該對象用於存儲當前 Decoder 接收到的 byte 數據。

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
  // 用來保存累計讀取到的字節,我們讀到的新字節會保存(緩沖)在這里
  ByteBuf cumulation;
  // 用來做累計的,負責將讀到的新字節寫入 cumulation,有兩個實現 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR
  private Cumulator cumulator = MERGE_CUMULATOR;
  // 設置為 true 后,單個解碼器只會解碼出一個結果
  private boolean singleDecode;
  private boolean decodeWasNull;
  // 是否是第一次讀取數據
  private boolean first;
  // 多少次讀取后,丟棄數據(默認16次)
  private int discardAfterReads = 16;
  // 已經累加了多少次數據
  private int numReads;

  /**
   * 每次接收到數據,就會調用 channelRead 進行處理,該處理器用於處理二進制數據。
   * 所以 msg 字段的類型應該是 ByteBuf。如果不是,則交給 pipeLine 的下一個處理器進行處理。
   */
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如果不是 ByteBuf 則不處理
    if (msg instanceof ByteBuf) {
      // out 用於存儲解析二進制流得到的結果,一個二進制流可能會解析出多個消息,所以 out 是一個 list
      CodecOutputList out = CodecOutputList.newInstance();
      try {
            ByteBuf data = (ByteBuf) msg;
            // 判斷 cumulation == null 並將結果賦值給 first
            // 因此如果 first 為 true,則表示第一次接受到數據
            first = cumulation == null;
            // 如果是第 1 次接收到數據,直接將接受到的數據賦值給緩存對象 cumulation
            if (first) {
                cumulation = data;
            } else {
                // 第 2 次解碼,就將 data 向 cumulation 追加,並釋放 data
                // 如果 cumulation 中的剩余空間,不足以存儲接收到的 data,將 cumulation 擴容
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 得到追加后的 cumulation 后,調用 decode() 進行解碼
            // 解碼過程中,調用 fireChannelRead 方法,主要目的是將累積區的內容 decode 到數組中
            callDecode(ctx, cumulation, out);
      } catch (DecoderException e) {
        throw e;
      } catch (Throwable t) {
        throw new DecoderException(t);
      } finally {
        // 如果 cumulation 沒有數據可讀了,說明所有的二進制數據都被解析過了。此時對 cumulation
        // 進行釋放,以節省內存空間。反之 cumulation 還有數據可讀,那么 if 中的語句不會運行,
        // 因為不對 cumulation 進行釋放,因此也就緩存了用戶尚未解析的二進制數據。
        if (cumulation != null && !cumulation.isReadable()) {
            // 將次數歸零
            numReads = 0;
            // 釋放累計區
            cumulation.release();
            // 等待 GC
            cumulation = null;
            // 如果超過了 16 次,就壓縮累計區,主要是將已經讀過的數據丟棄,將 readIndex 歸零。
        } else if (++numReads >= discardAfterReads) {
            // We did enough reads already try to discard some bytes so we not risk to see a OOME.
            // See https://github.com/netty/netty/issues/4275
            numReads = 0;
            discardSomeReadBytes();
        }
        int size = out.size();
        // 如果沒有向數組插入過任何數據
        decodeWasNull = !out.insertSinceRecycled();
        // 循環數組,向后面的 handler 發送數據,如果數組是空,那不會調用
        fireChannelRead(ctx, out, size);
        // 將數組中的內容清空,將數組的數組的下標恢復至原來
        out.recycle();
      }
    } else {
      // 如果 msg 類型是不是 ByteBuf,直接調用下一個 handler 進行處理
      ctx.fireChannelRead(msg);
    }
  }

  /**
   * callDecode 方法主要用於解析 cumulation 中的數據,並將解析的結果放入 List<Object> out 中。由於
   * cumulation 中緩存的二進制數據,可能包含了出多條有效信息,因此在 callDecode() 中,默認會調用多次
   * decode 方法。我們在覆寫 decode 方法時,每次只解析一個消息,添加到 out 中,callDecode 通過多次
   * 回調 decode。每次傳遞進來都是相同的 List<Object> out 實例,因此每一次解析出來的消息,都存儲在同
   * 一個 out 實例中。當 cumulation 沒有數據可以繼續讀,或者某次調用 decode 方法后,List<Object> out
   * 中元素個數沒有變化,則停止回調 decode 方法。
   */
  protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
      // 如果 cumulation 中有數據可讀的話,一直循環調用 decode 方法
      while (in.isReadable()) {
        // 獲取上一次 decode 方法調用后,out 中元素數量,如果是第 1 次調用,則為 0。
        int outSize = out.size();
        // 上次循環成功解碼
        if (outSize > 0) {
          // 用后面的業務 handler 的 ChannelRead 方法讀取解析的數據
          fireChannelRead(ctx, out, outSize);
          out.clear();
          if (ctx.isRemoved()) {
            break;
          }
          outSize = 0;
        }

        int oldInputLength = in.readableBytes();
        // 回調 decode 方法,由開發者覆寫,用於解析 in 中包含的二進制數據,並將解析結果放到 out 中。
        decode(ctx, in, out);

        if (ctx.isRemoved()) {
          break;
        }
        // outSize 是上一次 decode 方法調用時 out 的大小,out.size() 是當前 out 大小。
        // 如果二者相等,則說明當前 decode 方法調用沒有解析出有效信息。
        if (outSize == out.size()) {
          // 此時,如果發現上次 decode 方法和本次 decode 方法調用候,in 中的剩余可讀字節數相同,則說明本次 decode
          // 方法沒有讀取任何數據解析(可能是遇到半包等問題,即剩余的二進制數據不足以構成一條消息),跳出 while 循環。
          if (oldInputLength == in.readableBytes()) {
            break;
          } else {
            continue;
          }
        }
        // 處理人為失誤。如果走到這段代碼,則說明 outSize != out.size()。也就是本次 decode
        // 實際上是解析出來了有效信息放到 out 中。若 oldInputLength == in.readableBytes()
        // 說明本次 decode 方法調用並沒有讀取任何數據,但是 out 中元素卻添加了。
        // 這可能是因為開發者錯誤的編寫了代碼,例如 mock 了一個消息放到 List
        if (oldInputLength == in.readableBytes()) {
            throw new DecoderException(StringUtil.simpleClassName(getClass())
                     + ".decode() did not read anything but decoded a message.");
        }
        if (isSingleDecode()) {
          break;
        }
      }
    } catch (DecoderException e) {
      throw e;
    } catch (Throwable cause) {
      throw new DecoderException(cause);
    }
  }
}

這里 channelRead() 的主要邏輯是:

  1. 從對象池中取出一個空的數組;
  2. 判斷成員變量是否是第 1 次使用,要注意的是,這里既然使用了成員變量,所以這個 handler 不能是 @Shareble 狀態的 handler,不然你就分不清成員變量是哪個 channel 的。將 unsafe 中傳遞來的數據寫入到這個 cumulation 累積區中;
  3. 寫到累積區后,調用子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就調用后面節點的 channelRead 方法。若沒有解碼成功,什么都不做;
  4. 如果累積區沒有未讀數據了,就釋放累積區;
  5. 如果還有未讀數據,且解碼超過了 16 次(默認),就對累積區進行壓縮。將讀取過的數據清空,也就是將 readIndex 設置為 0;
  6. 設置 decodeWasNull 的值,如果上一次沒有插入任何數據,這個值就是 ture。該值在調用 channelReadComplete 方法的時候,會觸發 read 方法(不是自動讀取的話),嘗試從 JDK 的通道中讀取數據,並將之前的邏輯重來。主要應該是怕如果什么數據都沒有插入就執行 channelReadComplete 會遺漏數據;
  7. 調用 fireChannelRead 方法,將數組中的元素發送到后面的 handler 中;
  8. 將數組清空,並還給對象池。

當數據添加到累積區之后,需要調用 decode 方法進行解碼,代碼見上面的 callDecode()。在 callDecode() 中最關鍵的代碼就是將解析完的數據拿取調用 decode(ctx, in, out)。所以如果繼承 ByteToMessageDecoder 類實現自己的字節流轉對象的邏輯我們就要覆寫該方法。

b. LengthFieldBasedFrameDecoder

最好的方案就是:發送方告訴我當前消息總長度,接收方如果沒有收到該長度大小的數據就認為是沒有收完繼續等待。

/**
 * Creates a new instance.
 *
 * @param maxFrameLength      幀的最大長度
 * @param lengthFieldOffset   長度字段偏移的地址
 * @param lengthFieldLength   長度字段所占的字節長
 * @param lengthAdjustment    解析時候跳過多少個長度
 * @param initialBytesToStrip 解碼出一個數據包之后,去掉開頭的字節數
 * @param initialBytesToStrip 為 true,當 frame 長度超過 maxFrameLength 時立即報 TooLongFrameException
 *                            為 false,讀取完整個幀再報異
 */
public LengthFieldBasedFrameDecoder(
        int maxFrameLength,
        int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip) {
    this(
            maxFrameLength,
            lengthFieldOffset, lengthFieldLength, lengthAdjustment,
            initialBytesToStrip, true);
}

/**
 * Creates a new instance.
 *
 * @param maxFrameLength
 *        the maximum length of the frame.  If the length of the frame is
 *        greater than this value, {@link TooLongFrameException} will be
 *        thrown.
 * @param lengthFieldOffset
 *        the offset of the length field
 * @param lengthFieldLength
 *        the length of the length field
 */
public LengthFieldBasedFrameDecoder(
        int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
    this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
}

在 LengthFieldBasedFrameDecoder 類的注解上給出了一些關於該類使用的示例:

/**
 * A decoder that splits the received {@link ByteBuf}s dynamically by the
 * value of the length field in the message.  It is particularly useful when you
 * decode a binary message which has an integer header field that represents the
 * length of the message body or the whole message.
 * <p>
 * {@link LengthFieldBasedFrameDecoder} has many configuration parameters so
 * that it can decode any message with a length field, which is often seen in
 * proprietary client-server protocols. Here are some example that will give
 * you the basic idea on which option does what.
 *
 * <h3>2 bytes length field at offset 0, do not strip header</h3>
 *
 * The value of the length field in this example is <tt>12 (0x0C)</tt> which
 * represents the length of "HELLO, WORLD".  By default, the decoder assumes
 * that the length field represents the number of the bytes that follows the
 * length field.  Therefore, it can be decoded with the simplistic parameter
 * combination.
 * <pre>
 * <b>lengthFieldOffset</b>   = <b>0</b>
 * <b>lengthFieldLength</b>   = <b>2</b>
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0 (= do not strip header)
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
 * </pre>
 *
 * <h3>2 bytes length field at offset 0, strip header</h3>
 *
 * Because we can get the length of the content by calling
 * {@link ByteBuf#readableBytes()}, you might want to strip the length
 * field by specifying <tt>initialBytesToStrip</tt>.  In this example, we
 * specified <tt>2</tt>, that is same with the length of the length field, to
 * strip the first two bytes.
 * <pre>
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2
 * lengthAdjustment    = 0
 * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+
 * </pre>
 *
 * <h3>2 bytes length field at offset 0, do not strip header, the length field
 *     represents the length of the whole message</h3>
 *
 * In most cases, the length field represents the length of the message body
 * only, as shown in the previous examples.  However, in some protocols, the
 * length field represents the length of the whole message, including the
 * message header.  In such a case, we specify a non-zero
 * <tt>lengthAdjustment</tt>.  Because the length value in this example message
 * is always greater than the body length by <tt>2</tt>, we specify <tt>-2</tt>
 * as <tt>lengthAdjustment</tt> for compensation.
 * <pre>
 * lengthFieldOffset   =  0
 * lengthFieldLength   =  2
 * <b>lengthAdjustment</b>    = <b>-2</b> (= the length of the Length field)
 * initialBytesToStrip =  0
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
 * </pre>
 *
 * <h3>3 bytes length field at the end of 5 bytes header, do not strip header</h3>
 *
 * The following message is a simple variation of the first example.  An extra
 * header value is prepended to the message.  <tt>lengthAdjustment</tt> is zero
 * again because the decoder always takes the length of the prepended data into
 * account during frame length calculation.
 * <pre>
 * <b>lengthFieldOffset</b>   = <b>2</b> (= the length of Header 1)
 * <b>lengthFieldLength</b>   = <b>3</b>
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0
 *
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
 * </pre>
 *
 * <h3>3 bytes length field at the beginning of 5 bytes header, do not strip header</h3>
 *
 * This is an advanced example that shows the case where there is an extra
 * header between the length field and the message body.  You have to specify a
 * positive <tt>lengthAdjustment</tt> so that the decoder counts the extra
 * header into the frame length calculation.
 * <pre>
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 3
 * <b>lengthAdjustment</b>    = <b>2</b> (= the length of Header 1)
 * initialBytesToStrip = 0
 *
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
 * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
 * </pre>
 *
 * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header,
 *     strip the first header field and the length field</h3>
 *
 * This is a combination of all the examples above.  There are the prepended
 * header before the length field and the extra header after the length field.
 * The prepended header affects the <tt>lengthFieldOffset</tt> and the extra
 * header affects the <tt>lengthAdjustment</tt>.  We also specified a non-zero
 * <tt>initialBytesToStrip</tt> to strip the length field and the prepended
 * header from the frame.  If you don't want to strip the prepended header, you
 * could specify <tt>0</tt> for <tt>initialBytesToSkip</tt>.
 * <pre>
 * lengthFieldOffset   = 1 (= the length of HDR1)
 * lengthFieldLength   = 2
 * <b>lengthAdjustment</b>    = <b>1</b> (= the length of HDR2)
 * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
 *
 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
 * </pre>
 *
 * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header,
 *     strip the first header field and the length field, the length field
 *     represents the length of the whole message</h3>
 *
 * Let's give another twist to the previous example.  The only difference from
 * the previous example is that the length field represents the length of the
 * whole message instead of the message body, just like the third example.
 * We have to count the length of HDR1 and Length into <tt>lengthAdjustment</tt>.
 * Please note that we don't need to take the length of HDR2 into account
 * because the length field already includes the whole header length.
 * <pre>
 * lengthFieldOffset   =  1
 * lengthFieldLength   =  2
 * <b>lengthAdjustment</b>    = <b>-3</b> (= the length of HDR1 + LEN, negative)
 * <b>initialBytesToStrip</b> = <b> 3</b>
 *
 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
 * </pre>
 * @see LengthFieldPrepender
 */

3. 編解碼器

我們把解決半包粘包問題的解碼器叫「一次解碼器」,其作用是將原始數據流(可能會出現粘包和半包的數據流)轉換為用戶數據(ByteBuf 中存儲),但仍然是字節數據,所以我們需要「二次解碼器」將字節數組轉換為 Java 對象,或者將將一種格式轉化為另一種格式,方便上層應用程序使用。

一次解碼器繼承自 ByteToMessageDecoder,二次解碼器繼承自 MessageToMessageDecoder,但他們的本質都是繼承 ChannelInboundHandlerAdapter。

是不是也可以合並 1 次解碼和 2 次解碼? 可以,但不推薦。沒有分層,不夠清晰;耦合性高,不容易置換方案。

當 Netty 發送或者接受一個消息的時候,就將會發生一次數據轉換。入站消息會被解碼:從字節轉換為另一種格式(比如某種類型的對象);如果是出站消息,它會被編碼成字節。

Netty 提供一系列實用的編解碼器,他們都實現了 ChannelInboundHandler 或 ChannelOutboundHandler 接口。在這些類中,channelRead 方法已經被重寫了。以入站為例,對於每個從入站 Channel 讀取的消息,這個方法會被調用。隨后,它將調用由解碼器所提供的 decode() 方法進行解碼,並將已經解碼的字節轉發給 ChannelPipeline 中的下一個 ChannelInboundHandler。

3.1 示例代碼

(1)Byte2LongDecoder

public class Byte2LongDecoder extends ByteToMessageDecoder {
  /**
   * 這個例子,每次入站從 ByteBuf 中讀取 8 字節,將其解碼為一個 long,然后將它添加到下一個 List 中。
   * 當沒有更多元素可以被添加到該 List 中時,它的內容將會被發送給下一個 ChannelInboundHandler。
   * long 在被添加到 List 中時,會被自動裝箱為 Long。
   * @param ctx
   * @param in
   * @param out
   * @throws Exception
   */
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println("[Byte2LongDecoder#decode] Server decode data...");
    if (in.readableBytes() >= 8) {
      out.add(in.readLong());
    }
  }
}

(2)Long2ByteEncoder

public class Long2ByteEncoder extends MessageToByteEncoder<Long> {
  /**
   * 接收的消息類型必須與待處理的消息類型必須一致,否則調用的處理邏輯是 super.encode()
   *
   *    if (acceptOutboundMessage(msg)) {
   *       @SuppressWarnings("unchecked")
   *       I cast = (I) msg;
           buf = allocateBuffer(ctx, cast, preferDirect);
   *       try {
   *           encode(ctx, cast, buf);
   *       } finally {
   *           ReferenceCountUtil.release(cast);
   *       }
   *
   *       if (buf.isReadable()) {
   *           ctx.write(buf, promise);
   *       } else {
   *           buf.release();
   *           ctx.write(Unpooled.EMPTY_BUFFER, promise);
   *       }
   *       buf = null;
   *   } else {
   *       ctx.write(msg, promise);
   *   }
   *
   * @param ctx
   * @param msg
   * @param out
   * @throws Exception
   */
  @Override
  protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
    System.out.println("[Long2ByteEncoder#encode] Client encode data...");
    out.writeLong(msg);
  }
}

(3)NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

  /**
   * 當通道就緒就會觸發該方法
   *
   * @param ctx
   * @throws Exception
   */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("[NettyClientHandler#channelActive] Client send data...");
    ctx.writeAndFlush(12345L);
    // 當發送的類型不是編碼器處理的類型時,則會調用 MessageToByteEncoder#encode()。
    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server! ヾ(•ω•`)o", CharsetUtil.UTF_8));
  }
}

(4)NettyServerHandler

public class NettyServerHandler extends SimpleChannelInboundHandler<Long> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
    System.out.println("[NettyServerHandler#channelRead] " + msg);
  }
}

(5)ClientServer

ch.pipeline()
    .addLast(new Long2ByteEncoder())
    .addLast(new NettyClientHandler());

(6)NettyServer

ch.pipeline()
    .addLast(new Byte2LongDecoder())
    .addLast(new NettyServerHandler());

(7)控制台打印

3.2 其它編解碼器

4. 二進制/文本·協議


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM