netty之ByteBuf詳解


【ChannelPromise作用:可以設置success或failure 是為了通知ChannelFutureListener】
Netty的數據處理API通過兩個組件暴露——abstract class ByteBuf和interface ByteBufHolder。

下面是一些ByteBuf API的優點:
  它可以被用戶自定義的緩沖區類型擴展;
  通過內置的復合緩沖區類型實現了透明的零拷貝;
  容量可以按需增長(類似於JDK的StringBuilder);
  在讀和寫這兩種模式之間切換不需要調用ByteBuffer的flip()方法;
  讀和寫使用了不同的索引;
  支持方法的鏈式調用;
  支持引用計數;
  支持池化。
  使用不同的讀索引和寫索引來控制數據訪問;
  readerIndex達到和writerIndex

  使用內存的不同方式——基於字節數組和直接緩沖區;
  通過CompositeByteBuf生成多個ByteBuf的聚合視圖;
  數據訪問方法——搜索、切片以及復制;
  隨機訪問索引 【0到capacity() - 1】
  順序訪問索引
  可丟棄字節 【discardReadBytes() clear()改變index值】
  可讀字節【readBytes(ByteBuf dest) 】
  可寫字節【writeBytes(ByteBuf dest);】
  索引管理【markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex( readerIndex(int)或者writerIndex(int) 】
  查找操作【buf.indexOf(),forEachByte(ByteBufProcessor.FIND_NUL), int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);】
  派生緩沖區【 返回新的buf,都具有 readIndex writeIndex markIndex
  buf.duplicate();
  ByteBuf rep = buf.copy();//創建副本
  buf.slice();//操作buf分段
  buf.slice(0, 5);//操作buf分段】
  讀、寫、獲取和設置API;
  讀/寫操作【get()和set()操作,從給定的索引開始,並且保持索引不變;read()和write()操作,從給定的索引開始,並且會根據已經訪問過的字節數對索引進行調整】
  ByteBufAllocator池化
  ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能減少碎片,高效分配算法
  ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建
  引用計數
  ByteBufAllocator allocator = ctx.channel().alloc();
  ByteBuf directBuf = allocator.directBuffer();
  if(directBuf.refCnt() == 1){//當引用技術為1時釋放對象
  directBuf.release();
  }

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
    logger.info("channelRead start");
    ByteBuf buf = (ByteBuf) msg;
    if (buf.hasArray()) {//檢查buf是否支持一個數組
        byte[] array = buf.array();
        //第一個偏移量
        int off = buf.arrayOffset() + buf.readerIndex();
        //獲取可讀取字節
        int len = buf.readableBytes();
        byte[] buffer = new byte[len];
        buf.getBytes(off, buffer);
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        ByteBuf header = (ByteBuf) msg;
        ByteBuf body = (ByteBuf) msg;
        compositeByteBuf.addComponent(header);
        compositeByteBuf.addComponent(body);
        compositeByteBuf.removeComponent(0);
        for (ByteBuf bufer : compositeByteBuf) {
            System.out.println(bufer.toString());
        }
        int comLen = compositeByteBuf.readableBytes();

        for (int i = 0; i < buf.capacity(); i++) {
            System.out.println((char) buf.getByte(i));
            //讀完成后進行丟棄
            buf.discardReadBytes();
            //或者調用clear
            buf.clear();
        }
        //標記索引
        buf.readerIndex(2);
        buf.writeByte(2);
        //重置索引
        buf.markReaderIndex();
        buf.markWriterIndex();
        buf.resetReaderIndex();
        buf.resetWriterIndex();
        //查找
        buf.indexOf(0, 5, (byte) 0);
        int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);
        int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);
        //派生緩沖區  返回新的buf,都具有 readIndex  writeIndex markIndex
        buf.duplicate();
        ByteBuf rep = buf.copy();//創建副本
        buf.slice();//操作buf分段
        buf.slice(0, 5);//操作buf分段
        //==========數據訪問方法——搜索、切片以及復制;
        Charset charset = Charset.forName("UTF-8");
        ByteBuf buf1 = Unpooled.copiedBuffer("Netty in Action rocks!", charset);  //← --  創建一個用於保存給定字符串的字節的ByteBuf
        ByteBuf sliced = buf1.slice(0, 15);  //← --  創建該ByteBuf 從索引0 開始到索引15結束的一個新切片
        System.out.println(sliced.toString(charset));  // ← --  將打印“Netty in Action”
        buf1.setByte(0, (byte) 'J');   //← --  更新索引0 處的字節
        assert buf1.getByte(0) == sliced.getByte(0); //← --  將會成功,因為數據是共享的,對其中一個所做的更改對另外一個也是可見的

        Charset utf8 = Charset.forName("UTF-8");
        ByteBuf buf2 = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); // ← --  創建ByteBuf 以保存所提供的字符串的字節
        ByteBuf copy = buf2.copy(0, 15);// ← --  創建該ByteBuf 從索引0 開始到索引15結束的分段的副本
        System.out.println(copy.toString(utf8));//  ← --   將打印“Netty in Action”
        buf2.setByte(0, (byte) 'J');//  ← --  更新索引0 處的字節
        assert buf2.getByte(0) != copy.getByte(0);//  ← --  將會成功,因為數據不是共享的

        Unpooled.unmodifiableBuffer(buf);
        buf.order();
        buf.readSlice(1);
       //使用不同的讀索引和寫索引來控制數據訪問;
        //讀寫操作 get/set不改變索引位置   read/write改變索引(readIndex/writeIndex)位置

        Charset u8 = Charset.forName("UTF-8");
        ByteBuf getSetBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 創建一個新的ByteBuf以保存給定字符串的字節
        System.out.println((char) getSetBuf.getByte(0));//  打印第一個字符'N'
        int readerIndex = getSetBuf.readerIndex(); //  存儲當前的readerIndex 和writerIndex
        int writerIndex = getSetBuf.writerIndex();
        getSetBuf.setByte(0, (byte) 'B'); //  將索引0 處的字節更新為字符'B'
        System.out.println((char) getSetBuf.getByte(0)); //  打印第一個字符,現在是'B' 
        assert readerIndex == getSetBuf.readerIndex();//  將會成功,因為這些操作並不會修改相應的索引
        assert writerIndex == getSetBuf.writerIndex();


        ByteBuf readWriteBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 創建一個新的ByteBuf以保存給定字符串的字節
        System.out.println((char) readWriteBuf.readByte());//  打印第一個字符'N'
        System.out.println((boolean) readWriteBuf.readBoolean());//  讀取當前boolean值,並將readIndex+1
        readWriteBuf.writeByte('F');//  將字符F追加到緩沖區中,並將writeIndex+1
        int reIndex = readWriteBuf.readerIndex(); //  存儲當前的readerIndex 和writerIndex
        int wrIndex = readWriteBuf.writerIndex();
        readWriteBuf.setByte(0, (byte) 'B'); //  將索引0 處的字節更新為字符'B'
        System.out.println((char) readWriteBuf.getByte(0)); //  打印第一個字符,現在是'B' 
        assert reIndex == readWriteBuf.readerIndex();//  將會成功,因為這些操作並不會修改相應的索引
        assert wrIndex == readWriteBuf.writerIndex();

        buf.isReadable();//至少有一個字符可讀,返回true
        buf.isWritable();//至少有一個字節可被寫入,返回true
        int readableByte = buf.readableBytes();//返回可被讀取的字節數
        int writableByte = buf.writableBytes();//返回可被寫入的字節數
        int capacity = buf.capacity();//返回可容納的字節數
        buf.maxCapacity();//返回可容納的最大字節數
        buf.hasArray();//如果buf由一個字節數組支撐,返回true
        buf.array();//將buf轉換為字節數組

        //除了數據外,還有一些其他的屬性,如http的狀態碼,cookie等
        ByteBufHolder byteBufHolder = new DefaultLastHttpContent();
        ByteBuf httpContent = byteBufHolder.content();//返回一個http格式的ByteBuf
        ByteBufHolder copyBufHolder = byteBufHolder.copy();//深拷貝,不共享
        ByteBufHolder duplicateBufHolder = byteBufHolder.duplicate();//淺拷貝,共享

        //ByteBufAllocator  ByteBuf分配
        // buffer()基於堆或直接內存的buf
        //ioBuffer() 返回一個iobuf
        //heapBuffer  堆buf
        //directBuffer 直接buf
        //compositeBuffer  compositeHeapBuffer   compositeDirectBuffer   復合buf
        ByteBufAllocator ctxAllocator = ctx.alloc();
        ByteBufAllocator channelAllocator = ctx.channel().alloc();
        ctxAllocator.buffer();
        ctxAllocator.ioBuffer();
        ctxAllocator.compositeBuffer();
        ctxAllocator.heapBuffer();
        ctxAllocator.directBuffer();

        ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能減少碎片,高效分配算法
        ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建

        ctx.writeAndFlush(new byte[10]);
        ctx.writeAndFlush(Unpooled.copiedBuffer(new byte[10]));//writeAndFlush參數是Object,使用非池化技術轉為buf提升效率
        //工具類ByteBufUtil
        ByteBufUtil.hexDump(buf);//可對buf進行轉換
        ByteBufUtil.hexDump(new byte[9999]);//可對字節進行轉換
        //引用計數:跟蹤特定對象的引用計數
        ByteBufAllocator allocator = ctx.channel().alloc();
        ByteBuf directBuf = allocator.directBuffer();
        if(directBuf.refCnt() == 1){//當引用技術為1時釋放對象
            directBuf.release();
        }

        if (buf.readableBytes() <= 0) {
            ReferenceCountUtil.safeRelease(msg);
            return;
        }
        byte[] msgContent = new byte[buf.readableBytes()];
        buf.readBytes(msgContent);
        logger.info("recv from client:length: {},toHexString: {}\n", buf.readableBytes(), HexStringUtils.toHexString(buf.array()));
        if (buf.getByte(0) == 0x7e && buf.getByte(buf.readableBytes() - 1) == 0x7e) {}
        if(msgContent[0] == 0x7e && msgContent[msgContent.length-1]==0x7e){}
    }

}


//通常如果集成ChannelInboundHandlerAdapter時,復寫channelRead(),需要進行手動的消息釋放

//消息消費完成后自動釋放
private void release(Object msg) {
    try {
        ReferenceCountUtil.release(msg);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
===================================================

//在SimpleChannelInboundHandler中的channelRead0()方法中自動添加了釋放消息的方法
//SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM