1、什么是粘包/拆包
一般所謂的TCP粘包是在一次接收數據不能完全地體現一個完整的消息數據。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理數據,再加上網絡上MTU的往往小於在應用處理的消息數據,所以就會引發一次接收的數據無法滿足消息的需要,導致粘包的存在。處理粘包的唯一方法就是制定應用層的數據通訊協議,通過協議來規范現有接收的數據是否滿足消息數據的需要。
2、解決辦法
2.1、消息定長,報文大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方編程實現獲取定長報文也能區分。
2.2、包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符作為報文分隔符,接收方通過特殊分隔符切分報文區分。
2.3、將消息分為消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段
3、自定義協議,來實現TCP的粘包/拆包問題
3.0 自定義協議,開始標記
3.1 自定義協議的介紹
3.2 自定義協議的類的封裝
3.3 自定義協議的編碼器
3.4 自定義協議的解碼器
4、協議相關的實現
4.1 協議的封裝
- import java.util.Arrays;
- /**
- * <pre>
- * 自己定義的協議
- * 數據包格式
- * +——----——+——-----——+——----——+
- * |協議開始標志| 長度 | 數據 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標志head_data,為int類型的數據,16進制表示為0X76
- * 2.傳輸數據的長度contentLength,int類型
- * 3.要傳輸的數據
- * </pre>
- */
- public class SmartCarProtocol {
- /**
- * 消息的開頭的信息標志
- */
- private int head_data = ConstantValue.HEAD_DATA;
- /**
- * 消息的長度
- */
- private int contentLength;
- /**
- * 消息的內容
- */
- private byte[] content;
- /**
- * 用於初始化,SmartCarProtocol
- *
- * @param contentLength
- * 協議里面,消息數據的長度
- * @param content
- * 協議里面,消息的數據
- */
- public SmartCarProtocol(int contentLength, byte[] content) {
- this.contentLength = contentLength;
- this.content = content;
- }
- public int getHead_data() {
- return head_data;
- }
- public int getContentLength() {
- return contentLength;
- }
- public void setContentLength(int contentLength) {
- this.contentLength = contentLength;
- }
- public byte[] getContent() {
- return content;
- }
- public void setContent(byte[] content) {
- this.content = content;
- }
- @Override
- public String toString() {
- return "SmartCarProtocol [head_data=" + head_data + ", contentLength="
- + contentLength + ", content=" + Arrays.toString(content) + "]";
- }
- }
4.2 協議的編碼器
- /**
- * <pre>
- * 自己定義的協議
- * 數據包格式
- * +——----——+——-----——+——----——+
- * |協議開始標志| 長度 | 數據 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標志head_data,為int類型的數據,16進制表示為0X76
- * 2.傳輸數據的長度contentLength,int類型
- * 3.要傳輸的數據
- * </pre>
- */
- public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {
- @Override
- protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,
- ByteBuf out) throws Exception {
- // 寫入消息SmartCar的具體內容
- // 1.寫入消息的開頭的信息標志(int類型)
- out.writeInt(msg.getHead_data());
- // 2.寫入消息的長度(int 類型)
- out.writeInt(msg.getContentLength());
- // 3.寫入消息的內容(byte[]類型)
- out.writeBytes(msg.getContent());
- }
- }
4.3 協議的解碼器
- import java.util.List;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- /**
- * <pre>
- * 自己定義的協議
- * 數據包格式
- * +——----——+——-----——+——----——+
- * |協議開始標志| 長度 | 數據 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標志head_data,為int類型的數據,16進制表示為0X76
- * 2.傳輸數據的長度contentLength,int類型
- * 3.要傳輸的數據,長度不應該超過2048,防止socket流的攻擊
- * </pre>
- */
- public class SmartCarDecoder extends ByteToMessageDecoder {
- /**
- * <pre>
- * 協議開始的標准head_data,int類型,占據4個字節.
- * 表示數據的長度contentLength,int類型,占據4個字節.
- * </pre>
- */
- public final int BASE_LENGTH = 4 + 4;
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
- List<Object> out) throws Exception {
- // 可讀長度必須大於基本長度
- if (buffer.readableBytes() >= BASE_LENGTH) {
- // 防止socket字節流攻擊
- // 防止,客戶端傳來的數據過大
- // 因為,太大的數據,是不合理的
- if (buffer.readableBytes() > 2048) {
- buffer.skipBytes(buffer.readableBytes());
- }
- // 記錄包頭開始的index
- int beginReader;
- while (true) {
- // 獲取包頭開始的index
- beginReader = buffer.readerIndex();
- // 標記包頭開始的index
- buffer.markReaderIndex();
- // 讀到了協議的開始標志,結束while循環
- if (buffer.readInt() == ConstantValue.HEAD_DATA) {
- break;
- }
- // 未讀到包頭,略過一個字節
- // 每次略過,一個字節,去讀取,包頭信息的開始標記
- buffer.resetReaderIndex();
- buffer.readByte();
- // 當略過,一個字節之后,
- // 數據包的長度,又變得不滿足
- // 此時,應該結束。等待后面的數據到達
- if (buffer.readableBytes() < BASE_LENGTH) {
- return;
- }
- }
- // 消息的長度
- int length = buffer.readInt();
- // 判斷請求數據包數據是否到齊
- if (buffer.readableBytes() < length) {
- // 還原讀指針
- buffer.readerIndex(beginReader);
- return;
- }
- // 讀取data數據
- byte[] data = new byte[length];
- buffer.readBytes(data);
- SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
- out.add(protocol);
- }
- }
- }
4.4 服務端加入協議的編/解碼器
4.5 客戶端加入協議的編/解碼器
5、服務端的實現
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- public class Server {
- public Server() {
- }
- public void bind(int port) throws Exception {
- // 配置NIO線程組
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- // 服務器輔助啟動類配置
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChildChannelHandler())//
- .option(ChannelOption.SO_BACKLOG, 1024) // 設置tcp緩沖區 // (5)
- .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
- // 綁定端口 同步等待綁定成功
- ChannelFuture f = b.bind(port).sync(); // (7)
- // 等到服務端監聽端口關閉
- f.channel().closeFuture().sync();
- } finally {
- // 優雅釋放線程資源
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- /**
- * 網絡事件處理器
- */
- private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 添加自定義協議的編解碼工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 處理網絡IO
- ch.pipeline().addLast(new ServerHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Server().bind(9999);
- }
- }
6、服務端Handler的實現
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- public class ServerHandler extends ChannelHandlerAdapter {
- // 用於獲取客戶端發送的信息
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- // 用於獲取客戶端發來的數據信息
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Server接受的客戶端的信息 :" + body.toString());
- // 會寫數據給客戶端
- String str = "Hi I am Server ...";
- SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
- str.getBytes());
- // 當服務端完成寫操作后,關閉與客戶端的連接
- ctx.writeAndFlush(response);
- // .addListener(ChannelFutureListener.CLOSE);
- // 當有寫操作時,不需要手動釋放msg的引用
- // 當只有讀操作時,才需要手動釋放msg的引用
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- // cause.printStackTrace();
- ctx.close();
- }
- }
7、客戶端的實現
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class Client {
- /**
- * 連接服務器
- *
- * @param port
- * @param host
- * @throws Exception
- */
- public void connect(int port, String host) throws Exception {
- // 配置客戶端NIO線程組
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- // 客戶端輔助啟動類 對客戶端配置
- Bootstrap b = new Bootstrap();
- b.group(group)//
- .channel(NioSocketChannel.class)//
- .option(ChannelOption.TCP_NODELAY, true)//
- .handler(new MyChannelHandler());//
- // 異步鏈接服務器 同步等待鏈接成功
- ChannelFuture f = b.connect(host, port).sync();
- // 等待鏈接關閉
- f.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- System.out.println("客戶端優雅的釋放了線程資源...");
- }
- }
- /**
- * 網絡事件處理器
- */
- private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 添加自定義協議的編解碼工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 處理網絡IO
- ch.pipeline().addLast(new ClientHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Client().connect(9999, "127.0.0.1");
- }
- }
8、客戶端Handler的實現
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- //用於讀取客戶端發來的信息
- public class ClientHandler extends ChannelHandlerAdapter {
- // 客戶端與服務端,連接成功的售后
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 發送SmartCar協議的消息
- // 要發送的信息
- String data = "I am client ...";
- // 獲得要發送信息的字節數組
- byte[] content = data.getBytes();
- // 要發送信息的長度
- int contentLength = content.length;
- SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);
- ctx.writeAndFlush(protocol);
- }
- // 只是讀數據,沒有寫數據的話
- // 需要自己手動的釋放的消息
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- try {
- // 用於獲取客戶端發來的數據信息
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Client接受的客戶端的信息 :" + body.toString());
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- ctx.close();
- }
- }
Netty ByteBuf
ByteBuf的基本結構
ByteBuf由一段地址空間,一個read index和一個write index組成。兩個index分別記錄讀寫進度,省去了NIO中ByteBuffer手動調用flip和clear的煩惱。
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
通過上圖可以很好的理解ByteBuf的數據划分。writer index到capacity之間的部分是空閑區域,可以寫入數據;reader index到writer index之間是已經寫過還未讀取的可讀數據;0到reader index是已讀過可以釋放的區域。
三個index之間的關系是:reader index <= writer index <= capacity
存儲空間
ByteBuf根據其數據存儲空間不同有可以分為三種:基於JVM堆內的,基於直接內存的和組合的。
堆內受JVM垃圾收集器的管轄,使用上相對安全一些,不用每次手動釋放。弊端是GC是會影響性能的;還有就是內存的拷貝帶來的性能損耗(JVM進程到Socket)。
直接內存則不受JVM的管轄,省去了向JVM拷貝數據的麻煩。但是壞處就是別忘了釋放內存,否則就會發生內存泄露。相比於堆內存,直接內存的的分配速度也比較慢。
最佳實踐:在IO通信的線程中的讀寫Buffer使用DirectBuffer(省去內存拷貝的成本),在后端業務消息的處理使用HeapBuffer(不用擔心內存泄露)。
通過hasArray檢查一個ByteBuf heap based還是direct buffer。
創建ByteBuf
ByteBuf提供了兩個工具類來創建ByteBuf,分別是支持池化的Pooled和普通的Unpooled。Pooled緩存了ByteBuf的實例,提高性能並且減少內存碎片。它使用Jemalloc來高效的分配內存。
如果在Channel中我們可以通過channel.alloc()來拿到ByteBufAllocator,具體它使用Pool還是Unpool,Directed還是Heap取決於程序的配置。
索引的標記與恢復
markReaderIndex和resetReaderIndex是一個成對的操作。markReaderIndex可以打一個標記,調用resetReaderIndex可以把readerIndex重置到原來打標記的位置。
空間釋放
discardReadByte可以把讀過的空間釋放,這時buffer的readerIndex置為0,可寫空間和writerIndex也會相應的改變。discardReadBytes在內存緊張的時候使用用,但是調用該方法會伴隨buffer的內存整理的。這是一個expensive的操作。
clear是把readerIndex和writerIndex重置到0。但是,它不會進行內存整理,新寫入的內容會覆蓋掉原有的內容。
ByteBuf的派生與復制
派生操作會產生一個新的ByteBuf實例。這里的新指得是ByteBuf的引用是新的所有的index也是新的。但是它們共用着一套底層存儲。派生函數:
- duplicate()
- slice()
- slice(int, int)
- readSlice(int)
- retainedDuplicate()
- retainedSlice()
- retainedSlice(int, int)
- readRetainedSlice(int)
如果想要復制一個全新的ByteBuffer請使用copy,這會完全的復制一個新的ByteBuf出來。
引用計數
引用計數記錄了當前ByteBuf被引用的次數。新建一個ByteBuf它的refCnt是1,當refCnt == 0時,這個ByteBuf即可被回收。
引用技術主要用於內存泄露的判斷,Netty提供了內存泄露檢測工具。通過使用參數-Dio.netty.leakDetectionLevel=${level}
可以配置檢測級別:
- 禁用(DISABLED: 完全禁止泄露檢測,省點消耗。
- 簡單(SIMPLE): 默認等級,告訴我們取樣的1%的ByteBuf是否發生了泄露,但總共一次只打印一次,看不到就沒有了。
- 高級(ADVANCED): 告訴我們取樣的1%的ByteBuf發生泄露的地方。每種類型的泄漏(創建的地方與訪問路徑一致)只打印一次。對性能有影響。
- 偏執(PARANOID): 跟高級選項類似,但此選項檢測所有ByteBuf,而不僅僅是取樣的那1%。對性能有絕大的影響。
查詢
很多時候需要從ByteBuf中查找特定的字符,比如LineBasedFrameDecoder需要在ByteBuf中查找'\r\n'。ByteBuf提供了簡單的indexOf這樣的函數。同時也可以使用ByteProcesser來查找。
以下gist提供了一些example。
作者:福克斯記
鏈接:https://www.jianshu.com/p/20329efe3ca4
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
9、參考的博客地址
- http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
- http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html