Netty 系列二(傳輸).


一、前言

    上一篇文章我們提到 Netty 的核心組件是 Channel、回調、Future、ChannelHandler、EventLoop,這篇文章主要是對 Channel (Netty傳入和傳出數據的載體)做一些詳細的講解,以及介紹下 Netty 內置的傳輸類型。

二、傳輸的核心

    傳輸 API 的核心是 interface Channel ,她被用於所有的 I/O 操作。Channel 類的層次結構如圖所示:

    如圖,每個Channel 都會被分配一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設置,並且支持熱更新。ChannelPipeline 是 ChannelHandler鏈的容器,持有所有入站和出站數據以及ChannelHandler 實例。

    由於 Channel 是獨一無二的,所以為了保證順序將 Channel 聲明為java.lang.Compareable的一個子接口,因此每個Channel都有不同的散列碼,否則會報Error。

    Netty的 Channel 實現是線程安全的,因此你可以存儲一個Channel的引用,並且每當你需要向遠程節點寫數據時,都可以使用它,即使當時許多線程都在使用它。

    Channel 的其他方法如下:

    tips:

1、ChannelHandler 的典型用途:

-- 將數據從一種格式轉換為另一種格式。
-- 提供異常的通知。
-- 提供Channel 變為活動的或者非活動的通知。
-- 提供當Channel 注冊到 EventLoop 或者 從 EventLoop 注銷時的通知。
-- 提供有關用戶自定義事件的通知。

2、Netty 所提供的廣泛功能只依賴於少量的接口。這意味着,你可以對你的應用程序邏輯進行重大的修改,而無需大規模的重構你的代碼庫。

三、Netty 內置的傳輸類型

    Netty 內置了一些可開箱即用的傳輸。因為並不是它們所有的傳輸都支持每一種協議,所以你必須選擇一個和你的應用程序所使用的協議都相容的傳輸。

名稱

描述

應用場景

NIO

io.netty.channel.socket.nio

使用java.nio.channels 包作為基礎——基於選擇器的方式

非阻塞I/O使用

Epoll

io.netty.channel.epoll

由 JNI 驅動的 epoll()和非阻塞 IO。 這個傳輸支持只有在 Linux上可用的多種特性,如SO_REUSEPORT,比 NIO 傳輸更快, 而且是完全非阻塞的

Linux上的非阻塞 I/O 使用

OIO

io.netty.channel.socket.oio

使用 java.net 包作為基礎——使用阻塞流

阻塞 I/O 使用

Local

io.netty.channel.local

可以在 VM 內部通過管道進行通信的本地傳輸

客戶端和服務端都使用同個JVM通信

Embedded

io.netty.channel.embedded

Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基於網絡的傳輸。這在測試你的ChannelHandler 實現時非常有用

測試 ChannelHandler 的實現

    1、NIO — 非阻塞I/O

    Java NIO 提供了一個所有I/O操作的全異步實現。其中,選擇器的背后實際上是充當了一個注冊表,如圖展示了該處理流程:

    對於所有Netty的傳輸實現都共有的用戶級別API完全隱藏了Java NIO的實現細節,如上一篇展示的Demo一樣,Netty 這樣使用Java NIO:

    2、Epoll—用於 Linux 的本地非阻塞傳輸

    Netty為Linux提供了一組NIO API, 其以一種和它本身的設計更加一致的方式使用epoll,並且以一種更加輕量的方式使用中斷。 如果你的應用程序旨在運行於Linux系統, 那么請考慮利用這個版本的傳輸;你將發現在高負載下它的性能要優於JDK的NIO實現。

    Netty 在代碼中支持 Epoll 也非常簡單,只需做如下的轉改變:

    3、OIO—舊的阻塞 I/O

     Netty是如何能夠使用和用於異步傳輸相同的API來支持OIO的呢?

    上文提到,在NIO中,一個 EventLoop 對應一個線程,一個Channel 綁定一個 EventLoop,而一個EventLoop 可以綁定多個Channel 來實現異步,也就是說一個線程可以處理多個 Channel。而OIO中,一個 EventLoop 僅綁定一個 Channel,也就是說每個線程只處理一個Channel ,這就有點像傳統IO中,在服務端(ServerSocket)寫了一個多線程來處理客戶端的並發請求。

    現在還有一個問題,channel是雙向的,既可以讀,也可以寫。而stream是單向的,OIO中利用 InputStream 來讀,OutputStream 來寫。那么Channel 是如何實現阻塞的讀和寫的呢?答案就是, Netty利用了SO_TIMEOUT這個Socket標志,它指定了等待一個I/O操作完成的最大毫秒數,I/O 操作期間Channel是阻塞的,如果操作在指定的時間間隔內沒有完成,則將會拋出一個SocketTimeout Exception。 Netty將捕獲這個異常並繼續處理循環。在EventLoop下一次運行時,它將再次嘗試。這實際上也是類似於Netty這樣的異步框架能夠支持OIO的唯一方式。

    Netty 在代碼中支持 OIO,也和NIO類似:

    tips:

我從硬盤讀取數據,然后程序一直等,數據讀完后,繼續操作。這種方式是最簡單的,叫 阻塞IO。
我從硬盤讀取數據,然后程序繼續向下執行,等數據讀取完后,通知當前程序(對硬件來說叫中斷,對程序來說叫回調),然后此程序可以立即處理數據,也可以執行完當前操作在讀取數據。叫 非阻塞IO。

    4、Local —— 用於 JVM 內部通信的 Local 傳輸

    Netty 提供了一個Local傳輸, 用於在同一個 JVM 中運行的客戶端和服務器程序之間的異步通信。

    在這個傳輸中,和服務器 Channel 相關聯的 SocketAddress 並沒有綁定物理網絡地址;相反,只要服務器還在運行, 它就會被存儲在注冊表里,並在 Channel 關閉時注銷。 因為這個傳輸並不接受真正的網絡流量,所以它並不能夠和其他傳輸實現進行互操作。因此,客戶端希望連接到(在同一個 JVM 中)使用了這個傳輸的服務器端時也必須使用它。

    服務端代碼:

    public void server() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new DefaultEventLoop();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    .channel(LocalServerChannel.class)
                    .childHandler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(new LocalAddress("foo")).sync();
            System.out.println(EchoServer.class.getName() + "--started and listening for connections on--" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }
View Code

    客戶端代碼:

    public void client() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(LocalChannel.class)
                    .handler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new LocalAddress("foo")).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
View Code

    備注:現在客戶端和服務端的連接一直報一個異常,查了很多資料,也看了 Github 上的諸多demo,仍然沒有解決。有沒有大神幫我解答下?

    5、Embedded

    Netty 提供了一種額外的傳輸, 使得你可以將一組 ChannelHandler 作為幫助器類嵌入到其他的 ChannelHandler 內部。 通過這種方式,你將可以擴展一個 ChannelHandler 的功能,而又不需要修改其內部代碼。

    Embedded 傳輸的關鍵是一個被稱為 EmbeddedChannel 的具體的Channel實現。

    如果你想要為自己的 ChannelHandler 實現編寫單元測試, 那么請考慮使用 Embedded 傳輸。

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM