深入理解NIO(一)—— NIO的簡單使用及其三大組件介紹


深入理解NIO(一)—— NIO的簡單使用及其三大組件介紹

深入理解NIO系列分為四個部分

  • 第一個部分也就是本節為NIO的簡單使用(我很少寫這種新手教程,所以如果你是復習還好,應該不難理解這篇,但如果你真的是入門而且不常閱讀這種文字教程可能會看不懂,我的鍋,別擔心,建議找點簡單的視頻教程什么的先看看)
  • 第二個部分為Tomcat中對NIO的應用(本篇雖然講Tomcat源碼,但是主要講其中NIO的部分,其他部分請移步)(如果對NIO簡單使用有把握的話可以直接先看這篇)
  • 第三個部分為NIO原理及部分源碼的解析
  • 第四個部分為剖析NIO的底層epoll的實現原理

 (老哥行行好,轉載和我說一聲好嗎,我不介意轉載的,但是請把原文鏈接貼大點好嗎)

 

從BIO到NIO

無論是BIO還是NIO,其實都算是一種IO模型,都是基於socket的編程,

而socket又分為兩種:文件型網絡型(OS的知識,Linux的進程通訊就是socket實現的)

文件型可以簡單說成是本機的通訊,也就是本地進程間的通訊(我們訪問localhost應該算一個)

網絡型的話就是Client-Server了,例如瀏覽器訪問其他服務器上的網頁這種。

聊天室屬於既可以在本機開兩個窗口聊天,也可以和互聯網上的其他主機進行聊天的那種。

所以接下來我們講的無論是BIO還是NIO,都可以當做一個聊天室這樣子去理解會簡單些。

 

BIO模型

首先我們先看一下BIO的網絡模型

可以看到,BIO屬於來一個新的連接,我們就新開一個線程來處理這個連接,之后的操作全部由那個線程來完成的那種。

那么,這個模式下的性能瓶頸在哪里呢?

  • 首先,每次來一個連接都開一個新的線程這肯定是不合適的。當活躍連接數在幾十幾百的時候當然是可以這樣做的,但如果活躍連接數是幾萬幾十萬的時候,這么多線程明顯就不行了。每個線程都需要一部分內存,內存會被迅速消耗,同時,線程切換的開銷非常大。
  • 其次,假如一個用戶只是登錄了聊天室,之后便不再做任何操作,而這個線程卻一直在那里循環等待用戶發送消息,等待write(),這顯然是非常耗費資源的。

因此人們便提出了NIO

 

NIO模型

 

非阻塞 IO 的核心在於使用一個 Selector 來管理多個通道,可以是 SocketChannel,也可以是 ServerSocketChannel,將各個通道注冊到 Selector 上,指定監聽的事件。

之后可以只用一個線程來輪詢這個 Selector,看看上面是否有通道是准備好的,當通道准備好可讀或可寫,然后才去開始真正的讀寫,這樣速度就很快了。我們就完全沒有必要給每個通道都起一個線程。

 


 

簡單例子介紹NIO的使用

這里只給出服務端的實現,代碼不難,建議貼到ide里面好好過一遍,也方便后續閱讀。

/**
 * NIO服務器端
 */
public class NioServer {

