[編織消息框架][netty源碼分析]11 ByteBuf 實現類UnpooledHeapByteBuf職責與實現


 

每種ByteBuf都有相應的分配器ByteBufAllocator,類似工廠模式。我們先學習UnpooledHeapByteBuf與其對應的分配器UnpooledByteBufAllocator

如何知道alloc分配器那是個?

可以從官方下載的TimeServer 例子來學習,本項目已有源碼可在 TestChannelHandler.class里斷點追蹤

從圖可以看出netty 4.1.8默認的ByteBufAllocator是PooledByteBufAllocator,可以參過啟動參數-Dio.netty.allocator.type unpooled/pooled 設置

細心的讀者可以看出分配ByteBuf只有pool跟unpool,但ByteBuf有很多類型,可能出於使用方面考慮,有時不一定設計太死板,太規范反而使學習成本很大

public final class ByteBufUtil {
    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
        }
        DEFAULT_ALLOCATOR = alloc;
    }
}

 AbstractReferenceCountedByteBuf是統計引用總數處理,用到Atomic*技術。

refCnt是從1開始,每引用一次加1,釋放引用減1,當refCnt變成1時執行deallocate由子類實現

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    private volatile int refCnt = 1;
 
    @Override
    public ByteBuf retain() {
        return retain0(1);
    }

    private ByteBuf retain0(int increment) {
        for (;;) {
            int refCnt = this.refCnt;
            final int nextCnt = refCnt + increment;
            if (nextCnt <= increment) {
                throw new IllegalReferenceCountException(refCnt, increment);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
                break;
            }
        }
        return this;
    }
 
    @Override
    public boolean release() {
        return release0(1);
    }

    private boolean release0(int decrement) {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt < decrement) {
                throw new IllegalReferenceCountException(refCnt, -decrement);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                if (refCnt == decrement) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

    protected abstract void deallocate();
}

 

對於ByteBuf I/O 操作經常用的是 writeByte readByte兩種
由於ByteBuf支持多種bytes對象,如 OutputStream、GatheringByteChannel、ByteBuffer、ByteBuf等,
我們只拿兩三種常用的API來做分析,其它邏輯大同小異
如果讀者有印象的話,通常底層只負責流程控制,實現交給應用層/子類處理,AbstractByteBuf.class writeByte/readByte 也是這種處理方式

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
    //分配器
    private final ByteBufAllocator alloc;
    //數據
    byte[] array;
    //臨時ByteBuffer,用於內部緩存
    private ByteBuffer tmpNioBuf;
    
    private UnpooledHeapByteBuf(
            ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
        //省去部分代碼同邊界處理
        super(maxCapacity);
        this.alloc = alloc;
        array = initialArray;
        this.readerIndex = readerIndex;
        this.writerIndex = writerIndex;
    }
    //獲取ByteBuffer容量
    @Override
    public int capacity() {
        ensureAccessible();
        return array.length;
    }
    @Override
    public boolean hasArray() {
        return true;
    }
    //獲取原始數據
    @Override
    public byte[] array() {
        ensureAccessible();
        return array;
    }
    //擴容/縮容
    @Override
    public ByteBuf capacity(int newCapacity) {
        ensureAccessible();
        //newCapacity參數邊界判斷
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = array.length;
        //擴容處理,直接cp到新的array
        if (newCapacity > oldCapacity) {
            byte[] newArray = new byte[newCapacity];
            System.arraycopy(array, 0, newArray, 0, array.length);
            setArray(newArray);
        } else if (newCapacity < oldCapacity) {
            //減容處理
            //這里有兩種處理情況 
            //1.readerIndex > newCapacity 說明還有數據未處理直接將 readerIndex,writerIndex相等 newCapacity
            //2.否則 writerIndex =Math.min(writerIndex,newCapacity),取最少值,然后直接復制數據
            
            //可以看出netty處理超出readerIndex、writerIndex 限界直接丟棄數據。。。。。。
            
            byte[] newArray = new byte[newCapacity];
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex = newCapacity
                    this.writerIndex = writerIndex;
                }
                System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
              //System.arraycopy(復制來源數組, 來源組起始坐標, 目標數組, 目標數組起始坐標, 復制數據長度);

            } else {
                this.readerIndex = newCapacity;
                this.writerIndex = newCapacity;
            }
            setArray(newArray);
        }
        return this;
    }
}

 

AbstractByteBuf.class readBytes 調用子類實現 getBytes方法,區別是調用readBytes會改變readerIndex記錄

public abstract class AbstractByteBuf extends ByteBuf {
    @Override
    public ByteBuf readBytes(ByteBuffer dst) {
        int length = dst.remaining();
        //checkReadableBytes(length);
         if (readerIndex > (writerIndex - length)) {
            throw new IndexOutOfBoundsException(String.format(
                    "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                    readerIndex, length, writerIndex, this));
        }
        //調用子類實現
        getBytes(readerIndex, dst);
        //記錄已讀長度
        readerIndex += length;
        return this;
    }
    @Override
    public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
        checkReadableBytes(length);
        getBytes(readerIndex, dst, dstIndex, length);
        readerIndex += length;
        return this;
    }
    
    //這里如果index不為負的話只需要 capacity - (index + length) < 0 判斷就可以
    //用到 | 運算 如果 index為-1的話 index | length 還是負數 第二個 | (index + length)運算有可能 index + length相加為負
    public static boolean isOutOfBounds(int index, int length, int capacity) {
        return (index | length | (index + length) | (capacity - (index + length))) < 0;
    }
}
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
    //支持ByteBuffer讀取
    @Override
    public ByteBuf getBytes(int index, ByteBuffer dst) {
        //checkIndex(index, dst.remaining());
        if (isOutOfBounds(index,  dst.remaining(), capacity())) {
            throw new IndexOutOfBoundsException(String.format(
                    "index: %d, length: %d (expected: range(0, %d))", index, dst.remaining(), capacity()));
        }
        dst.put(array, index, dst.remaining());
        return this;
    }
    //支持ByteBuf讀取
    @Override
    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.capacity());
        //是unsafe類型,要調用jdk unsafe方法復制
        if (dst.hasMemoryAddress()) {
            PlatformDependent.copyMemory(array, index, dst.memoryAddress() + dstIndex, length);
        } else if (dst.hasArray()) { //如果是數組即 heap類型,直接復制過去
            getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
        } else {
            dst.setBytes(dstIndex, array, index, length);
        }
        return this;
    }
    
    //支持數組讀取
    @Override
    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.length);
        System.arraycopy(array, index, dst, dstIndex, length);
        return this;
    }
}

 AbstractByteBuf.class writeBytes 調用子類實現 setBytes方法,區別是調用writeBytes會改變writerIndex記錄

public abstract class AbstractByteBuf extends ByteBuf {
    @Override
    public ByteBuf writeBytes(ByteBuf src) {
        writeBytes(src, src.readableBytes());
        return this;
    }

    @Override
    public ByteBuf writeBytes(ByteBuf src, int length) {
        if (length > src.readableBytes()) {
            throw new IndexOutOfBoundsException(String.format(
                    "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
        }
        writeBytes(src, src.readerIndex(), length);
        //讀取src數據到this.ByteBuf 所以要更改src readerIndex
        src.readerIndex(src.readerIndex() + length);
        return this;
    }
    @Override
    public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
        ensureAccessible();
        //是否擴容處理
        ensureWritable(length);
        //調用子類實現
        setBytes(writerIndex, src, srcIndex, length);
        //記錄已寫長度
        writerIndex += length;
        return this;
    }
    
    private void ensureWritable0(int minWritableBytes) {
        if (minWritableBytes <= writableBytes()) {
            return;
        }
        //寫入數據長度大於最大空間剩余長度拋異常
        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
        
        //通過分配器計算,參數1寫完后的writerIndex記錄,參數2最大容量長度
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        //子類實現
        capacity(newCapacity);
    }
    //////////////////////////////AbstractByteBufAllocator.class//////////////////////////////////////
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        if (minNewCapacity < 0) {
            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
        }
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        
        final int threshold = 1048576 * 4; // 4 MiB page
        if (minNewCapacity == threshold) {
            return threshold;
        }

        //如果新容量大於4M,不走雙倍擴大算法,數值范圍取 minNewCapacity <= maxCapacity
        if (minNewCapacity > threshold) {
            // 除以threshold再乘以threshold得出的結果是 threshold的倍數,可以理解是去掉余數
            int newCapacity = minNewCapacity / threshold * threshold;
            //如果剩余容量不夠4M直接給maxCapacity,否則自增4M
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        //newCapacity <<= 1 意思是 newCapacity*2,雙倍自增
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }
}

 

//setBytes邏輯跟getBytes一樣
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
    @Override
    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.capacity());
        if (src.hasMemoryAddress()) {
            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
        } else  if (src.hasArray()) {
            setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
        } else {
            src.getBytes(srcIndex, array, index, length);
        }
        return this;
    }

    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.length);
        System.arraycopy(src, srcIndex, array, index, length);
        return this;
    }
}

 

總結:

1.writeBytes跟setBytes、readBytes跟getBytes區別是前者有記錄,后者沒有,而后者是子類的實現

2.擴容算法是兩種策略:

  2.1.大於4M時不走double自增,數值范圍取 minNewCapacity <= maxCapacity

  2.2.少於4M時從64開始double自增

3.更改容量也是每個子類實現,要考慮兩種情況

  3.1.大於當前容量

  3.2.小於當前容量,當小於的時候要考慮 readerIndex、writerIndex邊界,當超過 readerIndex、writerIndex邊界heap的策略是丟去原來的數據

4.heap是繼承 AbstractReferenceCountedByteBuf的,當refCnt記錄為1時釋放數據

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM