框架篇:ByteBuffer和netty.ByteBuf詳解


前言

數據序列化存儲,或者數據通過網絡傳輸時,會遇到不可避免將數據轉成字節數組的場景。字節數組的讀寫不會太難,但又有點繁瑣,為了避免重復造輪子,jdk推出了ByteBuffer來幫助我們操作字節數組;而netty是一款當前流行的java網絡IO框架,它內部定義了一個ByteBuf來管理字節數組,和ByteBuffer大同小異

  • ByteBuffer
  • 零拷貝之MappedByteBuffer
  • DirectByteBuffer堆外內存回收機制
  • netty之ByteBuf

關注公眾號,一起交流,微信搜一搜: 潛行前行

Buffer結構

public abstract class Buffer {
	//關系: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    long address; // Used only by direct buffers,直接內存的地址
  • mark:調用mark()方法的話,mark值將存儲當前position的值,等下次調用reset()方法時,會設定position的值為之前的標記值
  • position:是下一個要被讀寫的byte元素的下標索引
  • limit:是緩沖區中第一個不能讀寫的元素的數組下標索引,也可以認為是緩沖區中實際元素的數量
  • capacity:是緩沖區能夠容納元素的最大數量,這個值在緩沖區創建時被設定,而且不能夠改變

Buffer.API

Buffer(int mark, int pos, int lim, int cap)
//Buffer創建時設置的最大數組容量值
public final int capacity()
//當前指針的位置
public final int position() 
//限制可讀寫大小
public final Buffer limit(int newLimit)
//標記當前position的位置
public final Buffer mark()
//配合mark使用,position成之前mark()標志的位置。先前沒調用mark則報錯
public final Buffer reset()
//寫->讀模式翻轉,單向的
//position變成了初值位置0,而limit變成了寫模式下position位置
public final Buffer flip()
//重置position指針位置為0,mark為-1;相對flip方法是limit不變
public final Buffer rewind() //復位
//和rewind一樣,多出一步是limit會被設置成capacity
public final Buffer clear() 
//返回剩余未讀字節數
public final int remaining()

ByteBuffer結構

public abstract class ByteBuffer extends Buffer 
			implements Comparable<ByteBuffer>{
    final byte[] hb;  //僅限堆內內存使用
    final int offset;
    boolean isReadOnly; 

ByteBuffer.API

//申請堆外內存
public static ByteBuffer allocateDirect(int capacity)
//申請堆內內存
public static ByteBuffer allocate(int capacity) 
//原始字節包裝成ByteBuffer
public static ByteBuffer wrap(byte[] array, int offset, int length)
//原始字節包裝成ByteBuffer
public static ByteBuffer wrap(byte[] array)
//創建共享此緩沖區內容的新字節緩沖區
public abstract ByteBuffer duplicate()
//分片,創建一個新的字節緩沖區
//新ByteBuffer的開始位置是此緩沖區的當前位置position
public abstract ByteBuffer slice()
//獲取字節內容
public abstract byte get()
//從ByteBuffer偏移offset的位置,獲取length長的字節數組,然后返回當前ByteBuffer對象
public ByteBuffer get(byte[] dst, int offset, int length)
//設置byte內存
public abstract ByteBuffer put(byte b);
//以offset為起始位置設置length長src的內容,並返回當前ByteBuffer對象
public ByteBuffer put(byte[] src, int offset, int length長)
//將沒有讀完的數據移到到緩沖區的初始位置,position設置為最后一沒讀字節數據的下個索引,limit重置為為capacity
//讀->寫模式,相當於flip的反向操作
public abstract ByteBuffer compact()
//是否是直接內存
public abstract boolean isDirect()
  • ByteBuffer bf = ByteBuffer.allocate(10);`,創建大小為10的ByteBuffer對象
    image.png
  • 寫入數據
    ByteBuffer buf ByteBuffer.allocate(10);
    buf.put("csc".getBytes());

image.png

  • 調用flip轉換緩沖區為讀模式; buf.flip();
    image.png
  • 讀取緩沖區中到內容:get(); System.out.println((char) buf.get());
    image.png
    image.png

零拷貝之MappedByteBuffer

FileChannel readChannel = FileChannel.open(Paths.get("./cscw.txt"), StandardOpenOption.READ);
MappedByteBuffer data = readChannel.map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 40);

DirectByteBuffer堆外內存回收機制Cleaner

  • 下面我們看看直接內存的回收機制(java8);DirectByteBuffer內部存在一個Cleaner對象,並且委托內部類Deallocator對象進行內存回收
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
{
	//構造函數
    DirectByteBuffer(int cap) { 
		.... //內存分配
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    	...
    }    
    private static class Deallocator implements Runnable{
    	...
    	public void run() {
            if (address == 0) {
                // Paranoia
                return;
            }
            unsafe.freeMemory(address); //回收內存
            address = 0;
            Bits.unreserveMemory(size, capacity);
        }
}
  • 細看下Cleaner,繼承於PhantomReference,並且在public void clean()方法會調用Deallocator進行清除操作
public class Cleaner extends PhantomReference<Object> {
    //如果DirectByteBuffer對象被回收,相應的Cleaner會被放入dummyQueue隊列
    private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue();
    //構造函數
    public static Cleaner create(Object var0, Runnable var1) {
        return var1 == null ? null : add(new Cleaner(var0, var1));
    }
    private Cleaner(Object var1, Runnable var2) {
        super(var1, dummyQueue);
        this.thunk = var2;
    }
    private final Runnable thunk;
    public void clean() {
            if (remove(this)) {
                try {
                    this.thunk.run();
                } catch (final Throwable var2) {
                ....
  • 在Reference內部存在一個守護線程,循環獲取Reference,並判斷是否Cleaner對象,如果是則調用其clean方法
public abstract class Reference<T> 
    static {
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        for (ThreadGroup tgn = tg; tgn != null; g = tgn, tgn = tg.getParent());
        Thread handler = new ReferenceHandler(tg, "Reference Handler");
        ...
        handler.setDaemon(true);
        handler.start();
        ...
    }
	...
    //內部類調用 tryHandlePending
    private static class ReferenceHandler extends Thread {
        public void run() {
                    while (true) {
                        tryHandlePending(true);
                    }
                }
  	... 
    static boolean tryHandlePending(boolean waitForNotify) {
        Cleaner c;
        .... //從鏈表獲取對象被回收的引用
        // 判斷Reference是否Cleaner,如果是則調用其clean方法
        if (c != null) {
            c.clean(); //調用Cleaner的clean方法
            return true;
        }
        ReferenceQueue<? super Object> q = r.queue;
        if (q != ReferenceQueue.NULL) q.enqueue(r);
        return true;

netty之ByteBuf

  • ByteBuf原理
  • Bytebuf通過兩個位置指針來協助緩沖區的讀寫操作,分別是readIndex和writerIndex
 *      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0 <= readerIndex <= writerIndex <= capacity
  • ByteBuf.API
//獲取ByteBuf分配器
public abstract ByteBufAllocator alloc()
//丟棄可讀字節
public abstract ByteBuf discardReadBytes()
//返回讀指針
public abstract int readerIndex()
//設置讀指針
public abstract ByteBuf readerIndex(int readerIndex);
//標志當前讀指針位置,配合resetReaderIndex使用
public abstract ByteBuf markReaderIndex()
public abstract ByteBuf resetReaderIndex()
//返回可讀字節數
public abstract int readableBytes()
//返回寫指針
public abstract int writerIndex()
//設置寫指針
public abstract ByteBuf writerIndex(int writerIndex);
//標志當前寫指針位置,配合resetWriterIndex使用
public abstract ByteBuf markWriterIndex()
public abstract ByteBuf resetWriterIndex()
//返回可寫字節數
public abstract int writableBytes()
public abstract ByteBuf clear();
//設置讀寫指針
public abstract ByteBuf setIndex(int readerIndex, int writerIndex)
//指針跳過length
public abstract ByteBuf skipBytes(int length)
//以當前位置切分ByteBuf todo
public abstract ByteBuf slice();
//切割起始位置為index,長度為length的ByteBuf todo
public abstract ByteBuf slice(int index, int length);
//Returns a copy of this buffer's readable bytes. //復制ByteBuf todo
public abstract ByteBuf copy()
//是否可讀
public abstract boolean isReadable()
//是否可寫
public abstract boolean isWritable()
//字節編碼順序
public abstract ByteOrder order()
//是否在直接內存申請的ByteBuf
public abstract boolean isDirect()
//轉為jdk.NIO的ByteBuffer類
public abstract ByteBuffer nioBuffer()
  • 使用示例
public static void main(String[] args) {
    //分配大小為10的內存
    ByteBuf buf = Unpooled.buffer(10);
    //寫入
    buf.writeBytes("csc".getBytes());
    //讀取
    byte[] b =  new byte[3];
    buf.readBytes(b);
    System.out.println(new String(b));
    System.out.println(buf.writerIndex());
    System.out.println(buf.readerIndex());
}
----result----
csc
3
3
  • ByteBuf初始化時,readIndex和writerIndex等於0,調用writeXXX()方法寫入數據,writerIndex會增加(setXXX方法無作用);調用readXXX()方法讀取數據,則會使readIndex增加(getXXX方法無作用),但不會超過writerIndex
  • 在讀取數據之后,0-readIndex之間的byte數據被視為discard,調用discardReadBytes(),釋放這部分空間,作用類似於ByteBuffer的compact方法

參考文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM