一、ByteBuf工作原理
1. ByteBuf是ByteBuffer的升級版:
jdk中常用的是ByteBuffer,從功能角度上,ByteBuffer可以完全滿足需要,但是有以下缺點:
- ByteBuffer一旦分配完成,長度固定,不能動態擴展和收縮,當需要編碼的POJO對象大於分配容量時發生索引越界異常
- ByteBuffer只要一個標識位置的指針postion,讀寫切換比較麻煩,flip rewind等操作
- 功能有限
ByteBuf依然是Byte數組緩沖區,擁有ByteBuffer的一切功能:
- 7種Java基礎類型、byte數組、ByteBuffer(ByteBuf)等讀寫;
- 緩沖區自身的copy和slice等;
- 設置網絡字節序;
- 構造緩沖區實例;
- 操作位置指針方法;
2. ByteBuf的工作原理:
ByteBuf使用2個位置指針來協助緩沖區的讀寫操作,讀操作使用readerIndex,寫操作使用writerIndex。
(1) 一開始readerIndex和writerIndex都是0
(2) 隨着寫入writerIndex增加,隨着讀取readerIndex增加,但是不會超過writerIndex。
(3) 讀取之后,0~readerIndex這部分視為discard,調用discardReadBytes方法,可以釋放這部分空間
(4) readerIndex到writerIndex之間的數據是可以讀取的,等價於ByteBuffer中position-limit之間的數據
(5) writerIndex和capacity之間的空間是可寫的,等價於ByteBuffer limit和capacity之間的可用空間。
3. 用圖演示上述過程:
初始分配的ByteBuf:

寫入了N個字節之后:

讀取了M(<N)個字節之后的ByteBuf如圖所示:

調用了discardReadBytes操作之后的ByteBuf如圖所示:

調用了clear之后的ByteBuf如圖所示:

4. 動態擴展
跟大多數的自動擴容數組一樣,在進行put操作的時候,如果空間不足,就創建新的ByteBuffer實現自動擴容,並將之前的ByteBuffer復制到新的ByteBuffer中,最后釋放老的ByteBuffer。
public ByteBuf writeBytes(ByteBuffer src) { ensureAccessible(); int length = src.remaining(); ensureWritable(length); setBytes(writerIndex, src); writerIndex += length; return this; }
注意到讀寫都是使用ByteBuffer,在容量不足的時候會自動擴容:
private void ensureWritable0(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // Adjust to the new capacity. capacity(newCapacity); }
二、ByteBuf功能介紹
1. 順序讀操作
詳見api,類似於ByteBuffer的get
2. 順序寫操作
詳見api,類似於ByteBuffer的put
3. readerIndex和writerIndex
讀索引和寫索引,將ByteBuf分割為3個區域:

可讀區域和可寫區域,以及已經讀取過的區域,可以調用discardReadBytes操作來重用這部分的空間,以節省內存,防止ByteBuf的動態擴張。這在私有協議棧解碼的時候非常有用,因為TCP層可能會粘包,幾百個整包消息被TCP作為一個整包發送。這樣,使用discardReadBytes操作可以重用之前的已經解碼過的緩沖區,從而防止接收緩沖區因為容量不足擴張。
但是,discardReadBytes是把雙刃劍,不能濫用。
4. Discardable bytes
動態擴張比較耗時,因此為了提高性能,往往需要最大努力提升緩沖區的重用率。
discardbytes操作則可以重用已經讀過的空間,減少擴容的次數。
但是,discardbytes操作本身也是自己數組的內存復制,所以頻繁調用也會導致性能下降,因此調用之前,請確認,你想要時間換取空間,而不是擴容。
@Override public ByteBuf discardReadBytes() { ensureAccessible(); if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) { setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; }
5. Readable bytes和Writable bytes
- 可讀區域中是數據實際存儲的區域,以read或者skip開頭的任何操作都將會從readerIndex開始讀取或者跳過指定的數據。如果讀的字節數>可讀字節數,throw IndexOutOfBoundsException。
- 可寫區域是尚可填充的空間,任何以write開頭的操作都會從writeIndex開始向空閑空間開始寫入字節,如果寫入字節數>可寫字節數,也會拋出IndexOutOfBoundsException。
6. Clear操作
ByteBuffer的clear操作不會操作內容本身,而是修改指針位置。ByteBuf也一樣,clear之后0=readerIndex=writerIndex。
7. Mark和Reset
某些情況需要能夠回滾,Netty提供了類似的方法。
- markReaderIndex:將當前的readerIndex備份到markedReaderIndex中;
- resetReaderIndex:將當前的readerIndex設置為mardedReaderIndex;
- markWriterIndex:將當前的readerIndex備份到markedWriterIndex中;
- resetWriterIndex:將當前的readerIndex設置為mardedWriterIndex;
8. 查找操作
ByteBuf提供了查找方法用於滿足不同的應用場景,詳細分類如下.
(1) indexOf(int fromIndex, int toIndex, byte value):從from到to查找value的值
(2) bytesBefore(byte value): readerIndex到writerIndex中查找value的值
(3) bytesBefore(int length, byte value):從readerIndex到readerIndex+length
(4) bytesBefore(int index, int length, byte value): 從index到Index+length
(5) forEachByte(ByteBufProcessor processor): 遍歷可讀字節數組,與ByteBufProcessor 設置的條件進行對比
(6) forEachByte(int index, int length, ByteBufProcessor processor): 類似上面
(7) forEachByteDesc(ByteBufProcessor processor): 同上,采用逆序
(8) forEachByteDesc(int index, int length, ByteBufProcessor processor):逆序
對於被查詢的字節,Netty在ByteBufProcessor中做好了抽象,定義如下:
- FIND_NUL: NUL(0x00)
- FIND_CR
- FIND_LF
- FIND_CRLF
- FIND_LINER_WHITESPACE
9. Derived buffers
類似於數據庫的視圖。以下方法用於創建視圖或者恢復ByteBuf。
(1) duplicate: 返回當前ByteBuf的復制對象,二者共享緩沖區內容,但是讀寫索引獨立,即修改內容內容會變,索引變化不影響原ByteBuf。
(2) copy: 復制一個對象,不共享,內容和索引都是獨立的;
(3) copy(int index, int length)
(4) slice: 返回當前ByteBuf的可讀子緩沖區,即從readerIndex到writerIndex的部分,共享內容,索引獨立。
(5) slice(int index, int length):共享內容,索引獨立。
10. 轉換為ByteBuffer
(1) ByteBuffer nioBuffer(): 當前可讀緩沖區轉換為ByteBuffer,共享內容,索引獨立,且無法感知動態擴容;
(2) ByteBuffer nioBuffer(int index, int length)
11. 隨機讀寫 (set和get)
隨機讀api:

隨機寫api,同樣方式可以查看。
無論是get還是set,都會對其索引進行合法性校驗。
但是,set不支持動態擴展。
三、源碼分析
3.1 主要類繼承關系
除了這些類之外,還有非常多的類:

1. 從內存分配的角度有2類:
(1) 堆內存:優點是內存的分配和回收速度快,可以被自動回收,缺點是如果進行SocketIO的讀寫,需要額外一次的內存復制,將堆內存對應的緩沖區復制到內核Channel中,性能有一定程度損失。
(2) 直接內存:在堆外進行分配,相對分配和回收速度會慢一些,但是將它寫入或者從Socket Channel讀取時,少了一次內存復制,速度更快。
經驗表明:ByteBuf的最佳實踐是在I/O通信線程讀寫緩沖區使用DirectByteBuf,后端業務消息的編解碼模塊使用HeapByteBuf,這樣組合可以達到性能最優。
2. 從內存回收角度上:
(1) 基於對象池的ByteBuf:內存池,可以循環創建ByteBuf,提升內存的利用率,降低高負載導致的頻繁GC。
(2) 普通ByteBuf。
測試表明高負載使用內存池會更加的平穩。
盡管推薦使用基於內存池的ByteBuf,但是內存池的管理和維護更加復雜,也需要更加謹慎。
3.2 AbstactByteBuf
骨架類
1. 主要成員變量
private static final boolean checkAccessible; static { checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true); if (logger.isDebugEnabled()) { logger.debug("-D{}: {}", PROP_MODE, checkAccessible); } } static final ResourceLeakDetector<ByteBuf> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class); //讀索引 int readerIndex; //寫索引 int writerIndex; private int markedReaderIndex; private int markedWriterIndex; private int maxCapacity;
一些公共屬性的定義,這里關注下leakDetector:用於檢測對象是否有泄露。
2. 讀操作系列
這里沒有定義緩沖區的實現,因為不知道是直接內存還是堆內存,但是無論是基於何種內存實現讀操作,一些基本的操作都在骨架類中已經實現,實現代碼復用,這也是抽象和繼承的價值所在。
這里以方法readBytes為例:
@Override public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { //1. 檢測緩沖區可讀長度 checkReadableBytes(length); //2. 抽象獲取,由子類實現 getBytes(readerIndex, dst, dstIndex, length); //3. 讀索引增加 readerIndex += length; return this; }
檢測長度方法非常簡單,檢測可讀長度是否有length,這里略過。
3. 寫操作系列
與讀類似
@Override public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureAccessible(); //1 .檢測寫長度合法 ensureWritable(length); //2. 抽象方法,由子類實現 setBytes(writerIndex, src, srcIndex, length); //3. 寫索引增加 writerIndex += length; return this; }
為什么需要動態擴展?
很多時候都是依據經驗來判斷Pojo對象的大小,如果這個估計值偏大則造成內存浪費,如果偏小直接拋出異常,這種做法對用戶非常不友好。
而Netty的ByteBuf支持動態擴展,為了保證安全,可以指定最大容量。
如何進行計算?
參數是writerIndex+minWriableBytes,即滿足要求的最小容量。
設置閥門值是4MB,如果新增的內存空間大於這個值,不采用倍增,而采用每次步進4MB的方式,每次增加后和maxCapacity比較,選擇其小者。
如果擴容之后的新容量小於閥值,則以64進行倍增。
這樣做的原因無非是綜合2點因素:不希望一次增加容量太小,導致需要頻繁的擴容,不希望一次增加太多,造成空間上的浪費。
因此,在內存比較小的時候(<4MB)的時候,倍增64->128->256字節,這種方式大多數應用可以接收
當內存達到閥值時,再倍增就會帶來額外的內存浪費,例如10MB->20MB,因此使用步增的方式進行擴張。
代碼如下:
@Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
計算完長度,再創建新的緩沖區,由於內存申請方式不同子類不同,依舊設置為一個抽象方法:
public abstract ByteBuf capacity(int newCapacity);
4. 操作索引
與索引相關的操作主要是讀寫索引 mark reset等等。這部分代碼相當簡單。
5. 重用緩沖區
0->readerIndex這部分的空間可以重用。
public ByteBuf discardReadBytes() { ensureAccessible(); if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) {
//1. 字節組進行復制 setBytes(0, this, readerIndex, writerIndex - readerIndex);
//2. 重新設置索引 writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; }
注意到還要重新調整markedReaderIndex和markedWriterIndex。
6. skipBytes
在解碼的時候,有時候需要丟棄非法的數據報文。非常簡單,修改readerIndex即可,
@Override public ByteBuf skipBytes(int length) { checkReadableBytes(length); readerIndex += length; return this; }
3.3 AbstractReferenceCountedByteBuf源碼分析
從類的名字可以看出,該類的功能主要是引用計數,類似於JVM內存回收的對象引用計數器,用於跟蹤對象的分配和銷毀,用於自動的內存回收。
1. 成員變量
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater; static { AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; } private volatile int refCnt = 1;
refCntUpdater是一個CAS類型變量,通過原子操作對成員變量進行更新。
refCnt是一個volatile修飾的字段,用於跟蹤對象的引用次數。
2. 對象引用計數器
CAS算法,每調用一次retain方法,引用計數器就會+1.
@Override public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + 1; // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow. if (nextCnt <= 1) { throw new IllegalReferenceCountException(refCnt, 1); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; }
下面看釋放引用計數器的代碼,也是使用CAS在一個自旋循環里進行判斷和更新的。需要注意的是:當refCnt==1的時候意味着申請和釋放相等,說明對象引用已經不可達,該對象需要被垃圾回收掉,因此調用deallocate方法來釋放ByteBuf對象,代碼如下:
@Override public boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { deallocate(); return true; } return false; } } }
3.4 UnpooledHeapByteBuf
基於堆內存,沒有對象池,意味着每次I/O的讀寫都會創建一個新的UnpooledHeapByteBuf,頻繁進行大塊內存的分配和回收可能會對性能有一定的影響,但是相比於堆外內存的申請和釋放,成本還是要低一些。
相比於PooledHeapByteBuf,其原理更加的簡單,也不容易出現內存管理方面的問題,因此在滿足性能的情況下,推薦使用UnpooledHeapByteBuf。
1. 成員變量
//1. 用於內存分配
private final ByteBufAllocator alloc;
//2. 數組緩沖區 byte[] array;
//3. private ByteBuffer tmpNioBuf;
2. 動態擴展緩沖區
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length;
//1. 如果新的容量值大於當前的緩沖區容量,需要動態擴展 if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity];
//進行數組復制 System.arraycopy(array, 0, newArray, 0, array.length);
// 替換舊的數組 setArray(newArray); } else if (newCapacity < oldCapacity) {
//此時,需要截取當前緩沖區創建一個新的子緩沖區 byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex();
//如果讀索引<新的容量值 if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); }
//拷貝內容 System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else {
//如果此時讀索引更大,無須拷貝數據 setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
3. 字節數組復制
@Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; }
此時不會修改readerIndex和writerIndex,只是修改內容.
4. 轉換為ByteBuf
public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); }
5. 子類相關方法
isDirect: 由於是基於heap,所以返回false
hasArray: 返回true
array: 返回array
@Override public boolean hasArray() { return true; } @Override public byte[] array() { ensureAccessible(); return array; } @Override public int arrayOffset() { return 0; } @Override public boolean hasMemoryAddress() { return false; } @Override public long memoryAddress() { throw new UnsupportedOperationException(); }
其它:
由於UnpooledDirectByteBuf原理和UnpooledHeapByteBuf相同,不同之處在於使用內部緩沖區DirectByteBuffer實現,這里不再描述。
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
3.5 PooledByteBuf內存池原理分析
細節非常復雜,這里僅僅從設計角度上講解。
1. PoolArena
Arena本身是一塊區域,在內存管理中,Memory Arena是指內存中的一大塊連續的區域,PoolArena就是Netty的內存池實現類。
為了集中管理內存的分配和釋放,同時提高分配和釋放內存時候的性能,很多框架和應用都會通過預先申請一大塊內存,然后通過提供相應的分配和釋放接口來使用內存。
這樣,對內存的管理就會被集中到幾個類或者函數中,由於不再頻繁使用系統調用來申請和釋放內存,應用或者系統的性能也會大大提高。這種設計思路中,預先申請的那一大塊內存就會被稱為Memeory Arena。
不同的框架中,Memory Arena的實現不同,Netty的PoolArena是由多個Chunk組成的大塊內存區域,而每個Chunk則由一個或者多個Page組成。
代碼片段如下:
abstract class PoolArena<T> implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); enum SizeClass { Tiny, Small, Normal } static final int numTinySubpagePools = 512 >>> 4; final PooledByteBufAllocator parent; private final int maxOrder; final int pageSize; final int pageShifts; final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] smallSubpagePools; private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; private final List<PoolChunkListMetric> chunkListMetrics;
2. PoolChunk
Chunk中Page被構建為一棵二叉樹。假設一個Chunk由16個Page組成,那么這些Page會被按照下圖方式組織起來。

Page的大小是4個byte,而Chunk的大小是64即4*16。樹有5層,葉子節點所在層用來分配所有的Page內存,而上一層用來分配2個Page,以此類推...
每個節點都記錄了自己在整個Memory Arena中的偏移地址,當一個節點代表的內存區域被分配出去之后,這個節點就被標記為已分配,自這個節點以下的所有節點在后面的內存請求都會被忽略。例如,需要16個byte的時候,就會在第三層尋找,然后標記已經分配,再分配只能尋找其他的三個節點了。
對樹的遍歷算法采用的是深度優先的算法,但是在選擇哪個子節點繼續遍歷的時候是隨機的。
3. PoolSubpage
對於小於一個Page的內存,Netty在Page中完成分配。每個Page會被切分為大小相同的多個存儲塊,存儲塊的大小由第一次申請內存的塊大小決定。
假設一個Page是8個字節,第一次申請的塊大小是4個字節,那么這個Page就包含了2個存儲塊;如果第一次申請的是8個字節,那么這個Page就被分成一個存儲塊。同時,之后能分配的也是和第一次一樣的字節,如果不一樣,需要在一個新的Page中進行分配。
Page中存儲區域的使用狀態通過一個long數組來維護,數組中每個long的每一位表示一個塊存儲區域的占用情況:0表示未占用,1表示已占用。對於個4bytes的Page來說,如果這個Page用來分配1個字節的存儲區域,那么long數組只用一個元素的低4位就可以描述,如果對於一個128bytes的Page,如果也是1byte分配就有128個,就需要2個long元素來代表區域占用情況。
final class PoolSubpage<T> implements PoolSubpageMetric { final PoolChunk<T> chunk; private final int memoryMapIdx; private final int runOffset; private final int pageSize; private final long[] bitmap; PoolSubpage<T> prev; PoolSubpage<T> next; boolean doNotDestroy; int elemSize; private int maxNumElems; private int bitmapLength; private int nextAvail; private int numAvail;
4. 內存回收策略
無論是Chunk還是Page,都使用狀態位(bitmap)來標識內存是否可用,不同之處在於Chunk通過在二叉樹上對節點進行標識,Page則是通過維護塊的狀態標識來實現。
3.6 PooledDirectByteBuf
基於內存池實現,基於直接緩沖,與UnPooledDirectByteBuf唯一的不同就是內存分配和銷毀策略不同,其他都是相同的。
1. 創建字節緩沖區實例
通過靜態工廠創建:
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
設置引用計數器為1,設置緩沖區最大容量后返回。
/** * Method must be called before reuse this {@link PooledByteBufAllocator} */ final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
2. 復制新的字節緩沖區實例
如果使用者確實需要復制一個新的實例,與原來的PooledDirectByteBuf獨立,則調用它的copy(int index, int length) 可以達到上述目標,代碼:
@Override public ByteBuf copy(int index, int length) { checkIndex(index, length); ByteBuf copy = alloc().directBuffer(length, maxCapacity()); copy.writeBytes(this, index, length); return copy; }
上述代碼中,首先對index和length進行合法性校驗,通過之后調用PooledByteBufAllocator分配一個新的ByteBuf,最終調用的是AbstractByteAllocator的directBuffer方法。
@Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); }
newDirectBuffer方法根據子類實現不同策略,此處是Pooled,從池中獲取而不是創建一個新的對象。
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
3. 其它相關功能
@Override public boolean hasArray() { return false; } @Override public byte[] array() { throw new UnsupportedOperationException("direct buffer"); } @Override public int arrayOffset() { throw new UnsupportedOperationException("direct buffer"); } @Override public boolean hasMemoryAddress() { return false; } @Override public long memoryAddress() { throw new UnsupportedOperationException(); }
四、ByteBuf相關輔助類介紹
4.1 ByteBufHolder
ByteBufHolder是ByteBuf相關的容器,在Netty中非常有用。
例如HTTP協議的請求消息和應答消息都可以攜帶消息體,這個消息體在NIO ByteBuffer中就是ByteBuffer對象,在Netty中就是ByteBuf對象。而不同的協議消息體中可以含有不同的協議字段和功能,因此需要對ByteBuf進行包裝和抽象。
為了滿足這些定制化的需求,Netty抽象出了ByteBufHolder對象,它包含了一個ByteBuf,另外還提供了一些其他實用的方法,使用者繼承ByteBufHolder接口可以按需封裝自己的實現。
下面是其類圖,非常豐富。

4.2 ByteBufAllocator
ByteBufAllocator是字節緩沖區分配器,按照Netty的緩沖區實現不同,共有2種不同的分配器,基於內存池的字節緩沖分配器和普通的字節緩沖區分配器。

下圖是主要API列表:

4.3 CompositeByteBuf
CompositeByteBuf允許將多個ByteBuf的實例組裝到一起,形成一個統一的視圖。
某些場景下非常有用,例如某個協議POJO對象包含2部分:消息頭和消息體,它們都是ByteBuf對象。當需要對消息進行編碼的時候需要進行整合,如果使用JDK的話,有以下2種思路:
(1) 將某個ByteBuffer復制到另一個ByteBuffer中,或者創建一個新的ByteBuffer
(2) 通過List等容器,統一維護和處理
Netty的做法則是使用組合模式進行優化。
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> { private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer(); private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator(); private final ByteBufAllocator alloc; private final boolean direct; private final List<Component> components; private final int maxNumComponents; private boolean freed;
它定義了一個Component的集合,Component就是ByteBuf的包裝類:
private static final class Component { final ByteBuf buf; final int length; int offset; int endOffset; Component(ByteBuf buf) { this.buf = buf; length = buf.readableBytes(); } void freeIfNecessary() { buf.release(); // We should not get a NPE here. If so, it must be a bug. } }
增加和刪除的代碼:
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { checkNotNull(buffer, "buffer"); addComponent0(increaseWriterIndex, components.size(), buffer); consolidateIfNeeded(); return this; }
/** * Remove the {@link ByteBuf} from the given index. * * @param cIndex the index on from which the {@link ByteBuf} will be remove */ public CompositeByteBuf removeComponent(int cIndex) { checkComponentIndex(cIndex); Component comp = components.remove(cIndex); comp.freeIfNecessary(); if (comp.length > 0) { // Only need to call updateComponentOffsets if the length was > 0 updateComponentOffsets(cIndex); } return this; }
4.4 ByteBufUtil
工具類,提供靜態方法用於操作ByteBuf對象。

最有用的是對字符串進行編碼和解碼:
public static ByteBuf encodeString(ByteBufAllocator alloc, CharBuffer src, Charset charset): 對字符串進行編碼,使用指定的ByteBufAllocator生成一個新的ByteBuf。
還有方法是hexDump,將ByteBuf內容以16進制的字符串打印出來。
