Java IO學習筆記六:NIO到多路復用


作者:Grey

原文地址:Java IO學習筆記六:NIO到多路復用

雖然NIO性能上比BIO要好,參考:Java IO學習筆記五:BIO到NIO

但是NIO也有問題,NIO服務端的示例代碼中往往會包括如下代碼:

....
//遍歷已經鏈接進來的客戶端能不能讀寫數據
            for (SocketChannel c : clients) {  
                int num = c.read(buffer); 
                if (num > 0) {
                    buffer.flip();
                    byte[] aaa = new byte[buffer.limit()];
                    buffer.get(aaa);

                    String b = new String(aaa);
                    System.out.println(c.socket().getPort() + " : " + b);
                    buffer.clear();
                }
            }
...

即:遍歷所有的SocketChannel,獲取能讀寫數據的客戶端,當客戶端數量非常多的時候,服務端要輪詢所有連接的客戶端拿數據(recv調用),很多調用是無意義的,這樣會導致頻繁的用戶態切換成內核態,導致性能變差。

多路復用技術可以解決NIO的這個問題,多個IO通過一個系統調用獲得其中的IO狀態,然后由程序對有狀態的IO進行讀寫操作。在Linux系統中,多路復用的實現有:

  • 基於POSIX標准的SELECT
  • POLL (select只支持最大fd < 1024,如果單個進程的文件句柄數超過1024,select就不能用了。poll在接口上無限制)
  • EPOLL

其中SELECT和POLL類似,但是有一些區別,參考select和poll的區別

無論NIO,SELECT還是POLL,都是要遍歷所有IO,詢問狀態,只不過遍歷這件事到底是內核來做還是應用程序來做而已。

而epoll,可以看成是SELECT和POLL的增強,在調用select/poll時候,都需要把fd集合從用戶態拷貝到內核態,但是epoll調用epoll_ctl時拷貝進內核並保存,之后每次epoll_wait不做拷貝,而且epoll采用的是事件通知方式,每當fd就緒,系統注冊的回調函數就會被調用,將就緒fd放到rdllist里面。時間復雜度O(1)。

更多內容可以參考:

Java的Selector封裝了底層epoll和poll的API,可以通過指定如下參數來調用執行的內核調用, 在Linux平台,如果指定

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

則底層調用poll,

指定為:

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider

或者不指定,則底層調用epoll。

源碼參考:jdk8u-jdk

image

接下來,我們使用一套服務端代碼,在Linux服務器上運行,分別指定底層用epoll和poll,並用strace工具來追蹤其內核調用。

准備服務端代碼:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class SocketMultiplexingV1 {

    private Selector selector = null;
    int port = 9090;

    public void initServer() {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服務器啟動了。。。。。");
        try {
            while (true) {
                Set<SelectionKey> keys = selector.keys();
                System.out.println(keys.size() + "   size");
                while (selector.select() > 0) {
                    //返回的有狀態的fd集合
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客戶端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public static void main(String[] args) {
        SocketMultiplexingV1 service = new SocketMultiplexingV1();
        service.start();
    }
}

和服務端代碼在同一目錄下准備一個腳本SocketMultiplexingV1.sh

rm -rf ${1}*
/usr/local/jdk/bin/javac SocketMultiplexingV1.java
strace -ff -o $1 /usr/local/jdk/bin/java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.${1}SelectorProvider SocketMultiplexingV1

執行

./SocketMultiplexingV1 Poll

底層調用Poll

重新打開一個控制台,通過nc工具連接這個服務端

nc localhost 9090

服務端可以正常接收到連接

[root@io io]# ./SocketMultiplexingV1.sh Poll
服務器啟動了。。。。。
1   size
-------------------------------------------
新客戶端:/0:0:0:0:0:0:0:1:39724
-------------------------------------------

暫時先不要發送數據,此時,查看服務端的進程:

[root@io io]# jps
1712 Jps
1659 SocketMultiplexingV1

查看服務端目前關聯的文件描述符

[root@io io]# lsof -p 1659
...
java    1659 root    4u  IPv6  25831       0t0       TCP *:websm (LISTEN)
...
java    1659 root    7u  IPv6  22508       0t0       TCP localhost:websm->localhost:39724 (ESTABLISHED)

其中4u為服務端監聽的Socket文件描述符,7u為新連接進來的客戶端Socket文件描述符。

通過nc客戶端給服務端發送一些數據,客戶端也可以正常收到服務端返回的數據

[root@io io]# nc localhost 9090
sdfasdfasd
sdfasdfasd

接下來停掉服務端和客戶端, 查看追蹤日志

[root@io io]# ll
total 2444
-rwxr-xr-x. 1 root root     106 Jun 10 19:25 mysh.sh
-rw-r--r--. 1 root root    1714 Jun 12 16:35 OSFileIO.java
-rw-r--r--. 1 root root    9572 Jun 17 19:58 Poll.1659
-rw-r--r--. 1 root root  215792 Jun 17 19:58 Poll.1660
-rw-r--r--. 1 root root    1076 Jun 17 19:58 Poll.1661
-rw-r--r--. 1 root root     983 Jun 17 19:58 Poll.1662
-rw-r--r--. 1 root root     850 Jun 17 19:58 Poll.1663
-rw-r--r--. 1 root root     940 Jun 17 19:58 Poll.1664
-rw-r--r--. 1 root root     948 Jun 17 19:58 Poll.1665
-rw-r--r--. 1 root root     885 Jun 17 19:58 Poll.1666
-rw-r--r--. 1 root root     948 Jun 17 19:58 Poll.1667
-rw-r--r--. 1 root root    1080 Jun 17 19:58 Poll.1668
-rw-r--r--. 1 root root  124751 Jun 17 19:58 Poll.1669
-rw-r--r--. 1 root root    1245 Jun 17 19:58 Poll.1670
-rw-r--r--. 1 root root    1210 Jun 17 19:58 Poll.1671
-rw-r--r--. 1 root root    2416 Jun 17 19:58 Poll.1672
-rw-r--r--. 1 root root   27498 Jun 17 19:58 Poll.1673
-rw-r--r--. 1 root root   27326 Jun 17 19:58 Poll.1674
-rw-r--r--. 1 root root   27602 Jun 17 19:58 Poll.1675
-rw-r--r--. 1 root root   26866 Jun 17 19:58 Poll.1676
-rw-r--r--. 1 root root    1141 Jun 17 19:58 Poll.1677
-rw-r--r--. 1 root root 1953818 Jun 17 19:58 Poll.1678
-rw-r--r--. 1 root root    2204 Jun 17 19:58 Poll.1831
-rw-r--r--. 1 root root    3440 Jun 17 19:48 SocketMultiplexingV1.class
-rw-r--r--. 1 root root    3315 Jun 17 19:13 SocketMultiplexingV1.java
-rwxr-xr-x. 1 root root     199 Jun 17 19:19 SocketMultiplexingV1.sh

其中Poll.1678為主線程日志, 我們一一看下整個調用過程

...
2535 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
...
2793 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE        T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28)         = 0
2794 listen(4, 50)                           = 0
...

以上兩個調用對應了代碼中建立Socket並綁定9090端口進行監聽這個邏輯。

...
2772 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
...
2883 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{f        d=4, revents=POLLIN}])

以上調用對應了:

server.configureBlocking(false);

調用的poll方法表示一個新的文件描述符4uPOLLIN(POLLIN:There is data to read)的事件

...
2893 accept(4, {sa_family=AF_INET6, sin6_port=htons(39724), inet_pton(AF_        INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0},         [28]) = 7

...

2935 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=PO        LLIN}], 3, -1) = 1 ([{fd=7, revents=POLLIN}])

這里說明接收了一個新的Socket連接,就是我們剛才用lsof看到的7u這個文件描述符。,調用了poll方法,說明一個新的文件描述符7uPOLLIN(POLLIN:There is data to read)的事件。

我們的代碼中對於每次接收的客戶端,也會把客戶端設置為非阻塞,即:

client.configureBlocking(false);

對應的內核調用就是:

2926 fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0

以上就是poll調用對應內核函數的調用。

接下來切換成epoll模式,重新執行腳本

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服務器啟動了。。。。。
1   size

用nc連接服務端

nc localhost 9090

服務端響應正常

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服務器啟動了。。。。。
1   size
-------------------------------------------
新客戶端:/0:0:0:0:0:0:0:1:39726
-------------------------------------------

通過nc發送一些數據

[root@io io]# nc localhost 9090
asdfasd
asdfasd

也可以正常接收

接下來停掉服務端和客戶端,查看主線程調用情況

[root@io io]# ll -h EPoll.*
-rw-r--r--. 1 root root 9.4K Jun 17 20:33 EPoll.2067
-rw-r--r--. 1 root root 212K Jun 17 20:33 EPoll.2068
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2069
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2070
-rw-r--r--. 1 root root  850 Jun 17 20:33 EPoll.2071
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2072
-rw-r--r--. 1 root root  948 Jun 17 20:33 EPoll.2073
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2074
-rw-r--r--. 1 root root  850 Jun 17 20:33 EPoll.2075
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2076
-rw-r--r--. 1 root root  31K Jun 17 20:33 EPoll.2077
-rw-r--r--. 1 root root 1.4K Jun 17 20:33 EPoll.2078
-rw-r--r--. 1 root root 1.3K Jun 17 20:33 EPoll.2079
-rw-r--r--. 1 root root 2.4K Jun 17 20:33 EPoll.2080
-rw-r--r--. 1 root root 9.0K Jun 17 20:33 EPoll.2081
-rw-r--r--. 1 root root 8.7K Jun 17 20:33 EPoll.2082
-rw-r--r--. 1 root root 8.6K Jun 17 20:33 EPoll.2083
-rw-r--r--. 1 root root 8.2K Jun 17 20:33 EPoll.2084
-rw-r--r--. 1 root root 1.2K Jun 17 20:33 EPoll.2085
-rw-r--r--. 1 root root 400K Jun 17 20:33 EPoll.2086
-rw-r--r--. 1 root root 2.2K Jun 17 20:33 EPoll.2109
vi EPoll.2068

其中新建Socket,Bind 9090端口,設置非阻塞和Poll都是相同的調用

...
  2539 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
....
 2776 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
 ....
 2797 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE        T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28)         = 0
 2798 listen(4, 50)                           = 0
....

但是一旦有新的連接進來

···
2852 epoll_create(256)                       = 7

···

2862 epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=140462610448389        }}) = 0

···
2888 epoll_wait(7, [{EPOLLIN, {u32=4, u64=140462610448388}}], 4096, -1) =         1

···

epoll_create: 創建一個epoll實例,文件描述符

epoll_ctl: 將監聽的文件描述符添加到epoll實例中,實例代碼為將標准輸入文件描述符添加到epoll中

epoll_wait: 等待epoll事件從epoll實例中發生, 並返回事件以及對應文件描述符

調用epoll_create時,內核除了幫我們在epoll文件系統里建了個file結點,在內核cache里建了個紅黑樹用於存儲以后epoll_ctl傳來的socket外,還會再建立一個list鏈表,用於存儲准備就緒的事件。

當epoll_wait調用時,僅僅觀察這個list鏈表里有沒有數據即可。有數據就返回,沒有數據就sleep,等到timeout時間到后即使鏈表沒數據也返回。所以,epoll_wait非常高效。

源碼:Github

參考資料:

深入理解 Epoll

Select、Poll、Epoll詳解

select和poll的區別


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM