Undertow服務器基礎分析 - XNIO


我們從名字上就能看出這是一個NIO思想為基礎的IO框架,X是指這個框架可以有多種實現,我們可以從代碼庫 https://github.com/xnio 中發現一個項目xnio-native,里面有用C實現的nio層,就能體會到這個X的含義,可以直接基於操作系統C庫。目前在Xnio中默認的實現是nio-impl,也就是JDK的NIO。我們可以認為xnio是在JDK的NIO之上,進行了擴展,融入了一些JBoss開發人員對於並發訪問異步通信的理解和思考,總結出一套API,並用基於JDK NIO進行實現。

要學習XNIO,必須得有JDK NIO的基礎知識,本文假設讀者已經學習過NIO,如果還沒有可以閱讀參考書籍[1],[2]。另外對Netty有所了解的話,就會融會貫通,比較容易的理解XNIO的基本原理,因為兩個項目有很多相似之處。

XNIO有兩個重要的概念:
1. Channel,是傳輸管道的抽象概念,在NIO的Channel上進行的擴展加強,使用ChannelListener API進行事件通知。在創建Channel時,就賦予IO線程,用於執行所有的ChannelListener回調方法。
2. 區分IO線程和工作線程,創建一個工作線程池可以用來執行阻塞任務。一般情況下,非阻塞的Handler由IO線程執行,而阻塞任務比如Servlet則被調度到工作線程池執行。這樣就很好的區分了阻塞和非阻塞的兩種情形。

我們知道NIO的基本要求是不阻塞當前線程的執行,對於非阻塞請求的結果,可以用兩種方式獲得:一種是對於請求很快返回一個引用(如JDK中Future,XNIO中稱為IoFuture,其中很多方法是類似的),過一段時間再查詢結果;還有一種是當結果就緒時,調用事先注冊的回調方法來通知(如NIO2的CompletionHandler,XNIO的ChannelListener)。顯而易見后者效率更高一些,避免了數據未就緒情景下的無用處理過程。但JDK7之前無法將函數作為方法參數,所以只能用Java的匿名內部類來模擬函數式方法,造成代碼嵌套層次過多,難以理解和維護,所以Netty和XNIO這樣的框架通過調度方法調用過程,簡化了編程工作。

XNIO和Netty的最主要的一個區別是,XNIO繼承重用了JDK NIO的ByteBuffer類,而不像Netty另起爐灶,完全重建自己的ByteBuf體系。我們知道NIO的ByteBuffer使用時有個狀態切換的過程,讀和寫要顯式的通過調用slice, reset等方法切換,就和unix使用vi編輯器編輯和處理文本需要用'i'和Esc切換狀態類似。Netty通過讀寫指針索引值移除了這個“不便操作”,但XNIO保留了和JDK NIO一致的做法。

無論NIO還是Netty,都有heap buffer和direct buffer的概念,前者可以認為是byte數組的封裝,緩沖區存放在堆上,后者可以直接通過調用操作系統的系統調用在內存上分配緩沖,這樣在一些IO操作時,比如從網卡上讀出大量數據,再寫到硬盤文件中,就不必拷貝數據到應用層,直接在操作系統內核或者驅動上進行數據復制。ByteBuffer的管理和應用是NIO最核心的思想,開發人員應該根據應用類型,對其反復調優,做到占用資源最少和效率最大。

XNIO和Netty都對ByteBuffer進行池化管理,簡單來說就是開發者在程序開始時就計划好讀寫緩存區大小,統一分配好放到池中,Xnio中有Pool和Pooled接口用來管理池化緩存區。開發過高並發應用就知道,JVM GC經常出現並難以控制是很頭疼的問題。我們通常在接收網絡數據時,往往簡單的new出一塊數據區,填充,解析,使用,最后丟棄,這種方法隨着大量的數據讀入,必然造成GC反復出現。重用緩存區就可以在這個方面解決一部分問題。

和Netty的ChannelHandler不同,XNIO對應的ChannelListener只有一個方法handleEvent(),也就意味着所有的事件都要經由這個方法。在實際實行過程中,會進行若干狀態機的轉變,比如在服務器端,開始時accept狀態就緒,當連接建立后轉變為可讀或者可寫狀態。請參見下面的例子。

在一些情況下,阻塞的IO調用也是很有用的,比如事務過程中。XNIO也提供了阻塞方法awaitReadable()和awaitWritable()。

利用Stream channel,可以在數據源頭和目的地之間直接讀寫數據,有一種zero-copy(零拷貝)的方式,即讀寫過程使用同一塊緩存區,這樣就不必進行數據拷貝移動過程。XNIO通過封裝NIO FileChannel中的方法transferTo和transferFrom來實現。

還有一種Messgae channel,用來傳輸幀數據,因為有些報文格式是固定長度或者按照某種已知幀式格式定義的,緩沖區的長度可以按照報文幀來定義,當數據填滿后就及時發送,減少了數據長度的計算工作,代碼邏輯簡潔很多,所以這種channel用於傳遞'消息',websocket就是這樣的。

閱讀XNIO時,有幾個詞出現頻率很高,Source表示信息源頭,Sink是信息目的地,Conduit是源頭到目的地管道的抽象。

前面提過,XNIO中有兩類線程:
WORKER_IO_THREADS, IO thread處理非阻塞任務,要保證不做阻塞操作,因為很多連接同時用到這類線程,類似於nodejs中的loop,這個線程只要有任務就去執行,實際配置時每個CPU一個線程比較好。
WORKER_TASK_CORE_THREADS,用於執行阻塞任務,從線程池中獲得,任務完成后返回到線程池中。因為不同應用對應的服務器負載不同,所以不易給出具體數值,一般建議每個CPU core設置10個。

代碼分析,摘自
https://github.com/ecki/xnio-samples/blob/master/src/main/java/org/xnio/samples/SimpleEchoServer.java

服務器:

Java代碼   收藏代碼
  1. import java.io.IOException;  
  2. import java.net.InetSocketAddress;  
  3. import java.nio.ByteBuffer;  
  4.   
  5. import org.xnio.ChannelListener;  
  6. import org.xnio.IoUtils;  
  7. import org.xnio.OptionMap;  
  8. import org.xnio.Xnio;  
  9. import org.xnio.XnioWorker;  
  10. import org.xnio.channels.AcceptingChannel;  
  11. import org.xnio.channels.Channels;  
  12. import org.xnio.channels.ConnectedStreamChannel;  
  13.   
  14. public final class SimpleEchoServer {  
  15.   
  16.     public static void main(String[] args) throws Exception {  
  17.   
  18.         // 定義讀數據listener  
  19.         final ChannelListener<ConnectedStreamChannel> readListener =   
  20.             new ChannelListener<ConnectedStreamChannel>() {  
  21.             public void handleEvent(ConnectedStreamChannel channel) {  
  22.                 //分配緩沖  
  23.                 final ByteBuffer buffer = ByteBuffer.allocate(512);  
  24.                 int res;  
  25.                 try {  
  26.                     while ((res = channel.read(buffer)) > 0) {  
  27.                         //切換到寫的狀態並用阻塞的方式寫回  
  28.                         buffer.flip();  
  29.                         Channels.writeBlocking(channel, buffer);  
  30.                     }  
  31.                     // 保證全部送出  
  32.                     Channels.flushBlocking(channel);  
  33.                     if (res == -1) {  
  34.                         channel.close();  
  35.                     } else {  
  36.                         channel.resumeReads();  
  37.                     }  
  38.                 } catch (IOException e) {  
  39.                     e.printStackTrace();  
  40.                     IoUtils.safeClose(channel);  
  41.                 }  
  42.             }  
  43.         };  
  44.         // 創建接收 listener.  
  45.         final ChannelListener<AcceptingChannel<ConnectedStreamChannel>> acceptListener =   
  46.             new ChannelListener<AcceptingChannel<ConnectedStreamChannel>>() {  
  47.             public void handleEvent(  
  48.                     final AcceptingChannel<ConnectedStreamChannel> channel) {  
  49.                 try {  
  50.                     ConnectedStreamChannel accepted;  
  51.                     // channel就緒,准備接收連接請求  
  52.                     while ((accepted = channel.accept()) != null) {  
  53.                         System.out.println("accepted " + accepted.getPeerAddress());  
  54.                         // 已經連接,設置讀數據listener  
  55.                         accepted.getReadSetter().set(readListener);  
  56.                         // 恢復讀的狀態  
  57.                         accepted.resumeReads();  
  58.                     }  
  59.                 } catch (IOException ignored) {  
  60.                 }  
  61.             }  
  62.         };  
  63.   
  64.         //創建Xnio實例,並構造XnioWorker  
  65.         final XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);  
  66.         // 創建server,在本地12345端口上偵聽  
  67.         AcceptingChannel<? extends ConnectedStreamChannel> server = worker  
  68.                 .createStreamServer(new InetSocketAddress(12345),  
  69.                         acceptListener, OptionMap.EMPTY);  
  70.         // 開始接受連接  
  71.         server.resumeAccepts();  
  72.         System.out.println("Listening on " + server.getLocalAddress());  
  73.     }  
  74. }  

 
客戶端:

Java代碼   收藏代碼
  1. import java.net.InetSocketAddress;  
  2. import java.nio.ByteBuffer;  
  3. import java.nio.CharBuffer;  
  4. import java.nio.charset.Charset;  
  5.   
  6. import org.xnio.IoFuture;  
  7. import org.xnio.IoUtils;  
  8. import org.xnio.OptionMap;  
  9. import org.xnio.Xnio;  
  10. import org.xnio.XnioWorker;  
  11. import org.xnio.channels.Channels;  
  12. import org.xnio.channels.ConnectedStreamChannel;  
  13.   
  14. public final class SimpleHelloWorldBlockingClient {  
  15.   
  16.     public static void main(String[] args) throws Exception {  
  17.         final Charset charset = Charset.forName("utf-8");  
  18.         //創建Xnio實例,並構造XnioWorker  
  19.         final Xnio xnio = Xnio.getInstance();  
  20.         final XnioWorker worker = xnio.createWorker(OptionMap.EMPTY);  
  21.   
  22.         try {  
  23.             //連接服務器,本地12345端口,注意返回值是IoFuture類型,並不阻塞,返回后可以做些別的事情  
  24.             final IoFuture<ConnectedStreamChannel> futureConnection = worker.connectStream(  
  25.                 new InetSocketAddress("localhost"12345), null, OptionMap.EMPTY);  
  26.             final ConnectedStreamChannel channel = futureConnection.get(); // get是阻塞調用  
  27.             try {  
  28.                 // 發送消息  
  29.                 Channels.writeBlocking(channel, ByteBuffer.wrap("Hello world!\n".getBytes(charset)));  
  30.                 // 保證全部送出  
  31.                 Channels.flushBlocking(channel);  
  32.                 // 發送EOF  
  33.                 channel.shutdownWrites();  
  34.                 System.out.println("Sent greeting string! The response is...");  
  35.                 ByteBuffer recvBuf = ByteBuffer.allocate(128);  
  36.                 // 接收消息  
  37.                 while (Channels.readBlocking(channel, recvBuf) != -1) {  
  38.                     recvBuf.flip();  
  39.                     final CharBuffer chars = charset.decode(recvBuf);  
  40.                     System.out.print(chars);  
  41.                     recvBuf.clear();  
  42.                 }  
  43.             } finally {  
  44.                 IoUtils.safeClose(channel);  
  45.             }  
  46.         } finally {  
  47.             worker.shutdown();  
  48.         }  
  49.     }  
  50. }  

 
[1]: JavaNIO http://www.amazon.com/Java-Nio-Ron-Hitchens/dp/0596002882
[2]: Pro Java 7 NIO.2 http://www.amazon.com/Pro-Java-NIO-2-Anghel-Leonard/dp/1430240113


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM