每種ByteBuf都有相應的分配器ByteBufAllocator,類似工廠模式。我們先學習UnpooledHeapByteBuf與其對應的分配器UnpooledByteBufAllocator
如何知道alloc分配器那是個?
可以從官方下載的TimeServer 例子來學習,本項目已有源碼可在 TestChannelHandler.class里斷點追蹤
從圖可以看出netty 4.1.8默認的ByteBufAllocator是PooledByteBufAllocator,可以參過啟動參數-Dio.netty.allocator.type unpooled/pooled 設置
細心的讀者可以看出分配ByteBuf只有pool跟unpool,但ByteBuf有很多類型,可能出於使用方面考慮,有時不一定設計太死板,太規范反而使學習成本很大
public final class ByteBufUtil { static final ByteBufAllocator DEFAULT_ALLOCATOR; static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; } else { alloc = PooledByteBufAllocator.DEFAULT; } DEFAULT_ALLOCATOR = alloc; } }
AbstractReferenceCountedByteBuf是統計引用總數處理,用到Atomic*技術。
refCnt是從1開始,每引用一次加1,釋放引用減1,當refCnt變成1時執行deallocate由子類實現
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf { private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); private volatile int refCnt = 1; @Override public ByteBuf retain() { return retain0(1); } private ByteBuf retain0(int increment) { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; if (nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; } @Override public boolean release() { return release0(1); } private boolean release0(int decrement) { for (;;) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } } protected abstract void deallocate(); }
對於ByteBuf I/O 操作經常用的是 writeByte readByte兩種
由於ByteBuf支持多種bytes對象,如 OutputStream、GatheringByteChannel、ByteBuffer、ByteBuf等,
我們只拿兩三種常用的API來做分析,其它邏輯大同小異
如果讀者有印象的話,通常底層只負責流程控制,實現交給應用層/子類處理,AbstractByteBuf.class writeByte/readByte 也是這種處理方式
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { //分配器 private final ByteBufAllocator alloc; //數據 byte[] array; //臨時ByteBuffer,用於內部緩存 private ByteBuffer tmpNioBuf; private UnpooledHeapByteBuf( ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) { //省去部分代碼同邊界處理 super(maxCapacity); this.alloc = alloc; array = initialArray; this.readerIndex = readerIndex; this.writerIndex = writerIndex; } //獲取ByteBuffer容量 @Override public int capacity() { ensureAccessible(); return array.length; } @Override public boolean hasArray() { return true; } //獲取原始數據 @Override public byte[] array() { ensureAccessible(); return array; } //擴容/縮容 @Override public ByteBuf capacity(int newCapacity) { ensureAccessible(); //newCapacity參數邊界判斷 if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; //擴容處理,直接cp到新的array if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { //減容處理 //這里有兩種處理情況 //1.readerIndex > newCapacity 說明還有數據未處理直接將 readerIndex,writerIndex相等 newCapacity //2.否則 writerIndex =Math.min(writerIndex,newCapacity),取最少值,然后直接復制數據 //可以看出netty處理超出readerIndex、writerIndex 限界直接丟棄數據。。。。。。 byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex = newCapacity this.writerIndex = writerIndex; } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); //System.arraycopy(復制來源數組, 來源組起始坐標, 目標數組, 目標數組起始坐標, 復制數據長度); } else { this.readerIndex = newCapacity; this.writerIndex = newCapacity; } setArray(newArray); } return this; } }
AbstractByteBuf.class readBytes 調用子類實現 getBytes方法,區別是調用readBytes會改變readerIndex記錄
public abstract class AbstractByteBuf extends ByteBuf { @Override public ByteBuf readBytes(ByteBuffer dst) { int length = dst.remaining(); //checkReadableBytes(length); if (readerIndex > (writerIndex - length)) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, length, writerIndex, this)); } //調用子類實現 getBytes(readerIndex, dst); //記錄已讀長度 readerIndex += length; return this; } @Override public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; } //這里如果index不為負的話只需要 capacity - (index + length) < 0 判斷就可以 //用到 | 運算 如果 index為-1的話 index | length 還是負數 第二個 | (index + length)運算有可能 index + length相加為負 public static boolean isOutOfBounds(int index, int length, int capacity) { return (index | length | (index + length) | (capacity - (index + length))) < 0; } }
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { //支持ByteBuffer讀取 @Override public ByteBuf getBytes(int index, ByteBuffer dst) { //checkIndex(index, dst.remaining()); if (isOutOfBounds(index, dst.remaining(), capacity())) { throw new IndexOutOfBoundsException(String.format( "index: %d, length: %d (expected: range(0, %d))", index, dst.remaining(), capacity())); } dst.put(array, index, dst.remaining()); return this; } //支持ByteBuf讀取 @Override public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.capacity()); //是unsafe類型,要調用jdk unsafe方法復制 if (dst.hasMemoryAddress()) { PlatformDependent.copyMemory(array, index, dst.memoryAddress() + dstIndex, length); } else if (dst.hasArray()) { //如果是數組即 heap類型,直接復制過去 getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); } else { dst.setBytes(dstIndex, array, index, length); } return this; } //支持數組讀取 @Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; } }
AbstractByteBuf.class writeBytes 調用子類實現 setBytes方法,區別是調用writeBytes會改變writerIndex記錄
public abstract class AbstractByteBuf extends ByteBuf { @Override public ByteBuf writeBytes(ByteBuf src) { writeBytes(src, src.readableBytes()); return this; } @Override public ByteBuf writeBytes(ByteBuf src, int length) { if (length > src.readableBytes()) { throw new IndexOutOfBoundsException(String.format( "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src)); } writeBytes(src, src.readerIndex(), length); //讀取src數據到this.ByteBuf 所以要更改src readerIndex src.readerIndex(src.readerIndex() + length); return this; } @Override public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { ensureAccessible(); //是否擴容處理 ensureWritable(length); //調用子類實現 setBytes(writerIndex, src, srcIndex, length); //記錄已寫長度 writerIndex += length; return this; } private void ensureWritable0(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; } //寫入數據長度大於最大空間剩余長度拋異常 if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } //通過分配器計算,參數1寫完后的writerIndex記錄,參數2最大容量長度 int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); //子類實現 capacity(newCapacity); } //////////////////////////////AbstractByteBufAllocator.class////////////////////////////////////// @Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } //如果新容量大於4M,不走雙倍擴大算法,數值范圍取 minNewCapacity <= maxCapacity if (minNewCapacity > threshold) { // 除以threshold再乘以threshold得出的結果是 threshold的倍數,可以理解是去掉余數 int newCapacity = minNewCapacity / threshold * threshold; //如果剩余容量不夠4M直接給maxCapacity,否則自增4M if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } //newCapacity <<= 1 意思是 newCapacity*2,雙倍自增 int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); } }
//setBytes邏輯跟getBytes一樣 public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { @Override public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.capacity()); if (src.hasMemoryAddress()) { PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length); } else if (src.hasArray()) { setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); } else { src.getBytes(srcIndex, array, index, length); } return this; } @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } }
總結:
1.writeBytes跟setBytes、readBytes跟getBytes區別是前者有記錄,后者沒有,而后者是子類的實現
2.擴容算法是兩種策略:
2.1.大於4M時不走double自增,數值范圍取 minNewCapacity <= maxCapacity
2.2.少於4M時從64開始double自增
3.更改容量也是每個子類實現,要考慮兩種情況
3.1.大於當前容量
3.2.小於當前容量,當小於的時候要考慮 readerIndex、writerIndex邊界,當超過 readerIndex、writerIndex邊界heap的策略是丟去原來的數據
4.heap是繼承 AbstractReferenceCountedByteBuf的,當refCnt記錄為1時釋放數據