多路復用器Selector是Java NIO編程的基礎,熟練地掌握Selector對於掌握NIO編程至關重要。多路復用器提供選擇已經就緒的任務的能力。簡單來講,Selector會不斷地輪詢注冊在其上的Channel,如果某個Channel上面有新的TCP連接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進行后續的I/O操作。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll代替傳統的select實現,所以它並沒有最大連接句柄1024/2048的限制。這也就意味着只需要一個線程負責Selector的輪詢,就可以介入成千上萬的客戶端。
Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好准備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網絡連接。
一、為什么使用Selector?
僅用單個線程來處理多個Channels的好處是,只需要更少的線程來處理通道。事實上,可以只用一個線程處理所有的通道。對於操作系統來說,線程之間上下文切換的開銷很大,而且每個線程都要占用系統的一些資源(如內存)。因此,使用的線程越少越好。
但是,需要記住,現代的操作系統和CPU在多任務方面表現的越來越好,所以多線程的開銷隨着時間的推移,變得越來越小了。實際上,如果一個CPU有多個內核,不使用多任務可能是在浪費CPU能力。不管怎么說,關於那種設計的討論應該放在另一篇不同的文章中。在這里,只要知道使用Selector能夠處理多個通道就足夠了。
下面是單線程使用一個Selector處理3個channel的示例圖:
二、Selector簡介
選擇器提供選擇執行已經就緒的任務的能力,從底層來看,Selector提供了詢問通道是否已經准備好執行每個I/O操作的能力。Selector 允許單線程處理多個Channel。僅用單個線程來處理多個Channels的好處上面已經解釋了。
在開始之前,需要回顧一下Selector、SelectableChannel和SelectionKey:
選擇器(Selector)
Selector選擇器類管理着一個被注冊的通道集合的信息和它們的就緒狀態。通道是和選擇器一起被注冊的,並且使用選擇器來更新通道的就緒狀態。當這么做的時候,可以選擇將被激發的線程掛起,直到有就緒的的通道。
可選擇通道(SelectableChannel)
SelectableChannel這個抽象類提供了實現通道的可選擇性所需要的公共方法。它是所有支持就緒檢查的通道類的父類。因為FileChannel類沒有繼承SelectableChannel因此是不是可選通道,而所有socket通道都是可選擇的,包括從管道(Pipe)對象的中獲得的通道。SelectableChannel可以被注冊到Selector對象上,同時可以指定對那個選擇器而言,那種操作是感興趣的。一個通道可以被注冊到多個選擇器上,但對每個選擇器而言只能被注冊一次。
選擇鍵(SelectionKey)
選擇鍵封裝了特定的通道與特定的選擇器的注冊關系。選擇鍵對象被SelectableChannel.register()返回並提供一個表示這種注冊關系的標記。選擇鍵包含了兩個比特集(以整數的形式進行編碼),指示了該注冊關系所關心的通道操作,以及通道已經准備好的操作。
三、Selector的使用
3.1、Selector的創建
通過調用Selector.open()方法創建一個Selector,通過Selector的靜態方法open()創建,如下:
Selector selector = Selector.open();
3.2、將channel注冊到Selector中
為了將Channel和Selector配合使用,必須將channel注冊到selector上。通過SelectableChannel.register()方法來實現,如下:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
與Selector一起使用時,Channel必須處於非阻塞模式下。這意味着不能將FileChannel與Selector一起使用,因為FileChannel不能切換到非阻塞模式。而套接字通道都可以。
注意register()方法的第二個參數。這是一個“interest集合”,意思是在通過Selector監聽Channel時對什么事件感興趣。可以監聽四種不同類型的事件:
- Connect
- Accept
- Read
- Write
通道觸發了一個事件意思是該事件已經就緒。所以,某個channel成功連接到另一個服務器稱為“連接就緒”。一個server socket channel准備好接收新進入的連接稱為“接收就緒”。一個有數據可讀的通道可以說是“讀就緒”。等待寫數據的通道可以說是“寫就緒”。
這四種事件用SelectionKey的四個常量來表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果你對不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
在下面還會繼續提到interest集合。
SelectionKey
在上一小節中,當向Selector注冊Channel時,register()方法會返回一個SelectionKey對象。這個對象包含了一些你感興趣的屬性:
- interest集合
- ready集合
- Channel
- Selector
- 附加的對象(可選)
下面我會描述這些屬性。
interest集合
就像向Selector注冊通道一節中所描述的,interest集合是你所選擇的感興趣的事件集合。可以通過SelectionKey讀寫interest集合,像這樣:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以看到,用“位與”操作interest 集合和給定的SelectionKey常量,可以確定某個確定的事件是否在interest 集合中。
ready集合
ready 集合是通道已經准備就緒的操作的集合。在一次選擇(Selection)之后,你會首先訪問這個ready set。Selection將在下一小節進行解釋。可以這樣訪問ready集合:
int readySet = selectionKey.readyOps();
可以用像檢測interest集合那樣的方法,來檢測channel中什么事件或操作已經就緒。但是,也可以使用以下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable();//等價於selectionKey.readyOps()&SelectionKey.OP_ACCEPT
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
需要注意的是,通過相關的選擇鍵的readyOps()方法返回的就緒狀態指示只是一個提示,底層的通道在任何時候都會不斷改變,而其他線程也可能在通道上執行操作並影響到它的就緒狀態。另外,我們不能直接修改read集合。
取出SelectionKey所關聯的Selector和Channel
從SelectionKey訪問Channel和Selector很簡單。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
關於取消SelectionKey對象
我們可以通過SelectionKey對象的cancel()
方法來取消特定的注冊關系。該方法調用之后,該SelectionKey對象將會被”拷貝”至已取消鍵的集合中,該鍵此時已經失效,但是該注冊關系並不會立刻終結。在下一次select()
時,已取消鍵的集合中的元素會被清除,相應的注冊關系也真正終結。
為SelectionKey綁定附加對象
可以將一個對象或者更多信息附着到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,可以附加 與通道一起使用的Buffer,或是包含聚集數據的某個對象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
還可以在用register()方法向Selector注冊Channel的時候附加對象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
需要注意的是如果附加的對象不再使用,一定要人為清除,因為垃圾回收器不會回收該對象,若不清除的話會成內存泄漏。
一個單獨的通道可被注冊到多個選擇器中,有些時候我們需要通過isRegistered()
方法來檢查一個通道是否已經被注冊到任何一個選擇器上。 通常來說,我們並不會這么做。
3.3、通過Selector選擇通道
我們可以通過 Selector.select()方法獲取對某件事件准備好了的 Channel, 即如果我們在注冊 Channel 時, 對其的可寫事件感興趣, 那么當 select()返回時, 我們就可以獲取 Channel 了.
注意
, select()方法返回的值表示有多少個 Channel 可操作.
我們知道選擇器維護注冊過的通道的集合,並且這種注冊關系都被封裝在SelectionKey當中。接下來我們簡單的了解一下Selector維護的三種類型SelectionKey集合:
已注冊的鍵的集合(Registered key set)
所有與選擇器關聯的通道所生成的鍵的集合稱為已經注冊的鍵的集合。並不是所有注冊過的鍵都仍然有效。這個集合通過keys()方法返回,並且可能是空的。這個已注冊的鍵的集合不是可以直接修改的;試圖這么做的話將引發java.lang.UnsupportedOperationException。
已選擇的鍵的集合(Selected key set)
已注冊的鍵的集合的子集。這個集合的每個成員都是相關的通道被選擇器(在前一個選擇操作中)判斷為已經准備好的,並且包含於鍵的interest集合中的操作。這個集合通過selectedKeys()方法返回(並有可能是空的)。
不要將已選擇的鍵的集合與ready集合弄混了。這是一個鍵的集合,每個鍵都關聯一個已經准備好至少一種操作的通道。每個鍵都有一個內嵌的ready集合,指示了所關聯的通道已經准備好的操作。鍵可以直接從這個集合中移除,但不能添加。試圖向已選擇的鍵的集合中添加元素將拋出java.lang.UnsupportedOperationException。已取消的鍵的集合(Cancelled key set)
已注冊的鍵的集合的子集,這個集合包含了cancel()方法被調用過的鍵(這個鍵已經被無效化),但它們還沒有被注銷。這個集合是選擇器對象的私有成員,因而無法直接訪問。
在剛初始化的Selector對象中,這三個集合都是空的。通過Selector的select()方法可以選擇已經准備就緒的通道(這些通道包含你感興趣的的事件)。比如你對讀就緒的通道感興趣,那么select()方法就會返回讀事件已經就緒的那些通道。下面是Selector幾個重載的select()方法:
- select():阻塞到至少有一個通道在你注冊的事件上就緒了。
- select(long timeout):和select()一樣,但最長阻塞事件為timeout毫秒。
- selectNow():非阻塞,只要有通道就緒就立刻返回。
select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法后有多少通道變成就緒狀態。如果調用select()方法,因為有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道就緒了。
selectedKeys()
一旦調用了select()方法,並且返回值表明有一個或更多個通道就緒了,然后可以通過調用selector的selectedKeys()方法,訪問“已選擇鍵集(selected key set)”中的就緒通道。如下所示:
Set selectedKeys = selector.selectedKeys();
當像Selector注冊Channel時,Channel.register()方法會返回一個SelectionKey 對象。這個對象代表了注冊到該Selector的通道。可以通過SelectionKey的selectedKeySet()方法訪問這些對象。
可以遍歷這個已選擇的鍵集合來訪問就緒的通道。如下:
Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
這個循環遍歷已選擇鍵集中的每個鍵,並檢測各個鍵所對應的通道的就緒事件。
注意, 在每次迭代時, 我們都調用 "keyIterator.remove()" 將這個 key 從迭代器中刪除, 因為 select() 方法僅僅是簡單地將就緒的 IO 操作放到 selectedKeys 集合中, 因此如果我們從 selectedKeys 獲取到一個 key, 但是沒有將它刪除, 那么下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.
例如此時我們收到 OP_ACCEPT 通知, 然后我們進行相關處理, 但是並沒有將這個 Key 從 SelectedKeys 中刪除, 那么下一次 select() 返回時 我們還可以在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.注意, 我們可以動態更改 SekectedKeys 中的 key 的 interest set.
例如在 OP_ACCEPT 中, 我們可以將 interest set 更新為 OP_READ, 這樣 Selector 就會將這個 Channel 的 讀 IO 就緒事件包含進來了。
SelectionKey.channel()方法返回的通道需要轉型成你要處理的類型,如ServerSocketChannel或SocketChannel等。
關於Selector執行選擇的過程
我們知道調用
select()
方法進行選擇通道,現在我們再來深入一下選擇的過程,也就是select()
執行過程。當select()
被調用時將執行以下幾步:
- 首先檢查已取消鍵集合,也就是通過
cancle()
取消的鍵。如果該集合不為空,則清空該集合里的鍵,同時該集合中每個取消的鍵也將從已注冊鍵集合和已選擇鍵集合中移除。(一個鍵被取消時,並不會立刻從集合中移除,而是將該鍵“拷貝”至已取消鍵集合中,這種取消策略就是我們常提到的“延遲取消”。)- 再次檢查已注冊鍵集合(准確說是該集合中每個鍵的interest集合)。系統底層會依次詢問每個已經注冊的通道是否准備好選擇器所感興趣的某種操作,一旦發現某個通道已經就緒了,則會首先判斷該通道是否已經存在在已選擇鍵集合當中,如果已經存在,則更新該通道在已注冊鍵集合中對應的鍵的ready集合,如果不存在,則首先清空該通道的對應的鍵的ready集合,然后重設ready集合,最后將該鍵存至已注冊鍵集合中。這里需要明白,當更新ready集合時,在上次
select()
中已經就緒的操作不會被刪除,也就是ready集合中的元素是累積的,比如在第一次的selector對某個通道的read和write操作感興趣,在第一次執行select()
時,該通道的read操作就緒,此時該通道對應的鍵中的ready集合存有read元素,在第二次執行select()
時,該通道的write操作也就緒了,此時該通道對應的ready集合中將同時有read和write元素。
深入已注冊鍵集合的管理
到現在我們已經知道一個通道的的鍵是如何被添加到已選擇鍵集合中的,下面我們來繼續了解對已選擇鍵集合的管理 。首先要記住:選擇器不會主動刪除被添加到已選擇鍵集合中的鍵,而且被添加到已選擇鍵集合中的鍵的ready集合只能被設置,而不能被清理。如果我們希望清空已選擇鍵集合中某個鍵的ready集合該怎么辦?我們知道一個鍵在新加入已選擇鍵集合之前會首先置空該鍵的ready集合,這樣的話我們可以人為的將某個鍵從已注冊鍵集合中移除最終實現置空某個鍵的ready集合。被移除的鍵如果在下一次的select()中再次就緒,它將會重新被添加到已選擇的鍵的集合中。這就是為什么要在每次迭代的末尾調用
keyIterator.remove()
。
停止選擇
選擇器執行選擇的過程,系統底層會依次詢問每個通道是否已經就緒,這個過程可能會造成調用線程進入阻塞狀態,那么我們有以下三種方式可以喚醒在select()方法中阻塞的線程。
通過調用Selector對象的wakeup()方法讓處在阻塞狀態的select()方法立刻返回
該方法使得選擇器上的第一個還沒有返回的選擇操作立即返回。如果當前沒有進行中的選擇操作,那么下一次對select()方法的一次調用將立即返回。通過
close()
方法關閉Selector**
該方法使得任何一個在選擇操作中阻塞的線程都被喚醒(類似wakeup()
),同時使得注冊到該Selector的所有Channel被注銷,所有的鍵將被取消,但是Channel本身並不會關閉。調用
interrupt()
調用該方法會使睡眠的線程拋出InterruptException異常,捕獲該異常並在調用wakeup()
上面有些人看到“系統底層會依次詢問每個通道”時可能在想如果已選擇鍵非常多是,會不會耗時較長?答案是肯定的。但是我想說的是通常你可以選擇忽略該過程,至於為什么,后面再說。
Selector 的基本使用流程
-
通過 Selector.open() 打開一個 Selector.
-
將 Channel 注冊到 Selector 中, 並設置需要監聽的事件(interest set)
-
不斷重復:
-
調用 select() 方法
-
調用 selector.selectedKeys() 獲取 selected keys
-
迭代每個 selected key:
-
*從 selected key 中獲取 對應的 Channel 和附加信息(如果有的話)
-
*判斷是哪些 IO 事件已經就緒了, 然后處理它們.
如果是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置為 非阻塞的, 然后將這個 Channel 注冊到 Selector 中.
-
*根據需要更改 selected key 的監聽事件.
-
*將已經處理過的 key 從 selected keys 集合中刪除.
-
-
完整的示例
這里有一個完整的示例,打開一個Selector,注冊一個通道注冊到這個Selector上(通道的初始化過程略去),然后持續監控這個Selector的四種事件(接受,連接,讀,寫)是否就緒。
package com.dxz.springsession.nio.demo4; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class ServerSocketChannelTest { private int size = 1024; private ServerSocketChannel socketChannel; private ByteBuffer byteBuffer; private Selector selector; private final int port = 8998; private int remoteClientNum=0; public ServerSocketChannelTest() { try { initChannel(); } catch (Exception e) { e.printStackTrace(); System.exit(-1); } } public void initChannel() throws Exception { socketChannel = ServerSocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.bind(new InetSocketAddress(port)); System.out.println("listener on port:" + port); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_ACCEPT); byteBuffer = ByteBuffer.allocateDirect(size); byteBuffer.order(ByteOrder.BIG_ENDIAN); } private void listener() throws Exception { while (true) { int n = selector.select(); if (n == 0) { continue; } Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = ite.next(); //a connection was accepted by a ServerSocketChannel. if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); registerChannel(selector, channel, SelectionKey.OP_READ); remoteClientNum++; System.out.println("online client num="+remoteClientNum); replyClient(channel); } //a channel is ready for reading if (key.isReadable()) { readDataFromSocket(key); } ite.remove();//must } } } protected void readDataFromSocket(SelectionKey key) throws Exception { SocketChannel socketChannel = (SocketChannel) key.channel(); int count; byteBuffer.clear(); while ((count = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); // Make buffer readable // Send the data; don't assume it goes all at once while (byteBuffer.hasRemaining()) { socketChannel.write(byteBuffer); } byteBuffer.clear(); // Empty buffer } if (count < 0) { socketChannel.close(); } } private void replyClient(SocketChannel channel) throws IOException { byteBuffer.clear(); byteBuffer.put("hello client!\r\n".getBytes()); byteBuffer.flip(); channel.write(byteBuffer); } private void registerChannel(Selector selector, SocketChannel channel, int ops) throws Exception { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, ops); } public static void main(String[] args) { try { new ServerSocketChannelTest().listener(); } catch (Exception e) { e.printStackTrace(); } } } //客戶端 package com.dxz.springsession.nio.demo4; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; public class SocketChannelTest { private int size = 1024; private ByteBuffer byteBuffer; private SocketChannel socketChannel; public void connectServer() throws IOException { socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8998)); byteBuffer = ByteBuffer.allocate(size); byteBuffer.order(ByteOrder.BIG_ENDIAN); receive(); } private void receive() throws IOException { while (true) { int count; byteBuffer.clear(); while ((count = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } //send("send data to server\r\n".getBytes()); byteBuffer.clear(); } } } private void send(byte[] data) throws IOException { byteBuffer.clear(); byteBuffer.put(data); byteBuffer.flip(); socketChannel.write(byteBuffer); } public static void main(String[] args) throws IOException { new SocketChannelTest().connectServer(); } }