Netty實戰五之ByteBuf


網絡數據的基本單位總是字節,Java NIO 提供了ByteBuffer作為它的字節容器,但是其過於復雜且繁瑣。

Netty的ByteBuffer替代品是ByteBuf,一個強大的實現,即解決了JDK API的局限性,又為網絡應用程序的開發者提供了更好的API。

1、ByteBuf的API

Netty的數據處理API通過兩個組件暴露——abstract class ByteBuf 和 interface ByteBufHolder。

以下是其優點:

-可以被用戶自定義的緩沖區類型擴展

-通過內置的復合緩沖區類型實現了透明的零拷貝

-容量可以按需增長(類似JDK的StringBuilder)

-在讀和寫這兩個模式之間切換不需要調用ByteBuffer的flip()方法

-讀和寫使用了不同的索引

-支持方法的鏈式調用

-支持引用計數

-支持池化

其他類可用於管理ByteBuf實例的分配,以及執行各種針對於數據容器本身和它所持有的數據的操作。

2、ByteBuf如何工作

因為所有的網絡通信都涉及字節序列的移動,所以高效易用的數據結構明顯是必不可少的。

ByteBuf維護了兩個不同的索引:一個用於讀取、一個用於寫入。當你從ByteBuf讀取時,它的readerIndex將會被遞增已經被讀取的字節數。同樣地,當你寫入BytBuf時,它的writerIndex也會被遞增。下圖展示了一個空ByteBuf的布局結構和狀態。 輸入圖片說明

如果我們打算讀取字節直到readerIndex達到和writeIndex同樣的值時會發生什么,則將會到達“可以讀取的”數據的末尾。就如同視圖讀取超出數組末尾的數據一樣,試圖讀取超出該點的數據將會觸發一個indexOutOfBoundsException。

名稱以read或者write開頭的ByteBuf方法,將會推進其對應的索引,而名稱以set或者get開頭的操作則不會。后面的這些方法將在作為一個參數傳入的一個相對索引上執行操作。

可以指定ByteBuf的最大容量。試圖移動寫索引(即writerIndex)超過這個值將會觸發一個異常。(默認的限制是Integer.MAX_VALUE)

3、ByteBuf的使用模式-堆緩沖區

一個由不同的索引分別控制讀訪問和寫訪問的字節數組。

最常用的ByteBuf模式是將數據存儲在JVM的堆空間中。這種模式被稱為支撐數組(backing array),它能在沒有使用池化的情況下提供快速的分配和釋放。這種方式,非常適合於有遺留的數據需要處理的情況。

ByteBuf directBuf = ...;
        //檢查ByteBuf是否由數組支撐。如果不是,則這是一個直接緩沖區 if (!directBuf.hasArray()){ //獲取可讀字節數 int length = directBuf.readableBytes(); //分配一個新的數組來保存具有該長度的字節數據 byte[] array = new byte[length]; //將字節復制到該數組 directBuf.getBytes(directBuf.readerIndex(),array); //使用數組、偏移量和長度作為參數調用你的方法 handleArray(array, 0 ,length); } 

4、ByteBuf的使用模式-直接緩沖區

直接緩沖區是另外一種ByteBuf模式。我們期望用於對象創建的內存分配永遠都來自於堆中,但這並不是必須的——NIO在JDK1.4中引入的ByteBuffer類允許JVM實現通過本地調用來分配內存。這主要是為了避免在每次調用本地I/O操作之前(或者之后)將緩存區的內容復制到一個中間緩沖區(或者從中間緩沖區把內容復制到緩沖區)。

ByteBuffer的Javadoc明確指出:“直接緩沖區的內容將駐留在常規的會被垃圾回收的堆之外”。這也就解釋了為何直接緩沖區對於網絡數據傳輸是理想的選擇。如果你的數據包含在一個在堆上分配的緩沖區中,那么事實上,在通過套接字發送它之前,JVM將會在內部把你的緩沖區復制到一個直接緩沖區中。

直接緩沖區的主要缺點是,相對於基於堆的緩沖區,它們的分配和釋放都教委昂貴。如果你正在處理遺留代碼,你也可能會遇到另一個缺點:因為數據不是在堆上,所以你不得不進行一次復制。如下代碼所示。顯然,這比使用支撐數組相比工作量更多。

ByteBuf heapBuf = ...;
        //檢查ByteBuf是否有一個支撐數組 //當hasArray()方法返回false時,嘗試訪問支撐數組將觸發一個UnsupportedOperationException //這個模式類似於JDK的ByteBuffer的用法 if (heapBuf.hasArray()){ //如果有,則獲取對該數組的引用 byte[] array = heapBuf.array(); //計算第一個字節的偏移量 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); //獲得可讀字節數 int length = heapBuf.readableBytes(); //使用數組、偏移量和長度作為參數調用你的方法 handleArray(array,offset,length); } 

5、ByteBuf的使用模式-復合緩沖區

它為多個ByteBuf提供一個聚合視圖。在這里你可以根據需要添加或者刪除ByteBuf實例,這是一個JDK的ByteBuffer實現完全缺失的特性。

Netty通過一個ByteBuf子類——CompositeByteBuf——實現了這個模式,他提供了一個將多個緩沖區表示為單個合並緩沖區的虛擬表示。

警告:CompositeByteBuf中的ByteBuf實例可能同時包含直接內存分配和非直接內存分配。如果其中只有一個實例,那么對CompositeByteBuf上的hsaArray()方法的調用將返回該組件上的hasArray()方法的值;否則它將返回false。

為了舉例說明,讓我們考慮一下一個由兩個部分——頭部和主體——組成的將通過HTTP協議傳輸的消息。這兩部分由應用程序的不同模塊產生,將會在消息被發送的時候組裝。該應用程序可以選擇多個消息重用相同的消息主體。當這種情況發生時,對於每個消息都將會創建一個新的頭部。

因為我們不想為每個消息都重新分配這兩個緩沖區,所以使用CompositeByteBuf是一個完美的選擇。它在消除了沒必要的復制的同時,暴露了通用的ByteBuf API。

輸入圖片說明

以下代碼展示了如何通過使用JDK的ByteBuffer來實現這一需求。創建一個包含兩個ByteBuffer的數組用來保存這些消息組件,同時創建了第三個ByteBuffer用來保存所有這些數據的副本。

//Use an array to hold message parts ByteBuffer[] message = new ByteBuffer[]{header,body}; //Create a new ByteBuffer and use copy to merge the header and body ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining()); message2.put(header); message2.put(body); message2.flip(); 

分配和復制操作,以及伴隨着數組管理的需要,使得這個版本的實現效率低下而且笨拙。

CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
        ByteBuf headerBuf = ...;//can be backing or direct ByteBuf bodyBuf = ...;//can be backing or direct //將ByteBuf實例追加到CompositeByteBuf messageBuf.addComponents(headerBuf,bodyBuf); ...... //刪除位於索引位置為0(第一個組件)的ByteBuf messageBuf.removeComponent(0);//remove the header //循環遍歷所有的ByteBuf實例 for(ByteBuf buf: messageBuf){ System.out.println(buf.toString()); } 

CompositeByteBuf可能不支持訪問其支撐數組,因此訪問CompositeByteBuf中的數據類似於(訪問)直接緩沖區的模式。

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
        //獲得可讀字節數 int length = compBuf.readableBytes(); //分配一個具有可讀字節數長度的新數組 byte[] array = new byte[length]; //將字節讀到該數組中 compBuf.getBytes(compBuf.readerIndex(),array); //使用偏移量和長度作為參數使用該數組 handleArray(array,0,array.length); 

需要注意的是,Netty使用了CompositeByteBuf來優化套接字的I/O操作,盡可能地消除了由JDK的緩沖區實現所導致的性能以及內存使用率的懲罰。這種優化發生在Netty的核心代碼中,因此不會被暴露出來,但是你應該知道它帶來的影響。

6、字節級操作——隨機訪問索引

如同在普通的Java字節數組中一樣,ByteBuf的索引是從零開始的:第一個字節的索引是0,最后一個字節總是capacity()-1.以下代碼表明,對存儲機制的封裝使得遍歷ByteBuf的內容非常簡單。

ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++){ byte b = buffer.getByte(i); System.out.println((char)b); } 

需要注意的是,使用那些需要一個索引值參數的方法之一來訪問數據既不會改變readerIndex也不會改變weriterIndex。如果有需要,也可以通過調用readerIndex(index)或者writerIndex(index)來手動移動這兩者。

7、字節級操作——順序訪問索引

雖然ByteBuf同時具有讀索引和寫索引,但是JDK的ByteBuffer卻只有一個索引,這也就是為什么必須調用flip()方法來在讀模式和寫模式之間進行切換的原因。下圖展示了ByteBuf是如何被它的兩個索引划分成3個區域的。 輸入圖片說明

8、字節級操作——可丟棄字節

在上圖中標記為可丟棄字節的分段包含了已經被讀過的字節。通過調用discardReadBytes()方法,可以丟棄它們並回收空間。這個分段的初始大小為0,存儲在readerIndex中,會隨着read操作的執行而增加(get*操作不會移動readerIndex)。

下圖展示了上圖所展示的緩沖區上調用discardReadBytes()方法后的結果。可以看到,可丟棄字節分段中的空間已經變為可寫的了。注意,在調用discardReadBytes()之后,對可寫分段的內容並沒有任何的保證。(因為只是移動了可以讀取的字節以及writerIndex,而沒有對所有可寫入的字節進行擦除寫。) 輸入圖片說明

雖然你可能會傾向於頻繁地調用discardReadBytes()方法以確保可寫分段的最大化,但是請注意,這將極有可能會導致內存復制,因為可讀字節(圖中標記為CONTENT的部分)必須被移動到緩沖區的開始位置。我們建議只在真正需要的時候才這樣做,例如,當內存非常寶貴的時候。

9、字節級操作——可讀字節

ByteBuf的可讀字節分段存儲了實際數據。新分配的、包裝的或者復制的緩沖區的默認的readerIndex值為0。任何名稱以read或者skip開頭的操作都將檢索或者跳過位於當前readerIndex的數據,並且將它增加已讀字節數。

如果被調用的方法需要一個ByteBuf參數作為寫入的目標,並且沒有指定目標索引參數,那么該目標緩沖區的writerIndex也將被增加,例如: readBytes(ByteBuf dest);

如果嘗試在緩沖區的可讀字節數已經耗盡時從中讀取數據,那么將會引發一個IndexOutOfBoundsException。

下圖展示了如何讀取所有可以讀的字節。

ByteBuf buffer = ...; while (buffer.isReadable()){ System.out.println(buffer.readByte()); } 

10、字節級操作——可寫字節

可寫字節分段是指一個擁有未定義內容的、寫入就緒的內存區域。新分配的緩沖區的writeIndex的默認值為0.任何名稱以write開頭的操作都將從當前的writeIndex處開始寫數據,並將它增加已經寫入的字節數。如果寫操作的目標也是ByteBuf,並且沒有指定源索引的值,則源緩沖區的readerIndex也同樣會被增加相同的大小。這個調用如下所示:

writeBytes(ByteBuf dest);

如果嘗試往目標寫入超過目標容量的數據,將會引發一個IndexOutOfBoundException。以下代碼是一個用隨機整數值填充緩存區,直到它空間不足為止的例子。writeableBytes()方法在這里被用來確定該緩沖區中是否還有足夠的空間。

 //Fills the writable bytes of a buffer with random integers ByteBuf buffer = ...; while (buffer.writableBytes() >= 4){ buffer.writeInt(random.nextInt()); } 

11、索引管理

JDK的InputStream定義了mark(int readlimit)和reset()方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置。

同樣,可以通過調用markReaderIndex()、markWriterindex()、resetWriterIndex()和resetReaderIndex()來標記和重置ByteBuf的readerIndex和writerIndex。這些和InputStream上的調用類似,只是沒有readlimit參數來指定標記什么時候失效。

也可以通過調用readerIndex(int)或者writerIndex(int)來將索引移動到指定位置。試圖將任何一個索引設置到一個無效的位置都將導致一個IndexOutOfBoundsException。

可以通過調用clear()方法來將readerIndex和writerIndex都設置為0.注意,這並不會清除內存中的內容。 輸入圖片說明

和之前一樣,ByteBuf包含3個分段,下圖展示了在clear()方法被調用之后ByteBuf的狀態。 輸入圖片說明

調用clear()比調用discardReadBytes()輕量得多,因為它將只是重置索引而不會復制任何的內存。

12、查找操作

在ByteBuf中有多種可以用來確定指定值的索引的方法,最簡單的是使用indexOf()方法。較復雜的查找可以通過那些需要一個ByteBufProcessor作為參數的方法達成。這個接口只定義了一個方法:

boolean process(byte value)

它將檢查輸入值是否是正在查找的值

ByteBufProcessor針對一些常見的值定義了許多遍歷的方法。假設你的應用程序需要和所謂的包含有以NULL結尾的內容的FLash套接字集成。調用、 forEachByte(ByteBufProcessor.FIND_NUL) 將簡單高效地消費該Flash數據,因為在處理期間只會執行較少的邊界檢查。 以下代碼展示了一個查找回車符(\r)的例子

ByteBuf buffer = ...;  int index = buffer.forEachByte(ByteBufProcessor.FIND_CR); 

13、派生緩沖區

派生緩沖區為ByteBuf提供了以專門的方式來呈現其內容的視圖。這類視圖是通過以下方法被創建的:

·duplicate()

·slice()

·slice(int,int)

·Unpooled.unmodifiableBuffer(...)

·order(ByteOrder)

·readSlice(int)

每個這些方法都將返回一個新的ByteBuf實例,它具有自己的讀索引、寫索引和標記索引。其內部存儲和JDK的ByteBuffer一樣也是共享的。這使得派生緩沖區的創建成本是很低廉的,但是這也意味着,如果你修改了它的內容,也同時修改了其對應的源實例,所以要小心。

ByteBuf復制 : 如果需要一個現有緩沖區的真實副本,請使用copy()或者copy(int,int)方法,不同於派生緩沖區,由這個調用所返回的ByteBuf擁有獨立的數據副本。

以下代碼展示了如何使用slice(int,int)方法來操作ByteBuf的一個分段

Charset utf8 = Charset.forName("UTF-8"); //創建一個用於保存給定字符串的字節的ByteBuf ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8); //創建該ByteBuf從索引0開始到索引15結束的一個新切片 ByteBuf sliced = buf.slice(0,15); //打印內容 System.out.println(sliced.toString(utf8)); //更新索引0處的字節 buf.setByte(0,(byte)'J'); //將會成功,因為數據是共享的,對其中一個所做的更改對另一個也是可見的 assert buf.getByte(0) == sliced.getByte(0); 

以下讓我們看看,ByteBuf的分段的副本和切片有何區別

Charset utf8 = Charset.forName("UTF-8"); //創建ByteBuf以保存所提供的字符串的字節 ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8); //創建該ByteBuf從索引0開始到索引15結束的分段的副本 ByteBuf copy = buf.copy(0,15); //打印內容 System.out.println(copy.toString(utf8)); //更新索引0處的字節 buf.setByte(0,(byte)'J'); //將會成功,因為數據不是共享的 assert buf.getByte(0) != copy.getByte(0); 

除了修改原始ByteBuf的切片或者副本的效果以外,這兩種場景是相同的。只要有可能,使用slice()方法來避免復制內存的開銷。

14、讀/寫操作

get()和set()操作,從給定的索引開始,並且保持索引不變

read()和write()操作,從給定的索引開始,並且會根據已經訪問過的字節數對索引進行調整。 以下代碼說明了其用法,表明了他們不會改變讀索引和寫索引。

Charset utf8 = Charset.forName("UTF-8"); //創建一個新的ByteBuf以保存給定字符串的字節 ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8); //打印第一個字符‘N’ System.out.println((char)buf.getByte(0)); //存儲當前的readIndex和writeIndex int readerIndex = buf.readerIndex(); int writeIndex = buf.writerIndex(); //將索引0處的字節更新為字符‘B’ buf.setByte(0,(byte)'B'); //打印第一個字符,現在是‘B’ System.out.println((char)buf.getByte(0)); //將會成功,因為這些操作並不會修改相應的索引 assert readerIndex == buf.readerIndex(); assert writeIndex == buf.writerIndex(); 

還有read()操作,其作用於當前的readerIndex或writeIndex。這些方法將用於從ByteBuf中讀取數據。如同它是一個流。

幾乎每個read()方法都有對應的write()方法,用於將數據追加到ByteBuf中,以下代碼展示了read()和write()操作

Charset utf8 = Charset.forName("UTF-8"); //創建一個新的ByteBuf以保存給定字符串的字節 ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8); //打印字符‘N’ System.out.println((char)buf.readByte()); //存儲當前的readerIndex int readerIndex = buf.readerIndex(); //存儲當前的writeIndex int writeIdnex = buf.writerIndex(); //將字符‘?’追加到緩沖區 buf.writeByte((byte)'?'); assert readerIndex == buf.readerIndex(); //將會成功,因為writeByte()方法移動了writeIndex assert writeIdnex != buf.writerIndex(); 

15、ByteBufHolder接口

我們經常發現,除了實際的數據負載之外,我們還需要存儲各種屬性值。HTTP響應便是一個很好的例子,除了表示為字節的內容,還包括狀態碼、cookie等。

為了處理這種常見的用例,Netty提供了ByteBufHolder。ByteBufHolder也為Netty的高級特性提供了支持,如緩沖區池化,其中可以從池中借用ByteBuf,並且在需要時自動釋放。

ByteBufHolder只有幾種用於訪問底層數據和引用計數的方法。

如果想要實現一個將其有效負載存儲在ByteBuf中的消息對象,那么ByteBufHolder將是個不錯的選擇。

16、按需分配:ByteBufAllocator接口

為了降低分配和釋放內存的開銷,Netty通過interface ByteBufAllocator實現了(ByteBuf的)池化,它可以用來分配我們所描述過的任何類型的ByteBuf實例。使用池化是特定於應用程序的決定,其並不會以任何方式改變ByteBuf API。

可以通過Channel(每個都可以有一個不同的ByteBufAllocator實例)或者綁定到ChannelHandler的ChannelHanlderContext獲取一個到ByteBufAllocator的引用。

io.netty.channel.Channel channel = ...;
        ByteBufAllocator allocator = channel.alloc();
        .....
        ChannelHandlerContext ctx = ...;
        ByteBufAllocator allocator1 = ctx.alloc();
        ....

Netty提供了兩種ByteBufAllocator的實現:PooledByteAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的實例以提高性能並最大限度地減少內存碎片。此實現使用了一種稱為jemalloc的已被大量現代操作系統所采用的高效方法來分配內存。后者的實現不池化ByteBuf實例,並且在每次它被調用時都會返回一個新的實例。

雖然Netty默認使用了PooledByteBufAllocator,但這可以很容易地通過ChannelConfig API或者在引導你的應用程序時指定一個不同的分配器來更改。

17、Unpooled緩沖區

可能某些情況下,你未能獲取一個到ByteBufAllocator調用,對於這種情況,Netty提供了一個簡單的成為Unpooled的工具類,它提供了靜態的輔助方法來創建未池化的ByteBuf實例。

Unpooled類還使得ByteBuf同樣可用於那些並不需要Netty的其它組件的非網絡項目,使得其能得益於高性能的可擴展的緩沖區API。

18、ByteBufUtil類

ByteBufUtil提供了用於操作ByteBuf的靜態的輔助方法。因為這個API是通用的,並且和池化無關,所以這些方法已然在分配類的外部實現。

這些靜態方法中最有價值的可能是hexdump()方法,它以十六進制的表示形式打印ByteBuf的內容。這在各種情況下都很有用,例如,出於調試的目的記錄ByteBuf的內容。十六進制的表示通常會提供一個比字節值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的字節表示。

另一個有用的方法是boolean equals(ByteBuf,ByteBuf),它被用來判斷兩個ByteBuf實例的相等性,如果你實現了自己的ByteBuf子類,你可能會發現ByteBufUtil的其它有用方法。

19、引用計數

引用計數是一種通過在某個對象所持有的資源不再被其它對象引用時釋放該對象所持有的資源來優化內存使用和性能的技術。

引用技術背后的想法並不是特別的復雜,他主要設計跟蹤到某個特定對象的活動引用的數量。一個ReferenceCounted實現的實例將通常以活動的引用計數為1作為開始。只要引用計數大於0,就能保證對象不會被釋放。當活動引用的數量減少到0時,該實例就會被釋放。注意,雖然釋放的確切語義可能是特定於實現的,但是至少已經釋放的對象應該不可再用了。

引用技術對於池化實現(PooledByteBufAllocator)來說是至關重要的,它降低了內存分配的開銷。

io.netty.channel.Channel channel = ...; //從Channel獲取ByteBufAllocator ByteBufAllocator allocator = channel.alloc(); ... //從ByteBufAllocator分配一個ByteBuf ByteBuf buffer = allocator.directBuffer(); //檢查引用技術是否為預期的1 assert buffer.refCnt() == 1; 
ByteBuf buffer = ...; //減少到該對象的活動引用。當減少到0時,該對象被釋放,並且該方法返回true boolean released = buffer.release(); 

試圖訪問一個已經被釋放的引用計數對象,將會導致一個IllegalReferenceCountException。 注意,一個特定的(ReferenceCounted的實現)類,可以用它自己的獨特方式來定義它的引用計數規則。例如,我們可以設想一個類,其release()方法的實現總是將引用計數設為零,而不用關心它的當前值,從而一次性地使所有的活動都失效。

誰負責釋放 : 一般來說,是由最后訪問(引用計數)對象的那一方來負責將它釋放。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM