我們從名字上就能看出這是一個NIO思想為基礎的IO框架,X是指這個框架可以有多種實現,我們可以從代碼庫 https://github.com/xnio 中發現一個項目xnio-native,里面有用C實現的nio層,就能體會到這個X的含義,可以直接基於操作系統C庫。目前在Xnio中默認的實現是nio-impl,也就是JDK的NIO。我們可以認為xnio是在JDK的NIO之上,進行了擴展,融入了一些JBoss開發人員對於並發訪問異步通信的理解和思考,總結出一套API,並用基於JDK NIO進行實現。
要學習XNIO,必須得有JDK NIO的基礎知識,本文假設讀者已經學習過NIO,如果還沒有可以閱讀參考書籍[1],[2]。另外對Netty有所了解的話,就會融會貫通,比較容易的理解XNIO的基本原理,因為兩個項目有很多相似之處。
XNIO有兩個重要的概念:
1. Channel,是傳輸管道的抽象概念,在NIO的Channel上進行的擴展加強,使用ChannelListener API進行事件通知。在創建Channel時,就賦予IO線程,用於執行所有的ChannelListener回調方法。
2. 區分IO線程和工作線程,創建一個工作線程池可以用來執行阻塞任務。一般情況下,非阻塞的Handler由IO線程執行,而阻塞任務比如Servlet則被調度到工作線程池執行。這樣就很好的區分了阻塞和非阻塞的兩種情形。
我們知道NIO的基本要求是不阻塞當前線程的執行,對於非阻塞請求的結果,可以用兩種方式獲得:一種是對於請求很快返回一個引用(如JDK中Future,XNIO中稱為IoFuture,其中很多方法是類似的),過一段時間再查詢結果;還有一種是當結果就緒時,調用事先注冊的回調方法來通知(如NIO2的CompletionHandler,XNIO的ChannelListener)。顯而易見后者效率更高一些,避免了數據未就緒情景下的無用處理過程。但JDK7之前無法將函數作為方法參數,所以只能用Java的匿名內部類來模擬函數式方法,造成代碼嵌套層次過多,難以理解和維護,所以Netty和XNIO這樣的框架通過調度方法調用過程,簡化了編程工作。
XNIO和Netty的最主要的一個區別是,XNIO繼承重用了JDK NIO的ByteBuffer類,而不像Netty另起爐灶,完全重建自己的ByteBuf體系。我們知道NIO的ByteBuffer使用時有個狀態切換的過程,讀和寫要顯式的通過調用slice, reset等方法切換,就和unix使用vi編輯器編輯和處理文本需要用'i'和Esc切換狀態類似。Netty通過讀寫指針索引值移除了這個“不便操作”,但XNIO保留了和JDK NIO一致的做法。
無論NIO還是Netty,都有heap buffer和direct buffer的概念,前者可以認為是byte數組的封裝,緩沖區存放在堆上,后者可以直接通過調用操作系統的系統調用在內存上分配緩沖,這樣在一些IO操作時,比如從網卡上讀出大量數據,再寫到硬盤文件中,就不必拷貝數據到應用層,直接在操作系統內核或者驅動上進行數據復制。ByteBuffer的管理和應用是NIO最核心的思想,開發人員應該根據應用類型,對其反復調優,做到占用資源最少和效率最大。
XNIO和Netty都對ByteBuffer進行池化管理,簡單來說就是開發者在程序開始時就計划好讀寫緩存區大小,統一分配好放到池中,Xnio中有Pool和Pooled接口用來管理池化緩存區。開發過高並發應用就知道,JVM GC經常出現並難以控制是很頭疼的問題。我們通常在接收網絡數據時,往往簡單的new出一塊數據區,填充,解析,使用,最后丟棄,這種方法隨着大量的數據讀入,必然造成GC反復出現。重用緩存區就可以在這個方面解決一部分問題。
和Netty的ChannelHandler不同,XNIO對應的ChannelListener只有一個方法handleEvent(),也就意味着所有的事件都要經由這個方法。在實際實行過程中,會進行若干狀態機的轉變,比如在服務器端,開始時accept狀態就緒,當連接建立后轉變為可讀或者可寫狀態。請參見下面的例子。
在一些情況下,阻塞的IO調用也是很有用的,比如事務過程中。XNIO也提供了阻塞方法awaitReadable()和awaitWritable()。
利用Stream channel,可以在數據源頭和目的地之間直接讀寫數據,有一種zero-copy(零拷貝)的方式,即讀寫過程使用同一塊緩存區,這樣就不必進行數據拷貝移動過程。XNIO通過封裝NIO FileChannel中的方法transferTo和transferFrom來實現。
還有一種Messgae channel,用來傳輸幀數據,因為有些報文格式是固定長度或者按照某種已知幀式格式定義的,緩沖區的長度可以按照報文幀來定義,當數據填滿后就及時發送,減少了數據長度的計算工作,代碼邏輯簡潔很多,所以這種channel用於傳遞'消息',websocket就是這樣的。
閱讀XNIO時,有幾個詞出現頻率很高,Source表示信息源頭,Sink是信息目的地,Conduit是源頭到目的地管道的抽象。
前面提過,XNIO中有兩類線程:
WORKER_IO_THREADS, IO thread處理非阻塞任務,要保證不做阻塞操作,因為很多連接同時用到這類線程,類似於nodejs中的loop,這個線程只要有任務就去執行,實際配置時每個CPU一個線程比較好。
WORKER_TASK_CORE_THREADS,用於執行阻塞任務,從線程池中獲得,任務完成后返回到線程池中。因為不同應用對應的服務器負載不同,所以不易給出具體數值,一般建議每個CPU core設置10個。
代碼分析,摘自
https://github.com/ecki/xnio-samples/blob/master/src/main/java/org/xnio/samples/SimpleEchoServer.java
服務器:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import org.xnio.ChannelListener;
- import org.xnio.IoUtils;
- import org.xnio.OptionMap;
- import org.xnio.Xnio;
- import org.xnio.XnioWorker;
- import org.xnio.channels.AcceptingChannel;
- import org.xnio.channels.Channels;
- import org.xnio.channels.ConnectedStreamChannel;
- public final class SimpleEchoServer {
- public static void main(String[] args) throws Exception {
- // 定義讀數據listener
- final ChannelListener<ConnectedStreamChannel> readListener =
- new ChannelListener<ConnectedStreamChannel>() {
- public void handleEvent(ConnectedStreamChannel channel) {
- //分配緩沖
- final ByteBuffer buffer = ByteBuffer.allocate(512);
- int res;
- try {
- while ((res = channel.read(buffer)) > 0) {
- //切換到寫的狀態並用阻塞的方式寫回
- buffer.flip();
- Channels.writeBlocking(channel, buffer);
- }
- // 保證全部送出
- Channels.flushBlocking(channel);
- if (res == -1) {
- channel.close();
- } else {
- channel.resumeReads();
- }
- } catch (IOException e) {
- e.printStackTrace();
- IoUtils.safeClose(channel);
- }
- }
- };
- // 創建接收 listener.
- final ChannelListener<AcceptingChannel<ConnectedStreamChannel>> acceptListener =
- new ChannelListener<AcceptingChannel<ConnectedStreamChannel>>() {
- public void handleEvent(
- final AcceptingChannel<ConnectedStreamChannel> channel) {
- try {
- ConnectedStreamChannel accepted;
- // channel就緒,准備接收連接請求
- while ((accepted = channel.accept()) != null) {
- System.out.println("accepted " + accepted.getPeerAddress());
- // 已經連接,設置讀數據listener
- accepted.getReadSetter().set(readListener);
- // 恢復讀的狀態
- accepted.resumeReads();
- }
- } catch (IOException ignored) {
- }
- }
- };
- //創建Xnio實例,並構造XnioWorker
- final XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);
- // 創建server,在本地12345端口上偵聽
- AcceptingChannel<? extends ConnectedStreamChannel> server = worker
- .createStreamServer(new InetSocketAddress(12345),
- acceptListener, OptionMap.EMPTY);
- // 開始接受連接
- server.resumeAccepts();
- System.out.println("Listening on " + server.getLocalAddress());
- }
- }
客戶端:
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.charset.Charset;
- import org.xnio.IoFuture;
- import org.xnio.IoUtils;
- import org.xnio.OptionMap;
- import org.xnio.Xnio;
- import org.xnio.XnioWorker;
- import org.xnio.channels.Channels;
- import org.xnio.channels.ConnectedStreamChannel;
- public final class SimpleHelloWorldBlockingClient {
- public static void main(String[] args) throws Exception {
- final Charset charset = Charset.forName("utf-8");
- //創建Xnio實例,並構造XnioWorker
- final Xnio xnio = Xnio.getInstance();
- final XnioWorker worker = xnio.createWorker(OptionMap.EMPTY);
- try {
- //連接服務器,本地12345端口,注意返回值是IoFuture類型,並不阻塞,返回后可以做些別的事情
- final IoFuture<ConnectedStreamChannel> futureConnection = worker.connectStream(
- new InetSocketAddress("localhost", 12345), null, OptionMap.EMPTY);
- final ConnectedStreamChannel channel = futureConnection.get(); // get是阻塞調用
- try {
- // 發送消息
- Channels.writeBlocking(channel, ByteBuffer.wrap("Hello world!\n".getBytes(charset)));
- // 保證全部送出
- Channels.flushBlocking(channel);
- // 發送EOF
- channel.shutdownWrites();
- System.out.println("Sent greeting string! The response is...");
- ByteBuffer recvBuf = ByteBuffer.allocate(128);
- // 接收消息
- while (Channels.readBlocking(channel, recvBuf) != -1) {
- recvBuf.flip();
- final CharBuffer chars = charset.decode(recvBuf);
- System.out.print(chars);
- recvBuf.clear();
- }
- } finally {
- IoUtils.safeClose(channel);
- }
- } finally {
- worker.shutdown();
- }
- }
- }
[1]: JavaNIO http://www.amazon.com/Java-Nio-Ron-Hitchens/dp/0596002882
[2]: Pro Java 7 NIO.2 http://www.amazon.com/Pro-Java-NIO-2-Anghel-Leonard/dp/1430240113