因為網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。由於寫操作是非阻塞的,所以即使沒有寫出所有的數據,寫操作也會在完成時返回並通知ChannelFuture。當這種情況發生時,如果仍然不停地寫入,就有內存耗盡的風險。所以在寫大型數據時,需要准備好處理到遠程節點的連接是慢速連接的情況,這種情況會導致內存釋放的延遲。讓我們考慮下將一個文件內容寫出到網絡的情況。
我們知道在傳輸的過程中,由於NIO的零拷貝特性,這種特性消除了將文件的內容從文件系統移動到網絡棧的復制過程。所有的這一切都發生在Netty的核心中,所以應用程序所有需要做的就是使用一個FileRegion接口的實現,其在Netty的API文檔中的定義是:“通過支持零拷貝的文件傳輸的Channel來發送的文件區域。”
下面的代碼清單展示了如何通過從FileInputStream創建一個DefaultFileRegion,並將其寫入Channel,從而利用零拷貝特性來傳輸一個文件的內容。
使用FileRegion傳輸文件的內容
FileInputStream in = new FileInputStream(file); // 創建一個FileInputStream FileRegion region = new DefaultFileRegion(in.getChannel(), 0, // 以該文件的完整長度創建一個新的DefaultFileRegion file.length());channel.writeAndFlush(region).addListener(// 發送該DefaultFileRegion,並注冊一個ChannelFutureListener
new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { Throwable cause = future.cause();// 處理失敗 // Do something } } });
這個示例只適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理。在需要將數據從文件系統復制到用戶內存中時,可以使用ChunkedWriteHandler,它支持異步寫大型數據流,而又不會導致大量的內存消耗。
關鍵是interfaceChunkedInput<B>,其中類型參數B是readChunk()方法返回的類型。Netty預置了該接口的4個實現,如下表中所列出的。每個都代表了一個將由ChunkedWriteHandler處理的不定長度的數據流。
ChunkedInput 的實現
ChunkedFile 從文件中逐塊獲取數據,當你的平台不支持零拷貝或者你需要轉換數據時使用
ChunkedNioFile 和ChunkedFile 類似,只是它使用了FileChannel
ChunkedStream 從InputStream 中逐塊傳輸內容
ChunkedNioStream 從ReadableByteChannel 中逐塊傳輸內容
下面代碼清單說明了ChunkedStream的用法,它是實踐中最常用的實現。所示的類使用了一個File以及一個SslContext進行實例化。當initChannel()方法被調用時,它將使用所示的ChannelHandler 鏈初始化該Channel。
當Channel 的狀態變為活動的時,WriteStreamHandler 將會逐塊地把來自文件中的數據作為ChunkedStream 寫入。數據在傳輸之前將會由SslHandler加密。
使用ChunkedStream 傳輸文件內容
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> { private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) { this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());// 將SslHandler添加到ChannelPipeline中 pipeline.addLast(new ChunkedWriteHandler());// 添加ChunkedWriteHandler以處理作為ChunkedInput傳入的數據 pipeline.addLast(new WriteStreamHandler());// 一旦連接建立,WriteStreamHandler就開始寫文件數據 } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {// 當連接建立時,channelActive()方法將使用ChunkedInput寫文件數據 super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
逐塊輸入要使用你自己的ChunkedInput實現,請在ChannelPipeline中安裝一個ChunkedWriteHandler。在本節中,我們討論了如何通過使用零拷貝特性來高效地傳輸文件,以及如何通過使用ChunkedWriteHandler來寫大型數據而又不必冒着導致OutOfMemoryError的風險。