ByteBuf使用實例


  之前我們有個netty5的拆包解決方案(參加netty5拆包問題解決實例),現在我們采用另一種思路,不需要新增LengthFieldBasedFrameDecoder,直接修改NettyMessageDecoder:

package com.wlf.netty.nettyapi.msgpack;

import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class NettyMessageDecoder extends ByteToMessageDecoder {

    /**
     * 消息體字節大小:分割符字段4字節+長度字段4字節+請求類型字典1字節+預留字段1字節=10字節
     */
    private static final int HEAD_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        while (true) {

            // 標記字節流開始位置
            byteBuf.markReaderIndex();

            // 若讀取到分割標識,說明讀取當前字節流開始位置了
            if (byteBuf.readInt() == Delimiter.DELIMITER) {
                break;
            }

            // 重置讀索引為0
            byteBuf.resetReaderIndex();

            // 長度校驗,字節流長度至少10字節,小於10字節則等待下一次字節流過來
            if (byteBuf.readableBytes() < HEAD_LENGTH) {
                byteBuf.resetReaderIndex();
                return;
            }
        }

        // 2、獲取data的字節流長度
        int dataLength = byteBuf.readInt();

        // 校驗數據包是否全部發送過來,總字節流長度(此處讀取的是除去delimiter和length之后的總長度)-
        // type和reserved兩個字節=data的字節流長度
        int totalLength = byteBuf.readableBytes();
        if ((totalLength - 2) < dataLength) {

            // 長度校驗,字節流長度少於數據包長度,說明數據包拆包了,等待下一次字節流過來
            byteBuf.resetReaderIndex();
            return;
        }

        // 3、請求類型
        byte type = byteBuf.readByte();

        // 4、預留字段
        byte reserved = byteBuf.readByte();


        // 5、數據包內容
        byte[] data = null;
        if (dataLength > 0) {
            data = new byte[dataLength];
            byteBuf.readBytes(data);
        }

        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        header.setDelimiter(Delimiter.DELIMITER);
        header.setLength(dataLength);
        header.setType(type);
        header.setReserved(reserved);
        nettyMessage.setHeader(header);
        nettyMessage.setData(data);

        list.add(nettyMessage);

        // 回收已讀字節
        byteBuf.discardReadBytes();
    }
}

  我們的改動很小,只不過將原來的讀索引改為標記索引,然后在拆包時退出方法前重置讀索引,這樣下次數據包過來,我們的讀索引依然從0開始,delimiter的標記就可以讀出來,而不會陷入死循環了。

  ByteBuf是ByteBuffer的進化版,ByteBuffer(參見ByteBuffer使用實例)才一個索引,讀寫模式需要通過flip來轉換,而ByteBuf有兩個索引,readerIndex讀索引和writerIndex寫索引,讀寫轉換無縫連接,青出於藍而勝於藍:

      +-------------------+------------------+------------------+
      | discardable bytes |  readable bytes  |  writable bytes  |
      |                           |     (CONTENT)    |                         |
      +-------------------+------------------+------------------+
      |                           |                            |                         |
      0      <=      readerIndex   <=   writerIndex    <=    capacity

  既然有兩個索引,那么標記mask、重置reset必然也是兩兩對應,上面的代碼中我們只需要用到讀標記和讀重置。

  我們把客戶端handler也修改下,先把LengthFieldBasedFrameDecoder去掉:

// channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));

 

  再讓數據包更大一些:

    /**
     * 構造PCM請求消息體
     *
     * @return
     */
    private byte[] buildPcmData() throws Exception {
        byte[] resultByte = longToBytes(System.currentTimeMillis());

        // 讀取一個本地文件
        String AUDIO_PATH = "D:\\input\\test_1.pcm";
        try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {

            int len = -1;
            byte[] content = new byte[1024];
            while((len = raf.read(content)) != -1)
            {
               resultByte = addAll(resultByte, content);
            }
        }

        return resultByte;
    }
            

  再debug下看看,第一次解析客戶端發送的數據,讀取1024字節,我們可以看到讀索引是8(delimiter+length=8),寫索引就是1024,我們的大包里有3939116個字節,去掉10個字節的header,剩下小包是3939106::

 

   第二次再讀1024,代碼已經執行reset重置讀索引了,所以讀索引由8改為0,寫索引累增到2048:

 

   第三次再讀1024,寫索引繼續累增到3072:

 

   最后一次發1024,寫索引已經到達3939116,大包傳輸結束了:

   從上面看出,我們對ByteBuf的capacity一直在翻倍,讀指針一直標記在大包的起始位置0,這樣做的目的是每次都能讀取小包的長度length(3939106),拿來跟整個ByteBuf的長度作比較,只要它取到的小包沒到達到length,我們就繼續接受新包,寫索引不停的累加,直到整個大包長度>=3939116(也就是小包>=3939106),這時我們開始移動讀索引,將字節流寫入對象,最后回收已讀取的字節(調用discardReaderBytes方法):

  BEFORE discardReadBytes()

      +-------------------+------------------+------------------+
      | discardable bytes |  readable bytes  |  writable bytes  |
      +-------------------+------------------+------------------+
      |                         |                      |                            |
      0      <=      readerIndex   <=   writerIndex    <=    capacity


  AFTER discardReadBytes()

      +------------------+--------------------------------------+
      |  readable bytes  |    writable bytes (got more space)   |
      +------------------+--------------------------------------+
      |                        |                                               |
readerIndex (0) <= writerIndex (decreased)        <=        capacity

  其他方法參見測試類:

package com.wlf.netty.nettyserver;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Assert;
import org.junit.Test;

public class ByteBufTest {
    @Test
    public void byteBufTest() {
        ByteBuf byteBuf = Unpooled.buffer(10);
        byteBuf.writeInt(0xabef0101);
        byteBuf.writeInt(1024);
        byteBuf.writeByte((byte) 1);
        byteBuf.writeByte((byte) 0);

        // 開始讀取
        printDelimiter(byteBuf);
        printLength(byteBuf);

        // 派生一個ByteBuf,取剩下2個字節,但讀索引不動
        ByteBuf duplicatBuf = byteBuf.duplicate();
        printByteBuf(byteBuf);

        // 派生一個ByteBuf,取剩下2個字節,讀索引動了
        ByteBuf sliceBuf = byteBuf.readSlice(2);
        printByteBuf(byteBuf);

        // 兩個派生的對象其實是一樣的
        Assert.assertEquals(duplicatBuf, sliceBuf);
    }

    private void printDelimiter(ByteBuf buf) {
        int newDelimiter = buf.readInt();
        System.out.printf("delimeter: %s\n", Integer.toHexString(newDelimiter));
        printByteBuf(buf);
    }

    private void printLength(ByteBuf buf) {
        int length = buf.readInt();
        System.out.printf("length: %d\n", length);
        printByteBuf(buf);
    }

    private void printByteBuf(ByteBuf buf) {
        System.out.printf("reader Index: %d, writer Index: %d, capacity: %d\n", buf.readerIndex(), buf.writerIndex(), buf.capacity());
    }
}

  輸出:

delimeter: abef0101
reader Index: 4, writer Index: 10, capacity: 10
length: 1024
reader Index: 8, writer Index: 10, capacity: 10
reader Index: 8, writer Index: 10, capacity: 10
reader Index: 10, writer Index: 10, capacity: 10

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM