Netty中的內存分配是基於ByteBufAllocator這個接口實現的,通過對它的具體實現,可以用來分配我們之前描述過的任意類型的BytebBuf實例;我們先看一下ByteBufAllocator接口中的定義的關鍵方法
一、ByteBufAllocator 構造
public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; //根據具體實現返回基於直接內存或堆內內存的ByteBuf ByteBuf buffer(); //根據具體實現返回一個給定初始容量的基於直接內存或堆內內存的ByteBuf ByteBuf buffer(int initialCapacity); //根據具體實現返回一個給定初始容量與最大量的基於直接內存或堆內內存的ByteBuf ByteBuf buffer(int initialCapacity, int maxCapacity); //返回一個用於套接字操作的ByteBuf ByteBuf ioBuffer(); //返回一個用於套接字操作的給定初始量ByteBuf ByteBuf ioBuffer(int initialCapacity); //返回一個用於套接字操作的給定初始量與最大量的ByteBuf ByteBuf ioBuffer(int initialCapacity, int maxCapacity); //返回一個基於堆內內存的ByteBuf ByteBuf heapBuffer(); //返回一個給定初始量基於堆內內存的ByteBuf ByteBuf heapBuffer(int initialCapacity); //返回一個給定初始量與最大量的基於堆內內存的ByteBuf ByteBuf heapBuffer(int initialCapacity, int maxCapacity); //返回一個基於直接內存的ByteBuf ByteBuf directBuffer(); //返回一個給定初始量基於直接內存的ByteBuf ByteBuf directBuffer(int initialCapacity); //返回一個給定初始量與最大量的基於直接內存的ByteBuf ByteBuf directBuffer(int initialCapacity, int maxCapacity); //返回一個基於復合緩沖區的ByteBuf,基於堆還是直接內存看具體實現 CompositeByteBuf compositeBuffer(); //返回一個給定最大量的基於復合緩沖區的ByteBuf,基於堆還是直接內存看具體實現 CompositeByteBuf compositeBuffer(int maxNumComponents); //返回一個基於堆內存的CompositeByteBuf CompositeByteBuf compositeHeapBuffer(); //返回一個給定最大量基於堆內存的CompositeByteBuf CompositeByteBuf compositeHeapBuffer(int maxNumComponents); //返回一個基於直接內存的CompositeByteBuf CompositeByteBuf compositeDirectBuffer(); //返回一個給定最大量的基於直接內存的CompositeByteBuf CompositeByteBuf compositeDirectBuffer(int maxNumComponents); //直接內存是否池化管理 boolean isDirectBufferPooled(); //計算bytebuf需要擴展時的新容量 int calculateNewCapacity(int minNewCapacity, int maxCapacity); }
可以看到接口中定義的方法基本都是用於分配不同類型的內存,接下來我們看下基於ByteBufAllocator 接口的具體實現類。
看下實現ByteBufAllocator 接口的類結構圖
頂層抽象類AbstractByteBufAllocator下有兩大子類PooledByteBufAllocator與UnpooledByteBufAllocator,分別用於池化與非池化內存的構造;
二、ByteBufAllocator 使用
首先我們需要注意下ChannelOption.ALLOCATOR這個配置項,如果不進行特殊配置, 默認為PooledByteBufAllocator,默認ByteBuf類型為PooledUnsafeDirectByteBuf,這里演示需要改為UnpooledByteBufAllocator
b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
在數據入站Handler中,我們通過使用不同的BufAllocator實現類來分配ByteBuf進行對比,主要關注下分配的ByteBuf的類型與是否池化
public class BuffHandler extends ChannelInboundHandlerAdapter{ PooledByteBufAllocator pdallocator = new PooledByteBufAllocator(true);//池化直接內存 PooledByteBufAllocator pallocator = new PooledByteBufAllocator(false);//池化堆內存 UnpooledByteBufAllocator adllocator = new UnpooledByteBufAllocator(true);//非池化直接內存 UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);//非池化堆內存 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf outBuffer = (ByteBuf) msg; System.err.println("outBuffer is :"+outBuffer.getClass()); System.err.println("outBuffer's model is :"+outBuffer.isDirect()); outBuffer = ByteBufAllocator.DEFAULT.buffer();//ByteBufAllocator默認內存類型 System.err.println("ByteBufAllocator.DEFAULT.buffer() is :"+outBuffer.getClass()); System.err.println("ByteBufAllocator.DEFAULT.buffer()'s model is :"+outBuffer.isDirect()); outBuffer = pdallocator.buffer();////池化直接內存 System.err.println("PooledByteBufAllocator(true) is :"+outBuffer.getClass()); System.err.println("PooledByteBufAllocator(true)'s model is :"+outBuffer.isDirect()); outBuffer = pallocator.buffer();//池化隊堆內存 System.err.println("PooledByteBufAllocator(false) is :"+outBuffer.getClass()); System.err.println("PooledByteBufAllocator(false)'s model is :"+outBuffer.isDirect()); outBuffer = adllocator.buffer();////非池化直接內存 System.err.println("UnpooledByteBufAllocator(true) is :"+outBuffer.getClass()); System.err.println("UnpooledByteBufAllocator(true)'s model is :"+outBuffer.isDirect()); outBuffer = allocator.buffer();//非池化堆內存 System.err.println("UnpooledByteBufAllocator(false) is :"+outBuffer.getClass()); System.err.println("UnpooledByteBufAllocator(false)'s model is :"+outBuffer.isDirect()); byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e}; outBuffer.writeBytes(sendBytes); ctx.writeAndFlush(outBuffer); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
根據BufAllocator具體實現類與preferDirect參數會分配不同類型的ByteBuf,輸出結果如下:
outBuffer is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf outBuffer's model is :true ByteBufAllocator.DEFAULT.buffer() is :class io.netty.buffer.PooledUnsafeDirectByteBuf ByteBufAllocator.DEFAULT.buffer()'s model is :true PooledByteBufAllocator(true) is :class io.netty.buffer.PooledUnsafeDirectByteBuf PooledByteBufAllocator(true)'s model is :true PooledByteBufAllocator(false) is :class io.netty.buffer.PooledUnsafeHeapByteBuf PooledByteBufAllocator(false)'s model is :false UnpooledByteBufAllocator(true) is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf UnpooledByteBufAllocator(true)'s model is :true UnpooledByteBufAllocator(false) is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf UnpooledByteBufAllocator(false)'s model is :false
上面示例中,BufAllocator具體實現基本可以分為以下四種,下面我們對四種分配模式的具體實現進行下追蹤與分析
new PooledByteBufAllocator(true);//池化直接內存 new PooledByteBufAllocator(false);//池化堆內存 new UnpooledByteBufAllocator(true);//非池化直接內存 new UnpooledByteBufAllocator(false);//非池化堆內存
三、ByteBufAllocator 實現
ByteBufAllocator.DEFAULT.buffer()
這里使用了 ByteBufUtil DEFAULT_ALLOCATOR,我們進入ByteBufUtil類內部看下具體實現
static final ByteBufAllocator DEFAULT_ALLOCATOR; static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim();//讀取io.netty.allocator.type配置 //根據配置類型實例化不同類型的BufAllocator實現類 ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { // DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { // DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE); MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024); logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE); }
ByteBufUtil 的靜態構造中會根據io.netty.allocator.type配置的不同實例化不同類型的BufAllocator實現類,下面我們看下BufAllocator的兩個具體實現類PooledByteBufAllocator與UnpooledByteBufAllocator
PooledByteBufAllocator
在PooledByteBufAllocator構造函數中,首先會進行內存池的詳細配置
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); //聲明一個PoolThreadLocalCache用於內存申請 threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); checkPositiveOrZero(nHeapArena, "nHeapArena"); checkPositiveOrZero(nDirectArena, "nDirectArena"); checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment"); if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) { throw new IllegalArgumentException("directMemoryCacheAlignment is not supported"); } if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { throw new IllegalArgumentException("directMemoryCacheAlignment: " + directMemoryCacheAlignment + " (expected: power of two)"); } int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0) { heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null; heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } metric = new PooledByteBufAllocatorMetric(this); }
緊接着看下PooledByteBufAllocator具體的內存分配方法
newDirectBuffer 分配直接內存
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena;//具體的內存分配類PoolArena final ByteBuf buf; if (directArena != null) {//PoolArena就是Netty的內存池實現類。實現具體內存分配 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
heapBuffer 分配堆內內存
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<byte[]> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { //通過PoolArena分配堆內內存 buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
UnpooledByteBufAllocator
由於采用非池化管理,UnpooledByteBufAllocator構造函數中需要指定內存清理策略
public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) { super(preferDirect); this.disableLeakDetector = disableLeakDetector; //內存清理策略,默認noCleaner需要使用 unSafe 的 freeMemory 方法釋放內存 //noCleaner 使用 unSafe.freeMemory(address); //hasCleaner 使用 DirectByteBuffer 的 Cleaner 的 clean 方法。 noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor(); }
緊接着看下UnpooledByteBufAllocator具體的內存分配方法
newDirectBuffer 分配直接內存
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) {//判斷是否適用Unsafe操作 buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf);//是否進行堆外內存泄露監控 }
newHeapBuffer 分配堆內內存
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
四、總結
通過以上內容我們梳理了Netty中BufAllocator的具體實現及分配內存的類型,從內存管理模式上分為池化與非池化,從內存分配類型上分為直接內存與堆內內存,本文我們只是初步對其進行了總結,Netty分配內存的具體實現及精巧設計都還未涉及,后續我們會繼續對其進行進一步的探究,希望本文對大家能有所幫助,其中如有不足與不正確的地方還望指出與海涵。
關注微信公眾號,查看更多技術文章。