    private void start() throws IOException {
        // 1. 創建Selector
        Selector selector = Selector.open();

        // 2. 通過ServerSocketChannel創建channel通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 3. 為channel通道綁定監聽端口
        serverSocketChannel.bind(new InetSocketAddress(8000));

        // 4. 設置channel為非阻塞模式
        serverSocketChannel.configureBlocking(false);

        // 5. 將channel注冊到selector上,監聽連接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服務器啟動成功!");

        // 6. 循環等待新接入的連接
        for (;;) {
            // 獲取可用channel數量
            int readyChannels = selector.select();

            if (readyChannels == 0) continue;

            // 獲取可用channel的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            Iterator iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                // selectionKey實例
                SelectionKey selectionKey = (SelectionKey) iterator.next();

                iterator.remove();

                // 如果是 接入事件
                if (selectionKey.isAcceptable()) {
                    acceptHandler(serverSocketChannel, selector);
                }

                // 如果是 可讀事件
                if (selectionKey.isReadable()) {
                    readHandler(selectionKey, selector);
                }
            }
        }
    }

    /**
     * 接入事件處理器
     */
    private void acceptHandler(ServerSocketChannel serverSocketChannel,
                               Selector selector)
            throws IOException {
        // 如果要是接入事件,創建socketChannel
        SocketChannel socketChannel = serverSocketChannel.accept();

        // 將socketChannel設置為非阻塞工作模式
        socketChannel.configureBlocking(false);

        // 將channel注冊到selector上,監聽 可讀事件
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 回復客戶端提示信息
        socketChannel.write(Charset.forName("UTF-8")
                .encode("你與聊天室里其他人都不是朋友關系,請注意隱私安全"));
    }

    /**
     * 可讀事件處理器
     */
    private void readHandler(SelectionKey selectionKey, Selector selector)
            throws IOException {
        // 要從 selectionKey 中獲取到已經就緒的channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

        // 創建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        // 循環讀取客戶端請求信息
        String request = "";
        while (socketChannel.read(byteBuffer) > 0) {

            // 切換buffer為讀模式
            byteBuffer.flip();

            // 讀取buffer中的內容
            request += Charset.forName("UTF-8").decode(byteBuffer);
        }

        // 將channel再次注冊到selector上,監聽他的可讀事件
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 將客戶端發送的請求信息 廣播給其他客戶端
        if (request.length() > 0) {
            // 廣播給其他客戶端
            broadCast(selector, socketChannel, request);
        }
    }

    /**
     * 廣播給其他客戶端
     */
    private void broadCast(Selector selector,
                           SocketChannel sourceChannel, String request) {
        // 獲取到所有已接入的客戶端channel
        Set<SelectionKey> selectionKeySet = selector.keys();

        // 循環向所有channel廣播信息
        selectionKeySet.forEach(selectionKey -> {
            Channel targetChannel = selectionKey.channel();

            // 剔除發消息的客戶端
            if (targetChannel instanceof SocketChannel
                    && targetChannel != sourceChannel) {
                try {
                    // 將信息發送到targetChannel客戶端
                    ((SocketChannel) targetChannel).write(
                            Charset.forName("UTF-8").encode(request));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

和上面的代碼一模一樣,但是這個有行號,方便閱讀:

  1 /**
  2  * NIO服務器端
  3  */
  4 public class NioServer {
  5     
  6     private void start() throws IOException {
  7         // 1. 創建Selector
  8         Selector selector = Selector.open();
  9 
 10         // 2. 通過ServerSocketChannel創建channel通道
 11         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 12 
 13         // 3. 為channel通道綁定監聽端口
 14         serverSocketChannel.bind(new InetSocketAddress(8000));
 15 
 16         // 4. 設置channel為非阻塞模式
 17         serverSocketChannel.configureBlocking(false);
 18 
 19         // 5. 將channel注冊到selector上,監聽連接事件
 20         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 21         System.out.println("服務器啟動成功!");
 22 
 23         // 6. 循環等待新接入的連接
 24         for (;;) {
 25             // 獲取可用channel數量
 26             int readyChannels = selector.select();
 27 
 28             if (readyChannels == 0) continue;
 29             
 30             // 獲取可用channel的集合
 31             Set<SelectionKey> selectionKeys = selector.selectedKeys();
 32 
 33             Iterator iterator = selectionKeys.iterator();
 34 
 35             while (iterator.hasNext()) {
 36                 // selectionKey實例
 37                 SelectionKey selectionKey = (SelectionKey) iterator.next();
 38                 
 39                 iterator.remove();
 40                 
 41                 // 如果是 接入事件
 42                 if (selectionKey.isAcceptable()) {
 43                     acceptHandler(serverSocketChannel, selector);
 44                 }
 45 
 46                 // 如果是 可讀事件
 47                 if (selectionKey.isReadable()) {
 48                     readHandler(selectionKey, selector);
 49                 }
 50             }
 51         }
 52     }
 53 
 54     /**
 55      * 接入事件處理器
 56      */
 57     private void acceptHandler(ServerSocketChannel serverSocketChannel,
 58                                Selector selector)
 59             throws IOException {
 60         // 如果要是接入事件,創建socketChannel
 61         SocketChannel socketChannel = serverSocketChannel.accept();
 62 
 63         // 將socketChannel設置為非阻塞工作模式
 64         socketChannel.configureBlocking(false);
 65 
 66         // 將channel注冊到selector上,監聽 可讀事件
 67         socketChannel.register(selector, SelectionKey.OP_READ);
 68 
 69         // 回復客戶端提示信息
 70         socketChannel.write(Charset.forName("UTF-8")
 71                 .encode("你與聊天室里其他人都不是朋友關系,請注意隱私安全"));
 72     }
 73 
 74     /**
 75      * 可讀事件處理器
 76      */
 77     private void readHandler(SelectionKey selectionKey, Selector selector)
 78             throws IOException {
 79         // 要從 selectionKey 中獲取到已經就緒的channel
 80         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
 81 
 82         // 創建buffer
 83         ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
 84 
 85         // 循環讀取客戶端請求信息
 86         String request = "";
 87         while (socketChannel.read(byteBuffer) > 0) {
 88             
 89             // 切換buffer為讀模式
 90             byteBuffer.flip();
 91 
 92             // 讀取buffer中的內容
 93             request += Charset.forName("UTF-8").decode(byteBuffer);
 94         }
 95 
 96         // 將channel再次注冊到selector上,監聽他的可讀事件
 97         socketChannel.register(selector, SelectionKey.OP_READ);
 98 
 99         // 將客戶端發送的請求信息 廣播給其他客戶端
100         if (request.length() > 0) {
101             // 廣播給其他客戶端
102             broadCast(selector, socketChannel, request);
103         }
104     }
105 
106     /**
107      * 廣播給其他客戶端
108      */
109     private void broadCast(Selector selector,
110                            SocketChannel sourceChannel, String request) {
111         // 獲取到所有已接入的客戶端channel
112         Set<SelectionKey> selectionKeySet = selector.keys();
113 
114         // 循環向所有channel廣播信息
115         selectionKeySet.forEach(selectionKey -> {
116             Channel targetChannel = selectionKey.channel();
117 
118             // 剔除發消息的客戶端
119             if (targetChannel instanceof SocketChannel
120                     && targetChannel != sourceChannel) {
121                 try {
122                     // 將信息發送到targetChannel客戶端
123                     ((SocketChannel) targetChannel).write(
124                             Charset.forName("UTF-8").encode(request));
125                 } catch (IOException e) {
126                     e.printStackTrace();
127                 }
128             }
129         });
130     }
131 }

 


 

NIO的三大組件

 

 通過1.2的NIO部分的那張圖和2.0的代碼,你應該大致知道NIO的其中兩大組件:SelectorChannel

 

這里這張圖隨手也把第三大組件Buffer也給了,接下來我們就先來聊一下這個Buffer

 


 

Buffer組件

首先看一眼Buffer種類(大同小異,大同小異)

 接下來講一下它的參數:

 

  • capacity,它代表這個緩沖區的容量,一旦設定就不可以更改。比如 capacity 為 1024 的 IntBuffer,代表其一次可以存放 1024 個 int 類型的值。一旦 Buffer 的容量達到 capacity,需要清空 Buffer,才能重新寫入值。
  • position 的初始值是 0,每往 Buffer 中寫入一個值,position 就自動加 1,代表下一次的寫入位置。讀操作的時候也是類似的,每讀一個值,position 就自動加 1。
  • 從寫操作模式到讀操作模式切換的時候(flip),position 都會歸零,這樣就可以從頭開始讀寫了。
  • Limit:寫操作模式下,limit 代表的是最大能寫入的數據,這個時候 limit 等於 capacity。寫結束后,切換到讀模式,此時的 limit 等於 Buffer 中實際的數據大小,因為 Buffer 不一定被寫滿了。

 

看一下剛剛例子中對Buffer的使用(82~94行):

// 創建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 循環讀取客戶端請求信息
String request = "";
while (socketChannel.read(byteBuffer) > 0) {

    // 切換buffer為讀模式
    byteBuffer.flip();

    // 讀取buffer中的內容
    request += Charset.forName("UTF-8").decode(byteBuffer);
}

其中的flip方法,其實也就是設置了一下 position 和 limit 值罷了。

public final Buffer flip() {
    limit = position; // 將 limit 設置為實際寫入的數據數量
    position = 0; // 重置 position 為 0
    mark = -1; // mark 之后再說
    return this;
}

其他的read和write方法也不過是對三個參數的操作和讀取寫入buffer數組的綜合而已,這里就不一一分析(大同小異,大同小異)

其它的方法我也就不介紹了,要用的時候自己去查api就是了。

 

Channel組件

 

  • FileChannel:文件通道,用於文件的讀和寫
  • DatagramChannel:用於 UDP 連接的接收和發送
  • SocketChannel:把它理解為 TCP 連接通道,簡單理解就是 TCP 客戶端
  • ServerSocketChannel:TCP 對應的服務端,用於監聽某個端口進來的請求

 

Channel 經常翻譯為通道,類似 IO 中的流,用於讀取和寫入。它與前面介紹的 Buffer 打交道,讀操作的時候將 Channel 中的數據填充到 Buffer 中,而寫操作時將 Buffer 中的數據寫入到 Channel 中。 

 

這里是例子中對ServerSocketChannel的應用(10~17行)

// 2. 通過ServerSocketChannel創建channel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 3. 為channel通道綁定監聽端口
serverSocketChannel.bind(new InetSocketAddress(8000));

// 4. 設置channel為非阻塞模式
serverSocketChannel.configureBlocking(false);

還有就是對SocketChannel的應用(60~64行)

// 如果要是接入事件,創建socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();

// 將socketChannel設置為非阻塞工作模式
socketChannel.configureBlocking(false);

到這里,我們應該能理解 SocketChannel 了,它不僅僅是 TCP 客戶端,它代表的是一個網絡通道,可讀可寫。

而ServerSocketChannel 不和 Buffer 打交道了,因為它並不實際處理數據,它一旦接收到請求后,實例化 SocketChannel,之后在這個連接通道上的數據傳遞它就不管了,因為它需要繼續監聽端口,等待下一個連接。

 

Selector組件

那么,整出Channel后該怎么辦呢?當然是把它注冊到Selector上了。

我們先整一個Selector出來(7~8行):

// 1. 創建Selector
Selector selector = Selector.open();

然后把ServerSocketChannel注冊上去(16~21行):

// 4. 設置channel為非阻塞模式
serverSocketChannel.configureBlocking(false);

// 5. 將channel注冊到selector上,監聽連接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務器啟動成功!");

這里可以看到注冊的另一個參數  SelectionKey.OP_ACCEPT :

register 方法的第二個 int 型參數(使用二進制的標記位)用於表明需要監聽哪些感興趣的事件,共以下四種事件:

  •  SelectionKey.OP_READ   對應 00000001,通道中有數據可以進行讀取

  •  SelectionKey.OP_WRITE   對應 00000100,可以往通道中寫入數據

  •  SelectionKey.OP_CONNECT   對應 00001000,成功建立 TCP 連接

  •  SelectionKey.OP_ACCEPT   對應 00010000,接受 TCP 連接

 

 SocketChannel 同理:

// 如果要是接入事件,創建socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();

// 將socketChannel設置為非阻塞工作模式
socketChannel.configureBlocking(false);

// 將channel注冊到selector上,監聽 可讀事件
socketChannel.register(selector, SelectionKey.OP_READ);

接下來就是循環檢測selector中有沒有准備好的channel了(23~31行):

// 6. 循環等待新接入的連接
for (;;) {
    // 獲取可用channel數量
    int readyChannels = selector.select();

    if (readyChannels == 0) continue;

    // 獲取可用channel的集合
    Set<SelectionKey> selectionKeys = selector.selectedKeys();

這里只提一下select()方法

調用此方法,會將上次 select 之后的准備好的 channel 對應的 SelectionKey 復制到 selected set 中。如果沒有任何通道准備好,這個方法會阻塞,直到至少有一個通道准備好。

 下一篇:深入理解NIO(二)——  Tomcat中對NIO的應用

 


 

參考資料:

https://javadoop.com/post/java-nio  參考組件部分

https://www.imooc.com/learn/1118  參考圖片部分

http://www.mamicode.com/info-detail-2461800.html  參考圖片部分

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM