Netty 源碼分析之ByteBuf
ByteBuf基礎
Java Nio 的Buffer
在進行數據傳輸的過程中,我們經常會用到緩沖區。
在Java NIO 為我們提供了原生的七種緩沖區實現,對應着Java 的七種基本類型。一般使用ByteBuffer較多。原生的Buffer雖然能滿足我們的日常使用,但是要進行復雜的應用的時候,確有點力不從心了,原生Buffer存在着以下缺點。因此Netty對其進行了封裝,提供了更為友好的接口供我們使用。
- 當我們調用對應Buffer類的allocate方法來創建緩沖區實例的時候,會分配指定的空間,同時緩沖區的長度就會被固定,不能進行動態的增長或者收縮。如果我們寫入的數據大於緩沖區的capacity的時候,就會發生數組越界錯誤。
-
Buffer只有一個位置標志位屬性Position,我們只能flip或者rewind方法來對position進行修改來處理數據的存取位置,一不小心就可能會導致錯誤。
-
Buffer只提供了存取、翻轉、釋放、標志、比較、批量移動等緩沖區的基本操作,我們想使用高級的功能,就得自己手動進行封裝及維護,使用非常不方便。
ByteBuf工作原理
ByteBuf也是通過字節數組作為緩沖區來存取數據,通過外觀模式聚合了JDK NIO元素的ByteBuffer,進行封裝。
ByteBuf是通過readerIndex跟writerIndex兩個位置指針來協助緩沖區的讀寫操作的。
在對象初始化的時候,readerIndex和writerIndex的值為0,隨着讀操作和寫操作的進行,writerIndex和readerIndex都會增加,不過readerIndex不能超過writerIndex,在進行讀取操作之后,0到readerIndex之間的空間會被視為discard,調用ByteBuf的discardReadBytes方法,可以對這部分空間進行釋放重用,類似於ByteBuffer的compact操作,對緩沖區進行壓縮。readerIndex到writerIndex的空間,相當於ByteBuffer的position到limit的空間,可以對其進行讀取,WriterIndex到capacity的空間,則相當於ByteBuffer的limit到capacity的空間,是可以繼續寫入的。
readerIndex跟writerIndex讓讀寫操作的位置指針分離,不需要對同一個位置指針進行調整,簡化了緩沖區的讀寫操作。
同樣,ByteBuf對讀寫操作進行了封裝,提供了動態擴展的能力,當我們對緩沖區進行寫操作的時候,需要對剩余的可用空間進行校驗,如果可用空間不足,同時要寫入的字節數小於可寫的最大字節數,會對緩沖區進行動態擴展,它會重新創建一個緩沖區,然后將以前的數據復制到新創建的緩沖區中,
ByteBuf基本功能
- 順序讀
在進行讀操作之前,首先對緩沖區可用的空間進行校驗。如果要讀取的字節長度小於0,就會拋出IllegalArgumentException異常,如果要讀取的字節長度大於已寫入的字節長度,會拋出IndexOutOfBoundsException異常。通過校驗之后,調用getBytes方法,從當前的readerIndex開始,讀取length長度的字節數據到目標dst中,由於不同的子類實現不一樣,getBytes是個抽象方法,由對應的子類去實現。如果讀取數據成功,readerIndex將會增加相應的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
- 順序寫
讀操作是將源字節數組從srcIndex開始,length長度的數據寫入到當前的ByteBuf中的。
一開始需要對寫入數組的字節數進行校驗,如果寫入長度小於0,將會拋出IllegalArgumentException異常,如果寫入字節數小於當前ByteBuf的可寫入字節數,則通過檢驗。如果寫入字節數大於緩沖區最大可動態擴展的容量maxCapacity,就會拋出
IndexOutOfBoundsException異常,否則的話,就會通過動態擴展來滿足寫入需要的字節數。首先通過calculateNewCapacity計算出重新擴展后的容量,然后調用capacity方法進行擴展,不同的子類有不同實現,所以也是一個抽象方法。- 計算擴展容量,首先設置門閥值為4m,如果要擴展的容量等於閥值就使用閥值作為緩沖區新的容量,如果大於閥值就以4M作為步長,每次增加4M,如果擴展期間,要擴展的容量比最大可擴展容量還大的話,就以最大可擴展容量maxCapacity為新的容量。否則的話,就從64開始倍增,直到倍增之后的結果大於要擴展的容量,再把結果作為緩沖區的新容量。
- 通過先倍增再步長來擴展容量,如果我們只是writerIndex+length的值作為緩沖區的新容量,那么再以后進行寫操作的時候,每次都需要進行容量擴展,容量擴展的過程需要進行內存復制,過多內存復制會導致系統的性能下降,之所以是倍增再部長,在最初空間比較小的時候,倍增操作並不會帶來太多的內存浪費,但是內存增長到一定的時候,再進行倍增的時候,就會對內存造成浪費,因此,需要設定一個閥值,到達閥值之后就通過步長的方法進行平滑的增長。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity實現
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- Clear操作
clear操作只是把readerIndex和writerIndex設置為0,不會對存儲的數據進行修改。
public ByteBuf clear() {
readerIndex = writerIndex = 0;
return this;
}
-
索引操作
- 讀寫位置索引設置:主要是對邊界條件進行校驗,設置readerIndex的時候,newReaderIndex不能小於0跟大於writerIndex;設置writerIndex的時候,newWriterIndex必須大於readerIndex和小於當前的capacity。如果不能通過校驗的話,就會拋出IndexOutOfBoundsException異常。
- mark和reset操作:由於有readerIndex和writerIndex,因此進行mark或者reset需要指定相應的操作位置索引,mark操作會把當前的readerIndex或者writerIndex設置為markedReaderIndex或者markedWriterIndex;reset操作的話,它是參入對應的mark值調用對應readerIndex()或者writerIndex();
-
緩沖區重用
可以通過discardReadByte方法去重用已經讀取過的緩沖區。
首先對readerIndex進行判斷:- 如果readerIndex等於0,就說明沒有讀取數據,沒有可以用來重用的空間,直接返回;
- 如果readerIndex大於0且不等於writerIndex的話,說明有進行數據讀取被丟棄的緩沖區,也有還沒有被讀取的緩沖區。調用setBytes方法進行字節數組的復制,將沒被讀取的數據移動到緩沖區的起始位置,重新去設置readerIndex和writerIndex,readerIndex為0,writerIndex為原writerIndex-readerIndex;同時,也需要對mark進行重新設置。
- 首先對markedReaderIndex進行備份然后跟decrement進行比較,如果markedReaderIndex比decrement小的話,markedReaderIndex設置為0,再用markedWriterIndex跟decrement比較,如果小於的話,markedWriterIndex也設置為0,否則的話markedWriterIndex較少decrement;
- 如果markedReaderIndex比decrement大的話,markedReaderIndex和markedReaderIndex都減去decrement就可以了。
- 如果readerIndex等於writerIndex的話,說明沒有可以進行重用的緩沖區,直接對mark重新設置就可以了,不需要內存復制。
public ByteBuf discardReadBytes() {
ensureAccessible();
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
} else {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
protected final void adjustMarkers(int decrement) {
int markedReaderIndex = this.markedReaderIndex;
if (markedReaderIndex <= decrement) {
this.markedReaderIndex = 0;
int markedWriterIndex = this.markedWriterIndex;
if (markedWriterIndex <= decrement) {
this.markedWriterIndex = 0;
} else {
this.markedWriterIndex = markedWriterIndex - decrement;
}
} else {
this.markedReaderIndex = markedReaderIndex - decrement;
markedWriterIndex -= decrement;
}
}
- skipBytes
當我們需要跳過某些不需要的字節的時候,可以調用skipBytes方法來跳過指定長度的字節來讀取后面的數據。
首先對跳躍長度進行判斷,如果跳躍長度小於0的話,會拋出IllegalArgumentException異常,或者跳躍長度大於當前緩沖區可讀長度的話,會拋出IndexOutOfBoundsException異常。如果校驗通過,新的readerindex為原readerIndex+length,如果新的readerIndex大於writerIndex的話,會拋出IndexOutOfBoundsException異常,否則就更新readerIndex。
public ByteBuf skipBytes(int length) {
checkReadableBytes(length);
int newReaderIndex = readerIndex + length;
if (newReaderIndex > writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
length, readerIndex, writerIndex));
}
readerIndex = newReaderIndex;
return this;
}
ByteBuf源碼分析
AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf是ByteBuf實現對引用進行計數的基類,用來跟蹤對象的分配和銷毀,實現自動內存回收。
- 成員變量
- refCntUpdater refCntUpdater是一個AtomicIntegerFieldUpdater類型的成員變量,它可以對成員變量進行原子性更新操作,達到線程安全。
- REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是標識refCnt字段在AbstractReferenceCountedByteBuf的內存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf兩個子類中都會使用到這個偏移量。
- refCnt volatile修飾保證變量的線程可見性,用來跟蹤對象的引用次數
- 對象引用計數器
每調用retain方法一次,引用計數器就會加一。retain方法通過自旋對引用計數器進行加一操作,引用計數器的初始值為1,只要程序是正確執行的話,它的最小值應該為1,當申請和釋放次數相等的時候,對應的ByteBuf就會被回收。當次數為0時,表明對象被錯誤的引用,就會拋出IllegalReferenceCountException異常,如果次數等於Integer類型的最大值,就會拋出
IllegalReferenceCountException異常。retain通過refCntUpdater的compareAndSet方法進行原子操作更新,compareAndSet會使用獲取的值與期望值進行比較,如果在比較器件,有其他線程對變量進行修改,那么比較失敗,會再次自旋,獲取引用計數器的值再次進行比較,否則的話,就會進行加一操作,退出自旋。
release方法的話與retain方法類似,也是通過自旋循環進行判斷和更新,不過當refCnt的值等於1的時候,表明引用計數器的申請跟釋放次數一樣,對象引用已經不可達了,對象應該要被垃圾收集回收掉了,調用deallocate方法釋放ByteBuf對象
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
UnpooledHeapByteBuf
UnpooledHeapByteBuf是一個非線程池實現的在堆內存進行內存分配的字節緩沖區,在每次IO操作的都會去創建一個UnpooledHeapByteBuf對象,如果頻繁地對內存進行分配或者釋放會對性能造成影響。
- 成員變量
- ByteBufAllocator 用於內存分配
- array 字節數組作為緩沖區,用於存儲字節數據
- ByteBuffer 用來實現Netty ByteBuf 到Nio ByteBuffer的變換
- 動態擴展緩沖區
調用capacity方法動態擴展緩沖區,首先要對擴展容量進行校驗,如果新容量的大小小於0或者大於最大可擴展容量maxCapacity的話,拋出IllegalArgumentException異常。
通過校驗之后,如果新擴展容量比原來大的話,則創建一個新的容量為新擴展容量的字節數組緩沖區,然后調用System.arraycopy進行內存復制,將舊的數據復制到新數組中去,然后用setArray進行數組替換。動態擴展之后需要原來的視圖tmpNioBuffer設置為控。
如果新的容量小於當前緩沖區容量的話,不需要進行動態擴展,但是需要截取部分數據作為子緩沖區。- 首先對當前的readerIndex是否小於newCapacity,如果小於的話繼續對writerIndex跟newCapacity進行比較,如果writerIndex大於newCapacity的話,就將writerIndex設置為newCapacity,更新完索引之后就通過System.arrayCopy內存復制將當前可讀的數據復制到新的緩沖區字節數組中。
- 如果newCapacity小於readerIndex的話,說明沒有新的可讀數據要復制到新的字節數組緩沖區中,只需要把writerIndex跟readerIndex都更新為newCapacity既可,最后調用setArray更換字節數組。
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- setBytes
字節數組復制,首先對數據進行合法性檢驗,如果srcIndex或者index的值小於0,就會拋出IllegalArgumentException,如果index+length的值大於capacity的值或者srcIndex+length的值大於src.length的話,就會拋出IndexOutOfBoundsException異常。通過校驗之后,就調用System.arraycopy進行字節數組復制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
checkIndex(index, length);
if (srcIndex < 0 || srcIndex > srcCapacity - length) {
throw new IndexOutOfBoundsException(String.format(
"srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
}
}
- Netty ByteBuf與Nio ByteBuffer轉換
要將Netty的ByteBuf轉化為Nio ByteBuffer,在ByteBuffer中有wrap靜態方法,只需要傳入對應的字節數組即可創建轉化為ByteBuffer,在nioBuffer方法還調用了slice方法,它可以創建一個從原ByteBuffer的position開始緩沖區,與原緩沖區共享同一段數據元素。nioBuffer方法不會重用緩沖區,只能保證writerIndex跟readerIndex的獨立性。
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
PooledByteBuf
在Netty4之后加入內存池管理,通過內存池管理比之前ByteBuf的創建性能得到了極大提高。
- PoolChunk
- Page 可以用來分配的最小內存塊單位
- Chunk page的集合
PoolChunk主要負責內存塊的分配及釋放,chunk中的page會構建成一顆二叉樹,默認情況下page的大小是8K,chunk的大小是2^11 page,即16M,構成了11層的二叉樹,最下面一層的葉子節點有8192個,與page的數目一樣,每一次內存的分配必須保證連續性,方便內存操作。每個節點會記錄自己在Memory Area的偏移地址,當一個節點表示的內存區域被分配之后,那么該節點會被標志為已分配,該節點的所有子節點的內存請求都會忽略。每次內存分配的都是8k(2^n)大小的內存塊,當需要分配大小為chunkSize/(2^k)的內存端時,為了找到可用的內存段,會從第K層左邊開始尋找可用節點。
- PoolArena
在內存分配中,為了能夠集中管理內存的分配及釋放,同時提供分配和釋放內存的性能,一般都是會先預先分配一大塊連續的內存,不需要重復頻繁地進行內存操作,那一大塊連續的內存就叫做memory Arena,而PoolArena是Netty的內存池實現類。
在Netty中,PoolArena是由多個Chunk組成的,而每個Chunk則由多個Page組成。PoolArena是由Chunk和Page共同組織和管理的。
- PoolSubpage
當對於小於一個Page的內存分配的時候,每個Page會被划分為大小相等的內存塊,它的大小是根據第一次申請內存分配的內存塊大小來決定的。一個Page只能分配與第一次內存內存的內存塊的大小相等的內存塊,如果想要想要申請大小不想等的內存塊,只能在新的Page上申請內存分配了。
Page中的存儲區域的使用情況是通過一個long數組bitmap來維護的,每一位表示一個區域的占用情況。
PooledDirectByteBuf
- 創建字節緩沖區
由於內存池實現,每次創建字節緩沖區的時候,不是直接new,而是從內存池中去獲取,然后設置引用計數器跟讀寫Index,跟緩沖區最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
- 復制字節緩沖區實例
copy方法可以復制一個字節緩沖區實例,與原緩沖區獨立。
首先要對index和length進行合法性判斷,然后調用PooledByteBufAllocator的directBuffer方法分配一個新的緩沖區。newDirectBuffer方法是一個抽象方法,對於不同的子類有不同的實現。如果是unpooled的話,會直接創建一個新的緩沖區,如果是pooled的話,它會從內存池中獲取一個可用的緩沖區。
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
ByteBuf輔助類分析
ByteBufHolder
ByteBufHolder是ByteBuf的一個容器,它可以更方便地訪問ByteBuf中的數據,在使用不同的協議進行數據傳輸的時候,不同的協議消息體包含的數據格式和字段不一樣,所以抽象一個ByteBufHolder對ByteBuf進行包裝,不同的子類有不同的實現,使用者可以根據自己的需要進行實現。Netty提供了一個默認實現DefaultByteBufHolder。
ByteBufAllocator
ByteBufAllocator是字節緩沖區分配器,根據Netty字節緩沖區的實現不同,分為兩種不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不同ByteBuf的分配方法。
CompositeByteBuf
CompositeByteBuf是一個虛擬的Buffer,它可以將多個ByteBuf組裝為一個ByteBuf視圖。
在Java NIO中,我們有兩種實現的方法
- 將其他ByteBuffer的數據復制到一個ByteBuffer中,或者重新創建一個新的ByteBuffer,將其他的ByteBuffer復制到新建的ByteBuffer中。
- 通過容器將多個ByteBuffer存儲在一起,進行統一的管理和維護。
在Netty中,CompositeByByteBuf中維護了一個Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護在集合中的位置偏移量等信息。一般情況下,我們應該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來創建CompositeByteBuf,而不是直接通過構造函數去實例化一個CompositeByteBuf對象。
private int addComponent0(int cIndex, ByteBuf buffer) {
checkComponentIndex(cIndex);
if (buffer == null) {
throw new NullPointerException("buffer");
}
int readableBytes = buffer.readableBytes();
// No need to consolidate - just add a component to the list.
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
}
return cIndex;
}
private void consolidateIfNeeded() {
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = allocBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
}
public CompositeByteBuf removeComponent(int cIndex) {
checkComponentIndex(cIndex);
Component comp = components.remove(cIndex);
comp.freeIfNecessary();
if (comp.length > 0) {
updateComponentOffsets(cIndex);
}
return this;
}
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
buf.release(); // We should not get a NPE here. If so, it must be a bug.
}
}
ByteBufUtil
ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態方法來操作ByteBuf。