Redis的I/O多路復用


幾種 I/O 模型

為什么 Redis 中要使用 I/O 多路復用這種技術呢?

首先,Redis 是跑在單線程中的,所有的操作都是按照順序線性執行的,但是由於讀寫操作等待用戶輸入或輸出都是阻塞的,所以 I/O 操作在一般情況下往往不能直接返回

這會導致某一文件的 I/O 阻塞導致整個進程無法對其它客戶提供服務,而 I/O 多路復用就是為了解決這個問題而出現的。

Blocking I/O

先來看一下傳統的阻塞 I/O 模型到底是如何工作的

當使用 read 或者 write 對某一個文件描述符(File Descriptor 以下簡稱 FD)進行讀寫時,如果當前 FD 不可讀或不可寫,整個 Redis 服務就不會對其它的操作作出響應,導致整個服務不可用。

這也就是傳統意義上的,也就是我們在編程中使用最多的阻塞模型:

 

 阻塞模型雖然開發中非常常見也非常易於理解,但是由於它會影響其他 FD 對應的服務,所以在需要處理多個客戶端任務的時候,往往都不會使用阻塞模型。

I/O 多路復用

阻塞式的 I/O 模型並不能滿足這里的需求,我們需要一種效率更高的 I/O 模型來支撐 Redis 的多個客戶(redis-cli),這里涉及的就是 I/O 多路復用模型了

在 I/O 多路復用模型中,最重要的函數調用就是 select,
該方法的能夠同時監控多個文件描述符的可讀可寫情況,當其中的某些文件描述符可讀或者可寫時,select 方法就會返回可讀以及可寫的文件描述符個數
與此同時也有 其它的 I/O 多路復用函數 epoll/kqueue/evport,它們相比 select 性能更優秀,同時也能支撐更多的服務。

當如下任一情況發生時,會產生套接字的可讀事件:

  • 該套接字的接收緩沖區中的數據字節數大於等於套接字接收緩沖區低水位標記的大小;
  • 該套接字的讀半部關閉(也就是收到了FIN),對這樣的套接字的讀操作將返回0(也就是返回EOF);
  • 該套接字是一個監聽套接字且已完成的連接數不為0;
  • 該套接字有錯誤待處理,對這樣的套接字的讀操作將返回-1。

當如下任一情況發生時,會產生套接字的可寫事件:

  • 該套接字的發送緩沖區中的可用空間字節數大於等於套接字發送緩沖區低水位標記的大小;
  • 該套接字的寫半部關閉,繼續寫會產生SIGPIPE信號;
  • 非阻塞模式下,connect返回之后,該套接字連接成功或失敗;
  • 該套接字有錯誤待處理,對這樣的套接字的寫操作將返回-1。
此外, 在UNIX系統上,一切皆文件套接字也不例外,每一個套接字都有對應的fd(即文件描述符)。
我們簡單看看這幾個系統調用的原型。
select(int nfds, fd_set *r, fd_set *w,fd_set *e, struct timeval *timeout)

對於select(),我們需要傳3個集合,r(讀),w(寫)和e。其中,r表示我們對哪些fd的可讀事件感興趣,w表示我們對哪些fd的可寫事件感興趣,每個集合其實是一個bitmap,通過0/1表示我們感興趣的fd例如,

如:我們對於fd為6的可讀事件感興趣,那么r集合的第6個bit需要被設置為1這個系統調用會阻塞,直到我們感興趣的事件(至少一個)發生調用返回時,

內核同樣使用這3個集合來存放fd實際發生的事件信息。也就是說,調用前這3個集合表示我們感興趣的事件,調用后這3個集合表示實際發生的事件

select為最早期的UNIX系統調用,它存在4個問題:

1)這3個bitmap有大小限制(FD_SETSIZE,通常為1024);

2)由於這3個集合在返回時會被內核修改,因此我們每次調用時都需要重新設置;

3)我們在調用完成后需要掃描這3個集合才能知道哪些fd的讀/寫事件發生了,一般情況下全量集合比較大而實際發生讀/寫事件的fd比較少,效率比較低下;

4)內核在每次調用都需要掃描這3個fd集合,然后查看哪些fd的事件實際發生,在讀/寫比較稀疏的情況下同樣存在效率問題

由於存在這些問題,於是人們對select進行了改進,從而有了poll

poll(struct pollfd *fds, int nfds, inttimeout)

struct pollfd {int fd;short events;short revents;}

poll調用需要傳遞的是一個pollfd結構的數組,調用返回時結果信息也存放在這個數組里面。pollfd的結構中存放着fd,我們對該fd感興趣的事件(events)以及該fd實際發生的事件(revents),poll傳遞的不是固定大小的bitmap,因此select的問題1解決了

poll將感興趣事件和實際發生事件分開了,因此select的問題2也解決了,但select的問題3和問題4仍然沒有解決。

select問題3比較容易解決,只要系統調用返回的是實際發生相應事件的fd集合,我們便不需要掃描全量的fd集合。

對於select的問題4,我們為什么需要每次調用都傳遞全量的fd呢?內核可不可以在第一次調用的時候記錄這些fd,然后我們在以后的調用中不需要再傳這些fd呢?

問題的關鍵在於無狀態對於每一次系統調用,內核不會記錄下任何信息,所以每次調用都需要重復傳遞相同信息。

上帝說要有狀態,所以我們有了epoll和kqueue
int epoll_create(int size);

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);

epoll_create的作用是創建一個context,這個context相當於狀態保存者的概念

epoll_ctl的作用是,當你對一個新的fd的讀/寫事件感興趣時,通過該調用將fd與相應的感興趣事件更新到context中

epoll_wait的作用是,等待context中fd的事件發生

epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)並為每個fd指定一個回調函數當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd。

Reactor 設計模式

Redis 服務采用 Reactor 的方式來實現文件事件處理器每一個網絡連接其實都對應一個文件描述符

 

文件事件處理器使用 I/O 多路復用模塊同時監聽多個 FD,當 accept、read、write 和 close 文件事件產生時,文件事件處理器就會回調 FD 綁定的事件處理器

雖然整個文件事件處理器是在單線程上運行的,但是通過 I/O 多路復用模塊的引入,實現了同時對多個 FD 讀寫的監控,提高了網絡通信模型的性能,同時也可以保證整個 Redis 服務實現的簡單。

I/O 多路復用模塊

I/O 多路復用模塊封裝了底層的 select、epoll、avport 以及 kqueue 這些 I/O 多路復用函數,為上層提供了相同的接口。

 

 在這里我們簡單介紹 Redis 是如何包裝 select 和 epoll 的,簡要了解該模塊的功能,整個 I/O 多路復用模塊抹平了不同平台上 I/O 多路復用函數的差異性,提供了相同的接口:

  • static int aeApiCreate(aeEventLoop *eventLoop)
  • static int aeApiResize(aeEventLoop *eventLoop, int setsize)
  • static void aeApiFree(aeEventLoop *eventLoop)
  • static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
  • static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
  • static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
同時, 因為各個函數所需要的參數不同,我們在每一個子模塊內部通過一個 aeApiState 來存儲需要的上下文信息
// select
typedef struct aeApiState {
    fd_set rfds, wfds;
    fd_set _rfds, _wfds;
} aeApiState;

// epoll
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

這些上下文信息會存儲在 eventLoop 的 void *state 中,不會暴露到上層,只在當前子模塊中使用。

封裝 select 函數

select 可以監控 FD 的可讀、可寫以及出現錯誤的情況。
在介紹 I/O 多路復用模塊如何對 select 函數封裝之前,先來看一下 select 函數使用的大致流程:
int fd = /* file descriptor */

fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds)

for ( ; ; ) {
    select(fd+1, &rfds, NULL, NULL, NULL);
    if (FD_ISSET(fd, &rfds)) {
        /* file descriptor `fd` becomes readable */
    }
}
  1. 初始化一個可讀的 fd_set 集合,保存需要監控可讀性的 FD;

  2. 使用 FD_SET 將 fd 加入 rfds;

  3. 調用 select 方法監控 rfds 中的 FD 是否可讀;

  4. 當 select 返回時,檢查 FD 的狀態並完成對應的操作。

而在 Redis 的 ae_select 文件中代碼的組織順序也是差不多的,首先在 aeApiCreate 函數中初始化 rfds 和 wfds:

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    FD_ZERO(&state->rfds);
    FD_ZERO(&state->wfds);
    eventLoop->apidata = state;
    return 0;
}

而 aeApiAddEvent 和 aeApiDelEvent 會通過 FD_SET 和 FD_CLR 修改 fd_set 中對應 FD 的標志位:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
    return 0;
}
整個 ae_select 子模塊中最重要的函數就是 aeApiPoll,它是實際 調用 select 函數的部分,其作用就是在 I/O 多路復用函數返回時,將對應的 FD 加入 aeEventLoop 的 fired 數組中,並返回事件的個數
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

封裝 epoll 函數

Redis 對 epoll 的封裝其實也是類似的,使用 epoll_create 創建 epoll 中使用的 epfd:
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

在 aeApiAddEvent 中使用 epoll_ctl 向 epfd 中添加需要監控的 FD 以及監聽的事件:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
由於 epoll 相比 select 機制略有不同,在 epoll_wait 函數返回時並不需要遍歷所有的 FD 查看讀寫情況;在 epoll_wait 函數返回時會提供一個 epoll_event 數組:
typedef union epoll_data {
    void    *ptr;
    int      fd; /* 文件描述符 */
    uint32_t u32;
    uint64_t u64;} epoll_data_t;

struct epoll_event {
    uint32_t     events; /* Epoll 事件 */
    epoll_data_t data;
};

其中保存了發生的 epoll 事件(EPOLLIN、EPOLLOUT、EPOLLERR 和 EPOLLHUP)以及發生該事件的 FD。

aeApiPoll 函數只需要將 epoll_event 數組中存儲的信息加入 eventLoop 的 fired 數組中,將信息傳遞給上層模塊
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

子模塊的選擇

因為 Redis 需要在多個平台上運行,同時為了最大化執行的效率與性能,
所以會根據編譯平台的不同選擇不同的 I/O 多路復用函數作為子模塊,提供給上層統一的接口;
在 Redis 中,我們通過宏定義的使用,合理的選擇不同的子模塊:
#ifdef HAVE_EVPORT#include "ae_evport.c"#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

因為 select 函數是作為 POSIX 標准中的系統調用,在不同版本的操作系統上都會實現,所以將其作為保底方案

Redis 會優先選擇時間復雜度為 O(1)的 I/O 多路復用函數作為底層實現,包括 Solaries 10 中的 evport、Linux 中的 epoll 和 macOS/FreeBSD 中的 kqueue,

上述的這些函數都使用了內核內部的結構,並且能夠服務幾十萬的文件描述符

但是如果當前編譯環境沒有上述函數,就會選擇 select 作為備選方案,由於其在使用時會掃描全部監聽的描述符,所以其時間復雜度較差 O(n),並且只能同時服務 1024 個文件描述符,所以一般並不會以 select 作為第一方案使用。

總結

Redis 對於 I/O 多路復用模塊的設計非常簡潔,通過宏保證了 I/O 多路復用模塊在不同平台上都有着優異的性能將不同的 I/O 多路復用函數封裝成相同的 API 提供給上層使用

整個模塊使 Redis 能以單進程運行的同時服務成千上萬個文件描述符,避免了由於多進程應用的引入導致代碼實現復雜度的提升,減少了出錯的可能性


參考:
鏈接:https://www.jianshu.com/p/311f9d276b2a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM