基於netty 報文的拆包粘包處理方法


一、拆包/粘包的問題

  正常情況下客戶端發上來的報文都是單獨,一條報文就是一個完善的。但是特殊情況下會出現2個報文粘在一起發上來。

  正常情況的報文:

  757200501011313130323630383237374137393738323030303532000000000000000055012238393836303242343130313638303038333035320000000000000000000000000000000000000000D58A

  7572 是幀起始序列,也就是包頭

  0050 是報文的長度

  D58A 為CRC16 檢驗

  粘包:

        757200501011313130323630383237374137393738323030303532000000000000000055012238393836303242343130313638303038333035320000000000000000000000000000000000000000D58A757200501011313130323630383237374137393738323030303532000000000000000055

  這是報文第二條是個不完整的包,我們服務端需要做到將包拆成完整的包,並且第二個包需要等到下一條報文拼接成完整的包。

 

二、 netty的解決方案

  1.消息定長,報文大小固定長度,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方編程實現獲取定長報文也能區分。

     2.包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符作為報文分隔符,接收方通過特殊分隔符切分報文區分。

        3.將消息分為消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段

三、實現方法

  創建一個實現類繼承netty的 MessageToMessageDecoder方法

public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    private byte[] remainingBytes;
    private static byte[] HEAD_DATA = new byte[]{0x75, 0x72}; //協議幀起始序列 7572 2個字節
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        ByteBuf currBB = null;
        if(remainingBytes == null) {
            currBB = msg;
        }else {
            byte[] tb = new byte[remainingBytes.length + msg.readableBytes()];
            System.arraycopy(remainingBytes, 0, tb, 0, remainingBytes.length);
            byte[] vb = new byte[msg.readableBytes()];
            msg.readBytes(vb);
            System.arraycopy(vb, 0, tb, remainingBytes.length, vb.length);
            currBB = Unpooled.copiedBuffer(tb);
        }
        while(currBB.readableBytes() > 0) {
            if(!doDecode(ctx, currBB, out)) {
                break;
            }
        }
        if(currBB.readableBytes() > 0) {
            remainingBytes = new byte[currBB.readableBytes()];
            currBB.readBytes(remainingBytes);
        }else {
            remainingBytes = null;
        }
//        out.add(remainingBytes);
//        remainingBytes=null;
    }

    private boolean doDecode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        if(msg.readableBytes() < 2)
            return false;
        msg.markReaderIndex();
        byte[] header = new byte[2];
        msg.readBytes(header);
        byte[] dataLength=new byte[2]; //報文的長度
        msg.readBytes(dataLength);
        if (!Arrays.equals(header, HEAD_DATA)) {
            return false;
           // throw new DecoderException("errorMagic: " + Arrays.toString(header));
        }
        int len = Integer.parseInt(DatatypeConverter.printHexBinary(dataLength), 16);
       // int len =msg.readInt();
        if(msg.readableBytes() < len-4) {
            msg.resetReaderIndex();
            return false;
        }
        msg.resetReaderIndex();
        byte[] body = new byte[len];
        msg.readBytes(body);
        out.add(Unpooled.copiedBuffer(body));
        if(msg.readableBytes() > 0)
            return true;
        return false;
    }
}

netty 客戶端ChannelPipeline加入創建的 MessageDecoder 類

public synchronized boolean openDev() {
        if(isOpen()){
            return true;
        }
        if(group ==null){
            group =new NioEventLoopGroup();
        }
        Bootstrap b =new Bootstrap();
        final MessageHandler hander =new MessageHandler();
        hander.setMedia(this);
        final ServerHandler serverHandler =new ServerHandler();
        serverHandler.setMedia(this);
        b.handler(new HeartbeatHandlerInitializer(this));//心跳
        b.group(group).channel(NioSocketChannel.class).
                option(ChannelOption.TCP_NODELAY, true).
                handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline=socketChannel.pipeline();
                        //pipeline.addLast(new MessageEncoder());//協議編碼器
                        pipeline.addLast(new MessageDecoder());//協議解碼器
                        pipeline.addLast(hander);
                        pipeline.addLast(new ClientHandler(TcpClient.this));
                    }
                });
        try {
            f =b.connect(mediaPara.getIp(),mediaPara.getPort()).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
        return channel.isActive();
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM