Java NIO 選擇器 Selector


選擇器 Selector 是 I/O 多路復用模型的核心組件,它可以監控實現了 SelectableChannel 接口的通道的就緒情況。基於多路復用(multiplexing) I/O 模型,單線程的 Java 程序能夠處理數萬個連接,極大提高了系統的並發數。

1. 多路復用 I/O 模型

I/O 多路復用模型是操作系統提供給應用程序的一種進行 I/O 操作的模型。應用程序通過 select/poll 系統調用監控多個 I/O 設備,一旦某個或者多個 I/O 設備的處於就緒狀態(例如:可讀)則返回,應用程序隨后可對就緒的設備進行操作。

應用程序 內核 select 數據未准備好 阻塞等待 等待數據 數據已准備好 復制完成 復制數據到 用戶空間 read 處理數據 阻塞等待 系統調用 返回可讀 系統調用 返回 OK

大致流程如下:

1)應用程序向內核發起 select 系統調用,該調用會阻塞應用程序。

2)內核等待數據到達。數據可能由 DMA 復制到內核緩沖區,也有可能是 CPU 進行復制。

3)數據准備完畢,select 調用返回。select 返回的是一個集和,可能有多個連接都已經就緒。

4)應用程序發起 read 系統調用。

5)操作系統將數據有內核緩沖區復制到用戶緩沖區。

6)read 調用返回。

I/O 多路復用模型本質上是一種阻塞 I/O,進行讀操作的 read 系統調用是阻塞的,select 的時候也是阻塞的。不過 I/O 多路復用模型的優勢在於阻塞時可以等待多路 I/O 就緒,然后一並處理。與多線程處理多路 I/O 相比,它是單線程的,沒有線程切換的開銷,單位時間內能夠處理多的連接數。

2. 選擇器與通道關系

在 Java 中,通道 Channel 可以表示 I/O 連接,而選擇器可以監控某些 I/O 事件就緒的通道,選擇通道中就緒的 I/O 事件。這里的通道必須是實現了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 沒有實現該接口,所以不支持選擇器。

Selector 選擇器 SelectableChannel 可選擇通道 SelectableChannel 可選擇通道 SelectableChannel 可選擇通道 SelectableChannel 可選擇通道

3. 選擇鍵 SelectionKey

選擇器 Selector 監控通道時監控的是通道中的事件,選擇鍵 SelectionKey 就代表着 I/O 事件。程序通過調用 Selector.select() 方法來選中選擇器所監控的通道中的就緒的 I/O 事件的集合,然后遍歷集合,對事件作出相應的處理。

選擇鍵 SelectionKey 可以表示 4 種事件,這 4 種事件使用 int 類型的常量來表示。

1)SelectionKey.OP_ACCEPT 表示 accept 事件就緒。例如:對於 ServerSocketChannel 來說,該事件就緒表示可以調用 accept() 方法來獲得與客戶端連接的通道 SocketChannel。

2)SelectionKey.OP_CONNECT 表示客戶端與服務端連接成功。

3)SelectionKey.OP_READ 表示通道中已經有了可讀數據,可以調用 read() 方法從通道中讀取數據。

4)SelectionKey.OP_WRITE 表示寫事件就緒,可以調用 write() 方法往通道中寫入數據。

不同的通道所能夠支持的 I/O 事件不同,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,可以查看通道的 javadoc 文檔,或者調用通道的 validOps() 方法來進行判斷。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持讀事件。

4. 選擇器使用步驟

4.1 獲取選擇器

與通道和緩沖區的獲取類似,選擇器的獲取也是通過靜態工廠方法 open() 來得到的。

Selector selector = Selector.open();    // 獲取一個選擇器實例

4.2 獲取可選擇通道

能夠被選擇器監控的通道必須實現了 SelectableChannel 接口,並且需要將通道配置成非阻塞模式,否則后續的注冊步驟會拋出 IllegalBlockingModeException。

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打開 SocketChannel 並連接到本機 9090 端口
socketChannel.configureBlocking(false); // 配置通道為非阻塞模式

4.3 將通道注冊到選擇器

通道在被指定的選擇器監控之前,應該先告訴選擇器,並且告知監控的事件,即:將通道注冊到選擇器。

通道的注冊通過 SelectableChannel.register(Selector selector, int ops) 來完成,ops 表示關注的事件,如果需要關注該通道的多個 I/O 事件,可以傳入這些事件類型或運算之后的結果。這些事件必須是通道所支持的,否則拋出 IllegalArgumentException。

socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);   // 將套接字通過到注冊到選擇器,關注 read 和 write 事件

4.4 輪詢 select 就緒事件

通過調用選擇器的 Selector.select() 方法可以獲取就緒事件,該方法會將就緒事件放到一個 SelectionKey 集合中,然后返回就緒的事件的個數。這個方法映射多路復用 I/O 模型中的 select 系統調用,它是一個阻塞方法。正常情況下,直到至少有一個就緒事件,或者其它線程調用了當前 Selector 對象的 wakeup() 方法,或者當前線程被中斷時返回。

while (selector.select() > 0){ // 輪詢,且返回時有就緒事件
    Set<SelectionKey> keys = selector.selectedKeys(); // 獲取就緒事件集合
    .......
}

有 3 種方式可以 select 就緒事件:

1)select() 阻塞方法,有一個就緒事件,或者其它線程調用了 wakeup() 或者當前線程被中斷時返回。

2)select(long timeout) 阻塞方法,有一個就緒事件,或者其它線程調用了 wakeup(),或者當前線程被中斷,或者阻塞時長達到了 timeout 時返回。不拋出超時異常。

3)selectNode() 不阻塞,如果無就緒事件,則返回 0;如果有就緒事件,則將就緒事件放到一個集合,返回就緒事件的數量。

4.5 處理就緒事件

每次可以 select 出一批就緒的事件,所以需要對這些事件進行迭代。從一個 SelectionKey 對象可以得到:1)就緒事件的對應的通道;2)就緒的事件。通過這些信息,就可以很方便地進行 I/O 操作。

for(SelectionKey key : keys){
    if(key.isWritable()){ // 可寫事件
        if("Bye".equals( (line = scanner.nextLine()) )){
            socketChannel.shutdownOutput();
            socketChannel.close();
            break;
        }
        buf.put(line.getBytes());
        buf.flip();
        socketChannel.write(buf);
        buf.compact();
    }
}
keys.clear(); // 清除選擇鍵(事件)集,避免下次循環的時候重復處理。

需要注意的是,處理完 I/O 事件之后,需要清除選擇鍵集合,避免下一輪循環的時候對同一事件重復處理。

5. 完整示例

下面給出一個完整的實例,實例中包含 TCP 客戶端 TcpClient, UDP 客戶端 UdpClient 和服務端 EchoServer。服務端 EchoServer 可以同時處理 UDP 請求和 TCP 請求,用戶可以在客戶端控制台輸入內容,按回車發送給服務端,服務端打印客戶端發送過來的內容。完整代碼:https://github.com/Robothy/java-experiments/tree/main/nio/Selector

5.1 服務端

public class EchoServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();    // 獲取選擇器

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開服務器通道
        serverSocketChannel.configureBlocking(false);                         // 服務器通道配置為非阻塞模式
        serverSocketChannel.bind(new InetSocketAddress(9090));           // 綁定 TCP 端口 9090
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);       // 將服務器通道注冊到選擇器 selector 中,注冊事件為 ACCEPT

        DatagramChannel datagramChannel = DatagramChannel.open();             // 打開套接字通道
        datagramChannel.configureBlocking(false);                             // 配置通道為非阻塞模式
        datagramChannel.bind(new InetSocketAddress(9090));               // 綁定 UDP 端口 9090
        datagramChannel.register(selector, SelectionKey.OP_READ);             // 將通道注冊到選擇器 selector 中,注冊事件為讀取數據

        ByteBuffer buf = ByteBuffer.allocate(1024);                           // 分配一個 1024 字節的堆字節緩沖區

        while (selector.select() > 0){                                        // 輪詢已經就緒的注冊的通道的 I/O 事件
            Set<SelectionKey> keys = selector.selectedKeys();                 // 獲取就緒的 I/O 事件,即選擇器鍵集合
            for (SelectionKey key : keys){                                    // 遍歷選擇鍵,處理就緒事件
                if(key.isAcceptable()){                                       // 選擇鍵的事件的是 I/O 連接事件
                    SocketChannel socketChannel = serverSocketChannel.accept(); // 執行 I/O 操作,獲取套接字連接通道
                    socketChannel.configureBlocking(false);                   // 配置為套接字通道為非阻塞模式
                    socketChannel.register(selector, SelectionKey.OP_READ);   // 將套接字通過到注冊到選擇器,關注 READ 事件
                }else if(key.isReadable()){                        // 選擇鍵的事件是 READ
                    StringBuilder sb = new StringBuilder();
                    if(key.channel() instanceof DatagramChannel){  // 選擇的通道為數據報通道,客戶端是通過 UDP 連接過來的
                        sb.append("UDP Client: ");
                        datagramChannel.receive(buf);              // 最多讀取 1024 字節,數據報多出的部分自動丟棄
                        buf.flip();
                        while(buf.position() < buf.limit()) {
                            sb.append((char)buf.get());
                        }
                        buf.clear();
                    }else{                                          // 選擇的通道為套接字通道,客戶端時通過 TCP 連接過來的
                        sb.append("TCP Client: ");
                        ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 獲取通道
                        int size;
                        while ( (size = channel.read(buf))>0){
                            buf.flip();
                            while (buf.position() < buf.limit()) {
                                sb.append((char)buf.get());
                            }
                            buf.clear();
                        }

                        if (size == -1) {
                            sb.append("Exit");
                            channel.close();
                        }
                    }
                    System.out.println(sb);
                }
            }
            keys.clear();  // 將選擇鍵清空,防止下次循環時被重復處理
        }
    }
}

5.2 TCP 客戶端

public class TcpClient {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_WRITE);

        Scanner scanner = new Scanner(System.in);
        String line;
        ByteBuffer buf = ByteBuffer.allocate(1024);

        while (selector.select() > 0){
            Set<SelectionKey> keys = selector.selectedKeys();
            for(SelectionKey key : keys){
                if(key.isWritable()){
                    if("Bye".equals( (line = scanner.nextLine()) )){
                        socketChannel.shutdownOutput();
                        socketChannel.close();
                        break;
                    }
                    buf.put(line.getBytes());
                    buf.flip();
                    socketChannel.write(buf);
                    buf.compact();
                }
            }
            keys.clear();
            if(!socketChannel.isOpen()) break;
        }
    }
}

5.3 UDP 客戶端

public class UdpClient {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();                        // 獲取選擇器
        DatagramChannel datagramChannel = DatagramChannel.open();   // 打開一個數據報通道
        datagramChannel.configureBlocking(false);                   // 配置通道為非阻塞模式
        datagramChannel.register(selector, SelectionKey.OP_WRITE);  // 將通道的寫事件注冊到選擇器
        ByteBuffer buff = ByteBuffer.allocate(1024);                // 分配字節緩沖區
        Scanner scanner = new Scanner(System.in);                   // 創建掃描器,掃描控制台輸入流
        InetSocketAddress server = new InetSocketAddress("localhost", 9090);
        while (selector.select() > 0){                              // 有就緒事件
            Set<SelectionKey> keys = selector.selectedKeys();       // 獲取選擇鍵,即就緒的事件
            for(SelectionKey key : keys){                           // 遍歷選擇鍵
                if(key.isWritable()){                               // 如果當前選擇鍵是讀就緒
                    String line;
                    if("Bye".equals( line = scanner.nextLine() )) { // 從控制台獲取 1 行輸入,並檢查輸入的是不是 Bye
                        System.exit(0);                 // 正常退出
                    }
                    buff.put(line.getBytes());          // 放入緩沖區
                    buff.flip();                        // 將緩沖區置為讀狀態
                    datagramChannel.send(buff, server); // 往 I/O 寫數據
                    buff.compact();                     // 壓縮緩沖區,保留沒發送完的數據
                }
            }
            keys.clear();
        }
    }
}

6. 小結

Selector 作為多路復用 I/O 模型的核心組件,能夠同時監控多路 I/O 通道。選擇器在 select 就緒事件地時候會阻塞,在處理 I/O 事件的時候也會阻塞,它的優勢在於在阻塞的時候可以等待多路 I/O 就緒,是一種異步阻塞 I/O 模型。與多線程處理多路 I/O 相比,多路復用模型只需要單個線程即可處理萬級連接,沒有線程切換的開銷。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM