淺析redis中的IO多路復用與事件機制


 

引入

讀這篇文章之前請先閱讀:淺析服務器並發IO性能提升之路—從網絡編程基礎到epoll,以更好的理解本文的內容,謝謝。
我們知道,我們在使用redis的時候,通過客戶端發送一個get命令,就能夠得到redis服務端返回的數據。redis是基於傳統的C/S架構實現的。它通過監聽一個TCP端口(6379)的方式來接收來自客戶端的連接,從而進行后續命令的執行,並把執行結果返回給客戶端。

redis是一個合格的服務端程序

我們先思考一個問題:作為一個合格的服務端程序,我們在命令行輸入一個get命令之后,redis服務端是怎么處理這個命令,並把結果返回給客戶端的呢?
要回答這個問題,我們先回顧上一篇文章中講過的,客戶端與服務器需要分別創建一個套接字表明自己所在的網絡地址與端口號,然后基於TCP協議來進行套接字之間的通信。通常情況下,一個服務端程序的socket通信流程如下:

int main(int argc, char *argv[]) { listenSocket = socket(); //調用socket()系統調用創建一個監聽套接字描述符 bind(listenSocket); //綁定地址與端口 listen(listenSocket); //由默認的主動套接字轉換為服務器適用的被動套接字 while (1) { //不斷循環去監聽是否有客戶端連接事件到來 connSocket = accept($listenSocket); //接受客戶端連接 read(connsocket); //從客戶端讀取數據,只能同時處理一個客戶端 write(connsocket); //返回給客戶端數據,只能同時處理一個客戶端 } return 0; }

在redis中,同樣要經過以上幾個步驟。與客戶端建立連接之后,就會讀取客戶端發來的命令,然后執行命令,最后通過調用write系統調用,將命令的執行結果返回給客戶端。
但是這樣一個進程只能同時處理一個客戶端的連接與讀寫事件。為了讓單進程的服務端應用同時處理多個客戶端的事件,我們采用了IO多路復用機制。目前最好的IO多路復用機制就是epoll。回顧我們上一篇文章中最終使用epoll創建的服務器代碼:

int main(int argc, char *argv[]) { listenSocket = socket(AF_INET, SOCK_STREAM, 0); //同上,創建一個監聽套接字描述符 bind(listenSocket) //同上,綁定地址與端口 listen(listenSocket) //同上,由默認的主動套接字轉換為服務器適用的被動套接字 epfd = epoll_create(EPOLL_SIZE); //創建一個epoll實例 ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //創建一個epoll_event結構存儲套接字集合 event.events = EPOLLIN; event.data.fd = listenSocket; epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //將監聽套接字加入到監聽列表中 while (1) { event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已經就緒的套接字描述符們 for (int i = 0; i < event_cnt; ++i) { //遍歷所有就緒的套接字描述符 if (ep_events[i].data.fd == listenSocket) { //如果是監聽套接字描述符就緒了,說明有一個新客戶端連接到來 connSocket = accept(listenSocket); //調用accept()建立連接 event.events = EPOLLIN; event.data.fd = connSocket; epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加對新建立的連接套接字描述符的監聽,以監聽后續在連接描述符上的讀寫事件 } else { //如果是連接套接字描述符事件就緒,則可以進行讀寫 strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //從連接套接字描述符中讀取數據, 此時一定會讀到數據,不會產生阻塞 if (strlen == 0) { //已經無法從連接套接字中讀到數據,需要移除對該socket的監聽 epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //刪除對這個描述符的監聽 close(ep_events[i].data.fd); } else { write(ep_events[i].data.fd, buf, str_len); //如果該客戶端可寫 把數據寫回到客戶端 } } } } close(listenSocket); close(epfd); return 0; }

redis基於原有的select、poll與epoll機制,結合自己獨特的業務需求,封裝了自己的一套事件處理函數,我們把它叫做ae(a simple event-driven programming library)。而redis具體使用select、epoll還是mac上的kqueue技術,redis會首先進行判斷,然后選擇性能最優的那個:

/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif

因為 select 函數是作為 POSIX 標准中的系統調用,在不同版本的操作系統上都會實現,所以將其作為兜底方案。為了講述方便,后面的文章均使用epoll機制來講解。

redis中的IO多路復用

當我們在命令行中啟動一個redis-server的時候,redis其實做了和我們之前寫的epoll服務器類似的操作,重點的函數調用有以下三個:

int main(int argc, char **argv) { ... initServerConfig(); //初始化存儲服務端信息的結構體 ... initServer(); //初始化redis事件循環並調用epoll_create與epoll_ctl。創建socket、bind、listen、accept都在這個函數中進行調用,並注冊調用后返回的監聽描述符和連接描述符 ... aeMain(); //執行while(1)事件循環,並調用epoll_wait獲取已就緒的描述符,並調用對應的handler ... }

接下來我們一個一個來看:

initServerConfig()

redis服務端的所有信息都存儲在一個redisServer結構體中,這個結構體字段非常多,比如服務端的套接字信息(如地址和端口),還有很多支持redis其他功能如集群、持久化等的配置信息都存儲在這個結構體中。這個函數調用就是對redisServer結構體的所有字段進行初始化並賦一個初始值。由於我們這次講解的是事件與IO多路復用機制在redis中的應用,所以我們只關注其中的幾個字段即可。

initServer()

這個函數調用是我們的重中之重。初始化完服務器的相關信息之后,就需要進行套接字的創建、綁定、監聽並與客戶端建立連接了。在這個函數中,進行了我們常說的創建socket、bind、listen、accept、epoll_create、epoll_ctl調用,我們可以對照上文的epoll服務器,逐步了解redis的事件機制。initServer()的主要函數調用如下:

void initServer(void) { ... server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); ... if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); ... for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){ serverPanic("Unrecoverable error creating server.ipfd file event."); } } ... }

我們按照從上到下的順序解讀這幾行關鍵代碼:

aeCreateEventLoop()

在redis中,有一個aeEventLoop的概念,它來管理所有相關的事件描述字段、存儲已注冊的事件、已就緒的事件:

typedef struct aeEventLoop { int stop; //標識事件循環(即while(1))是否結束 aeFileEvent *events; //存儲已經注冊的文件事件(文件事件即客戶端連接與讀寫事件) aeFiredEvent *fired; //存儲已就緒的文件事件 aeTimeEvent *timeEventHead; //存儲時間事件(時間事件后面再講) void *apidata; /* 存儲epoll相關信息 */ aeBeforeSleepProc *beforesleep; //事件發生前需要調用的函數 aeBeforeSleepProc *aftersleep; //事件發生后需要調用的函數 } aeEventLoop;

redis將所有通過epoll_wait()返回的就緒描述符都存儲在fired數組中,然后遍歷這個數組,並調用對應的事件處理函數,一次性處理完所有事件。在aeCreateEventLoop()函數中,對這個管理所有事件信息的結構體字段進行了初始化,這里面也包括調用epoll_create(),對epoll的epfd進行初始化:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err; //調用aeApiCreate(),內部會調用epoll_create() for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; }

在aeApiCreate()函數中,調用了epoll_create(),並將創建好的epfd放到eventLoop結構體的apidata字段保管:

typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 調用epoll_create初始化epoll的epfd */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; //將創建好的epfd放到eventLoop結構體的apidata字段保管 return 0; }

listenToPort()

在創建完epfd之后,我們就要進行socket創建、綁定、監聽的操作了,這幾步在listenToPort()函數來進行:

int listenToPort(int port, int *fds, int *count) { if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; for (j = 0; j < server.bindaddr_count || j == 0; j++) { //遍歷所有的ip地址 if (server.bindaddr[j] == NULL) { //還沒有綁定地址 ... } else if (strchr(server.bindaddr[j],':')) { //綁定IPv6地址 ... } else { //綁定IPv4地址,一般會進到這個if分支中 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog); //真正的綁定邏輯 } ... } return C_OK; }

redis會先進行綁定ip地址類型的判斷,我們一般是IPv4,所以一般會走到第三個分支,調用anetTcpServer()函數來進行具體的綁定邏輯:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) { ... if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) { anetSetError(err, "%s", gai_strerror(rv)); return ANET_ERR; } for (p = servinfo; p != NULL; p = p->ai_next) { if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) //調用socket()創建一個監聽套接字 continue; if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error; if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; //調用bind()與listen()綁定端口並轉化為服務端被動套接字 goto end; } }

在調用socket()系統調用創建了套接字之后,需要進一步調用bind()與listen(),這兩步是在anetListen()函數內部實現的:

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) { if (bind(s,sa,len) == -1) { //調用bind()綁定端口 anetSetError(err, "bind: %s", strerror(errno)); close(s); return ANET_ERR; } if (listen(s, backlog) == -1) { //調用listen()將主動套接字轉換為被動監聽套接字 anetSetError(err, "listen: %s", strerror(errno)); close(s); return ANET_ERR; } return ANET_OK; }

看到這里,我們知道redis和我們寫過的epoll服務器一樣,都是需要進行套接字創建、綁定、監聽的過程。

aeCreateFileEvent

在redis中,把客戶端連接事件、讀寫事件統稱為文件事件。我們剛才完成了socket創建、bind、listen的過程。目前我們已經有了一個監聽描述符,那么我們需要首先將監聽描述符添加到epoll的監聽列表,以監聽客戶端的連接事件。在initServer()中,通過調用aeCreateFileEvent(),同時指定了它的事件處理函數acceptTcpHandler()來實現對客戶端連接事件的處理:

    for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){ serverPanic("Unrecoverable error creating server.ipfd file event."); } }

跟進aeCreateFileEvent()函數,發現其內部進一步調用了aeApiAddEvent()函數:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; //調用epoll_ctl添加客戶端連接事件 return 0; }

aeApiAddEvent函數會調用epoll_ctl(),將客戶端連接事件添加到監聽列表。同時,redis會將該事件的處理函數放到aeFileEvent結構體中進行存儲:

typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; //讀事件處理程序 aeFileProc *wfileProc; //寫事件處理程序 void *clientData; //客戶端數據 } aeFileEvent;

對照之前我們寫過的epoll服務端程序,我們已經實現了以下幾個步驟:

int main(int argc, char *argv[]) { listenSocket = socket(AF_INET, SOCK_STREAM, 0); //創建一個監聽套接字描述符 bind(listenSocket) //綁定地址與端口 listen(listenSocket) //由默認的主動套接字轉換為服務器適用的被動套接字 epfd = epoll_create(EPOLL_SIZE); //創建一個epoll實例 ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //創建一個epoll_event結構存儲套接字集合 event.events = EPOLLIN; event.data.fd = listenSocket; epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //將監聽套接字加入到監聽列表中 ... }

我們已經實現了對套接字的創建、bind、listen,已通過epoll_create()實現了epfd的創建,並將初始的監聽套接字描述符事件添加到了epoll的監聽列表中,並為他指定了事件處理函數。下一步,就應該到了while(1)循環調用epoll_wait()的階段了。通過阻塞調用epoll_wait(),返回所有已經就緒的套接字描述符,觸發相應事件,然后對事件進行處理。

aeMain()

最后就是通過while(1)循環,等待客戶端連接事件的到來啦:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }

在eventLoop中,采用stop標志來判定循環是否結束。如果沒有結束,那么循環調用aeProcessEvents()。我們猜測,這里面就調用了epoll_wait(),阻塞等待事件的到來,然后遍歷所有就緒的套接字描述符,然后調用對應的事件處理函數即可:

int aeProcessEvents(aeEventLoop *eventLoop, int flags) { numevents = aeApiPoll(eventLoop, tvp); //調用epoll_wait() ... }

我們跟進aeApiPoll,來看看epoll_wait()是如何調用的:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; // int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }

首先從eventLoop中拿出之前在aeApiCreate()中創建的epfd與已經注冊的事件集合,調用epoll_wait()等待事件們的到來,並返回所有就緒事件的描述符集合。隨后,遍歷所有就緒的描述符集合,判斷它是什么類型的描述符,是可讀還是可寫的,隨后將所有就緒可處理的事件存儲到eventLoop中的fired數組中,並把相應數組位置上的可讀還是可寫標記也一並存儲。
回到外部調用處,我們現在已經把所有能夠處理的事件都放到了fired數組中,那么我們就可以通過遍歷這個數組,拿到所有可以處理的事件,然后調用對應的事件處理函數:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        numevents = aeApiPoll(eventLoop, tvp); //調用epoll_wait() for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; //循環拿出所有就緒的事件 int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); //如果該事件是讀事件,調用讀事件處理函數 fired++; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); //如果該事件是寫事件,調用寫事件處理函數 fired++; } } } } ... }

至於如何區分是客戶端連接事件以及讀寫事件,redis通過指定不同的事件處理函數(如accept事件是acceptTcpHandler事件處理函數),讀或寫事件又是其他的事件處理函數。通過這層封裝,免去了判斷套接字描述符類型的步驟,直接調用之前注冊的事件處理函數即可、
回顧我們之前寫過的的epoll服務器,是不是和這一段代碼很相似呢?

    while (1) { event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已經就緒的套接字描述符們 for (int i = 0; i < event_cnt; ++i) { //遍歷所有就緒的套接字描述符 if (ep_events[i].data.fd == listenSocket) { //如果是監聽套接字描述符就緒了,說明有一個新客戶端連接到來 connSocket = accept(listenSocket); //調用accept()建立連接 event.events = EPOLLIN; event.data.fd = connSocket; epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加對新建立的連接套接字描述符的監聽,以監聽后續在連接描述符上的讀寫事件 } else { //如果是連接套接字描述符事件就緒,則可以進行讀寫 strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //從連接套接字描述符中讀取數據, 此時一定會讀到數據,不會產生阻塞 if (strlen == 0) { //已經無法從連接套接字中讀到數據,需要移除對該socket的監聽 epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //刪除對這個描述符的監聽 close(ep_events[i].data.fd); } else { write(ep_events[i].data.fd, buf, str_len); //如果該客戶端可寫 把數據寫回到客戶端 } } } }

總結

至此,我們就掌握了redis中的IO多路復用場景。redis把所有連接與讀寫事件、還有我們沒提到的時間事件一起集中管理,並對底層IO多路復用機制進行了封裝,最終實現了單進程能夠處理多個連接以及讀寫事件。這就是IO多路復用在redis中的應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM