概述
redis 內部有一個小型的事件驅動,它和 libevent 網絡庫的事件驅動一樣,都是依托 I/O 多路復用技術支撐起來的。
利用 I/O 多路復用技術,監聽感興趣的文件 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符為主鍵,數據為某個預設函數的事件表,這里其實就是一個數組或者鏈表 。當事件觸發時,比如某個文件描述符可讀,系統會返回文件描述符值,用這個值在事件表中找到相應的數據項,從而實現回調。同樣的,定時事件也是可以實現的,因為系統提供的 I/O 多路復用技術中的函數允許我們設定時間值。
上面一段話比較綜合,可能需要一些 linux 系統編程和網絡編程的基礎,但你會看到多數事件驅動程序都是這么實現的(?)。
redis 事件驅動數據結構
redis 事件驅動內部有四個主要的數據結構,分別是:事件循環結構體,文件事件結構體,時間事件結構體和觸發事件結構體。
// 文件事件結構體 /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ // 回調函數指針 aeFileProc *rfileProc; aeFileProc *wfileProc; // clientData 參數一般是指向 redisClient 的指針 void *clientData; } aeFileEvent; // 時間事件結構體 /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ // 定時回調函數指針 aeTimeProc *timeProc; // 定時事件清理函數,當刪除定時事件的時候會被調用 aeEventFinalizerProc *finalizerProc; // clientData 參數一般是指向 redisClient 的指針 void *clientData; // 定時事件表采用鏈表來維護 struct aeTimeEvent *next; } aeTimeEvent; // 觸發事件 /* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent; // 事件循環結構體 /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ // 記錄最大的定時事件 id + 1 long long timeEventNextId; // 用於系統時間的矯正 time_t lastTime; /* Used to detect system clock skew */ // I/O 事件表 aeFileEvent *events; /* Registered events */ // 被觸發的事件 aeFiredEvent *fired; /* Fired events */ // 定時事件表 aeTimeEvent *timeEventHead; // 事件循環結束標識 int stop; // 對於不同的 I/O 多路復用技術,有不同的數據,詳見各自實現 void *apidata; /* This is used for polling API specific data */ // 新的循環前需要執行的操作 aeBeforeSleepProc *beforesleep; } aeEventLoop;
上面的數據結構能給我們很好的提示:事件循環結構體維護 I/O 事件表,定時事件表和觸發事件表。
事件循環中心
redis 的主函數中調用 initServer() 函數從而初始化事件循環中心(EventLoop),它的主要工作是在 aeCreateEventLoop() 中完成的。
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; // 分配空間 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; // 分配文件事件結構體空間 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); // 分配已觸發事件結構體空間 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); // 時間事件鏈表頭 eventLoop->timeEventHead = NULL; // 后續提到 eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; // 進入事件循環前需要執行的操作,此項會在 redis main() 函數中設置 eventLoop->beforesleep = NULL; // 在這里,aeApiCreate() 函數對於每個 IO 多路復用模型的實現都有不同,具體參見源代碼,因為每種 IO 多路復用模型的初始化都不同 if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ // 初始化事件類型掩碼為無事件狀態 for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
有上面初始化工作只是完成了一個空空的事件中心而已。要想驅動事件循環,還需要下面的工作。
事件注冊詳解
文件 I/O 事件注冊主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據文件描述符的數值大小在事件循環結構體的 I/O 事件表中取一個數據空間,利用系統提供的 I/O 多路復用技術監聽感興趣的 I/O 事件,並設置回調函數。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // 在 I/O 事件表中選擇一個空間 aeFileEvent *fe = &eventLoop->events[fd]; // aeApiAddEvent() 只在此函數中調用,對於不同 IO 多路復用實現,會有所不同 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; // 設置回調函數 if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
對於不同版本的 I/O 多路復用,比如 epoll,select,kqueue 等,redis 有各自的版本,但接口統一,譬如 aeApiAddEvent()。
之於定時事件,在事件循環結構體中用鏈表來維護。定時事件操作在 aeCreateTimeEvent() 中完成:分配定時事件結構體,設置觸發時間和回調函數,插入到定時事件表中。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { /* 自增 timeEventNextId 會在處理執行定時事件時會用到,用於防止出現死循環。 如果超過了最大 id,則跳過這個定時事件,為的是避免死循環,即: 如果事件一執行的時候注冊了事件二,事件一執行完畢后事件二得到執行,緊接着如果事件一有得到執行就會成為循環,因此維護了 timeEventNextId 。*/ long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; // 分配空間 te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 填充時間事件結構體 te->id = id; // 計算超時時間 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); // proc == serverCorn te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; // 頭插法 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
准備監聽工作
initServer() 中調用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還做了監聽的准備。
/* Open the TCP listening socket for the user commands. */ // listenToPort() 中有調用 listen() if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); // UNIX 域套接字 /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } }
從上面可以看出,redis 提供了 TCP 和 UNIX 域套接字兩種工作方式。以 TCP 工作方式為例,listenPort() 創建綁定了套接字並啟動了監聽。
為監聽套接字注冊事件
在進入事件循環前還需要做一些准備工作。緊接着,initServer() 為所有的監聽套接字注冊了讀事件,響應函數為 acceptTcpHandler() 或者 acceptUnixHandler()。
// 創建接收 TCP 或者 UNIX 域套接字的事件處理 // TCP /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { // acceptTcpHandler() tcp 連接接受處理函數 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // UNIX 域套接字 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
來看看acceptTcpHandler() 做了什么:
// 用於 TCP 接收請求的處理函數 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); // 接收客戶端請求 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 出錯 if (cfd == AE_ERR) { redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); return; } // 記錄 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); // 真正有意思的地方 acceptCommonHandler(cfd,0); }
接收套接字與客戶端建立連接后,調用 acceptCommonHandler()。acceptCommonHandler() 主要工作就是:
- 建立並保存服務端與客戶端的連接信息,這些信息保存在一個 struct redisClient 結構體中;
- 為與客戶端連接的套接字注冊讀事件,相應的回調函數為 readQueryFromClient(),readQueryFromClient() 作用是從套接字讀取數據,執行相應操作並回復客戶端。
redis 事件循環
以上做好了准備工作,可以進入事件循環。跳出 initServer() 回到 main() 中,main() 會調用 aeMain()。進入事件循環發生在 aeProcessEvents() 中:
- 根據定時事件表計算需要等待的最短時間;
- 調用 redis api aeApiPoll() 進入監聽輪詢,如果沒有事件發生就會進入睡眠狀態,其實就是 I/O 多路復用 select() epoll() 等的調用;
- 有事件發生會被喚醒,處理已觸發的 I/O 事件和定時事件。
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 進入事件循環可能會進入睡眠狀態。在睡眠之前,執行預設置的函數 aeSetBeforeSleepProc()。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // AE_ALL_EVENTS 表示處理所有的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } // 先處理定時事件,然后處理套接字事件 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; // tvp 會在 IO 多路復用的函數調用中用到,表示超時時間 struct timeval tv, *tvp; // 得到最短將來會發生的定時事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); // 計算睡眠的最短時間 if (shortest) { // 存在定時事件 long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ // 得到當前時間 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { // 需要借位 // 減法中的借位,毫秒向秒借位 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { // 不需要借位,直接減 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 當前系統時間已經超過定時事件設定的時間 if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ // 如果沒有定時事件,見機行事 if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 調用 IO 多路復用函數阻塞監聽 numevents = aeApiPoll(eventLoop, tvp); // 處理已經觸發的事件 for (j = 0; j < numevents; j++) { // 找到 I/O 事件表中存儲的數據 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 讀事件 if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } // 處理定時事件 /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
事件觸發
這里以 select 版本的 redis api 實現作為講解,aeApiPoll() 調用了 select() 進入了監聽輪詢。aeApiPoll() 的 tvp 參數是最小等待時間,它會被預先計算出來,它主要完成:
- 拷貝讀寫的 fdset。select() 的調用會破壞傳入的 fdset,實際上有兩份 fdset,一份作為備份,另一份用作調用。每次調用 select() 之前都從備份中直接拷貝一份;
- 調用 select();
- 被喚醒后,檢查 fdset 中的每一個文件描述符,並將可讀或者可寫的描述符記錄到觸發表當中。
接下來的操作便是執行相應的回調函數,代碼在上一段中已經貼出:先處理 I/O 事件,再處理定時事件。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /* 真有意思,在 aeApiState 結構中: typedef struct aeApiState { fd_set rfds, wfds; fd_set _rfds, _wfds; } aeApiState; 在調用 select() 的時候傳入的是 _rfds 和 _wfds,所有監聽的數據在 rfds 和 wfds 中。 在下次需要調用 selec() 的時候,會將 rfds 和 wfds 中的數據拷貝進 _rfds 和 _wfds 中。*/ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { // 輪詢 for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 添加到觸發事件表中 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
總結
redis 的事件驅動總結如下:
- 初始化事件循環結構體
- 注冊監聽套接字的讀事件
- 注冊定時事件
- 進入事件循環
- 如果監聽套接字變為可讀,會接收客戶端請求,並為對應的套接字注冊讀事件
- 如果與客戶端連接的套接字變為可讀,執行相應的操作
后續分享更多內容。
----
搗亂 2014-3-9
http://daoluan.net