概述
redis 內部有一個小型的事件驅動,它和 libevent 網絡庫的事件驅動一樣,都是依托 I/O 多路復用技術支撐起來的。
利用 I/O 多路復用技術,監聽感興趣的文件 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符為主鍵,數據為某個預設函數的事件表,這里其實就是一個數組或者鏈表 。當事件觸發時,比如某個文件描述符可讀,系統會返回文件描述符值,用這個值在事件表中找到相應的數據項,從而實現回調。同樣的,定時事件也是可以實現的,因為系統提供的 I/O 多路復用技術中的函數允許我們設定時間值。
上面一段話比較綜合,可能需要一些 linux 系統編程和網絡編程的基礎,但你會看到多數事件驅動程序都是這么實現的(?)。
redis 事件驅動數據結構
redis 事件驅動內部有四個主要的數據結構,分別是:事件循環結構體,文件事件結構體,時間事件結構體和觸發事件結構體。
// 文件事件結構體
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
// 回調函數指針
aeFileProc *rfileProc;
aeFileProc *wfileProc;
// clientData 參數一般是指向 redisClient 的指針
void *clientData;
} aeFileEvent;
// 時間事件結構體
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 定時回調函數指針
aeTimeProc *timeProc;
// 定時事件清理函數,當刪除定時事件的時候會被調用
aeEventFinalizerProc *finalizerProc;
// clientData 參數一般是指向 redisClient 的指針
void *clientData;
// 定時事件表采用鏈表來維護
struct aeTimeEvent *next;
} aeTimeEvent;
// 觸發事件
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
// 事件循環結構體
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
// 記錄最大的定時事件 id + 1
long long timeEventNextId;
// 用於系統時間的矯正
time_t lastTime; /* Used to detect system clock skew */
// I/O 事件表
aeFileEvent *events; /* Registered events */
// 被觸發的事件
aeFiredEvent *fired; /* Fired events */
// 定時事件表
aeTimeEvent *timeEventHead;
// 事件循環結束標識
int stop;
// 對於不同的 I/O 多路復用技術,有不同的數據,詳見各自實現
void *apidata; /* This is used for polling API specific data */
// 新的循環前需要執行的操作
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
上面的數據結構能給我們很好的提示:事件循環結構體維護 I/O 事件表,定時事件表和觸發事件表。
事件循環中心
redis 的主函數中調用 initServer() 函數從而初始化事件循環中心(EventLoop),它的主要工作是在 aeCreateEventLoop() 中完成的。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 分配空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 分配文件事件結構體空間
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
// 分配已觸發事件結構體空間
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 時間事件鏈表頭
eventLoop->timeEventHead = NULL;
// 后續提到
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
// 進入事件循環前需要執行的操作,此項會在 redis main() 函數中設置
eventLoop->beforesleep = NULL;
// 在這里,aeApiCreate() 函數對於每個 IO 多路復用模型的實現都有不同,具體參見源代碼,因為每種 IO 多路復用模型的初始化都不同
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化事件類型掩碼為無事件狀態
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
有上面初始化工作只是完成了一個空空的事件中心而已。要想驅動事件循環,還需要下面的工作。
事件注冊詳解
文件 I/O 事件注冊主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據文件描述符的數值大小在事件循環結構體的 I/O 事件表中取一個數據空間,利用系統提供的 I/O 多路復用技術監聽感興趣的 I/O 事件,並設置回調函數。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 在 I/O 事件表中選擇一個空間
aeFileEvent *fe = &eventLoop->events[fd];
// aeApiAddEvent() 只在此函數中調用,對於不同 IO 多路復用實現,會有所不同
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 設置回調函數
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
對於不同版本的 I/O 多路復用,比如 epoll,select,kqueue 等,redis 有各自的版本,但接口統一,譬如 aeApiAddEvent()。
之於定時事件,在事件循環結構體中用鏈表來維護。定時事件操作在 aeCreateTimeEvent() 中完成:分配定時事件結構體,設置觸發時間和回調函數,插入到定時事件表中。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
/* 自增
timeEventNextId 會在處理執行定時事件時會用到,用於防止出現死循環。
如果超過了最大 id,則跳過這個定時事件,為的是避免死循環,即:
如果事件一執行的時候注冊了事件二,事件一執行完畢后事件二得到執行,緊接着如果事件一有得到執行就會成為循環,因此維護了 timeEventNextId 。*/
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 分配空間
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 填充時間事件結構體
te->id = id;
// 計算超時時間
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// proc == serverCorn
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// 頭插法
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
准備監聽工作
initServer() 中調用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還做了監聽的准備。
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調用 listen()
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
// UNIX 域套接字
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}
從上面可以看出,redis 提供了 TCP 和 UNIX 域套接字兩種工作方式。以 TCP 工作方式為例,listenPort() 創建綁定了套接字並啟動了監聽。
為監聽套接字注冊事件
在進入事件循環前還需要做一些准備工作。緊接着,initServer() 為所有的監聽套接字注冊了讀事件,響應函數為 acceptTcpHandler() 或者 acceptUnixHandler()。
// 創建接收 TCP 或者 UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler() tcp 連接接受處理函數
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// UNIX 域套接字
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
來看看acceptTcpHandler() 做了什么:
// 用於 TCP 接收請求的處理函數
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// 接收客戶端請求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出錯
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 記錄
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 真正有意思的地方
acceptCommonHandler(cfd,0);
}
接收套接字與客戶端建立連接后,調用 acceptCommonHandler()。acceptCommonHandler() 主要工作就是:
- 建立並保存服務端與客戶端的連接信息,這些信息保存在一個 struct redisClient 結構體中;
- 為與客戶端連接的套接字注冊讀事件,相應的回調函數為 readQueryFromClient(),readQueryFromClient() 作用是從套接字讀取數據,執行相應操作並回復客戶端。
redis 事件循環
以上做好了准備工作,可以進入事件循環。跳出 initServer() 回到 main() 中,main() 會調用 aeMain()。進入事件循環發生在 aeProcessEvents() 中:
- 根據定時事件表計算需要等待的最短時間;
- 調用 redis api aeApiPoll() 進入監聽輪詢,如果沒有事件發生就會進入睡眠狀態,其實就是 I/O 多路復用 select() epoll() 等的調用;
- 有事件發生會被喚醒,處理已觸發的 I/O 事件和定時事件。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 進入事件循環可能會進入睡眠狀態。在睡眠之前,執行預設置的函數 aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示處理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
// 先處理定時事件,然后處理套接字事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
// tvp 會在 IO 多路復用的函數調用中用到,表示超時時間
struct timeval tv, *tvp;
// 得到最短將來會發生的定時事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
// 計算睡眠的最短時間
if (shortest) { // 存在定時事件
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 得到當前時間
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) { // 需要借位
// 減法中的借位,毫秒向秒借位
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else { // 不需要借位,直接減
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 當前系統時間已經超過定時事件設定的時間
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 如果沒有定時事件,見機行事
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 調用 IO 多路復用函數阻塞監聽
numevents = aeApiPoll(eventLoop, tvp);
// 處理已經觸發的事件
for (j = 0; j < numevents; j++) {
// 找到 I/O 事件表中存儲的數據
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 讀事件
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 處理定時事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
事件觸發
這里以 select 版本的 redis api 實現作為講解,aeApiPoll() 調用了 select() 進入了監聽輪詢。aeApiPoll() 的 tvp 參數是最小等待時間,它會被預先計算出來,它主要完成:
- 拷貝讀寫的 fdset。select() 的調用會破壞傳入的 fdset,實際上有兩份 fdset,一份作為備份,另一份用作調用。每次調用 select() 之前都從備份中直接拷貝一份;
- 調用 select();
- 被喚醒后,檢查 fdset 中的每一個文件描述符,並將可讀或者可寫的描述符記錄到觸發表當中。
接下來的操作便是執行相應的回調函數,代碼在上一段中已經貼出:先處理 I/O 事件,再處理定時事件。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
/*
真有意思,在 aeApiState 結構中:
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
在調用 select() 的時候傳入的是 _rfds 和 _wfds,所有監聽的數據在 rfds 和 wfds 中。
在下次需要調用 selec() 的時候,會將 rfds 和 wfds 中的數據拷貝進 _rfds 和 _wfds 中。*/
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
// 輪詢
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
// 添加到觸發事件表中
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
總結
redis 的事件驅動總結如下:
- 初始化事件循環結構體
- 注冊監聽套接字的讀事件
- 注冊定時事件
- 進入事件循環
- 如果監聽套接字變為可讀,會接收客戶端請求,並為對應的套接字注冊讀事件
- 如果與客戶端連接的套接字變為可讀,執行相應的操作

后續分享更多內容。
----
搗亂 2014-3-9
http://daoluan.net



