aeEventLoop
Redis 事件驅動框架對應的數據結構,在 ae.h
中定義,記錄了運行過程信息,有 2 個記錄事件的變量:
IO 事件
:aeFileEvent 類型的指針 *events時間事件
:aeTimeEvent 類型的指針 *timeEventHead,按照一定時間周期觸發的事件
/* State of an event based program */
typedef struct aeEventLoop {
……
// IO 事件數組
aeFileEvent *events;
// 已觸發事件數組
aeFiredEvent *fired;
// 時間事件的鏈表投
aeTimeEvent *timeEventHead;
// polling api 相關數據
void *apidata;
// 進入事件循環流程前執行的函數
aeBeforeSleepProc *beforesleep;
// 進入事件循環流程后執行的函數
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
在 server.c
的 initServer 函數中調用 aeCreateEventLoop
進行初始化。
// 創建事件循環框架
server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
// 創建 eventLoop 並分配內存空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
……
// 調用 aeApiCreate 函數
if (aeApiCreate(eventLoop) == -1) goto err;
// 把所有網絡 IO 事件對應文件描述符的掩碼,初始化為 AE_NONE,暫時不對任何事件進行監聽
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
……
return NULL;
}
核心是調用 aeApiCreate
函數。aeApiCreate 函數封裝了操作系統提供的 IO 多路復用函數,假設 Redis 運行在 Linux 操作系統上,並且 IO 多路復用機制是 epoll,此時會調用 epoll_create
創建 epoll 實例,同時會創建 epoll_event 結構的數組,數組大小等於參數 setsize。
typedef struct aeApiState {
// epoll 實例的描述符
int epfd;
// epoll_event 結構體數組,記錄監聽事件
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 將 epoll_event 數組保存在 aeApiState 中
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
……
// 將 epoll 實例描述符保存在 aeApiState 中
state->epfd = epoll_create(1024);
……
// 將 aeApiState 變量賦值給 eventLoop 的 apidata
eventLoop->apidata = state;
}
aeApiCreate 函數最后將創建好的 aeApiState 變量賦值給 eventLoop 的 apidata,之后 eventLoop 結構體中就有了 epoll 實例
和 epoll_event 數組
信息,可以基於 epoll 創建和處理事件了。
// 將 aeApiState 變量賦值給 eventLoop 的 apidata
eventLoop->apidata = state;
IO 事件處理
Redis 的 IO 事件分 3 類:
可讀事件
可寫事件
屏障事件
:反轉事件的處理順序。
IO 事件的數據結構是 aeFileEvent
結構體,IO 事件的創建是通過 aeCreateFileEvent
函數來完成的。
typedef struct aeFileEvent {
// 事件類型的掩碼,AE_(READABLE|WRITABLE|BARRIER)
int mask;
// AE_READABLE 事件的處理函數
aeFileProc *rfileProc;
// AE_WRITABLE 事件的處理函數
aeFileProc *wfileProc;
// 指向客戶端私有數據
void *clientData;
} aeFileEvent;
IO 事件創建
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 錯誤處理
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
// 核心
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
入參有 5 個:
*eventLoop
:循環流程結構體fd
:IO 事件對應的文件描述符mask
:事件類型掩碼*proc
:事件處理回調函數*clientData
:事件私有數據
aeCreateFileEvent 函數會先根據傳入的文件描述符 fd,在 eventLoop 的 IO 事件數組中,獲取該描述符關聯的 IO 事件指針變量* fe,如下所示:
aeFileEvent *fe = &eventLoop->events[fd];
之后 aeCreateFileEvent 函數會調用 aeApiAddEvent 函數,添加要監聽的事件:
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
aeApiAddEvent 函數實際上會調用操作系統提供的 IO 多路復用
函數,來完成事件的添加。我們還是假設 Redis 實例運行在使用 epoll 機制的 Linux 上,那么 aeApiAddEvent 函數就會調用 epoll_ctl
函數,添加要監聽的事件。aeApiAddEvent 函數源碼如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
// 將可讀或可寫 IO 事件類型轉換為 epoll 監聽的類型 EPOLLIN 或 EPOLLOUT
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
// 將要監聽的文件描述符賦值給 epoll_event
ee.data.fd = fd;
// 增加新的觀察事件
if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
return 0;
}
至此事件驅動框架已經基於 epoll,封裝了 IO 事件的創建。
讀事件處理
Redis server 接收到客戶端的連接請求時,會使用注冊好的 acceptTcpHandler
函數進行處理。acceptTcpHandler 函數是在 networking.c
文件中,接受客戶端連接並創建已連接套接字 cfd
。
最終會調用 acceptCommonHandler
函數,其會調用 createClient 函數,最終會調用到 aeCreateFileEvent
函數,創建 AE_READABLE
的監聽事件,回調函數是 readQueryFromClient
。
至此事件驅動框架就增加了一個對客戶端已連接套接字的監聽。之后客戶端有請求發送到 Redis server,框架就會回調 readQueryFromClient 函數處理請求。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
……
// 每次處理 1000 個
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
……
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
acceptCommonHandler 函數會調用到 createClient:
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
……
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
……
connClose(conn); /* May be already closed, just ignore errors */
return;
}
}
createClient 函數會創建監聽事件:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
……
}
寫事件處理
readQueryFromClient 函數在 networking.c
中,收到客戶端請求后,處理客戶端命令,並將返回的數據寫入客戶端輸出緩沖區。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 循環調用
while (!eventLoop->stop) {
// 核心函數,處理事件的邏輯
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
在 aeProcessEvents 函數中,有 IO 事件發生時,會先判斷是否有 beforesleep
函數:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
……
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
……
beforeSleep 函數調用的 handleClientsWithPendingWrites
函數,會遍歷每一個待寫回數據的客戶端,然后調用 writeToClient
函數,將客戶端輸出緩沖區中的數據寫回。
從 aeProcessEvents 函數的代碼中,我們可以看到該函數會調用 aeApiPoll 函數,查詢監聽的文件描述符中,有哪些已經就緒。一旦有描述符就緒,aeProcessEvents 函數就會根據事件的可讀或可寫類型,調用相應的回調函數進行處理。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
……
// 有 IO 事件發生 || 緊急時間事件發生
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
……
// 調用 aeApiPoll 獲取就緒的描述符
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
……
// 如果觸發的是可讀事件,調用事件注冊時設置的讀事件回調處理函數
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 如果觸發的是可寫事件,調用事件注冊時設置的寫事件回調處理函數
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
整個流程就完成了。
時間事件處理
時間事件定義
/* Time event structure */
typedef struct aeTimeEvent {
// 時間事件 ID
long long id;
// 事件到達的時間戳
monotime when;
// 事件到達后的處理函數
aeTimeProc *timeProc;
// 事件結束后的處理函數
aeEventFinalizerProc *finalizerProc;
// 事件相關的私有數據
void *clientData;
// 鏈表前向指針
struct aeTimeEvent *prev;
// 鏈表后向指針
struct aeTimeEvent *next;
int refcount;
} aeTimeEvent;
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
時間事件創建
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
核心就是創建 aeTimeEvent 指針 te,並將 te 放入 eventLoop 的時間事件的鏈表頭:
eventLoop->timeEventHead = te;
aeCreateTimeEvent
函數是在 server.c
文件中的 initServer
函數中調用的:
// 為 server 后台任務創建定時事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
時間事件回調函數
serverCron
在 server.c
中:
- 調用
后台任務函數
- 調用
databaseCron 函數
,處理過期 key 或 rehash
/* We need to do a few operations on clients asynchronously. */
// 執行客戶端的異步操作
clientsCron();
/* Handle background operations on Redis databases. */
// 執行數據庫的后台操作
databasesCron();
時間事件的觸發處理
事件驅動框架的 aeMain 函數會循環調用 aeProcessEvents 函數,來處理各種事件。aeProcessEvents 函數的最后,會調用 processTimeEvents
函數處理時間任務。
// 檢查是否有時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
processTimeEvents 函數的主體邏輯,就是從 eventLoop 的時間事件的鏈表逐一取出每個事件,根據當前時間判斷該事件的時間是否滿足觸發條件。如果滿足就處理。
static int processTimeEvents(aeEventLoop *eventLoop) {
……
// 從時間事件鏈表中,取出事件
te = eventLoop->timeEventHead;
……
while(te) {
……
// 當前時間已經滿足事件的觸發時間戳
if (te->when <= now) {
……
// 調用回調函數
retval = te->timeProc(eventLoop, id, te->clientData);
……
now = getMonotonicUs();
if (retval != AE_NOMORE) {
// 處理后,再次更新時間
te->when = now + retval * 1000;
}
……
}
// 獲取下一個事件
te = te->next;
}
return processed;
}
參考鏈接
Redis 源碼簡潔剖析系列
Java 編程思想-最全思維導圖-GitHub 下載鏈接,需要的小伙伴可以自取~
原創不易,希望大家轉載時請先聯系我,並標注原文鏈接。