netty中的ByteBuf


網絡數據的基本單位總是字節。Java NIO 提供了 ByteBuffer 作為它 的字節容器,但是這個類使用起來過於復雜,而且也有些繁瑣。

Netty 的 ByteBuffer 替代品是 ByteBuf,一個強大的實現,既解決了 JDK API 的局限性, 又為網絡應用程序的開發者提供了更好的 API。

一、ByteBuf 的 API

Netty 的數據處理 API 通過兩個組件暴露——abstract class ByteBuf 和 interface ByteBufHolder。

下面是一些 ByteBuf API 的優點:

  •  它可以被用戶自定義的緩沖區類型擴展;
  •  通過內置的復合緩沖區類型實現了透明的零拷貝;
  •  容量可以按需增長(類似於 JDK 的 StringBuilder);
  •  在讀和寫這兩種模式之間切換不需要調用 ByteBuffer 的 flip()方法;
  •  讀和寫使用了不同的索引;
  •  支持方法的鏈式調用;
  • 支持引用計數;
  • 支持池化。

其他類可用於管理 ByteBuf 實例的分配,以及執行各種針對於數據容器本身和它所持有的 數據的操作。我們將在仔細研究 ByteBuf 和 ByteBufHolder 時探討這些特性。

二、ByteBuf 類——Netty 的數據容器

因為所有的網絡通信都涉及字節序列的移動,所以高效易用的數據結構明顯是必不可少的。 Netty 的 ByteBuf 實現滿足並超越了這些需求。讓我們首先來看看它是如何通過使用不同的索引 來簡化對它所包含的數據的訪問的吧。

2.1、它是如何工作的

ByteBuf 維護了兩個不同的索引:一個用於讀取,一個用於寫入。當你從 ByteBuf 讀取時, 它的 readerIndex 將會被遞增已經被讀取的字節數。同樣地,當你寫入 ByteBuf 時,它的 writerIndex 也會被遞增。圖 5-1 展示了一個空 ByteBuf 的布局結構和狀態。

要了解這些索引兩兩之間的關系,請考慮一下,如果打算讀取字節直到 readerIndex 達到 和 writerIndex 同樣的值時會發生什么。在那時,你將會到達“可以讀取的”數據的末尾。就 如同試圖讀取超出數組末尾的數據一樣,試圖讀取超出該點的數據將會觸發一個 IndexOutOfBoundsException。

 

ByteBuf是一個抽象類,內部全部是抽象的函數接口,AbstractByteBuf這個抽象類基本實現了ByteBuf,下面我們通過分析AbstractByteBuf里面的實現來分析ByteBuf的工作原理。

ByteBuf都是基於字節序列的,類似於一個字節數組。在AbstractByteBuf里面定義了下面5個變量:

//源碼
int readerIndex; //讀索引
int writerIndex; //寫索引
private int markedReaderIndex;//標記讀索引
private int markedWriterIndex;//標記寫索引
private int maxCapacity;//緩沖區的最大容量

ByteBuf 與JDK中的 ByteBuffer 的最大區別之一就是: 
(1)netty的ByteBuf采用了讀/寫索引分離,一個初始化的ByteBuf的readerIndex和writerIndex都處於0位置。 
(2)當讀索引和寫索引處於同一位置時,如果我們繼續讀取,就會拋出異常IndexOutOfBoundsException。 
(3)對於ByteBuf的任何讀寫操作都會分別單獨的維護讀索引和寫索引。maxCapacity最大容量默認的限制就是Integer.MAX_VALUE。

2.2、ByteBuf 的使用模式

JDK中的Buffer的類型 有heapBuffer和directBuffer兩種類型,但是在netty中除了heap和direct類型外,還有composite Buffer(復合緩沖區類型)。

2.2.1、Heap Buffer 堆緩沖區

這是最常用的類型,ByteBuf將數據存儲在JVM的堆空間,通過將數據存儲在數組中實現的。 
1)堆緩沖的優點是:由於數據存儲在JVM的堆中可以快速創建和快速釋放,並且提供了數組的直接快速訪問的方法。

2)堆緩沖缺點是:每次讀寫數據都要先將數據拷貝到直接緩沖區再進行傳遞。

這種模式被稱為支撐數組 (backing array),它能在沒有使用池化的情況下提供快速的分配和釋放。這種方式,如代碼清單 5-1 所示,非常適合於有遺留的數據需要處理的情況。

ByteBuf heapBuf = ...;
if (heapBuf.hasArray()) {
  byte[] array = heapBuf.array();
  int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
  int length = heapBuf.readableBytes();
  handleArray(array, offset, length);
}

 

2.2.2、Direct Buffer 直接緩沖區

NIO 在 JDK 1.4 中引入的 ByteBuffer 類允許 JVM 實現通過本地調用來分配內存。這主要是為了避免在每次調用本地 I/O 操作之前(或者之后)將緩沖區的內容復 制到一個中間緩沖區(或者從中間緩沖區把內容復制到緩沖區)。

Direct Buffer在堆之外直接分配內存,直接緩沖區不會占用堆的容量。事實上,在通過套接字發送它之前,JVM將會在內部把你的緩沖 區復制到一個直接緩沖區中。所以如果使用直接緩沖區可以節約一次拷貝。

(1)Direct Buffer的優點是:在使用Socket傳遞數據時性能很好,由於數據直接在內存中,不存在從JVM拷貝數據到直接緩沖區的過程,性能好。

(2)缺點是:相對於基於堆的緩沖區,它們的分配和釋放都較為昂貴。如果你 正在處理遺留代碼,你也可能會遇到另外一個缺點:因為數據不是在堆上,所以你不得不進行一 次復制。

雖然netty的Direct Buffer有這個缺點,但是netty通過內存池來解決這個問題。直接緩沖池不支持數組訪問數據,但可以通過間接的方式訪問數據數組:

ByteBuf directBuf = ...;
if (!directBuf.hasArray()) {
  int length = directBuf.readableBytes();
  byte[] array = new byte[length];
  directBuf.getBytes(directBuf.readerIndex(), array);
  handleArray(array, 0, length);
}

不過對於一些IO通信線程中讀寫緩沖時建議使用DirectByteBuffer,因為這涉及到大量的IO數據讀寫。對於后端的業務消息的編解碼模塊使用HeapByteBuffer。

2.2.3、Composite Buffer 復合緩沖區

第三種也是最后一種模式使用的是復合緩沖區,它為多個 ByteBuf 提供一個聚合視圖。在 這里你可以根據需要添加或者刪除 ByteBuf 實例,這是一個 JDK 的 ByteBuffer 實現完全缺 失的特性。

Netty 通過一個 ByteBuf 子類——CompositeByteBuf——實現了這個模式,它提供了一 個將多個緩沖區表示為單個合並緩沖區的虛擬表示

Netty提供了Composite ByteBuf來處理復合緩沖區。例如:一條消息由Header和Body組成,將header和body組裝成一條消息發送出去。下圖顯示了Composite ByteBuf組成header和body: 

如果使用的是JDK的ByteBuffer就不能簡單的實現,只能通過創建數組或則新的ByteBuffer,再將里面的內容復制到新的ByteBuffer中,下面給出了一個CompositeByteBuf的使用示例:

//組合緩沖區
CompositeByteBuf compBuf = Unpooled.compositeBuffer();   
//堆緩沖區
ByteBuf heapBuf = Unpooled.buffer(8);   
//直接緩沖區
ByteBuf directBuf = Unpooled.directBuffer(16);   
//添加ByteBuf到CompositeByteBuf   
compBuf.addComponents(heapBuf, directBuf);   
//刪除第一個ByteBuf   
compBuf.removeComponent(0);   
Iterator<ByteBuf> iter = compBuf.iterator();   
while(iter.hasNext()){   
    System.out.println(iter.next().toString());   
}   

//使用數組訪問數據      
if(!compBuf.hasArray()){   
    int len = compBuf.readableBytes();   
    byte[] arr = new byte[len];   
    compBuf.getBytes(0, arr);   
}   

Netty使用了CompositeByteBuf來優化套接字的I/O操作,盡可能地消除了 由JDK的緩沖區實現所導致的性能以及內存使用率的懲罰。( 這尤其適用於 JDK 所使用的一種稱為分散/收集 I/O(Scatter/Gather I/O)的技術,定義為“一種輸入和 輸出的方法,其中,單個系統調用從單個數據流寫到一組緩沖區中,或者,從單個數據源讀到一組緩沖 區中”。《Linux System Programming》,作者 Robert Love(O’Reilly, 2007)) 這種優化發生在Netty的核心代碼中, 因此不會被暴露出來,但是你應該知道它所帶來的影響。

2.3、ByteBuf 字節級操作

2.3.1、隨機訪問索引getByte(i),i是隨機值

ByteBuf提供讀/寫索引,從0開始的索引,第一個字節索引是0,最后一個字節的索引是capacity-1,下面給出一個示例遍歷ByteBuf的字節:

public static void main(String[] args) {
    //創建一個16字節的buffer,這里默認是創建heap buffer
    ByteBuf buf = Unpooled.buffer(16);
    //寫數據到buffer
    for(int i=0; i<16; i++){
        buf.writeByte(i+1);
    }
    //讀數據
    for(int i=0; i<buf.capacity(); i++){
        System.out.print(buf.getByte(i)+", ");
    }
}

/***output:
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
*/

這里有一點需要注意的是:通過那些需要一個索引值參數的方法(getByte(i))之一索引訪問byte時不會改變真實的讀索引和寫索引,我們可以通過ByteBuf的readerIndex()或則writerIndex()函數來分別推進讀索引和寫索引。

2.3.2、順序訪問索引

@Override
public ByteBuf writeByte(int value) {
    ensureAccessible();//檢驗是否可以寫入
    ensureWritable0(1);
    _setByte(writerIndex++, value);//這里寫索引自增了
    return this;
}
@Override
public byte readByte() {
    checkReadableBytes0(1);
    int i = readerIndex;
    byte b = _getByte(i);
    readerIndex = i + 1;//這里讀索引自增了
    return b;
}

 

 

雖然 ByteBuf 同時具有讀索引和寫索引,但是 JDK 的 ByteBuffer 卻只有一個索引,這 也就是為什么必須調用 flip()方法來在讀模式和寫模式之間進行切換的原因。

首先圖 5-3 展示了 ByteBuf 是如何被它的兩個索引划分成 3 個區域的

2.3.3、ByteBuf索引分區

2.3.3.1、可丟棄字節

對於已經讀過的字節,我們需要回收,通過調用ByteBuf.discardReadBytes()來回收已經讀取過的字節,discardReadBytes()將回收從索引0到readerIndex之間的字節。調用discardReadBytes()方法之后會變成如下圖所示; 

雖然你可能會傾向於頻繁地調用 discardReadBytes()方法以確保可寫分段的最大化,但是 請注意,很明顯discardReadBytes()函數很可能會導致內存的復制,它需要移動ByteBuf中可讀字節到開始位置,所以該操作會導致時間開銷。說白了也就是時間換空間。

2.3.3.2、可讀字節

ByteBuf 的可讀字節分段存儲了實際數據。新分配的、包裝的或者復制的緩沖區的默認的 readerIndex 值為 0。任何名稱以 read 或者 skip 開頭的操作都將檢索或者跳過位於當前 readerIndex 的數據,並且將它增加已讀字節數。

當我們讀取字節的時候,一般要先判斷buffer中是否有字節可讀,這時候可以調用isReadable()函數來判斷:源碼如下:

@Override
public boolean isReadable() {
    return writerIndex > readerIndex;
}

2.3.3.3、可寫字節

可寫字節分段是指一個擁有未定義內容的、寫入就緒的內存區域。新分配的緩沖區的 writerIndex 的默認值為 0。任何名稱以 write 開頭的操作都將從當前的 writerIndex 處 開始寫數據,並將它增加已經寫入的字節數。如果寫操作的目標也是 ByteBuf,並且沒有指定 源索引的值,則源緩沖區的 readerIndex 也同樣會被增加相同的大小。

其實也就是判斷 讀索引是否小於寫索引 來判斷是否還可以讀取字節。在判斷是否可寫時也是判斷寫索引是否小於最大容量來判斷。

@Override
public boolean isWritable() {
    return capacity() > writerIndex;
}

 

清除緩沖區

清除ByteBuf來說,有兩種形式,第一種是clear()函數:源碼如下:

@Override public ByteBuf clear() { readerIndex = writerIndex = 0; return this; }
  • 1
  • 2
  • 3
  • 4
  • 5

很明顯這種方式並沒有真實的清除緩沖區中的數據,而只是把讀/寫索引值重新都置為0了,這與discardReadBytes()方法有很大的區別。

標記Mark和重置reset

從源碼可知,每個ByteBuf有兩個標注索引,

private int markedReaderIndex;//標記讀索引 private int markedWriterIndex;//標記寫索引
  • 1
  • 2

可以通過重置方法返回上次標記的索引的位置。

衍生的緩沖區

調用duplicate()、slice()、slice(int index, int length)等方法可以創建一個現有緩沖區的視圖(現有緩沖區與原有緩沖區是指向相同內存)。衍生的緩沖區有獨立的readerIndex和writerIndex和標記索引。如果需要現有的緩沖區的全新副本,可以使用copy()獲得。

4. 創建ByteBuf的方法

前面我們也講過了,ByteBuf主要有三種類型,heap、direct和composite類型,下面介紹創建這三種Buffer的方法: 
(1)通過ByteBufAllocator這個接口來創建ByteBuf,這個接口可以創建上面的三種Buffer,一般都是通過channel的alloc()接口獲取。

(2)通過Unpooled類里面的靜態方法,創建Buffer

CompositeByteBuf compBuf = Unpooled.compositeBuffer();           
ByteBuf heapBuf = Unpooled.buffer(8); ByteBuf directBuf = Unpooled.directBuffer(16); 
  • 1
  • 2
  • 3

還有一點就是,ByteBuf里面的數據都是保存在字節數組里面的:

byte[] array;
  • 1

5. ByteBuf與ByteBuffer的對比:

先來說說ByteBuffer的缺點: 
(1)下面是NIO中ByteBuffer存儲字節的字節數組的定義,我們可以知道ByteBuffer的字節數組是被定義成final的,也就是長度固定。一旦分配完成就不能擴容和收縮,靈活性低,而且當待存儲的對象字節很大可能出現數組越界,用戶使用起來稍不小心就可能出現異常。如果要避免越界,在存儲之前就要只要需求字節大小,如果buffer的空間不夠就創建一個更大的新的ByteBuffer,再將之前的Buffer中數據復制過去,這樣的效率是奇低的。

final byte[] hb;// Non-null only for heap buffers
  • 1

(2)ByteBuffer只用了一個position指針來標識位置,讀寫模式切換時需要調用flip()函數和rewind()函數,使用起來需要非常小心,不然很容易出錯誤。

下面說說對應的ByteBuf的優點: 
(1)ByteBuf是吸取ByteBuffer的缺點之后重新設計,存儲字節的數組是動態的,最大是Integer.MAX_VALUE。這里的動態性存在write操作中,write時得知buffer不夠時,會自動擴容。

(2) ByteBuf的讀寫索引分離,使用起來十分方便。此外ByteBuf還新增了很多方便實用的功能。

6. ByteBuf的引用計數類AbstractReferenceCountedByteBuf分析

看類名我們就可以知道,該類主要是對引用進行計數,有點類似於JVM中判斷對象是否可回收的引用計數算法。這個類主要是根據ByteBuf的引用次數判斷ByteBuf是否可被自動回收。下面來看看源碼:

成員變量

private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater; //靜態代碼段初始化refCntUpdater static { AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; } private volatile int refCnt = 1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

首先我們能看到refCntUpdater這個變量,這是一個原子變量類AtomicIntegerFieldUpdater,她是一個靜態變量,而且是在static代碼段里面實例化的,這說明這個類是單例的。這個類的主要作用是以原子的方式對成員變量進行更新操作以實現線程安全(這里線程安全的保證也就是CAS+volatile)。

然后是定義了refCnt變量,用於跟蹤對象的引用次數,使用volatile修飾解決原子變量可視性問題。

對象引用計數器 
那么,對對象的引用計數與釋放是怎么實現的呢?核心就是兩個函數:

//計數加1 retain(); //計數減一 release();
  • 1
  • 2
  • 3
  • 4
  • 5

下面分析這兩個函數源碼: 
每調用一次retain()函數一次,引用計數器就會加一,由於可能存在多線程並發使用的情景,所以必須保證累加操作是線程安全的,那么是怎么保證的呢?我們來看一下源碼:

public ByteBuf retain() { return retain0(1); } public ByteBuf retain(int increment) { return retain0(checkPositive(increment, "increment")); } /** 最后都是調用這個函數。 */ private ByteBuf retain0(int increment) { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow. if (nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

在retain0()函數中, 通過for(;;)來實現了自旋鎖。通過自旋來對引用計數器refCnt執行加1操作。這里的加一操作是通過原子變量refCntUpdater的compareAndSet(this, refCnt, nextCnt)方法實現的,這個通過硬件級別的CAS保證了原子性,如果修改失敗了就會不停的自旋,直到修改成功為止。

下面再看看釋放的過程:release()函數:

private boolean release0(int decrement) { for (;;) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

這里基本和retain()函數一樣,也是通過自旋和CAS保證執行的正確的將計數器減一。這里需要注意的是當refCnt == decrement 也就是引用對象不可達時,就需要調用deallocate();方法來釋放ByteBuf對象。

7. UnpooledHeapByteBuf源碼分析

從類名就可以知道UnpooledHeapByteBuf 是基於堆內存的字節緩沖區,沒有基於對象池實現,這意味着每次的IO讀寫都會創建一個UnpooledHeapByteBuf對象,會造成一定的性能影響,但是也不容易出現內存管理的問題。

這里寫圖片描述 
成員變量 
有三個成員變量,各自的含義見注釋。

//緩沖區分配器,用於UnpooledHeapByteBuf的內存分配。在UnpooledHeapByteBuf構造器中實例化 private final ByteBufAllocator alloc; //字節數組作為緩沖區 byte[] array; //實現ByteBuf與NIO中ByteBuffer的轉換 private ByteBuffer tmpNioBuf;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

動態擴展緩沖區 
在說道AbstractByteBuf的時候,ByteBuf是可以自動擴展緩沖區大小的,這里我們分析一下在UnpooledHeapByteBuf中是怎么實現的。

public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

里面的實現並不復雜: 
(1)首先獲取原本的容量oldCapacity;

(2)如果新需求容量大於oldCapacity,以新的容量newCapacity創建字節數組,將原來的字節數組內容通過調用System.arraycopy(array, 0, newArray, 0, array.length);復制過去,並將新的字節數組設為ByteBuf的字節數組。

(3)如果新需求容量小於oldCapacity就不需要動態擴展,但是需要截取出一段新緩沖區。

8. PooledDirectByteBuf 內存池原理分析

PooledDirectByteBuf基於內存池實現的,具體的內存池的實現原理,比較復雜,我沒分析清楚,具體的只知道,內存池就是一片提前申請的內存,當需要ByteBuf的時候,就從內存池中申請一片內存,這樣效率比較高。

PooledDirectByteBuf和UnPooledDirectByteBuf基本一樣,唯一不同的就是內存分配策略。

創建字節緩沖區實例 

由於PooledDirectByteBuf基於內存池實現的,所以不能通過new關鍵字直接實例化一個對象,而是直接從內存池中獲取,然后設置引用計數器的值。看下源碼:

static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
  • 1
  • 2
  • 3
  • 4
  • 5

通過RECYCLER對象的get()函數從內存池獲取PooledDirectByteBuf對象。然后在buf.reuse(maxCapacity); 函數里面設置引用計數器為1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM