redis-10 redis 和 I/O多路復用


背景

  要想完全徹底的搞清楚 epoll 底層原理,我們有必要了解一下整個 IO 的發展歷程,計算機內核的每一次的優化升級都是由於其自身的缺點進而發展出來的,從而促使底層系統函數的迭代升級,進而才會促使整個 IT 技術的升級迭代。這里不會講太細節性的東西,例如:網絡通信、CPU 中斷等,這個有興趣的同學可以下來更加細致的去了解,可以參考 epoll 三部曲來一步一步的了解:epoll 本質

  從以下的各個階段會通過圖文解釋來講清楚整個 IO 歷程,盡可能用簡單易懂的方式來循序漸進的引導,從而是大家對 epoll 有個深刻的認知。

一、阻塞 IO 之 BIO

  

 

  解釋:fd 表示的是文件描述符,因為 linux 環境下一切結尾文件,它可以理解為 java 對象的引用,當 redis 服務啟動的時候,會建立 socket 鏈接,如圖中的 fd6,假設外部有兩個客戶端來連接 redis,則會與 redis 建立 socket 連接,分別為 fd7 和 fd8,當其中一個客戶端發起讀取操作,會將讀取操作交給內核 kernel 來發起系統調用命令,系統調用命令 read 發起 read 請求將數據讀取返后返回給客戶端。后續基本上都是這個邏輯,就不會再重復解釋了。

  在這種 IO 模型的場景下,我們是給每一個客戶端連接創建一個線程去處理它。不管這個客戶端建立了連接有沒有在做事,都要去維護這個連接,直到連接斷開為止。創建過多的線程就會消耗過高的資源,以 Java BIO 為例

  • BIO 是一個同步阻塞 IO
  • 一個線程映射到一個輕量級進程(用戶態中)然后去調用內核線程執行操作
  • 對線程的調度,用戶態和內核態切換以及上下文和現場存儲等等都要消耗很多 CPU 和緩存資源
  • 同步:客戶端請求服務端后,服務端開始處理假設處理1秒鍾,這一秒鍾就算客戶端再發送很多請求過來,服務端也忙不過來,它必須等到之前的請求處理完畢后再去處理下一個請求,當然我們可以使用偽異步 IO 來實現,也就是實現一個線程池,客戶端請求過來后就丟給線程池處理,那么就能夠繼續處理下一個請求了
  • 阻塞:inputStream.read(data) 會通過 recvfrom 去接收數據,如果內核數據還沒有准備好就會一直處於阻塞狀態

  由此可見阻塞 I/O 難以支持高並發的場景,具體代碼如下:

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9999);
        // 新建一個線程用於接收客戶端連接。偽異步 IO
        new Thread(() -> {
            while (true) {
                System.out.println("開始阻塞, 等待客戶端連接");
                try {
                    Socket socket = serverSocket.accept();
                    // 每一個新來的連接給其創建一個線程去處理
                    new Thread(() -> {
                        byte[] data = new byte[1024];
                        int len = 0;
                        System.out.println("客戶端連接成功,阻塞等待客戶端傳入數據");
                        try {
                            InputStream inputStream = socket.getInputStream();
                            // 阻塞式獲取數據直到客戶端斷開連接
                            while ((len = inputStream.read(data)) != -1) {
                                // 或取到數據
                                System.out.println(new String(data, 0, len));
                                // 處理數據
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }).start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

 見最上面的圖,相應的問題就顯而易見了:

  文件描述符 fd7 讀取的時候 fd8 一直在那等着,什么時候fd7 處理完了 fd8 才能開始執行。那如果 fd7 耗時很長,那后面即使耗時很短的指令也無法執行了, 如何解?

二、非阻塞 IO 之 NIO

  

 

  在 BIO 中只能監控一個 socket 且只能阻塞的問題,如何解決?簡單易想的方案就是:我們改寫一下,不讓整個鏈路阻塞,那怎么實現呢?

  讓所有客戶端排好隊,內核來逐個循環來取,有指令執行了就直接執行就可以了,沒有就接着下一個掃描,這樣看起來就解決了每個客戶端需要單獨一個線程來阻塞等待執行了。

  那么問題由來了:

  雖然 NIO 解決了 BIO 中的阻塞問題,但是如果有 1000 個客戶端連接,那么 NIO 采用輪詢的方式就有問題了。因為從 用戶態 到 內核態 頻繁的切換,也會很耗性能,如果第一次循環第一個連接沒有相應的讀取指令,過去之后讀取指令馬上就來了,那還需要等到剩余的 999 次循環以及用戶態和內核態的切換才能再次來到當前連接執行。如何才能降低這種資源的浪費與性能的提升呢?

 

三、多路復用 IO 之 select

  

 

 

   NIO 中我們知道頻繁的 用戶態 到 內核態 切換,導致性能和資源浪費嚴重,那我們是否可以不這么來回切換狀態,那就有了多路復用之 select 了,即:

  還是原來的 1000 個客戶端連接,我們將用戶態所有的連接打包統一交給內核,之后再由內核來統一循環,內核循環一圈后得到 要執行的 fds,再發送給 server,由 server 發起 用戶態-->內核態的調用。這就是基本的多路復用策略。用一段偽代碼來實現:

// 假設現目前獲得了很多 serverSocket.accept(); 后的客戶端連接 List<Socket> sockets;
sockets = getSockets(); 
while (true) {
    // 阻塞,將所有的 sockets 傳入內核讓它幫我們檢測是否有數據准備就緒
    // n 表示有多少個 socket 准備就緒了
    int n = select(sockets);
    for (int i = 0; i < sockets.length; i++) {
        // FD_ISSET 挨個檢查 sockets 查看下內核數據是否准備就緒
        if (FD_ISSET(sockets[i]) {
            // 准備就緒了,挨個處理就緒的 socket
            doSomething();
        }
    }
}

  由此也能看出 select 的一些缺陷:

  • 單進程能打開的最大文件描述符為 1024
  • 監視 sockets 的時候需要將所有的 sockets 的文件描述符傳入內核並且設置對應的進程,傳入的東西太大了
  • 內核同樣也會一直在重復循環,然而其中也可能只有幾個 fds 可用,導致內核一直很忙,這樣的效率仍然不高。  

四、多路復用 IO 之 poll

  poll 跟 select 相似對其進行了部分優化,比如單進程能打開的文件描述符不受限制,底層是采用的鏈表實現。

五、多路復用 IO 終極 epoll

  epoll 的出現相較於 select 晚了幾年,它對 select,poll 進行了大幅度的優化。

     

 

   就上圖說明,相較於 select 可以發現主要是多了一個 eventpoll(rdlist),之前的需要監視的 socket 都需要綁定一個進程,現在都改為指向了 eventpoll

  它是什么呢,我們看下 epoll 實現的偽代碼:

// 假設現目前獲得了很多 serverSocket.accept(); 后的客戶端連接 List<Socket> sockets;
sockets = getSockets(); 
// 這里就是在創建 eventpoll
int epfd = epoll_create();
// 將所有需要監視的 socket 都加入到 eventpoll 中
epoll_ctl(epfd, sockets);
while (true) {
    // 阻塞返回准備好了的 sockets
    int n = epoll_wait();
    // 這里就直接對收到數據的 socket 進行遍歷不需要再遍歷所有的 sockets
    // 是怎么做到的呢,下面繼續分析
    for (遍歷接收到數據的 socket) {
        
    }
}

  解釋:

  epoll_create:當某個進程調用 epoll_create 方法時,內核會創建一個 eventpoll 對象。eventpoll 對象也是文件系統中的一員,和 socket 一樣,它也會有等待隊列。

    創建一個代表該epoll的eventpoll對象是必須的,因為內核要維護“就緒列表”等數據,“就緒列表”可以作為eventpoll的成員。

  epoll_ctl:創建epoll對象后,可以用 epoll_ctl 添加或刪除所要監聽的socket,內核會將 eventpoll 添加到這些 socket 的等待隊列中。

    當socket收到數據后,中斷程序會操作eventpoll對象,而不是直接操作進程。eventpoll對象相當於是socket和進程之間的中介,socket的數據接收並不直接影響進程,而是通過改變eventpoll的就緒列表來改變進程狀態。

  epoll_wait:阻塞返回准備好了的 sockets

六、就緒隊列

   就緒隊列就是下圖的 rdlist 它是 eventpoll 的一個成員,指的是內核中有哪些數據已經准備就緒。這個是怎么做到的呢,當我們調用 epoll_ctl() 的時候會為每一個 socket 注冊一個 回調函數,當某個 socket 准備好了就會 回調 然后加入 rdlist 中的,rdlist 的數據結構是一個雙向鏈表。

  
  這下我們就可以直接從 rdlist 中通過一次系統調用直接獲取數據了而不需要再去遍歷所有的 sockets 了。

總結

  epoll 提升了系統的並發,有限的資源提供更多的服務較於 select、poll 優勢總結如下:

  • 內核監視 sockets 的時候不再需要每次傳入所有的 sockets 文件描述符,然后又全部斷開(反復)的操作了,它只需通過一次 epoll_ctl 即可
  • select、poll 模型下進程收到了 sockets 准備就緒的指令執行后,它不知道到底是哪個 socket 就緒了,需要去遍歷所有的 sockets,而 epoll 維護了一個 rdlist 通過回調的方式將就緒的 socket 插入到 rdlist 鏈表中,我們可以直接獲取 rdlist 即可,無需遍歷其它的 socket 提升效率(注意需記住:多路復用 + 消息回調 的方式)

  最后我們考慮下 epoll 的適用場景:

    只要同一時間就緒列表不要太長都適合。比如 Nginx 它的處理都是及其快速的,如果它為每一個請求還創建一個線程,這個開銷情況下它還如何支持高並發。

netty

  最后我們來看下 netty

   netty 也是采用的多路復用模型我們討論在 linux 情況下的 epoll 使用情況,netty 要如何使用才能更加高效呢?

   如果某一個 socket 請求時間相對較長比如 100MS 會大幅度降低模型對應的並發性,該如何處理呢,java 代碼如下。

public class NIOServer {
    public static void main(String[] args) throws IOException {
        Selector serverSelector = Selector.open();
        Selector clientSelector = Selector.open();

        new Thread(() -> {
            try {
                // 對應IO編程中服務端啟動
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                listenerChannel.socket().bind(new InetSocketAddress(8000));
                listenerChannel.configureBlocking(false);
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

                while (true) {
                    // 一致處於阻塞直到有 socket 數據准備就緒
                    if (serverSelector.select() > 0) {
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isAcceptable()) {
                                try {
                                    // (1) 每來一個新連接,不需要創建一個線程,而是直接注冊到clientSelector
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                } finally {
                                    keyIterator.remove();
                                }
                            }
                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();

        new Thread(() -> {
            try {
                while (true) {
                    // 阻塞等待讀事件准備就緒
                    if (clientSelector.select() > 0) {
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isReadable()) {
                                try {
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    // (3) 面向 Buffer
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
       System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                                            .toString());
                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }
                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();
    }
}

來分析下上面這段代碼

  • 用 serverSelector 來處理所有客戶端的連接請求

  • 用 clientSelector 來處理所有客戶端連接成功后的讀操作

  • 1. 將 SelectionKey.OP_ACCEPT 這個操作注冊到了 serverSelector 上面

  相當於上述將的將我們去創建 eventpoll 並且將當前 serverSocket 進行監視並且注冊的是 ACCEPT 建立連接這個事件,將當前 Thread 移除工作隊列掛入 eventpoll 的等待隊列

  • 2. serverSelector.select() > 0 就是有 socket 數據准備就緒這里也就是有連接建立准備就緒

  相當於 epoll_wait 返回了可讀數量(建立連接的數量),然后我們通過 clientSelector.selectedKeys(); 拿到了就緒隊列里面的 socket

  • 3. 我們知道建立連接這個操作是很快的,建立成功后給 socket 注冊到 clientSelector 上並且注冊 READ 事件

就相當於我們又建立了一個 eventpoll 傳入的就是需要監視讀取事件的 socket(這其實就是之前講的列子 sockets = getSockets()),然后 eventpoll 從工作隊列中移除,需要監視的 sockets 全部指向 eventpoll ,eventpoll 的等待隊列就是當前 new Thread 這個線程。

  • 4. 一旦某個 socket 讀准備就緒,那么 eventpoll 的 rdlist 數據就會准備好,同時會喚醒當前等待的線程來處理數據

  這里思考下由於建立連接的那個線程非常快速只有綁定讀取事件給 clientSelector,所以時間可以忽略。但是在 clientSelector 中獲取到數據后一般需要進行業務邏輯操作,可能耗時會比較長。

  如果出現這種情況由於是單線程的,那么其它 socket 的讀就緒事件可能就無法得到及時的響應,所以一般的做法是,不要在這個線程中處理過於耗時的操作,因為會極大的降低其並發性,對於那種可能相對較慢的操作我們就丟給線程池去處理。

if (key.isReadable()) {
    // 耗時就扔進線程池中
    executor.execute(task);
}

  其實這也就是 netty 的處理方式,我們默認使用 netty 的時候,會創建 serverBootstrap.group(boosGroup, workerGroup) 其中默認情況 boosGroup 是一個線程在處理,workerGroup 是 n * cup 個線程在處理這樣就能大幅度的提升並發性了。

  另外有的小伙伴會說,netty 這樣處理,最終又將客戶端的操作去建立一個線程又丟給線程池了,這和我們使用阻塞式 I/O 每個請求建立一個連接一樣扔進線程池有撒區別。

區別就在於,對於阻塞I/O每一個請求過來會創建一個連接(就算有線程池一樣有很多線程創建維護的開銷),而對於多路復用來說建立連接只是一個線程在處理,並且它會將對於的 read 事件注入到其它 selector 中,對於用戶來說,肯定不會建立了連接那我就時時刻刻我不停的在發送請求了,多路復用的好處就體現出來了,連接你建立 OK linux 內核維護,我不去創建線程開銷。當你真正有讀的請求來的時候,我再給你取分配資源執行(如果耗時就走線程池),這里真正的請求過來的數量是遠遠低於建立成功的 sockets 數目的。那么對於的線程池線程開銷也會遠遠低於每個請求建立一個線程的開銷。

  但是如果對於那種每次獲取就緒隊列的時候都是接近滿負荷的話就不太適用於了多路復用的場景了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM