為了更好的理解Netty異步事件驅動網絡通信框架,有必要先了解一點Java NIO原生的通信理論,下面將結合基於TCP的例子程序,含客戶端和服務端的源碼,實現了Echo流程。
Java NIO的核心概念有三個:Channel,Selector,ByteBuffer。
而這當中,Channel的比重最大,NIO的功能主要基於Channel來實現,進行業務邏輯操作。Selector主要是IO事件選擇器,當一個Channel創建並配置好后,注冊到Selector上,與Selector相關的重要概念是SelectionKey,這個上面綁定了IO事件相關的Channel。在獲取到Channel后,進行數據的讀寫操作,Channel的數據讀寫是不能直接操作數據的,必須基於ByteBuffer進行,然而,Java NIO原生的ByteBuffer操作比較繁瑣,要flip和clear操作。
1. 而我們在業務邏輯操作中,用到的channel,主要有ServerSocketChannel,SocketChannel,DataGramChannel。下面,用一個圖,來簡要的描述下Channel到這三個具體之類之間的繼承/實現關系(該圖來自網絡,若有不妥,請告知,謝謝)。

2. Selector,是事件選擇器,創建Selector后,在調用select之前,在注冊Channel到這個Selector上時,必須指定關注的事件類型(interestOps)。通過這個類的select函數,可以獲取選擇上監聽到的IO事件。一旦select函數檢測到事件,就可以從Selector上獲取到具體有哪些IO事件,這些事件通過SelectionKey承載,SelectionKey上標記出該事件的類型,比如是OP_CONNECT,OP_ACCEPT還是OP_READ等。另外,SelectionKey還記錄了對應該IO事件發生的Channel,可以通過SelectionKey得到該Channel。

3. ByteBuffer。 因為字節操作,是操作系統與IO設備之間進行通信的基本數據單元,在Java NIO中,各通道Channel之間進行數據通信時,指定必須是基於ByteBuffer的。 ByteBuffer有兩個重要的函數,flip和clear。當Channel調用read函數,將數據讀到ByteBuffer中后,ByteBuffer的數據長度指針將會移動到數據長度所在的位置,這個位置是小於等於ByteBuffer容量capacity值的。當業務邏輯操作讀取到的數據前,需要對ByteBuffer做一下flip操作,就是將limit指針指向當前數據指針position的位置,然后,將position指針指向0的位置。數據邏輯結束后,一般要恢復ByteBuffer,即調用clear函數。

這三個重要的概念,做了一番解釋和描述后,就以一個demo程序,基於Java NIO的TCP C/S源碼,代碼中帶有了重要邏輯的注釋,后續不再單獨解釋。
A. TCP Server:
/** * @author "shihuc" * @date 2017年3月16日 */ package javaSocket.tcp.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; import javaSocket.tcp.Constants; /** * @author chengsh05 * */ public class TcpServer { /** * @param args */ public static void main(String[] args) { try { startServer(Constants.SERVER_PORT); } catch (IOException e) { e.printStackTrace(); } } public static void startServer(int port) throws IOException{ /* *開啟一個服務channel, *A selectable channel for stream-oriented listening sockets. */ ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.bind(new InetSocketAddress(port)); /* * 創建一個selector */ Selector selector = Selector.open(); /* * 將創建的serverChannel注冊到selector選擇器上,指定這個channel只關心OP_ACCEPT事件 */ serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { /* * select()操作,默認是阻塞模式的,即,當沒有accept或者read時間到來時,將一直阻塞不往下面繼續執行。 */ int readyChannels = selector.select(); if (readyChannels <= 0) { continue; } /* * 從selector上獲取到了IO事件,可能是accept,也有可能是read */ Set<SelectionKey> SelectonKeySet = selector.selectedKeys(); Iterator<SelectionKey> iterator = SelectonKeySet.iterator(); /* * 循環遍歷SelectionKeySet中的所有的SelectionKey */ while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { //處理OP_ACCEPT事件 SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //處理OP_READ事件 SocketChannel socketChannel = (SocketChannel) key.channel(); StringBuilder sb = new StringBuilder(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int readBytes = 0; int ret = 0; /* * 注意讀數據的時候,ByteBuffer的操作,需要flip,clear進行指針位置的調整 */ while ((ret = socketChannel.read(byteBuffer)) > 0) { readBytes += ret; byteBuffer.flip(); sb.append(Charset.forName("UTF-8").decode(byteBuffer).toString()); byteBuffer.clear(); } if (readBytes == 0) { System.err.println("handle opposite close Exception"); socketChannel.close(); } String message = sb.toString(); System.out.println("Message from client: " + message); if (Constants.CLIENT_CLOSE.equalsIgnoreCase(message.toString().trim())) { System.out.println("Client is going to shutdown!"); socketChannel.close(); } else if (Constants.SERVER_CLOSE.equalsIgnoreCase(message.trim())) { System.out.println("Server is going to shutdown!"); socketChannel.close(); serverChannel.close(); selector.close(); System.exit(0); } else { String outMessage = "Server response:" + message; socketChannel.write(Charset.forName("UTF-8").encode(outMessage)); } } /* * 將selector上當前已經監聽到的且已經處理了的事件標記清除掉。 */ iterator.remove(); } } } }
B. TCP Client
/** * @author "shihuc" * @date 2017年3月16日 */ package javaSocket.tcp.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; import javaSocket.tcp.Constants; /** * @author chengsh05 * */ public class TcpClient { /** * @param args */ public static void main(String[] args) { try { startClient(Constants.SERVER_IP, Constants.SERVER_PORT); } catch (IOException e) { e.printStackTrace(); } } public static void startClient(String serverIp, int serverPort) throws IOException{ /* * 創建一個SocketChannel,指定為非阻塞模式 * A selectable channel for stream-oriented connecting sockets. */ SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); /* * 連接到指定的服務地址 */ socketChannel.connect(new InetSocketAddress(serverIp, serverPort)); /* * 創建一個事件選擇器Selector */ Selector selector = Selector.open(); /* * 將創建的SocketChannel注冊到指定的Selector上,並指定關注的事件類型為OP_CONNECT */ socketChannel.register(selector, SelectionKey.OP_CONNECT); /* * 從系統輸入終端讀取數據,作為客戶端信息輸入源 */ Scanner sc = new Scanner(System.in); String cont = null; while(true){ if(socketChannel.isConnected()){ cont = sc.nextLine(); socketChannel.write(Charset.forName("UTF-8").encode(cont)); if(cont == null || cont.equalsIgnoreCase(Constants.CLIENT_CLOSE)){ socketChannel.close(); selector.close(); sc.close(); System.out.println("See you, 客戶端退出系統了"); System.exit(0); } } /* * 設置1sec的超時時間,進行IO事件選擇操作 */ int nSelectedKeys = selector.select(5000); if(nSelectedKeys > 0){ for(SelectionKey skey: selector.selectedKeys()){ /* * 判斷檢測到的channel是不是可連接的,將對應的channel注冊到選擇器上,指定關心的事件類型為OP_READ */ if(skey.isConnectable()){ SocketChannel connChannel = (SocketChannel) skey.channel(); connChannel.configureBlocking(false); connChannel.register(selector, SelectionKey.OP_READ); connChannel.finishConnect(); } /* * 若檢測到的IO事件是讀事件,則處理相關數據的讀相關的業務邏輯 */ else if(skey.isReadable()){ SocketChannel readChannel = (SocketChannel) skey.channel(); StringBuilder sb = new StringBuilder(); /* * 定義一個ByteBuffer的容器,容量為1k */ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int readBytes = 0; int ret = 0; /* * 注意,對ByteBuffer的操作,需要關心的是flip,clear等。 */ while ((ret = readChannel.read(byteBuffer)) > 0) { readBytes += ret; byteBuffer.flip(); sb.append(Charset.forName("UTF-8").decode(byteBuffer).toString()); byteBuffer.clear(); } if (readBytes == 0) { System.err.println("handle opposite close Exception"); readChannel.close(); } } } /* * 一次監聽的事件處理完畢后,需要將已經記錄的事件清除掉,准備下一輪的事件標記 */ selector.selectedKeys().clear(); }else{ System.err.println("handle select timeout Exception"); socketChannel.close(); } } } }
閱讀上述代碼時,請注意,server和client的實現風格不太一樣,主要是針對SelectionKeySet的遍歷,一次select操作獲取到的所有的SelectionKey處理完后的掃尾工作,體現出Selector的工作邏輯,若寫過C程序實現過TCP server/client程序,對事件選擇的過程應該就更清楚了。
最后,總結一下Java NIO TCP協議下的C/S結構程序流程圖,為徹底理解Java NIO服務。

基於這個例子引出的Java NIO的邏輯過程和思想,再去研讀Netty的代碼,相信會容易理解Netty的核心reactor模型工作原理。
