這次我們開講非阻塞I/O中的Selector,它需要配合非阻塞的TCP和UDP來使用。首先我們先簡單講一下TCP和UDP的非阻塞通道。
非阻塞I/O通道
在上代碼前我們先講解一些最基本的知識。TCP和UDP共對應着三種通道,分別是:SocketChannel、ServerSocketChannel、DatagramChannel 。它們都可以通過channel.open()方法來初始化;同時對於SocketChannel來說,當一個新連接到達ServerSocketChannel時,也會被創建(在代碼中會有說明)。而且它們使用結束后都需要被關閉。
首先讓我們來看看SocketChannel的基本操作
//通過open()打開SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//綁定主機端口
socketChannel.connect(new InetSocketAddress("127.0.0.1", 18888));
//設置成非阻塞模式
socketChannel.configureBlocking(false);
while(! socketChannel.finishConnect() ){
//做點其他事
}
// 利用SocketChannel進行數據操作
下面再來說說,如何用SocketChannel進行數據操作。它的數據讀寫和其他通道的讀寫方式是完全一致的,只是要注意的是,在非阻塞模式下,read()和write()沒有進行任何操作就返回了,所以要在循環中調用,並注意返回值。
ByteBuffer buf = ByteBuffer.allocate(48);
while(socketChannel.read(buf)!=-1) {
buf.flip();
while(buf.hasRemaining()) {
socketChannel.write(buf);
}
buf.clear();
}
SocketChannel相當於傳統I/O中的Socket,而ServerSocketChannel相當於ServerSocket;而且整體形式都是一致的,都是利用多路復用思想,在服務器端收到連接后,產生一個專門的Socket,與客戶端進行數據傳輸。具體形式就是"serverSocketChannel.accept()"在收到連接后,會返回一個SocketChannel,具體形式見代碼
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//綁定主機端口
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while (true) {
//accept()在非阻塞模式中,若建立連接,則返回SocketChannel;否則返回null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// 利用SocketChannel進行數據操作
}
}
而DatagramChannel則是跟DatagramPacket十分相似的,只不過數據包由當初的byte數組換成了現在的ByteBuffer
DatagramChannel channel = DatagramChannel.open();
//綁定主機端口
channel.socket().bind(new InetSocketAddress(9999));
channel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(48);
/*
* 1.因為UDP是無連接的網絡協議,所以不能像TCP那樣讀取和寫入,它是發送和接收數據包。
* 2.receive()在非阻塞模式中,若沒有收到數據包,則返回null;
* 若收到了,則將內容寫入byteBuffer,將發送方的SocketAddress返回(其中包含IP和端口)
* 3.如果Buffer容不下收到的數據,多出的數據將被丟棄
*/
while(channel.receive(buf)==null){
//做點其他事
}
buf.flip();
//指定接收方的SocketAddress
channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));
DatagramChannel還有一個特殊的地方,就是它可以“連接”到網絡中的特定地址的,十分類似於一個TCP連接。但由於UDP是無連接的,連接到特定地址並不會像TCP通道那樣創建一個真正的連接。而是鎖住DatagramChannel ,讓其只能從特定地址收發數據。想實現這種功能,編寫方式和TCP十分類似,就不寫了,去看文檔吧,講解的十分清楚。
Selector
現在開始進入我們今天的主題Selector
其實前言中已經簡單的講解過什么是Selector以及為什么要使用Selector了。這里就不再重復了(我猜你已經忘了,回去再看一眼吧),咱們還是從最基礎的創建開講。
Selector的創建是通過調用Selector.open()方法完成的(這部分都是用open()創建的)
Selector注冊
說完創建,就得說說如何讓Channel和Selector配合使用了?一句話:“將channel注冊到selector上”這個動作是通過SelectionKey channel.register(Selector sel,int ops,Object att)方法完成的。
這里要強調一點,就是調用register的channel必須是非阻塞的。這就將FileChannel排除在外(充話費送的就是不行)。
現在講解register()中每一個參數的含義。第一個參數,就是要將channel注冊到哪個Selector。第二個參數,它是一個“interest集合”,意思是在通過Selector監聽Channel時對什么事件感興趣,可以監聽四種不同類型的事件,分別是Connect、Accept、Read和Write;它們四個分別代表的含義是:
- Connect(SelectionKey.OP_CONNECT):一個channel成功連接到另一個服務器——“連接就緒”
- Accept(SelectionKey.OP_ACCEPT):一個ServerSocketchannel准備好接收新進入的連接——“接收就緒”
- Read(SelectionKey.OP_READ):一個通道的可讀數據已准備好——“讀就緒”
- Write(SelectionKey.OP_WRITE):一個通道的可寫數據已准備好——“寫就緒”
P.S:圓括號中的是要填在第二個參數ops位置上的int常量。我們把這四種叫做“感興趣事件”,后文會多次提到這個概念
如果你對不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來,如下:
int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
register()方法的第三個參數為附加對象,它可有可無,是一個Object對象,它可以作為每個通道的標識符,用以區別注冊在同一個Selector上的其他通道;也可以附加其他對象。
最后再來看看register()方法的返回值。返回值為SelectionKey對象,這是一個重要的對象,接下來我們就主要講解SelectionKey。
SelectionKey
當Selector發現某些channel中的感興趣事件發生了,就會返回相對應channel的SelectionKey對象。
SelectionKey對象包含着許多信息。比如所屬通道的channel對象,通過selectionKey.channel()方法就可以得到;還有通道的附加對象,通過selectionKey.attachment()方法就可以得到;還可以得到通道那個感興趣時間發生了通過下面四種方法獲得:
- boolean selectionKey.isAcceptable()
- boolean selectionKey.isConnectable()
- boolean selectionKey.isReadable()
- boolean selectionKey.isWritable()
還可以獲得更多信息,具體內容可以去看文檔。
Selector.select()
之前的創建、注冊等准備都完成之后,就可以坐等准備好的數據到來了。這時候需要知道有多少個通道感興趣事件已經准備好了。這時候有下面三個方法幫你完成這項任務,分別是
- int selector.select()
- int selector.select(long timeout)
- int selector.selectNow()
首先講一下這三個方法准確的作用,它們都是返回有多少個通道已經變成就緒狀態。它們的區別是:
- select()是阻塞的,它會一直等到有通道准備就緒、
- select(long timeout)也是阻塞的,它會一直等到有通道准備就緒或者已經超出給定的timeout時間並返回0。
- selectNow()是非阻塞的,如果沒有通道就緒就直接返回0。
Selector.selectedKeys()
通過select()方法知道有若干個通道准備就緒,就可以調用下面的方法來返回相應若干個通道的selectedKey了
Set<SelectionKey> selectedKeys = selector.selectedKeys()
獲得selectedKeys后,你就可以進行相應的處理了。需要強調的是,每次處理完一個selectionKey之后需要將它在Set中刪除,這樣下次它准備好以后就可以再次添加到Set中來。
現在關於Selector的知識基本上就講解完了,讓我們在一個服務器端、客戶端收發字符串的例子中結束本次的講解吧。
客戶端
public class HansClient {
// 定義檢測SocketChannel的Selector對象
private Selector selector = null;
// 客戶端SocketChannel
private SocketChannel sc = null;
public void init() throws IOException {
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
// 調用open靜態方法創建連接到指定主機的SocketChannel
sc = SocketChannel.open(isa);
// 設置該sc以非阻塞方式工作
sc.configureBlocking(false);
// 將SocketChannel對象注冊到指定Selector
sc.register(selector, SelectionKey.OP_READ);
// 啟動讀取服務器端數據的線程
new ClientThread().start();
// 創建鍵盤輸入流
Scanner scan = new Scanner(System.in);
while (scan.hasNextLine()) {
// 讀取鍵盤輸入
String line = scan.nextLine();
// 將鍵盤輸入的內容輸出到SocketChannel中
sc.write(StandardCharsets.UTF_8.encode(line));
}
}
// 定義讀取服務器數據的線程
private class ClientThread extends Thread {
public void run() {
try {
while (selector.select() > 0) {
// 遍歷每個有可用IO操作Channel對應的SelectionKey
for (SelectionKey sk : selector.selectedKeys()) {
// 刪除正在處理的SelectionKey
selector.selectedKeys().remove(sk);
// 如果該SelectionKey對應的Channel中有可讀的數據
if (sk.isReadable()) {
// 使用NIO讀取Channel中的數據
SocketChannel sc = (SocketChannel) sk.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
while (sc.read(buff) > 0) {
sc.read(buff);
buff.flip();
content += StandardCharsets.UTF_8.decode(buff);
}
// 打印輸出讀取的內容
System.out.println("聊天信息:" + content);
}
}
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new HansClient().init();
}
}
服務器端
public class HansServer {
// 用於檢測所有Channel狀態的Selector
private Selector selector = null;
public void init() throws IOException {
selector = Selector.open();
// 通過open方法來打開一個未綁定的ServerSocketChannel實例
ServerSocketChannel server = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
// 將該ServerSocketChannel綁定到指定IP地址
server.socket().bind(isa);
// 設置ServerSocket以非阻塞方式工作
server.configureBlocking(false);
// 將server注冊到指定Selector對象
server.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
// 依次處理selector上的每個已選擇的SelectionKey
for (SelectionKey sk : selector.selectedKeys()) {
// 從selector上的已選擇Key集中刪除正在處理的SelectionKey
selector.selectedKeys().remove(sk);
// 如果sk對應的通道包含客戶端的連接請求
if (sk.isAcceptable()) {
// 調用accept方法接受連接,產生服務器端對應的SocketChannel
SocketChannel sc = server.accept();
// 設置采用非阻塞模式
sc.configureBlocking(false);
// 將該SocketChannel也注冊到selector
sc.register(selector, SelectionKey.OP_READ);
}
// 如果sk對應的通道有數據需要讀取
if (sk.isReadable()) {
// 獲取該SelectionKey對應的Channel,該Channel中有可讀的數據
SocketChannel sc = (SocketChannel) sk.channel();
// 定義准備執行讀取數據的ByteBuffer
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
// 開始讀取數據
try {
while (sc.read(buff) > 0) {
buff.flip();
content += StandardCharsets.UTF_8.decode(buff);
}
// 打印從該sk對應的Channel里讀取到的數據
System.out.println("=====" + content);
}
// 如果捕捉到該sk對應的Channel出現了異常,即表明該Channel
// 對應的Client出現了問題,所以從Selector中取消sk的注冊
catch (IOException ex) {
// 從Selector中刪除指定的SelectionKey
sk.cancel();
if (sk.channel() != null) {
sk.channel().close();
}
}
// 如果content的長度大於0,即聊天信息不為空
if (content.length() > 0) {
// 遍歷該selector里注冊的所有SelectKey
for (SelectionKey key : selector.keys()) {
// 獲取該key對應的Channel
Channel targetChannel = key.channel();
// 如果該channel是SocketChannel對象
if (targetChannel instanceof SocketChannel) {
// 將讀到的內容寫入該Channel中
SocketChannel dest = (SocketChannel) targetChannel;
dest.write(StandardCharsets.UTF_8.encode(content));
}
}
}
}
}
}
}
public static void main(String[] args) throws IOException {
new HansServer().init();
}
}
本次講解就到這里了,本系列的講解也就到這里了。如果你能看到這里我真的很開心。有任何事都可以與我討論。
