20-Netty TCP 粘包和拆包及解決方案


TCP粘包和拆包的基本介紹

  1. TCP是面向連接的, 面向流的, 提供可靠性服務, 收發兩端(客戶端和服務器端) 都有一一成對的Socket,因此發送端為了將多個發給接收端的包, 更有效的發給對方, 使用了優化算法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包, 這樣做雖然提高了效率,但是接收端就難於分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的
  2. 由於TCP無消息保護邊界, 需要在接收端處理消息邊界問題, 也就是我們所說的粘包,拆包問題,看一張圖
  1. 示意圖TCP粘包,拆包圖解

對圖的說明

假設客戶端分別發送了兩個數據包D1和D2給服務端, 由於服務端一次讀取到字節數是不確定的,故有可能存在以下四種情況

  1. 服務端分別兩次讀取到了兩個獨立的數據包, 分別是D1 和 D2, 沒有粘包和拆包
  2. 服務端一次接收到了兩個數據包D1和D2粘在了一起,稱之為TCP粘包
  1. 服務端分兩次讀取到了數據包, 第一次讀取到了完整的D1包和D2包的部分內容, 第二次讀取到了D2包的剩余部分, 稱之為TCP拆包
  2. 服務器分兩次讀取到了數據包, 第一次讀取到了D1包的部分內容D1_1, 第二次讀取到了D1包的剩余部分D1_2, 和完整的D2包

TCP粘包和拆包現象實例

在編寫Netty程序時, 如果沒有做處理,就會發生粘包和拆包問題

看一個具體的實例

NettyServer

package com.dance.netty.netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class NettyServer {

    public static void main(String[] args) {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
            System.out.println("server is ready ......");
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    static class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private int count = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            byte[] bytes = new byte[msg.readableBytes()];
            msg.readBytes(bytes);
            count++;
            System.out.println("服務器第"+count+"次接收到來自客戶端的數據:" + new String(bytes, StandardCharsets.UTF_8));
            // 服務器回送數據給客戶端 回送隨機的UUID給客戶端
            ctx.writeAndFlush(Unpooled.copiedBuffer(UUID.randomUUID().toString(),StandardCharsets.UTF_8));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }

}

NettyClient

package com.dance.netty.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.StandardCharsets;

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

    static class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private int count = 0;

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 連續發送10條數據
            for (int i = 0; i < 10; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!" + i, StandardCharsets.UTF_8));
            }
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            byte[] bytes = new byte[msg.readableBytes()];
            msg.readBytes(bytes);
            // 接收服務器的返回
            count++;
            System.out.println("客戶端第"+count+"次接收服務端的回送:" + new String(bytes, StandardCharsets.UTF_8));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }
}

執行結果

Server

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服務器第1次接收到來自客戶端的數據:hello,server!0hello,server!1hello,server!2hello,server!3hello,server!4hello,server!5hello,server!6hello,server!7hello,server!8hello,server!9
服務器第1次接收到來自客戶端的數據:hello,server!0
服務器第2次接收到來自客戶端的數據:hello,server!1
服務器第3次接收到來自客戶端的數據:hello,server!2hello,server!3hello,server!4
服務器第4次接收到來自客戶端的數據:hello,server!5hello,server!6
服務器第5次接收到來自客戶端的數據:hello,server!7hello,server!8hello,server!9

Client1

客戶端第1次接收服務端的回送:84653e99-0e7f-431d-a897-c215af959a3b

Client2

客戶端第1次接收服務端的回送:6f3b0e79-2f40-4066-bb6b-80f988ecec116b6bbd94-b345-46d6-8d36-a114534331a850628e04-ece1-4f58-b684-d30189f6cf26b2139027-6bda-4d40-9238-9fc0e59bc7a64b568ffe-f616-4f48-8f1c-05ecf3e817ee

分析:

服務器啟動后到server is ready ......

第一個客戶端啟動后 TCP將10次發送直接封包成一次直接發送,所以導致了服務器一次就收到了所有的數據,產生了TCP粘包,拆包的問題

第二客戶端啟動后 TCP將10次發送分別封裝成了5次請求,產生粘包,拆包問題

TCP粘包和拆包解決方案

  1. 使用自定義協議 + 編解碼器來解決
  2. 關鍵就是要解決 服務器每次讀取數據長度的問題, 這個問題解決, 就不會出現服務器多讀或少讀數據的問題,從而避免TCP粘包和拆包

TCP粘包, 拆包解決方案實現

  1. 要求客戶端發送5個Message對象, 客戶端每次發送一個Message對象
  2. 服務器端每次接收一個Message, 分5次進行解碼, 每讀到一個Message, 會回復一個Message對象給客戶端

新建協議MessageProtocol

package com.dance.netty.netty.protocoltcp;

/**
 * 消息協議
 */
public class MessageProtocol {

    private int length;

    private byte[] content;

    public MessageProtocol() {
    }

    public MessageProtocol(int length, byte[] content) {
        this.length = length;
        this.content = content;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

新建編碼器

package com.dance.netty.netty.protocoltcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 自定義協議編碼器
 */
public class MyMessageProtocolEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
//        System.out.println("自定義協議---->開始編碼");
        // 開始發送數據
        out.writeInt(msg.getLength()); // 優先發送長度,定義邊界
        out.writeBytes(msg.getContent());
//        System.out.println("自定義協議---->編碼完成");
    }
}

新建解碼器

package com.dance.netty.netty.protocoltcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MyMessageProtocolDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//        System.out.println("自定義協議---->開始解碼");
        // 獲取定義的邊界長度
        int length = in.readInt();
        if(in.readableBytes() >= length){
            // 根據長度讀取數據
            byte[] bytes = new byte[length];
            in.readBytes(bytes);
            // 反構造成MessageProtocol
            MessageProtocol messageProtocol = new MessageProtocol(length, bytes);
            out.add(messageProtocol);
//            System.out.println("自定義協議---->解碼完成");
        }else{
            // 內容長度不夠
        }
    }
}

新建服務器端

package com.dance.netty.netty.protocoltcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class NettyServer {

    public static void main(String[] args) {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 加入自定義協議編解碼器
                            pipeline.addLast(new MyMessageProtocolDecoder());
                            pipeline.addLast(new MyMessageProtocolEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
            System.out.println("server is ready ......");
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    static class NettyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

        private int count = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            byte[] bytes = msg.getContent();
            count++;
            System.out.println("服務器第"+count+"次接收到來自客戶端的數據:" + new String(bytes, StandardCharsets.UTF_8));
            // 服務器回送數據給客戶端 回送隨機的UUID給客戶端
            byte[] s = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
            ctx.writeAndFlush(new MessageProtocol(s.length,s));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }

}

新建客戶端

package com.dance.netty.netty.protocoltcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.StandardCharsets;

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 加入自定義分割符號
//                            ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
//                            pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));

                            // 添加自定義協議編解碼器
                            pipeline.addLast(new MyMessageProtocolDecoder());
                            pipeline.addLast(new MyMessageProtocolEncoder());
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

    static class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

        private int count = 0;

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 連續發送10條數據
            for (int i = 0; i < 10; i++) {
                String msg = "今天天氣冷, 打火鍋" + i;
                byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
                // 使用自定義協議
                MessageProtocol messageProtocol = new MessageProtocol(bytes.length, bytes);
                ctx.writeAndFlush(messageProtocol);
            }
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            byte[] bytes = msg.getContent();
            // 接收服務器的返回
            count++;
            System.out.println("客戶端第"+count+"次接收服務端的回送:" + new String(bytes, StandardCharsets.UTF_8));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }
}

測試

發送10次

服務器端

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0
......
服務器第10次接收到來自客戶端的數據:今天天氣冷, 打火鍋9

客戶端

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
客戶端第1次接收服務端的回送:a6b69f1c-daba-435a-802a-c19a6350ca94
......
客戶端第10次接收服務端的回送:5af5c297-8668-48aa-b8c4-35656142f591

ok,沒有問題, 但是真的沒有問題嗎?答案是有問題

FAQ

發送1000次

修改客戶端發送消息數量

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 連續發送10條數據
    for (int i = 0; i < 1000; i++) {
        ......
    }
}

重新測試

服務器端

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0
......
服務器第31次接收到來自客戶端的數據:今天天氣冷, 打火鍋30
服務器第32次接收到來自客戶端的數據:今天天氣冷, 打火鍋31
io.netty.handler.codec.DecoderException: java.lang.NegativeArraySizeException
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1412)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:943)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
    at com.dance.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:17)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
    ... 16 more
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:359)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1407)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:925)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
    at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:786)
    at com.dance.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:14)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
    ... 17 more

what ? 直接報錯了, 數組下標越界, 讀索引1022 + 長度4 > 寫縮影1024了

這個是什么問題呢 ? 我看網上關於這個BUG的解決方案很少,基本沒有, 好多都是貼問題的, 我翻了將近1個小時,才找到一個大佬寫的一篇文章解決了, 感謝大佬

博客地址:

https://blog.csdn.net/u011035407/article/details/80454511

問題描述:

這樣在剛開始的工作中數據包傳輸沒有問題,不過數據包的大小超過512b的時候就會拋出異常了。

解決方案

配合解碼器DelimiterBasedFrameDecoder一起使用,在數據包的末尾使用換行符\n表示本次數據包已經結束,當DelimiterBasedFrameDecoder把數據切割之后,再使用ByteToMessageDecoder實現decode方法把數據流轉換為Message對象。

我們在ChannelPipeline加入DelimiterBasedFrameDecoder解碼器

客戶端和服務器端都加

//使用\n作為分隔符
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

MessageToByteEncoder的實現方法encode()增加out.writeBytes(new byte[]{'\n'});

//在寫出字節流的末尾增加\n表示數據結束
out.writeBytes(new byte[]{'\n'});

這時候就可以愉快的繼續處理數據了。
等我還沒有高興半天的時候,問題又來了。還是一樣的問題

等等等,,,怎么又報錯了,不是已經加了黏包處理了嗎??,解決問題把,首先看解析的數據包結構

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 01 01 00 00 00 06 00 00 01 0a 7b 22 69 64 22 |...........{"id"|
|00000010| 3a 33 2c 22 75 73 65 72 6e 61 6d 65 22 3a 22 31 |:3,"username":"1|
|00000020| 38 35 30 30 33 34 30 31 36 39 22 2c 22 6e 69 63 |8500340169","nic|
|00000030| 6b 6e 61 6d 65 22 3a 22 e4 bb 96 e5 9b 9b e5 a4 |kname":"........|
|00000040| a7 e7 88 b7 22 2c 22 72 6f 6f 6d 49 64 22 3a 31 |....","roomId":1|
|00000050| 35 32 37 32 33 38 35 36 39 34 37 34 2c 22 74 65 |527238569474,"te|
|00000060| 61 6d 4e 61 6d 65 22 3a 22 e4 bf 84 e7 bd 97 e6 |amName":".......|
|00000070| 96 af 22 2c 22 75 6e 69 74 73 22 3a 7b 22 75 6e |..","units":{"un|
|00000080| 69 74 31 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it1":{"x":10.0,"|
|00000090| 79 22 3a 31 30 2e 30 7d 2c 22 75 6e 69 74 32 22 |y":10.0},"unit2"|
|000000a0| 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 79 22 3a 31 |:{"x":10.0,"y":1|
|000000b0| 30 2e 30 7d 2c 22 75 6e 69 74 33 22 3a 7b 22 78 |0.0},"unit3":{"x|
|000000c0| 22 3a 31 30 2e 30 2c 22 79 22 3a 31 30 2e 30 7d |":10.0,"y":10.0}|
|000000d0| 2c 22 75 6e 69 74 34 22 3a 7b 22 78 22 3a 31 30 |,"unit4":{"x":10|
|000000e0| 2e 30 2c 22 79 22 3a 31 30 2e 30 7d 2c 22 75 6e |.0,"y":10.0},"un|
|000000f0| 69 74 35 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it5":{"x":10.0,"|
|00000100| 79 22 3a 31 30 2e 30 7d 7d 2c 22 73 74 61 74 75 |y":10.0}},"statu|
|00000110| 73 22 3a 31 7d 0a                               |s":1}.          |
+--------+-------------------------------------------------+----------------+

接收到的數據是完整的沒錯,但是還是報錯了,而且數據結尾的字節的確是0a,轉化成字符就是\n沒有問題啊。

ByteToMessageDecoderdecode方法里打印ByteBuf buf的長度之后,問題找到了 長度 : 10

這就是說在進入到ByteToMessageDecoder這個解碼器的時候,數據包已經只剩下10個長度了,那么長的數據被上個解碼器DelimiterBasedFrameDecoder隔空劈開了- -。問題出現在哪呢,看上面那塊字節流的字節,找到第11個字節,是0a。。。。因為不是標准的json格式,最前面使用了3個字節 加上2個int長度的屬性,所以 數據包頭應該是11個字節長。

而DelimiterBasedFrameDecoder在讀到第11個字節的時候讀成了\n,自然而然的就認為這個數據包已經結束了,而數據進入到ByteToMessageDecoder的時候就會因為規定的body長度不等於length長度而出現問題。

思來想去 不實用\n 這樣的單字節作為換行符,很容易在數據流中遇到,轉而使用\r\n倆字節來處理,而這倆字節出現在前面兩個int長度中的幾率應該很小。

最終解決

在客戶端和服務器端的pipeline中添加 以 "\r\n" 定義為邊界的符號來標識數據包結束

//這里使用自定義分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));

Server端

Client端

編碼器中發送結束位置增加

//這里最后修改使用\r\n
out.writeBytes(new byte[]{'\r','\n'});

再次運行程序 數據包可以正常接收了。

最終測試

服務器端

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0
......
服務器第999次接收到來自客戶端的數據:今天天氣冷, 打火鍋998
服務器第1000次接收到來自客戶端的數據:今天天氣冷, 打火鍋999

客戶端

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
客戶端第1次接收服務端的回送:48fa6d78-8079-4700-b488-ca2af9eb3f8c
......
客戶端第999次接收服務端的回送:581da47b-d77b-4972-af11-6d33057f6610
客戶端第1000次接收服務端的回送:0014e906-69cb-4900-9409-f4d1af9148dd

總結

以前使用netty的時候也僅限於和硬件交互,而當時的硬件受限於成本問題是一條一條處理數據包的,所以基本上不會考慮黏包問題

然后就是ByteToMessageDecoder和MessageToByteEncoder兩個類是比較底層實現數據流處理的,並沒有帶有拆包黏包的處理機制,需要自己在數據包頭規定包的長度,而且無法處理過大的數據包,因為我一開始首先使用了這種方式處理數據,所以后來就沒有再換成DelimiterBasedFrameDecoder加 StringDecoder來解析數據包,最后使用json直接轉化為對象。

FAQ 參考粘貼於大佬的博客,加自己的改動


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM