選擇器服務器端代碼
上一篇文章毫無條理地講了很多和選擇器相關的知識點,下面進入實戰,看一下如何寫和使用選擇器實現服務端Socket數據接收的程序,這也是NIO中最核心、最精華的部分。
看一下代碼:
1 public class SelectorServer 2 { 3 private static int PORT = 1234; 4 5 public static void main(String[] args) throws Exception 6 { 7 // 先確定端口號 8 int port = PORT; 9 if (args != null && args.length > 0) 10 { 11 port = Integer.parseInt(args[0]); 12 } 13 // 打開一個ServerSocketChannel 14 ServerSocketChannel ssc = ServerSocketChannel.open(); 15 // 獲取ServerSocketChannel綁定的Socket 16 ServerSocket ss = ssc.socket(); 17 // 設置ServerSocket監聽的端口 18 ss.bind(new InetSocketAddress(port)); 19 // 設置ServerSocketChannel為非阻塞模式 20 ssc.configureBlocking(false); 21 // 打開一個選擇器 22 Selector selector = Selector.open(); 23 // 將ServerSocketChannel注冊到選擇器上去並監聽accept事件 24 ssc.register(selector, SelectionKey.OP_ACCEPT); 25 while (true) 26 { 27 // 這里會發生阻塞,等待就緒的通道 28 int n = selector.select(); 29 // 沒有就緒的通道則什么也不做 30 if (n == 0) 31 { 32 continue; 33 } 34 // 獲取SelectionKeys上已經就緒的通道的集合 35 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 36 // 遍歷每一個Key 37 while (iterator.hasNext()) 38 { 39 SelectionKey sk = iterator.next(); 40 // 通道上是否有可接受的連接 41 if (sk.isAcceptable()) 42 { 43 ServerSocketChannel ssc1 = (ServerSocketChannel)sk.channel(); 44 SocketChannel sc = ssc1.accept(); 45 sc.configureBlocking(false); 46 sc.register(selector, SelectionKey.OP_READ); 47 } 48 // 通道上是否有數據可讀 49 else if (sk.isReadable()) 50 { 51 readDataFromSocket(sk); 52 } 53 iterator.remove(); 54 } 55 } 56 } 57 58 private static ByteBuffer bb = ByteBuffer.allocate(1024); 59 60 // 從通道中讀取數據 61 protected static void readDataFromSocket(SelectionKey sk) throws Exception 62 { 63 SocketChannel sc = (SocketChannel)sk.channel(); 64 bb.clear(); 65 while (sc.read(bb) > 0) 66 { 67 bb.flip(); 68 while (bb.hasRemaining()) 69 { 70 System.out.print((char)bb.get()); 71 } 72 System.out.println(); 73 bb.clear(); 74 } 75 } 76 }
代碼中已經有了相關的注釋,這里繼續解釋一下:
(1)第8行~第12行,確定要監聽的端口號,這里是1234
(2)第13行~第20行,由於選擇器管理的是通道(Channel),因此首先要有通道。這里是服務器的程序,因此獲取ServerSocketChannel,同時獲取它所對應的ServerSocket,設置服務端的Channel為非阻塞模式,並綁定之前確定的端口號1234
(3)第21行~第24行,打開一個選擇器,並注冊當前通道感興趣的時間為accept時間,即監聽來自客戶端的Socket數據
(4)第25行~第28行,調用select()方法等待來自客戶端的Socket數據。程序會阻塞在這兒不會往下走,直到客戶端有Socket數據的到來為止,所以嚴格意義上來說,NIO並不是一種非阻塞IO,因為NIO會阻塞在Selector的select()方法上
(5)第29行~第33行,沒有什么好說的,如果select()方法獲取的數據是0的話,下面的代碼都沒必要走,當然這是有可能發生的
(6)第34行~第39行,獲取到已經就緒的通道的迭代器進行迭代,泛型是選擇鍵SelectionKey,前文講過,選擇鍵用於封裝特定的通道
(7)第40行~第52行,這里是一個關鍵點、核心點,這里做了兩件事情:
a)滿足isAcceptable()則表示該通道上有數據到來了,此時我們做的事情不是獲取該通道->創建一個線程來讀取該通道上的數據,這么做就和前面一直講的阻塞IO沒有區別了,也無法發揮出NIO的優勢來。我們做的事情只是簡單地將對應的SocketChannel注冊到選擇器上,通過傳入OP_READ標記,告訴選擇器我們關心新的Socket通道什么時候可以准備好讀數據
b)滿足isReadable()則表示新注冊的Socket通道已經可以讀取數據了,此時調用readDataFromSocket方法讀取SocketChannel中的數據,讀取數據的方法前面通道的文章中已經詳細講過了,就不講了
(8)第53行,將鍵移除,這一行很重要也是容易忘記的一步操作。加入不remove,將會導致45行中出現空指針異常,原因不難理解,可以自己思考一下。
選擇器客戶端代碼
選擇器客戶端的代碼,沒什么要求,只要向服務器端發送數據就可以了。這里選用的是Java NIO4:Socket通道一文中,最后一部分開五個線程向服務端發送數據的程序:
1 public class SelectorClient 2 { 3 private static final String STR = "Hello World!"; 4 private static final String REMOTE_IP = "127.0.0.1"; 5 private static final int THREAD_COUNT = 5; 6 7 private static class NonBlockingSocketThread extends Thread 8 { 9 public void run() 10 { 11 try 12 { 13 int port = 1234; 14 SocketChannel sc = SocketChannel.open(); 15 sc.configureBlocking(false); 16 sc.connect(new InetSocketAddress(REMOTE_IP, port)); 17 while (!sc.finishConnect()) 18 { 19 System.out.println("同" + REMOTE_IP + "的連接正在建立,請稍等!"); 20 Thread.sleep(10); 21 } 22 System.out.println("連接已建立,待寫入內容至指定ip+端口!時間為" + System.currentTimeMillis()); 23 String writeStr = STR + this.getName(); 24 ByteBuffer bb = ByteBuffer.allocate(writeStr.length()); 25 bb.put(writeStr.getBytes()); 26 bb.flip(); // 寫緩沖區的數據之前一定要先反轉(flip) 27 sc.write(bb); 28 bb.clear(); 29 sc.close(); 30 } 31 catch (IOException e) 32 { 33 e.printStackTrace(); 34 } 35 catch (InterruptedException e) 36 { 37 e.printStackTrace(); 38 } 39 } 40 } 41 42 public static void main(String[] args) throws Exception 43 { 44 NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT]; 45 for (int i = 0; i < THREAD_COUNT; i++) 46 nbsts[i] = new NonBlockingSocketThread(); 47 for (int i = 0; i < THREAD_COUNT; i++) 48 nbsts[i].start(); 49 // 一定要join保證線程代碼先於sc.close()運行,否則會有AsynchronousCloseException 50 for (int i = 0; i < THREAD_COUNT; i++) 51 nbsts[i].join(); 52 } 53 }
代碼執行結果
先運行服務端程序:

空白,很正常,因為在監聽客戶端數據的到來,此時並沒有數據。接着運行客戶端程序:

看到5個線程的數據已經發送,此時服務端的執行情況是:

數據全部接收到並打印,看到右邊的方框還是紅色的,說明這5個線程的數據接收、打印完畢之后,再繼續等待着客戶端的數據的到來。
總結一下Selector的執行兩個關鍵點:
1、注冊一個ServerSocketChannel到selector中,這個通道的作用只是為了監聽客戶端是否有數據到來(這里注意一下有數據到來,意思是假如需要接收100個字節,如果到來了1個字節就算數據到來了),只要有數據到來,就把特定通道注冊到selector中,並指定其事件為讀事件。
2、ServerSocketChannel和SocketChannel(通道里面的是客戶端的數據)共同存在在Selector中,只要有注冊的事件到來,Selector取消阻塞狀態,遍歷SelectionKey集合,繼續注冊讀取數據的通道,或者是從通道中讀取數據。
選擇過程的可擴展性
從上面的代碼以及之前對於Selector的解讀可以看到,Selector可以簡化用單線程同時管理多個可選擇通道的實現。使用一個線程來為多個通道提供服務,通過消除管理各個線程的額外開銷,可能會降低復雜性並可能大幅提升性能。但只使用一個線程來服務所有可選擇的通道是不是一個好主意呢?這要看情況。
對單核CPU的系統而言這可能是一個好主意,因為在任何情況下都只有一個線程能夠運行。通過消除在線程之間進行上下文切換帶來的額外開銷,總吞吐量可以提高。但對於一個多核CPU的系統而言呢?字啊一個有n個CPU的系統上,當一個單一的線程線性輪流地處理每一個線程時,可能有(n-1)個CPU處於空閑狀態。
一種可行的解決辦法是使用多個選擇器。但是請盡量不要這么做,在大量通道上執行就緒選擇並不會有很大的開銷,大多數工作是由底層操作系統完成的,管理多個選擇器並隨機地將通道分派給它們當中的一個並不是這個問題的合理的解決方案。
一種更好的解決方案是對所有的可選擇通道使用同一個選擇器,並將對就緒選擇通道的服務委托給其他線程。開發者只使用一個線程監控通道的就緒狀態,至於通道處於就緒狀態之后又如何做,有兩種可行的做法:
1、使用一個協調好的工作線程池來處理接收到的數據,當然線程池的大小是可以調整的
2、通道根據功能由不同的工作線程來處理,它們可能是日志線程、命令/控制線程、狀態請求線程等
