Netty中如何寫大型數據


 

因為網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。由於寫操作是非阻塞的,所以即使沒有寫出所有的數據,寫操作也會在完成時返回並通知ChannelFuture。當這種情況發生時,如果仍然不停地寫入,就有內存耗盡的風險。所以在寫大型數據時,需要准備好處理到遠程節點的連接是慢速連接的情況,這種情況會導致內存釋放的延遲。讓我們考慮下將一個文件內容寫出到網絡的情況。

我們知道在傳輸的過程中,由於NIO的零拷貝特性,這種特性消除了將文件的內容從文件系統移動到網絡棧的復制過程。所有的這一切都發生在Netty的核心中,所以應用程序所有需要做的就是使用一個FileRegion接口的實現,其在Netty的API文檔中的定義是:“通過支持零拷貝的文件傳輸的Channel來發送的文件區域。”

下面的代碼清單展示了如何通過從FileInputStream創建一個DefaultFileRegion,並將其寫入Channel,從而利用零拷貝特性來傳輸一個文件的內容。

使用FileRegion傳輸文件的內容

FileInputStream in = new FileInputStream(file); // 創建一個FileInputStream
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, // 以該文件的完整長度創建一個新的DefaultFileRegion
        file.length());channel.writeAndFlush(region).addListener(// 發送該DefaultFileRegion,並注冊一個ChannelFutureListener

  new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { Throwable cause = future.cause();// 處理失敗 // Do something } } });

這個示例只適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理。在需要將數據從文件系統復制到用戶內存中時,可以使用ChunkedWriteHandler,它支持異步寫大型數據流,而又不會導致大量的內存消耗。
關鍵是interfaceChunkedInput<B>,其中類型參數B是readChunk()方法返回的類型。Netty預置了該接口的4個實現,如下表中所列出的。每個都代表了一個將由ChunkedWriteHandler處理的不定長度的數據流。

ChunkedInput 的實現

ChunkedFile 從文件中逐塊獲取數據,當你的平台不支持零拷貝或者你需要轉換數據時使用
ChunkedNioFile 和ChunkedFile 類似,只是它使用了FileChannel
ChunkedStream 從InputStream 中逐塊傳輸內容
ChunkedNioStream 從ReadableByteChannel 中逐塊傳輸內容

下面代碼清單說明了ChunkedStream的用法,它是實踐中最常用的實現。所示的類使用了一個File以及一個SslContext進行實例化。當initChannel()方法被調用時,它將使用所示的ChannelHandler 鏈初始化該Channel。
當Channel 的狀態變為活動的時,WriteStreamHandler 將會逐塊地把來自文件中的數據作為ChunkedStream 寫入。數據在傳輸之前將會由SslHandler加密。

使用ChunkedStream 傳輸文件內容

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());// 將SslHandler添加到ChannelPipeline中
        pipeline.addLast(new ChunkedWriteHandler());// 添加ChunkedWriteHandler以處理作為ChunkedInput傳入的數據
        pipeline.addLast(new WriteStreamHandler());// 一旦連接建立,WriteStreamHandler就開始寫文件數據
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {// 當連接建立時,channelActive()方法將使用ChunkedInput寫文件數據 super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

逐塊輸入要使用你自己的ChunkedInput實現,請在ChannelPipeline中安裝一個ChunkedWriteHandler。在本節中,我們討論了如何通過使用零拷貝特性來高效地傳輸文件,以及如何通過使用ChunkedWriteHandler來寫大型數據而又不必冒着導致OutOfMemoryError的風險。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM