一、前言
上一篇文章我們提到 Netty 的核心組件是 Channel、回調、Future、ChannelHandler、EventLoop,這篇文章主要是對 Channel (Netty傳入和傳出數據的載體)做一些詳細的講解,以及介紹下 Netty 內置的傳輸類型。
二、傳輸的核心
傳輸 API 的核心是 interface Channel ,她被用於所有的 I/O 操作。Channel 類的層次結構如圖所示:
如圖,每個Channel 都會被分配一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設置,並且支持熱更新。ChannelPipeline 是 ChannelHandler鏈的容器,持有所有入站和出站數據以及ChannelHandler 實例。
由於 Channel 是獨一無二的,所以為了保證順序將 Channel 聲明為java.lang.Compareable的一個子接口,因此每個Channel都有不同的散列碼,否則會報Error。
Netty的 Channel 實現是線程安全的,因此你可以存儲一個Channel的引用,並且每當你需要向遠程節點寫數據時,都可以使用它,即使當時許多線程都在使用它。
Channel 的其他方法如下:
tips:
1、ChannelHandler 的典型用途:
-- 將數據從一種格式轉換為另一種格式。
-- 提供異常的通知。
-- 提供Channel 變為活動的或者非活動的通知。
-- 提供當Channel 注冊到 EventLoop 或者 從 EventLoop 注銷時的通知。
-- 提供有關用戶自定義事件的通知。
2、Netty 所提供的廣泛功能只依賴於少量的接口。這意味着,你可以對你的應用程序邏輯進行重大的修改,而無需大規模的重構你的代碼庫。
三、Netty 內置的傳輸類型
Netty 內置了一些可開箱即用的傳輸。因為並不是它們所有的傳輸都支持每一種協議,所以你必須選擇一個和你的應用程序所使用的協議都相容的傳輸。
名稱 |
包 |
描述 |
應用場景 |
NIO |
io.netty.channel.socket.nio |
使用java.nio.channels 包作為基礎——基於選擇器的方式 |
非阻塞I/O使用 |
Epoll |
io.netty.channel.epoll |
由 JNI 驅動的 epoll()和非阻塞 IO。 這個傳輸支持只有在 Linux上可用的多種特性,如SO_REUSEPORT,比 NIO 傳輸更快, 而且是完全非阻塞的 |
Linux上的非阻塞 I/O 使用 |
OIO |
io.netty.channel.socket.oio |
使用 java.net 包作為基礎——使用阻塞流 |
阻塞 I/O 使用 |
Local |
io.netty.channel.local |
可以在 VM 內部通過管道進行通信的本地傳輸 |
客戶端和服務端都使用同個JVM通信 |
Embedded |
io.netty.channel.embedded |
Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基於網絡的傳輸。這在測試你的ChannelHandler 實現時非常有用 |
測試 ChannelHandler 的實現 |
1、NIO — 非阻塞I/O
Java NIO 提供了一個所有I/O操作的全異步實現。其中,選擇器的背后實際上是充當了一個注冊表,如圖展示了該處理流程:
對於所有Netty的傳輸實現都共有的用戶級別API完全隱藏了Java NIO的實現細節,如上一篇展示的Demo一樣,Netty 這樣使用Java NIO:
2、Epoll—用於 Linux 的本地非阻塞傳輸
Netty為Linux提供了一組NIO API, 其以一種和它本身的設計更加一致的方式使用epoll,並且以一種更加輕量的方式使用中斷。 如果你的應用程序旨在運行於Linux系統, 那么請考慮利用這個版本的傳輸;你將發現在高負載下它的性能要優於JDK的NIO實現。
Netty 在代碼中支持 Epoll 也非常簡單,只需做如下的轉改變:
3、OIO—舊的阻塞 I/O
Netty是如何能夠使用和用於異步傳輸相同的API來支持OIO的呢?
上文提到,在NIO中,一個 EventLoop 對應一個線程,一個Channel 綁定一個 EventLoop,而一個EventLoop 可以綁定多個Channel 來實現異步,也就是說一個線程可以處理多個 Channel。而OIO中,一個 EventLoop 僅綁定一個 Channel,也就是說每個線程只處理一個Channel ,這就有點像傳統IO中,在服務端(ServerSocket)寫了一個多線程來處理客戶端的並發請求。
現在還有一個問題,channel是雙向的,既可以讀,也可以寫。而stream是單向的,OIO中利用 InputStream 來讀,OutputStream 來寫。那么Channel 是如何實現阻塞的讀和寫的呢?答案就是, Netty利用了SO_TIMEOUT這個Socket標志,它指定了等待一個I/O操作完成的最大毫秒數,I/O 操作期間Channel是阻塞的,如果操作在指定的時間間隔內沒有完成,則將會拋出一個SocketTimeout Exception。 Netty將捕獲這個異常並繼續處理循環。在EventLoop下一次運行時,它將再次嘗試。這實際上也是類似於Netty這樣的異步框架能夠支持OIO的唯一方式。
Netty 在代碼中支持 OIO,也和NIO類似:
tips:
我從硬盤讀取數據,然后程序一直等,數據讀完后,繼續操作。這種方式是最簡單的,叫 阻塞IO。
我從硬盤讀取數據,然后程序繼續向下執行,等數據讀取完后,通知當前程序(對硬件來說叫中斷,對程序來說叫回調),然后此程序可以立即處理數據,也可以執行完當前操作在讀取數據。叫 非阻塞IO。
4、Local —— 用於 JVM 內部通信的 Local 傳輸
Netty 提供了一個Local傳輸, 用於在同一個 JVM 中運行的客戶端和服務器程序之間的異步通信。
在這個傳輸中,和服務器 Channel 相關聯的 SocketAddress 並沒有綁定物理網絡地址;相反,只要服務器還在運行, 它就會被存儲在注冊表里,並在 Channel 關閉時注銷。 因為這個傳輸並不接受真正的網絡流量,所以它並不能夠和其他傳輸實現進行互操作。因此,客戶端希望連接到(在同一個 JVM 中)使用了這個傳輸的服務器端時也必須使用它。
服務端代碼:

public void server() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new DefaultEventLoop(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group, group) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override protected void initChannel(LocalChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(serverHandler); } }); ChannelFuture channelFuture = bootstrap.bind(new LocalAddress("foo")).sync(); System.out.println(EchoServer.class.getName() + "--started and listening for connections on--" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
客戶端代碼:

public void client() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(LocalChannel.class) .handler(new ChannelInitializer<LocalChannel>() { @Override protected void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(new LocalAddress("foo")).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
備注:現在客戶端和服務端的連接一直報一個異常,查了很多資料,也看了 Github 上的諸多demo,仍然沒有解決。有沒有大神幫我解答下?
5、Embedded
Netty 提供了一種額外的傳輸, 使得你可以將一組 ChannelHandler 作為幫助器類嵌入到其他的 ChannelHandler 內部。 通過這種方式,你將可以擴展一個 ChannelHandler 的功能,而又不需要修改其內部代碼。
Embedded 傳輸的關鍵是一個被稱為 EmbeddedChannel 的具體的Channel實現。
如果你想要為自己的 ChannelHandler 實現編寫單元測試, 那么請考慮使用 Embedded 傳輸。
參考資料:《Netty IN ACTION》