Netty ByteBuf(圖解 )之一
瘋狂創客圈 Java 分布式聊天室【 億級流量】實戰系列之15 【 博客園 總入口 】
源碼工程
源碼IDEA工程獲取鏈接:Java 聊天室 實戰 源碼
寫在前面
大家好,我是作者尼恩。
今天是百萬級流量 Netty 聊天器 打造的系列文章的第15篇,這是一個基礎篇。
由於關於ByteBuf的內容比較多,分兩篇文章:
第一篇:圖解 ByteBuf的分配、釋放和如何避免內存泄露
第二篇:圖解 ByteBuf的具體使用
本篇為第一篇。
Netty ByteBuf 優勢
Netty 提供了ByteBuf,來替代Java NIO的 ByteBuffer 緩,來操縱內存緩沖區。
與Java NIO的 ByteBuffer 相比,ByteBuf的優勢如下:
-
Pooling (池化,這點減少了內存復制和GC,提升效率)
-
可以自定義緩沖類型
-
通過一個內置的復合緩沖類型實現零拷貝
-
擴展性好,比如 StringBuffer
-
不需要調用 flip()來切換讀/寫模式
-
讀取和寫入索引分開
-
方法鏈
-
引用計數
手動獲取與釋放ByteBuf
Netty環境下,業務處理的代碼,基本上都在Handler處理器中的各個入站和出站方法中。
一般情況下,采用如下方法獲取一個Java 堆中的緩沖區:
ByteBuf heapBuffer = ctx.alloc().heapBuffer();
使用完成后,通過如下的方法,釋放緩沖區:
ReferenceCountUtil.release(heapBuffer );
上面的代碼很簡單,通過release方法減去 heapBuffer 的使用計數,Netty 會自動回收 heapBuffer 。
緩沖區內存的回收、二次分配等管理工作,是 Netty 自動完成的。
自動獲取和釋放 ByteBuf
方式一:TailHandler 自動釋放
Netty默認會在ChannelPipline的最后添加的那個 TailHandler 幫你完成 ByteBuf的release。
先看看,自動創建的ByteBuf實例是如何登場的?
Netty自動創建 ByteBuf實例
Netty 的 Reactor 線程會在 AbstractNioByteChannel.NioByteUnsafe.read() 處調用 ByteBufAllocator創建ByteBuf實例,將TCP緩沖區的數據讀取到 Bytebuf 實例中,並調用 pipeline.fireChannelRead(byteBuf) 進入pipeline 入站處理流水線。
默認情況下,TailHandler自動釋放掉ByteBuf實例
Netty的ChannelPipleline的流水線的末端是TailHandler,默認情況下如果每個入站處理器Handler都把消息往下傳,TailHandler會釋放掉ReferenceCounted類型的消息。
說明:
上圖中,TailHandler 寫成了TailContext,這個是沒有錯的。
對於流水線的頭部和尾部Hander來說, Context和Hander ,是同一個類。
HeadContext 與HeadHandler ,也是同一個類。
關於Context與Handler 的關系,請看 瘋狂創客圈 的系列文章。
如果沒有到達末端呢?
一種沒有到達入站處理流水線pipeline末端的情況,如下圖所示:
這種場景下,也有一種自動釋放的解決辦法,它就是:
可以繼承 SimpleChannelInboundHandler,實現業務Handler。 SimpleChannelInboundHandler 會完成ByteBuf 的自動釋放,釋放的處理工作,在其入站處理方法 channelRead 中。
方式二:SimpleChannelInboundHandler 自動釋放
如果業務Handler需要將 ChannelPipleline的流水線的默認處理流程截斷,不進行后邊的inbound入站處理操作,這時候末端 TailHandler自動釋放緩沖區的工作,自然就失效了。
這種場景下,業務Handler 有兩種選擇:
-
手動釋放 ByteBuf 實例
-
繼承 SimpleChannelInboundHandler,利用它的自動釋放功能。
本小節,我們聚焦的是第二種選擇:看看 SimpleChannelInboundHandler是如何自動釋放的。
利用這種方法,業務處理Handler 必須繼承 SimpleChannelInboundHandler基類。並且,業務處理的代碼,必須 移動到 重寫的 channelRead0(ctx, msg)方法中。
如果好奇,想看看 SimpleChannelInboundHandler 是如何釋放ByteBuf 的,那就一起來看看Netty源碼。
截取的代碼如下所示:
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
{
//...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
源碼中,執行完重寫的channelRead0()后,在 finally 語句塊中,ByteBuf 的生命被結束掉了。
上面兩種,都是入站處理(inbound)過程中的自動釋放。
出站處理(outbound)流程,又是如何自動釋放呢?
方式三:HeadHandler 自動釋放
出站處理流程中,申請分配到的 ByteBuf,通過 HeadHandler 完成自動釋放。
出站處理用到的 Bytebuf 緩沖區,一般是要發送的消息,通常由應用所申請。在出站流程開始的時候,通過調用 ctx.writeAndFlush(msg),Bytebuf 緩沖區開始進入出站處理的 pipeline 流水線 。在每一個出站Handler中的處理完成后,最后消息會來到出站的最后一棒 HeadHandler,再經過一輪復雜的調用,在flush完成后終將被release掉。
強調一下,HeadContext (HeadHandler)是出站處理流程的最后一棒。
出站處理的全過程,請查看瘋狂創客圈的專門文章。
如何避免內存泄露
基本上,在 Netty的開發中,通過 ChannelHandlerContext 或 Channel 獲取的緩沖區ByteBuf 默認都是Pooled,所以需要再合適的時機對其進行釋放,避免造成內存泄漏。
自動釋放的注意事項
我們已經知道了三種自動釋放方法:
-
通過 TailHandler 自動釋放入站 ByteBuf
-
繼承 SimpleChannelInboundHandler 的完成 入站ByteBuf 自動釋放
-
通過HeadHandler自動釋放出站 ByteBuf
自動釋放,注意事項如下:
-
入站處理流程中,如果對原消息不做處理,默認會調用 ctx.fireChannelRead(msg) 把原消息往下傳,由流水線最后一棒 TailHandler 完成自動釋放。
-
如果截斷了入站處理流水線,則可以繼承 SimpleChannelInboundHandler ,完成入站ByteBuf 自動釋放。
-
出站處理過程中,申請分配到的 ByteBuf,通過 HeadHandler 完成自動釋放。
出站處理用到的 Bytebuf 緩沖區,一般是要發送的消息,通常由應用所申請。在出站流程開始的時候,通過調用 ctx.writeAndFlush(msg),Bytebuf 緩沖區開始進入出站處理的 pipeline 流水線 。在每一個出站Handler中的處理完成后,最后消息會來到出站的最后一棒 HeadHandler,再經過一輪復雜的調用,在flush完成后終將被release掉。
手動釋放的注意事項
手動釋放是自動釋放的重要補充和輔助。
手動釋放操作,大致有如下注意事項:
-
入站處理中,如果將原消息轉化為新的消息並調用 ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉;
-
入站處理中,如果已經不再調用 ctx.fireChannelRead(msg) 傳遞任何消息,也沒有繼承SimpleChannelInboundHandler 完成自動釋放,那更要把原消息release掉;
-
多層的異常處理機制,有些異常處理的地方不一定准確知道ByteBuf之前釋放了沒有,可以在釋放前加上引用計數大於0的判斷避免異常; 有時候不清楚ByteBuf被引用了多少次,但又必須在此進行徹底的釋放,可以循環調用reelase()直到返回true。
特別需要強調的,是上邊的第一種情況。
如果在入站處理的 handlers 傳遞過程中,傳遞了新的ByteBuf 值,老ByteBuf 值需要自己手動釋放。老的ByteBuf 值,就是從pipeline流水線入口傳遞過來的 ByteBuf 實例。
總之,只要是在傳遞過程中,沒有傳遞下去的ByteBuf就需要手動釋放,避免不必要的內存泄露。
緩沖區 Allocator 分配器
Netty通過 ByteBufAllocator分配緩沖區。
Netty提供了ByteBufAllocator的兩種實現:PoolByteBufAllocator和UnpooledByteBufAllocator。前者將ByteBuf實例放入池中,提高了性能,將內存碎片減少到最小。這個實現采用了一種內存分配的高效策略,稱為 jemalloc。它已經被好幾種現代操作系統所采用。后者則沒有把ByteBuf放入池中,每次被調用時,返回一個新的ByteBuf實例。
分配器 Allocator的類型
PooledByteBufAllocator:可以重復利用之前分配的內存空間。
為了減少內存的分配回收以及產生的內存碎片,Netty提供了PooledByteBufAllocator 用來分配可回收的ByteBuf,可以把PooledByteBufAllocator 看做一個池子,需要的時候從里面獲取ByteBuf,用完了放回去,以此提高性能。
UnpooledByteBufAllocator:不可重復利用,由JVM GC負責回收。
顧名思義Unpooled就是不會放到池子里,所以根據該分配器分配的ByteBuf,不需要放回池子,由JVM自己GC回收。
這兩個類,都是AbstractByteBufAllocator的子類,AbstractByteBufAllocator實現了一個接口,叫做ByteBufAllocator。
可以做一個對比試驗:
使用UnpooledByteBufAllocator的方式創建ByteBuf的時候,單台24核CPU的服務器,16G內存,剛啟動時候,10000個長連接,每秒所有的連接發一條消息,短時間內,可以看到內存占到10G多點,但隨着系統的運行,內存不斷增長,直到整個系統內存溢出掛掉。
把UnpooledByteBufAllocator換成PooledByteBufAllocator,通過試驗,內存使用量機器能維持在一個連接占用1M左右,內存在10G左右,經常長期的運行測試,發現都能維持在這個數量,系統內存不會崩潰。
默認的分配器
默認的分配器 ByteBufAllocator.DEFAULT ,可以通過 Java 系統參數(SystemProperty )選項 io.netty.allocator.type 去配置,使用字符串值:"unpooled","pooled"。
關於這一段,Netty的源代碼截取如下:
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
Object alloc;
if("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: unpooled (unknown: {})", allocType);
}
不同的Netty版本,源碼不一樣。
上面的代碼,是4.0版本的源碼,默認為UnpooledByteBufAllocator。
而4.1 版本,默認為 PooledByteBufAllocator。因此,4.1版本的代碼,是和上面的代碼稍微有些不同的。
設置通道Channel的分配器
在4.x版本中,UnpooledByteBufAllocator是默認的allocator,盡管其存在某些限制。
現在PooledByteBufAllocator已經廣泛使用一段時間,並且我們有了增強的緩沖區泄漏追蹤機制,所以是時候讓PooledByteBufAllocator成為默認了。
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(...);
}
});
使用Netty帶來的又一個好處就是內存管理。只需一行簡單的配置,就能獲得到內存池帶來的好處。在底層,Netty實現了一個Java版的Jemalloc內存管理庫,為我們做完了所有“臟活累活”!
緩沖區內存的類型
說完了分配器的類型,再來說下緩沖區的類型。
依據內存的管理方不同,分為堆緩存和直接緩存。也就是Heap ByteBuf 和 Direct ByteBuf。另外,為了方便緩沖區進行組合,提供了一種組合緩存區。
三種緩沖區的介紹如下:
使用模式 | 描述 | 優點 | 劣勢 |
---|---|---|---|
堆緩沖區 | 數據存存儲在JVM的堆空間中,又稱為支撐數組,通過 hasArray 來判斷是不是在堆緩沖區中 | 沒使用池化情況下能提供快速的分配和釋放 | 發送之前都會拷貝到直接緩沖區 |
直接緩沖區 | 存儲在物理內存中 | 能獲取超過jvm堆限制大小的空間; 寫入channel比堆緩沖區更快 |
釋放和分配空間昂貴(使用系統的方法) ; 操作時需要復制一次到堆上 |
復合緩沖 | 單個緩沖區合並多個緩沖區表示 | 操作多個更方便 | - |
上面三種緩沖區的類型,無論哪一種,都可以通過池化、非池化的方式,去獲取。
Unpooled 非池化緩沖區的使用方法
Unpooled也是用來創建緩沖區的工具類,Unpooled 的使用也很容易。
看下面代碼:
//創建復合緩沖區
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
//創建堆緩沖區
ByteBuf heapBuf = Unpooled.buffer(8);
//創建直接緩沖區
ByteBuf directBuf = Unpooled.directBuffer(16);
Unpooled 提供了很多方法,詳細方法大致如下:
方法名稱 | 描述 |
---|---|
buffer() buffer(int initialCapacity) buffer(int initialCapacity, int maxCapacity) |
返回 heap ByteBuf |
directBuffer() directBuffer(int initialCapacity) directBuffer(int initialCapacity, intmaxCapacity) |
返回 direct ByteBuf |
compositeBuffer() | 返回 CompositeByteBuf |
copiedBuffer() | 返回 copied ByteBuf |
Unpooled類的應用場景
Unpooled類讓ByteBuf也同樣適用於不需要其他的Netty組件的、無網絡操作的項目,這些項目可以從這個高性能的、可擴展的buffer API中獲益。
寫在最后
至此為止,終於完成ByteBuf的分配、釋放和如何避免內存泄露介紹。
接下來是:
第二篇:圖解 ByteBuf的具體使用
瘋狂創客圈 Java 死磕系列
- Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
- Netty 源碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
- 瘋狂創客圈 【 博客園 總入口 】