Netty 底層是基於 TCP 協議來處理網絡數據傳輸。我們知道 TCP 協議是面向字節流的協議,數據像流水一樣在網絡中傳輸那何來 “包” 的概念呢?
TCP是四層協議不負責數據邏輯的處理,但是數據在TCP層 “流” 的時候為了保證安全和節約效率會把 “流” 做一些分包處理,比如:
- 發送方約定了每次數據傳輸的最大包大小,超過該值的內容將會被拆分成兩個包發送;
- 發送端 和 接收端 約定每次發送數據包長度並隨着網絡狀況動態調整接收窗口大小,這里也會出現拆包的情況;
Netty 本身是基於 TCP 協議做的處理,如果它不去對 “流” 進行處理,到底這個 “流” 從哪到哪才是完整的數據就是個迷。我們先來看在 TCP 協議中有哪些步驟可能會讓 “流” 不完整或者是出現粘滯的可能。
1. TCP 中可能出現粘包/拆包的原因
數據流在TCP協議下傳播,因為協議本身對於流有一些規則的限制,這些規則會導致當前對端接收到的數據包不完整,歸結原因有下面三種情況:
- Socket 緩沖區與滑動窗口
- MSS/MTU限制
- Nagle算法
1. Socket緩沖區與滑動窗口
對於 TCP 協議而言,它傳輸數據是基於字節流傳輸的。應用層在傳輸數據時,實際上會先將數據寫入到 TCP 套接字的緩沖區,當緩沖區被寫滿后,數據才會被寫出去。每個TCP Socket 在內核中都有一個發送緩沖區(SO_SNDBUF )和一個接收緩沖區(SO_RCVBUF),TCP 的全雙工的工作模式以及 TCP 的滑動窗口便是依賴於這兩個獨立的 buffer 以及此 buffer 的填充狀態。
SO_SNDBUF:
進程發送的數據的時候假設調用了一個 send 方法,將數據拷貝進入 Socket 的內核發送緩沖區之中,然后 send 便會在上層返回。換句話說,send 返回之時,數據不一定會發送到對端去(和write寫文件有點類似),send 僅僅是把應用層 buffer 的數據拷貝進 Socket 的內核發送 buffer 中。
SO_RCVBUF:
把接收到的數據緩存入內核,應用進程一直沒有調用 read 進行讀取的話,此數據會一直緩存在相應 Socket 的接收緩沖區內。不管進程是否讀取 Socket,對端發來的數據都會經由內核接收並且緩存到 Socket 的內核接收緩沖區之中。read 所做的工作,就是把內核緩沖區中的數據拷貝到應用層用戶的 buffer 里面,僅此而已。
接收緩沖區保存收到的數據一直到應用進程讀走為止。對於 TCP,如果應用進程一直沒有讀取,buffer 滿了之后發生的動作是:通知對端 TCP 協議中的窗口關閉。這個便是滑動窗口的實現。保證 TCP 套接口接收緩沖區不會溢出,從而保證了 TCP 是可靠傳輸。因為對方不允許發出超過所通告窗口大小的數據。 這就是 TCP 的流量控制,如果對方無視窗口大小而發出了超過窗口大小的數據,則接收方 TCP 將丟棄它。
滑動窗口:
TCP連接在三次握手的時候,會將自己的窗口大小(window size)發送給對方,其實就是 SO_RCVBUF 指定的值。之后在發送數據的時,發送方必須要先確認接收方的窗口沒有被填充滿,如果沒有填滿,則可以發送。
每次發送數據后,發送方將自己維護的對方的 window size 減小,表示對方的 SO_RCVBUF 可用空間變小。
當接收方處理開始處理 SO_RCVBUF 中的數據時,會將數據從 Socket 在內核中的接受緩沖區讀出,此時接收方的 SO_RCVBUF 可用空間變大,即 window size 變大,接受方會以 ack 消息的方式將自己最新的 window size 返回給發送方,此時發送方將自己的維護的接受的方的 window size 設置為ack消息返回的 window size。
此外,發送方可以連續的給接受方發送消息,只要保證對方的 SO_RCVBUF 空間可以緩存數據即可,即 window size>0。當接收方的 SO_RCVBUF 被填充滿時,此時 window size=0,發送方不能再繼續發送數據,要等待接收方 ack 消息,以獲得最新可用的 window size。
2. MSS/MTU分片
MTU (Maxitum Transmission Unit,最大傳輸單元)是鏈路層對一次可以發送的最大數據的限制。MSS(Maxitum Segment Size,最大分段大小)是 TCP 報文中 data 部分的最大長度,是傳輸層對一次可以發送的最大數據的限制。
數據在傳輸過程中,每經過一層,都會加上一些額外的信息:
- 應用層:只關心發送的數據 data,將數據寫入 Socket 在內核中的緩沖區 SO_SNDBUF 即返回,操作系統會將 SO_SNDBUF 中的數據取出來進行發送;
- 傳輸層:會在 data 前面加上 TCP Header(20字節);
- 網絡層:會在 TCP 報文的基礎上再添加一個 IP Header,也就是將自己的網絡地址加入到報文中。IPv4 中 IP Header 長度是 20 字節,IPV6 中 IP Header 長度是 40 字節;
- 鏈路層:加上 Datalink Header 和 CRC。會將 SMAC(Source Machine,數據發送方的MAC地址),DMAC(Destination Machine,數據接受方的MAC地址 )和 Type 域加入。SMAC+DMAC+Type+CRC 總長度為 18 字節;
- 物理層:進行傳輸。
在回顧這個基本內容之后,再來看 MTU 和 MSS。MTU 是以太網傳輸數據方面的限制,每個以太網幀最大不能超過 1518bytes。刨去以太網幀的幀頭(DMAC+SMAC+Type域) 14Bytes 和幀尾 (CRC校驗 ) 4 Bytes,那么剩下承載上層協議的地方也就是 data 域最大就只能有 1500 Bytes 這個值 我們就把它稱之為 MTU。
MSS 是在 MTU 的基礎上減去網絡層的 IP Header 和傳輸層的 TCP Header 的部分,這就是 TCP 協議一次可以發送的實際應用數據的最大大小。
MSS = MTU(1500) -IP Header(20 or 40)-TCP Header(20)
由於 IPV4 和 IPV6 的長度不同,在 IPV4 中,以太網 MSS 可以達到 1460byte。在 IPV6 中,以太網 MSS 可以達到 1440byte。
發送方發送數據時,當 SO_SNDBUF 中的數據量大於 MSS 時,操作系統會將數據進行拆分,使得每一部分都小於 MSS,也形成了拆包。然后每一部分都加上 TCP Header,構成多個完整的 TCP 報文進行發送,當然經過網絡層和數據鏈路層的時候,還會分別加上相應的內容。
另外需要注意的是:對於本地回環地址(lookback)不需要走以太網,所以不受到以太網 MTU=1500 的限制。linux 服務器上輸入 ifconfig 命令,可以查看不同網卡的 MTU 大小,如下:
上圖顯示了 2 個網卡信息:
- eth0 需要走以太網,所以 MTU 是 1500;
- lo 是本地回環,不需要走以太網,所以不受 1500 的限制。
Nagle 算法
TCP/IP 協議中,無論發送多少數據,總是要在數據(data)前面加上協議頭(TCP Header+IP Header),同時,對方接收到數據,也需要發送 ACK 表示確認。
即使從鍵盤輸入的一個字符,占用一個字節,可能在傳輸上造成 41 字節的包,其中包括 1 字節的有用信息和 40 字節的首部數據。這種情況轉變成了 4000% 的消耗,這樣的情況對於重負載的網絡來是無法接受的。稱之為"糊塗窗口綜合征"。
為了盡可能的利用網絡帶寬,TCP 總是希望盡可能的發送足夠大的數據。(一個連接會設置 MSS 參數,因此,TCP/IP 希望每次都能夠以 MSS 尺寸的數據塊來發送數據)。Nagle 算法就是為了盡可能發送大塊數據,避免網絡中充斥着許多小數據塊。
Nagle 算法的基本定義是任意時刻,最多只能有一個未被確認的小段。 所謂 “小段”,指的是小於 MSS 尺寸的數據塊;所謂“未被確認”,是指一個數據塊發送出去后,沒有收到對方發送的 ACK 確認該數據已收到。
Nagle 算法的規則:
- 如果 SO_SNDBUF 中的數據長度達到 MSS,則允許發送;
- 如果該 SO_SNDBUF 中含有 FIN,表示請求關閉連接,則先將 SO_SNDBUF 中的剩余數據發送,再關閉;
- 設置了
TCP_NODELAY=true
選項,則允許發送。TCP_NODELAY 是取消 TCP 的確認延遲機制,相當於禁用了 Negale 算法。正常情況下,當 Server 端收到數據之后,它並不會馬上向 client 端發送 ACK,而是會將 ACK 的發送延遲一段時間(一般是 40ms),它希望在 t 時間內 server 端會向 client 端發送應答數據,這樣 ACK 就能夠和應答數據一起發送,就像是應答數據捎帶着 ACK 過去。當然,TCP 確認延遲 40ms 並不是一直不變的, TCP 連接的延遲確認時間一般初始化為最小值 40ms,隨后根據連接的重傳超時時間(RTO)、上次收到數據包與本次接收數據包的時間間隔等參數進行不斷調整。另外可以通過設置 TCP_QUICKACK 選項來取消確認延遲; - 未設置 TCP_CORK 選項時,若所有發出去的小數據包(包長度小於MSS)均被確認,則允許發送;
- 上述條件都未滿足,但發生了超時(一般為200ms),則立即發送。
基於以上問題,TCP層肯定是會出現當次接收到的數據是不完整數據的情況。出現粘包可能的原因有:
- 發送方每次寫入數據 < 套接字緩沖區大小;
- 接收方讀取套接字緩沖區數據不夠及時。
出現半包的可能原因有:
- 發送方每次寫入數據 > 套接字緩沖區大小;
- 發送的數據大於協議 MTU,所以必須要拆包。
解決問題肯定不是在4層來做而是在應用層,通過定義通信協議來解決粘包和拆包的問題。發送方 和 接收方約定某個規則:
- 當發生粘包的時候通過某種約定來拆包;
- 如果在拆包,通過某種約定來將數據組成一個完整的包處理。
2. 業界常用解決方案
1. 定長協議
指定一個報文具有固定長度。比如約定一個報文的長度是 5 字節,那么:
報文:1234,只有4字節,但是還差一個怎么辦呢,不足部分用空格補齊。就變為:1234 。
如果不補齊空格,那么就會讀到下一個報文的字節來填充上一個報文直到補齊為止,這樣粘包了。
定長協議的優點是使用簡單,缺點很明顯:浪費帶寬。
Netty 中提供了 FixedLengthFrameDecoder
,支持把固定的長度的字節數當做一個完整的消息進行解碼。
2. 特殊字符分割協議
很好理解,在每一個你認為是一個完整的包的尾部添加指定的特殊字符,比如:\n,\r等等。
需要注意的是:約定的特殊字符要保證唯一性,不能出現在報文的正文中,否則就將正文一分為二了。
Netty 中提供了 DelimiterBasedFrameDecoder
根據特殊字符進行解碼,LineBasedFrameDecoder
默認以換行符作為分隔符。
3. 變長協議
變長協議的核心就是:將消息分為消息頭和消息體,消息頭中標識當前完整的消息體長度。
- 發送方在發送數據之前先獲取數據的二進制字節大小,然后在消息體前面添加消息大小;
- 接收方在解析消息時先獲取消息大小,之后必須讀到該大小的字節數才認為是完整的消息。
Netty 中提供了 LengthFieldBasedFrameDecoder
,通過 LengthFieldPrepender
來給實際的消息體添加 length 字段。
3. Netty 粘包演示
代碼示例請看:github點我。
1. 實驗主要邏輯
演示客戶端發送多條消息,使用 Netty 自定義的 ByteBuf 作為傳輸數據格式,看看服務端接收數據是否是按每次發送的條數來接收還是按照當前緩沖區大小來接收。
主要代碼:
Server:
package com.rickiyang.learn.packageEvent1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* @author: rickiyang
* @date: 2020/3/15
* @description: server 端
*/
@Slf4j
public class PeServer {
private int port;
public PeServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server start fail",e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
PeServer server = new PeServer(7788);
server.start();
}
}
ServerInitialzr:
package com.rickiyang.learn.packageEvent1;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author: rickiyang
* @date: 2020/3/15
* @description:
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 自己的邏輯Handler
pipeline.addLast("handler", new PeServerHandler());
}
}
ServerHandler:
package com.rickiyang.learn.packageEvent1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author: rickiyang
* @date: 2020/3/15
* @description:
*/
@Slf4j
public class PeServerHandler extends SimpleChannelInboundHandler {
private int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("server channelActive");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("-----start------\n"+ body + "\n------end------");
String content = "receive" + ++counter;
ByteBuf resp = Unpooled.copiedBuffer(content.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
服務端的 handler 主要邏輯是接收客戶端發送過來的數據,看看是否是一條一條接收。然后每次接收到數據之后給客戶端回復一個確認消息。
Client:
package com.rickiyang.learn.packageEvent1;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* @author: rickiyang
* @date: 2020/3/15
* @description:
*/
@Slf4j
public class PeClient {
private int port;
private String address;
public PeClient(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().writeAndFlush("Hello world, i'm online");
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("client start fail",e);
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
PeClient client = new PeClient(7788,"127.0.0.1");
client.start();
}
}
ClientInitializer:
package com.rickiyang.learn.packageEvent1;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 客戶端的邏輯
pipeline.addLast("handler", new PeClientHandler());
}
}
ClientHandler:
package com.rickiyang.learn.packageEvent1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author: rickiyang
* @date: 2020/3/15
* @description:
*/
@Slf4j
public class PeClientHandler extends SimpleChannelInboundHandler {
private int counter;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println(body + " count:" + ++counter + "----end----\n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("client channelActive");
byte[] req = ("我是一條測試消息,快來讀我吧,啦啦啦").getBytes();
for (int i = 0; i < 100; i++) {
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Client is close");
}
}
客戶端 handler 主要邏輯是:循環100次給服務端發送測試消息。接收服務端的確認消息。
啟動項目之后我們來看看客戶端 和 服務端分別收到的消息結果:
服務端接收到的消息:
-----start------
我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦�
------end------
-----start------
��我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦�
------end------
-----start------
�啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦
------end------
-----start------
啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,�
------end------
-----start------
��啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧�
------end------
-----start------
�啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦我是一條測試消息,快來讀我吧,啦啦啦
------end------
這里能看到多條消息被粘到一起發送了。
客戶端接收到服務端回傳的消息:
receive1receive2receive3receive4receive5 count:1----end----
receive6 count:2----end----
服務端收到 6 次消息,所以回復了 6 次,同樣客戶端接收消息也出現粘包的現象。
因為我們並沒有對數據包做任何聲明,站在 TCP 協議端看, Netty 屬於應用層,我們上面的示例代碼中未對原始的數據包做任何處理。
4. Netty 粘包處理
處理 TCP 粘包的唯一方法就是制定應用層的數據通訊協議,通過協議來規范現有接收的數據是否滿足消息數據的需要。
1. Netty 提供的能力
為了解決網絡數據流的拆包粘包問題,Netty 為我們內置了如下的解碼器:
- ByteToMessageDecoder:如果想實現自己的半包解碼器,實現該類;
- MessageToMessageDecoder:一般作為二次解碼器,當我們在 ByteToMessageDecoder 將一個 bytes 數組轉換成一個 java 對象的時候,我們可能還需要將這個對象進行二次解碼成其他對象,我們就可以繼承這個類;
- LineBasedFrameDecoder:通過在包尾添加回車換行符 \r\n 來區分整包消息;
- StringDecoder:字符串解碼器;
- DelimiterBasedFrameDecoder:特殊字符作為分隔符來區分整包消息;
- FixedLengthFrameDecoder:報文大小固定長度,不夠空格補全;
- ProtoBufVarint32FrameDecoder:通過 Protobuf 解碼器來區分整包消息;
- ProtobufDecoder: Protobuf 解碼器;
- LengthFieldBasedFrameDecoder:指定長度來標識整包消息,通過在包頭指定整包長度來約定包長。
Netty 還內置了如下的編碼器:
- ProtobufEncoder:Protobuf 編碼器;
- MessageToByteEncoder:將 Java 對象編碼成 ByteBuf;
- MessageToMessageEncoder:如果不想將 Java 對象編碼成 ByteBuf,而是自定義類就繼承這個;
- LengthFieldPrepender:LengthFieldPrepender 是一個非常實用的工具類,如果我們在發送消息的時候采用的是:消息長度字段+原始消息的形式,那么我們就可以使用 LengthFieldPrepender。這是因為 LengthFieldPrepender 可以將待發送消息的長度(二進制字節長度)寫到 ByteBuf 的前兩個字節。
編解碼相關類結構圖如下:
上面的類關系能看到所有的自定義解碼器都是繼承自 ByteToMessageDecoder
。在Netty 中 Decoder 主要分為兩大類:
- 一種是將字節流轉換為某種協議的數據格式:
ByteToMessageDecoder
和ReplayingDecoder
; - 一種是將一直協議的數據轉為另一種協議的數據格式:
MessageToMessageDecoder
。
將字節流轉為對象是一種很常見的操作,也是一個消息框架應該提供的基礎功能。因為 Decoder 的作用是將輸入的數據解析成特定協議,上圖中可以看到所有的 Decoder 都實現了 ChannelInboundHandler接口。在應用層將 byte 轉為 message 的難度在於如何確定當前的包是一個完整的數據包,有兩種方案可以實現:
- 監聽當前 socket 的線程一直等待,直到收到的 byte 可以完成的構成一個包為止。這種方式的弊端就在於要浪費一個線程去等。
- 第二種方案是為每個監聽的 socket 都構建一個本地緩存,當前監聽線程如果遇到字節數不夠的情況就先將獲取到的數據存入緩存,繼而處理別的請求,等到這里有數據的時候再來將新數據繼續寫入緩存直到數據構成一個完整的包取出。
ByteToMessageDecoder 采用的是第二種方案。在 ByteToMessageDecoder 中有一個對象 ByteBuf,該對象用於存儲當前 Decoder接收到的 byte 數據。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 用來保存累計讀取到的字節. 我們讀到的新字節會保存(緩沖)在這里
ByteBuf cumulation;
// 用來做累計的,負責將讀到的新字節寫入 cumulation,有兩個實現 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR
private Cumulator cumulator = MERGE_CUMULATOR;
//設置為true后, 單個解碼器只會解碼出一個結果
private boolean singleDecode;
private boolean decodeWasNull;
//是否是第一次讀取數據
private boolean first;
//多少次讀取后, 丟棄數據 默認16次
private int discardAfterReads = 16;
//已經累加了多少次數據
private int numReads;
//每次接收到數據,就會調用channelRead 進行處理
//該處理器用於處理二進制數據,所以 msg 字段的類型應該是 ByteBuf。
//如果不是,則交給pipeLine的下一個處理器進行處理。
//下面的代碼中可以看出
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果不是ByteBuf則不處理
if (msg instanceof ByteBuf) {
//out用於存儲解析二進制流得到的結果,一個二進制流可能會解析出多個消息,所以out是一個list
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//判斷cumulation == null;並將結果賦值給first。因此如果first為true,則表示第一次接受到數據
first = cumulation == null;
//如果是第一次接受到數據,直接將接受到的數據賦值給緩存對象cumulation
if (first) {
cumulation = data;
} else {
// 第二次解碼,就將 data 向 cumulation 追加,並釋放 data
//如果cumulation中的剩余空間,不足以存儲接收到的data,將cumulation擴容
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加后的 cumulation 后,調用 decode 方法進行解碼
// 解碼過程中,調用 fireChannelRead 方法,主要目的是將累積區的內容 decode 到 數組中
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//如果cumulation沒有數據可讀了,說明所有的二進制數據都被解析過了
//此時對cumulation進行釋放,以節省內存空間。
//反之cumulation還有數據可讀,那么if中的語句不會運行,因為不對cumulation進行釋放
//因此也就緩存了用戶尚未解析的二進制數據。
if (cumulation != null && !cumulation.isReadable()) {
// 將次數歸零
numReads = 0;
// 釋放累計區
cumulation.release();
// 等待 gc
cumulation = null;
// 如果超過了 16 次,就壓縮累計區,主要是將已經讀過的數據丟棄,將 readIndex 歸零。
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
// 如果沒有向數組插入過任何數據
decodeWasNull = !out.insertSinceRecycled();
// 循環數組,向后面的 handler 發送數據,如果數組是空,那不會調用
fireChannelRead(ctx, out, size);
// 將數組中的內容清空,將數組的數組的下標恢復至原來
out.recycle();
}
} else {
//如果msg類型是不是ByteBuf,直接調用下一個handler進行處理
ctx.fireChannelRead(msg);
}
}
//callDecode方法主要用於解析cumulation 中的數據,並將解析的結果放入List<Object> out中。
//由於cumulation中緩存的二進制數據,可能包含了出多條有效信息,因此在callDecode方法中,默認會調用多次decode方法
//我們在覆寫decode方法時,每次只解析一個消息,添加到out中,callDecode通過多次回調decode
//每次傳遞進來都是相同的List<Object> out實例,因此每一次解析出來的消息,都存儲在同一個out實例中。
//當cumulation沒有數據可以繼續讀,或者某次調用decode方法后,List<Object> out中元素個數沒有變化,則停止回調decode方法。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//如果cumulation中有數據可讀的話,一直循環調用decode
while (in.isReadable()) {
//獲取上一次decode方法調用后,out中元素數量,如果是第一次調用,則為0。
int outSize = out.size();
//上次循環成功解碼
if (outSize > 0) {
//用后面的業務 handler 的 ChannelRead 方法讀取解析的數據
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//回調decode方法,由開發者覆寫,用於解析in中包含的二進制數據,並將解析結果放到out中。
decode(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//outSize是上一次decode方法調用時out的大小,out.size()是當前out大小
//如果二者相等,則說明當前decode方法調用沒有解析出有效信息。
if (outSize == out.size()) {
//此時,如果發現上次decode方法和本次decode方法調用候,in中的剩余可讀字節數相同
//則說明本次decode方法沒有讀取任何數據解析
//(可能是遇到半包等問題,即剩余的二進制數據不足以構成一條消息),跳出while循環。
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//處理人為失誤 。如果走到這段代碼,則說明outSize != out.size()。
//也就是本次decode方法實際上是解析出來了有效信息放到out中。
//但是oldInputLength == in.readableBytes(),說明本次decode方法調用並沒有讀取任何數據
//但是out中元素卻添加了。
//這可能是因為開發者錯誤的編寫了代碼,例如mock了一個消息放到List中。
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
}
這里 channelRead()
的主要邏輯是:
- 從對象池中取出一個空的數組;
- 判斷成員變量是否是第一次使用,要注意的是,這里既然使用了成員變量,所以這個 handler 不能是 @Shareble 狀態的 handler,不然你就分不清成員變量是哪個 channel 的。將 unsafe 中傳遞來的數據寫入到這個 cumulation 累積區中;
- 寫到累積區后,調用子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就調用后面節點的 channelRead 方法。若沒有解碼成功,什么都不做;
- 如果累積區沒有未讀數據了,就釋放累積區;
- 如果還有未讀數據,且解碼超過了 16 次(默認),就對累積區進行壓縮。將讀取過的數據清空,也就是將 readIndex 設置為0;
- 設置 decodeWasNull 的值,如果上一次沒有插入任何數據,這個值就是 ture。該值在 調用 channelReadComplete 方法的時候,會觸發 read 方法(不是自動讀取的話),嘗試從 JDK 的通道中讀取數據,並將之前的邏輯重來。主要應該是怕如果什么數據都沒有插入,就執行 channelReadComplete 會遺漏數據;
- 調用 fireChannelRead 方法,將數組中的元素發送到后面的 handler 中;
- 將數組清空。並還給對象池。
當數據添加到累積區之后,需要調用 decode 方法進行解碼,代碼見上面的 callDecode()
方法。在 callDecode()
中最關鍵的代碼就是將解析完的數據拿取調用decode(ctx, in, out)
方法。所以如果繼承 ByteToMessageDecoder 類實現自己的字節流轉對象的邏輯我們就要覆寫該方法。
2. LineBasedFrameDecoder 使用
LineBasedFrameDecoder
通過在包尾添加回車換行符 \r\n
來區分整包消息。邏輯比較簡單,示例代碼見:
示例代碼見:LineBasedFrameDecoder gitHub示例
3. FixedLengthFrameDecoder 使用
LineBasedFrameDecoder
即固定消息長度解碼器,個人認為這個貌似不能適用通用場景。
示例代碼見:FixedLengthFrameDecoder gitHub 示例
4. DelimiterBasedFrameDecoder 使用
DelimiterBasedFrameDecoder
即自定義分隔符解碼器。相當於是 LineBasedFrameDecoder
的高階版。
示例代碼見:DelimiterBasedFrameDecoder gitHub示例
5. LengthFieldBasedFrameDecoder 使用
LengthFieldBasedFrameDecoder
相對就高端一點。前面我們使用到的拆包都是基於一些約定來做的,比如固定長度,特殊分隔符,這些方案總是有一定的弊端。最好的方案就是:發送方告訴我當前消息總長度,接收方如果沒有收到該長度大小的數據就認為是沒有收完繼續等待。
先看一下該類的構造函數:
/**
* Creates a new instance.
*
* @param maxFrameLength 幀的最大長度
*
* @param lengthFieldOffset 長度字段偏移的地址
*
* @param lengthFieldLength 長度字段所占的字節長
* 修改幀數據長度字段中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,
* 若為負數,則說明要推后多少個字段
* @param lengthAdjustment 解析時候跳過多少個長度
*
* @param initialBytesToStrip 解碼出一個數據包之后,去掉開頭的字節數
*
* @param initialBytesToStrip 為true,當frame長度超過maxFrameLength時立即報
* TooLongFrameException異常,為false,讀取完整個幀再報異
*
*/
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
在 LengthFieldBasedFrameDecoder
類的注解上給出了一些關於該類使用的示例:
示例1:
lengthFieldOffset = 0,長度字段偏移位置為0表示從包的第一個字節開始讀取;
lengthFieldLength = 2,長度字段長為2,從包的開始位置往后2個字節的長度為長度字段;
lengthAdjustment = 0 ,解析的時候無需跳過任何長度;
initialBytesToStrip = 0,無需去掉當前數據包的開頭字節數, header + body。
0x000C 轉為 int = 12。
* <pre>
* <b>lengthFieldOffset</b> = <b>0</b>
* <b>lengthFieldLength</b> = <b>2</b>
* lengthAdjustment = 0
* initialBytesToStrip = 0 (= do not strip header)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* </pre>
上面這個設置表示:body長度為12,從當前包的第0個字節開始讀取,前兩個字節表示包長度,讀取數據 body的時候不偏移從0字節開始,所以整包大小14個字節,包含包頭長度字節在內。
示例2:
lengthFieldOffset = 0,長度字段偏移位置為0表示從包的第一個字節開始讀取;
lengthFieldLength = 2,長度字段長為2,從包的開始位置往后2個字節的長度為長度字段;
lengthAdjustment = 0 ,解析的時候無需跳過任何長度;
initialBytesToStrip = 2,去掉當前數據包的開頭2字節,去掉 header。
0x000C 轉為 int = 12。
* <pre>
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = 0
* <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
* </pre>
這個配置跟上面的而區別就在於,initialBytesToStrip = 2,表示當前包中的有效數據是從整包偏移2個字節開始計算的,即包頭中的長度字段 2 byte 不屬於包內容的一部分。
示例3:
lengthFieldOffset = 0,長度字段偏移位置為0表示從包的第一個字節開始讀取;
lengthFieldLength = 2,長度字段長為2,從包的開始位置往后2個字節的長度為長度字段;
lengthAdjustment = -2 ,解析的時候無需跳過任何長度;
initialBytesToStrip = 0,無需去掉當前數據包的開頭字節數。
0x000C 轉為 int = 12。
* <pre>
* lengthFieldOffset = 0
* lengthFieldLength = 2
* <b>lengthAdjustment</b> = <b>-2</b> (= the length of the Length field)
* initialBytesToStrip = 0
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* </pre>
length = 14,長度字段為 2 字節,真實的數據長度為 12 個字節,但是 length = 14,那么說明 length的長度也算上了數據包長度了。lengthAdjustment = -2 ,表示當前length長度往回調2個字節,這樣總包長度就是14個字節。
示例4:
lengthFieldOffset = 2,長度字段偏移位置為2表示從包的第3個字節開始讀取;
lengthFieldLength = 3,長度字段長為3,從包的開始位置往后3個字節的長度為長度字段;
lengthAdjustment = 0 ,解析的時候無需跳過任何長度;
initialBytesToStrip = 0,無需去掉當前數據包的開頭字節數。
0x000E 轉為 int = 14。
* <pre>
* <b>lengthFieldOffset</b> = <b>2</b> (= the length of Header 1)
* <b>lengthFieldLength</b> = <b>3</b>
* lengthAdjustment = 0
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* </pre>
*
header頭占2個字節,長度字段占3個字節,content字段占12個字節,總共17個字節。body讀取無偏移要求,所以body整體也是17個字節。
示例5:
lengthFieldOffset = 0,長度字段偏移位置為0表示從包的第0個字節開始讀取;
lengthFieldLength = 3,長度字段長為3,從包的開始位置往后3個字節的長度為長度字段;
lengthAdjustment = 2 ,解析的時候跳過2個字節;
initialBytesToStrip = 0,無需去掉當前數據包的開頭字節數。
0x000C 轉為 int = 12。
* <pre>
* lengthFieldOffset = 0
* lengthFieldLength = 3
* <b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1)
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* </pre>
*
這個包 length在最前面傳輸占3個字節,header在中間占兩個字節,content在最后占12個字節。body字段只有content,所以讀取content的時候需要在length字段的基礎上往前偏移2個字節跳過heade字段。
關於 LengthFieldBasedFrameDecoder
構造函數的示例用法我們先將這么多,下來舉一個示例我們看看實際中的使用:
示例代碼見:LengthFieldBasedFrameDecoder基本使用 gitHub示例
代碼解釋:
@Slf4j
public class PeClientHandler extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("client channelActive");
for (int i = 0; i < 100; i++) {
byte[] req = ("我是一條測試消息,快來讀我吧,啦啦啦" + i).getBytes();
ByteBuf message = Unpooled.buffer(req.length);
message.writeInt(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
}
客戶端發送消息是:int型的length字段占4個字節,剩余字節為content內容。那么對應到客戶端接收的解碼器設置:
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, // 幀的最大長度,即每個數據包最大限度
0, // 長度字段偏移量
4, // 長度字段所占的字節數
0, // 消息頭的長度,可以為負數
4) // 需要忽略的字節數,從消息頭開始,這里是指整個包
);
長度字段4個字節,消息體忽略4字節,即排除長度字段之后的內容算是body。
以上的這段演示代碼的重點,大家可以下載示例功能,自己演示一下。
但是有個問題是:我們上面寫的示例代碼在生產環境中只能是玩具。消息體的讀取配置不應該在這里通過參數配置來設置,應該有一個約定的消息結構體,每一個字段是什么數據結構會占用多大空間都應該在結構體中約定清楚。每個字段讀取對應空間大小的數據剩下的就是別人的部分互不侵犯。
所以下面的一個示例給出了通過繼承 LengthFieldBasedFrameDecoder 重寫 decode 方法來實現解析出約定對象的實現。
6. 自定義編解碼器的 LengthFieldBasedFrameDecoder 使用
首先我們自定義了一個消息體:
public class MsgReq {
private byte type;
private int length;
private String content;
}
包含3個字段。
發送消息出去的時候肯定是要將對象轉為 byte 發送,所以需要一個消息編碼器,我們繼承 MessageToByteEncoder 來實現編碼器:
package com.rickiyang.learn.packageEvent5;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
/**
* @author rickiyang
* @date 2020-05-14
* @Desc 自定義編碼器
*/
public class MyProtocolEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MsgReq req = (MsgReq) msg;
out.writeByte(req.getType());
out.writeInt(req.getLength());
out.writeBytes(req.getContent().getBytes(StandardCharsets.UTF_8));
}
}
即將 MsgReq 對象轉為對應的 byte 發送。
發送出去的是 byte 字節,對應的解碼器應該是將 byte 轉為對象。自然解碼器應該是繼承 ByteToMessageDecoder。我們的目的不是自己實現一個完完全全的自定義解碼器,而是在消息長度解碼器的基礎上完成對象解析的工作,所以解碼器如下:
package com.rickiyang.learn.packageEvent5;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.nio.charset.StandardCharsets;
/**
* @author rickiyang
* @date 2020-05-14
* @Desc 自定義解碼器
*/
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
/**
* @param maxFrameLength 幀的最大長度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字節長
* @param lengthAdjustment 修改幀數據長度字段中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推后多少個字段
* @param initialBytesToStrip 解析時候跳過多少個長度
* @param failFast 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在這里調用父類的方法
in = (ByteBuf) super.decode(ctx, in);
if (in == null) {
return null;
}
//讀取type字段
byte type = in.readByte();
//讀取length字段
int length = in.readInt();
if (in.readableBytes() != length) {
throw new RuntimeException("長度與標記不符");
}
//讀取body
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
return MsgReq.builder().length(length).type(type).content(new String(bytes, StandardCharsets.UTF_8)).build();
}
}
通過這種方式,我們只用約定好消息的最大長度,比如一條消息超過多少字節就拒收,約定好消息長度字段所占的字節,一般來說int類型4個字節足夠。剩下的幾個參數都無需設置,按照約定的消息格式進行解析即可。
示例代碼見:LengthFieldBasedFrameDecoder自定義編解碼器 gitHub示例
5. 小結
本篇將了關於 Netty 中處理拆包粘包的一些實用工具以及如果實現自定義的編解碼器的方式。每種處理方式都給出了對應的案例操作,大家有興趣的可以下載代碼自行運行看看處理效果。后面也給出了關於自定義編解碼器的示例,大家如果有興趣可以自己寫一下編解碼操作,下一篇再一起看看編解碼器在消息讀寫過程被使用在哪個階段。