異步IO
異步 I/O 是一種沒有阻塞地讀寫數據的方法。通常,在代碼進行 read() 調用時,代碼會阻塞直至有可供讀取的數據。同樣, write()調用將會阻塞直至數據能夠寫入,關於同步的IO請參考另一篇文章Java IO。
另一方面,異步 I/O 調用不但不會阻塞,相反,您可以注冊對特定 I/O 事件諸如數據可讀、新連接到來等等,而在發生這樣感興趣的事件時,系統將會告訴您。
異步 I/O 的一個優勢在於,它允許您同時根據大量的輸入和輸出執行 I/O。同步程序常常要求助於輪詢,或者創建許許多多的線程以處理大量的連接。使用異步 I/O,您可以監聽任何數量的通道上的事件,不用輪詢,也不用額外的線程。
Selector
在我的JavaNIO詳解(一)中已經詳細介紹了Java NIO三個核心對象中的Buffer和Channel,現在我們就重點介紹一下第三個核心對象Selector。Selector是一個對象,它可以注冊到很多個Channel上,監聽各個Channel上發生的事件,並且能夠根據事件情況決定Channel讀寫。這樣,通過一個線程管理多個Channel,就可以處理大量網絡連接了。
采用Selector模式的的好處
有了Selector,我們就可以利用一個線程來處理所有的channels。線程之間的切換對操作系統來說代價是很高的,並且每個線程也會占用一定的系統資源。所以,對系統來說使用的線程越少越好。
但是,需要記住,現代的操作系統和CPU在多任務方面表現的越來越好,所以多線程的開銷隨着時間的推移,變得越來越小了。實際上,如果一個CPU有多個內核,不使用多任務可能是在浪費CPU能力。不管怎么說,關於那種設計的討論應該放在另一篇不同的文章中。在這里,只要知道使用Selector能夠處理多個通道就足夠了。
下面這幅圖展示了一個線程處理3個 Channel的情況:
如何創建一個Selector
異步 I/O 中的核心對象名為 Selector。Selector 就是您注冊對各種 I/O 事件興趣的地方,而且當那些事件發生時,就是這個對象告訴您所發生的事件。
Selector selector = Selector.open();
然后,就需要注冊Channel到Selector了。
如何注冊Channel到Selector
為了能讓Channel和Selector配合使用,我們需要把Channel注冊到Selector上。通過調用 channel.register()
方法來實現注冊:
channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
注意,注冊的Channel 必須設置成異步模式 才可以,,否則異步IO就無法工作,這就意味着我們不能把一個FileChannel注冊到Selector,因為FileChannel沒有異步模式,但是網絡編程中的SocketChannel是可以的。
需要注意register()方法的第二個參數,它是一個“interest set”,意思是注冊的Selector對Channel中的哪些時間感興趣,事件類型有四種:
- Connect
- Accept
- Read
- Write
通道觸發了一個事件意思是該事件已經 Ready(就緒)。所以,某個Channel成功連接到另一個服務器稱為 Connect Ready
。一個ServerSocketChannel准備好接收新連接稱為 Accept Ready
,一個有數據可讀的通道可以說是 Read Ready
,等待寫數據的通道可以說是Write Ready
。
上面這四個事件對應到SelectionKey中的四個常量:
1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE
如果你對多個事件感興趣,可以通過or操作符來連接這些常量:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
關於SelectionKey
請注意對register()
的調用的返回值是一個SelectionKey。 SelectionKey 代表這個通道在此 Selector 上的這個注冊。當某個 Selector 通知您某個傳入事件時,它是通過提供對應於該事件的 SelectionKey 來進行的。SelectionKey 還可以用於取消通道的注冊。SelectionKey中包含如下屬性:
- The interest set
- The ready set
- The Channel
- The Selector
- An attached object (optional)
Interest Set
就像我們在前面講到的把Channel注冊到Selector來監聽感興趣的事件,interest set就是你要選擇的感興趣的事件的集合。你可以通過SelectionKey對象來讀寫interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
通過上面例子可以看到,我們可以通過用AND 和SelectionKey 中的常量做運算,從SelectionKey中找到我們感興趣的事件。
Ready Set
ready set 是通道已經准備就緒的操作的集合。在一次選Selection之后,你應該會首先訪問這個ready set。Selection將在下一小節進行解釋。可以這樣訪問ready集合:
int readySet = selectionKey.readyOps();
12
可以用像檢測interest集合那樣的方法,來檢測Channel中什么事件或操作已經就緒。但是,也可以使用以下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和Selector
我們可以通過SelectionKey獲得Selector和注冊的Channel:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attach 一個對象
可以將一個對象或者更多信息attach 到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,可以附加 與通道一起使用的Buffer,或是包含聚集數據的某個對象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
還可以在用register()方法向Selector注冊Channel的時候附加對象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通過Selector選擇通道
一旦向Selector注冊了一或多個通道,就可以調用幾個重載的select()
方法。這些方法返回你所感興趣的事件(如連接、接受、讀或寫)已經准備就緒的那些通道。換句話說,如果你對“Read Ready”的通道感興趣,select()方法會返回讀事件已經就緒的那些通道:
- int select(): 阻塞到至少有一個通道在你注冊的事件上就緒
- int select(long timeout):select()一樣,除了最長會阻塞timeout毫秒(參數)
- int selectNow(): 不會阻塞,不管什么通道就緒都立刻返回,此方法執行非阻塞的選擇操作。如果自從前一次選擇操作后,沒有通道變成可選擇的,則此方法直接返回零。
select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法后有多少通道變成就緒狀態。如果調用select()方法,因為有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道處於就緒狀態。
selectedKeys()
一旦調用了select()
方法,它就會返回一個數值,表示一個或多個通道已經就緒,然后你就可以通過調用selector.selectedKeys()
方法返回的SelectionKey集合來獲得就緒的Channel。請看演示方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
當你通過Selector注冊一個Channel時,channel.register()
方法會返回一個SelectionKey對象,這個對象就代表了你注冊的Channel。這些對象可以通過selectedKeys()
方法獲得。你可以通過迭代這些selected key來獲得就緒的Channel,下面是演示代碼:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
這個循環遍歷selected key的集合中的每個key,並對每個key做測試來判斷哪個Channel已經就緒。
請注意循環中最后的keyIterator.remove()
方法。Selector對象並不會從自己的selected key集合中自動移除SelectionKey實例。我們需要在處理完一個Channel的時候自己去移除。當下一次Channel就緒的時候,Selector會再次把它添加到selected key集合中。
SelectionKey.channel()
方法返回的Channel需要轉換成你具體要處理的類型,比如是ServerSocketChannel或者SocketChannel等等。
WakeUp()和Close()
某個線程調用select()方法后阻塞了,即使沒有通道就緒,也有辦法讓其從select()方法返回。只要讓其它線程在第一個線程調用select()方法的那個對象上調用Selector.wakeup()
方法即可。阻塞在select()方法上的線程會立馬返回。
如果有其它線程調用了wakeup()方法,但當前沒有線程阻塞在select()方法上,下個調用select()方法的線程會立即“醒來(wake up)”
當用完Selector后調應道掉用close()
方法,它將關閉Selector並且使注冊到該Selector上的所有SelectionKey實例無效。通道本身並不會關閉。
一個完整的例子
下面通過一個MultiPortEcho的例子來演示一下上面整個過程。
public class MultiPortEcho {
private int ports[];
private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
public MultiPortEcho(int ports[]) throws IOException {
this.ports = ports;
go();
}
private void go() throws IOException {
// 1. 創建一個selector,select是NIO中的核心對象
// 它用來監聽各種感興趣的IO事件
Selector selector = Selector.open();
// 為每個端口打開一個監聽, 並把這些監聽注冊到selector中
for (int i = 0; i < ports.length; ++i) {
//2. 打開一個ServerSocketChannel
//其實我們沒監聽一個端口就需要一個channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//設置為非阻塞
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
ss.bind(address);//監聽一個端口
//3. 注冊到selector
//register的第一個參數永遠都是selector
//第二個參數是我們要監聽的事件
//OP_ACCEPT是新建立連接的事件
//也是適用於ServerSocketChannel的唯一事件類型
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Going to listen on " + ports[i]);
}
//4. 開始循環,我們已經注冊了一些IO興趣事件
while (true) {
//這個方法會阻塞,直到至少有一個已注冊的事件發生。當一個或者更多的事件發生時
// select() 方法將返回所發生的事件的數量。
int num = selector.select();
//返回發生了事件的 SelectionKey 對象的一個 集合
Set selectedKeys = selector.selectedKeys();
//我們通過迭代 SelectionKeys 並依次處理每個 SelectionKey 來處理事件
//對於每一個 SelectionKey,您必須確定發生的是什么 I/O 事件,以及這個事件影響哪些 I/O 對象。
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
//5. 監聽新連接。程序執行到這里,我們僅注冊了 ServerSocketChannel
//並且僅注冊它們“接收”事件。為確認這一點
//我們對 SelectionKey 調用 readyOps() 方法,並檢查發生了什么類型的事件
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
//6. 接收了一個新連接。因為我們知道這個服務器套接字上有一個傳入連接在等待
//所以可以安全地接受它;也就是說,不用擔心 accept() 操作會阻塞
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 7. 講新連接注冊到selector。將新連接的 SocketChannel 配置為非阻塞的
//而且由於接受這個連接的目的是為了讀取來自套接字的數據,所以我們還必須將 SocketChannel 注冊到 Selector上
SelectionKey newKey = sc.register(selector,SelectionKey.OP_READ);
it.remove();
System.out.println("Got connection from " + sc);
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
// Echo data
int bytesEchoed = 0;
while (true) {
echoBuffer.clear();
int r = sc.read(echoBuffer);
if (r <= 0) {
break;
}
echoBuffer.flip();
sc.write(echoBuffer);
bytesEchoed += r;
}
System.out.println("Echoed " + bytesEchoed + " from " + sc);
it.remove();
}
}
// System.out.println( "going to clear" );
// selectedKeys.clear();
// System.out.println( "cleared" );
}
}
static public void main(String args2[]) throws Exception {
String args[]={"9001","9002","9003"};
if (args.length <= 0) {
System.err.println("Usage: java MultiPortEcho port [port port ...]");
System.exit(1);
}
int ports[] = new int[args.length];
for (int i = 0; i < args.length; ++i) {
ports[i] = Integer.parseInt(args[i]);
}
new MultiPortEcho(ports);
}
}