Redis程序的運行過程是一個處理事件的過程,也稱Redis是一個事件驅動的服務。Redis中的事件分兩類:文件事件(File Event)、時間事件(Time Event)。文件事件處理文件的讀寫操作,特別是與客戶端通信的Socket文件描述符的讀寫操作;時間事件主要用於處理一些定時處理的任務。
本文首先介紹Redis的運行過程,闡明Redis程序是一個事件驅動的程序;接着介紹事件機制實現中涉及的數據結構以及事件的注冊;最后介紹了處理客戶端中涉及到的套接字文件讀寫事件。
一、Redis的運行過程
Redis的運行過程是一個事件處理的過程,可以通過下圖反映出來:
圖1 Redis的事件處理過程
從上圖可以看出:Redis服務器的運行過程就是循環等待並處理事件的過程。通過時間事件將運行事件分成一個個的時間分片,如圖1的右半部分所示。如果在指定的時間分片中,有文件事件發生,如:讀文件描述符可讀、寫文件描述符可寫,則調用相應的處理函數進行文件的讀寫處理。文件事件處理完成之后,處理期望發生時間在當前時間之前或正好是當前時刻的時間事件。然后再進入下一次循環迭代處理。
如果在指定的事件間隔中,沒有文件事件發生,則不需要處理,直接進行時間事件的處理,如下圖所示。
圖2 Redis的事件處理過程(無文件事件發生)
二、事件數據結構
2.1 文件事件數據結構
Redis用如下結構體來記錄一個文件事件:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
通過mask來描述發生了什么事件:
- AE_READABLE:文件描述符可讀;
- AE_WRITABLE:文件描述符可寫;
- AE_BARRIER:文件描述符阻塞
rfileProc和wfileProc分別為讀事件和寫事件發生時的回調函數,其函數簽名如下:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
2.2 事件事件數據結構
Redis用如下結構體來記錄一個時間事件:
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
when_sec和when_ms指定時間事件發生的時間,timeProc為時間事件發生時的處理函數,簽名如下:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
prev和next表明時間事件構成了一個雙向鏈表。
3.3 事件循環
Redis用如下結構體來記錄系統中注冊的事件及其狀態:
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
這一結構體中,最主要的就是文件事件指針events和時間事件頭指針timeEventHead。文件事件指針event指向一個固定大小(可配置)數組,通過文件描述符作為下標,可以獲取文件對應的事件對象。
三、事件的注冊過程
事件驅動的程序實際上就是在事件發生時,調用相應的處理函數(即:回調函數)進行邏輯處理。因此關於事件,程序需要知道:①事件的發生;② 回調函數。事件的注冊過程就是告訴程序這兩。下面我們分別從文件事件、時間事件的注冊過程進行闡述。
3.1 文件事件的注冊過程
對於文件事件:
- 事件的發生:應用程序需要知道哪些文件描述符發生了哪些事件。感知文件描述符上有事件發生是由操作系統的職責,應用程序需要告訴操作系統,它關心哪些文件描述符的哪些事件,這樣通過相應的系統API就會返回發生了事件的文件描述符。
- 回調函數:應用程序知道了文件描述符發生了事件之后,需要調用相應回調函數進行處理,因而需要在事件發生之前將相應的回調函數准備好。
這就是文件事件的注冊過程,函數的實現如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
這段代碼邏輯非常清晰:首先根據文件描述符獲得文件事件對象,接着在操作系統中添加自己關心的文件描述符(addApiAddEvent
),最后將回調函數記錄到文件事件對象中。因此,一個線程就可以同時監聽多個文件事件,這就是IO多路復用。操作系統提供多種IO多路復用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有這些模型,用戶可以根據需要進行選擇。不同的模型,向操作系統添加文件描述符方式也不同,Redis將這部分邏輯封裝在aeApiAddEvent
中,下面代碼是EPOLL模型的實現:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
這段代碼就是對操作系統調用epoll_ctl()
的封裝,EPOLLIN
對應的是讀(輸入)事件,EPOLLOUT對應的是寫(輸出)事件。
3.2 時間事件的注冊過程
對於時間事件:
- 事件的發生:當前時刻正好是事件期望發生的時刻,或者是晚於事件期望發生的時刻,所以需要讓程序知道事件期望發生的時刻;
- 回調函數:此時調用回調函數進行處理,所以需要讓程序知道事件的回調函數。
對應的事件事件注冊函數如下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
這段代碼邏輯也是非常簡單:首先創建時間事件對象,接着設置事件,設置回調函數,最后將事件事件對象插入到時間事件鏈表中。設置時間事件期望發生的時間比較簡單:
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}
當前時間加上期望的時間間隔,作為事件期望發生的時刻。
四、套接字文件事件
Redis為客戶端提供存儲數據和獲取數據的緩存服務,監聽並處理來自請求,將結果返回給客戶端,這一過程將會發生以下文件事件:
與上圖相對應,對於一個請求,Redis會注冊三個文件事件:
4.1 TCP連接建立事件
服務器初始化時,在服務器套接字上注冊TCP連接建立的事件。
void initServer(void) {
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
}
回調函數為acceptTcpHandler,該函數最重要的職責是創建客戶端結構。
4.2 客戶端套接字讀事件
創建客戶端:在客戶端套接字上注冊客戶端套接字可讀事件。
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
回調函數為readQueryFromClient,顧名思義,此函數將從客戶端套接字中讀取數據。
4.3 向客戶端返回數據
Redis完成請求后,Redis並非處理完一個請求后就注冊一個寫文件事件,然后事件回調函數中往客戶端寫回結果。根據圖1,檢測到文件事件發生后,Redis對這些文件事件進行處理,即:調用rReadProc或writeProc回調函數。處理完成后,對於需要向客戶端寫回的數據,先緩存到內存中:
typedef struct client {
// ...其他字段
list *reply; /* List of reply objects to send to the client. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
};
發送給客戶端的數據會存放到兩個地方:
-
reply指針存放待發送的對象;
-
buf中存放待返回的數據,bufpos指示數據中的最后一個字節所在位置。
這里遵循一個原則:只要能存放在buf中,就盡量存入buf字節數組中,如果buf存不下了,才存放在reply對象數組中。
寫回發生在進入下一次等待文件事件之前,見圖1中【等待前處理】,會調用以下函數來處理客戶端數據寫回邏輯:
int writeToClient(int fd, client *c, int handler_installed) {
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
}
}
}
上述函數只截取了數據發送部分,首先發送buf
中的數據,然后發送reply
中的數據。
有讀者可能會疑惑:write()系統調用是阻塞式的接口,上述做法會不會在write()調用的地方有等待,從而導致性能低下?這里就要介紹Redis是怎么處理這個問題的。
首先,我們發現創建客戶端的代碼:
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
}
}
可以看到設置fd是非阻塞(NonBlock),這就保證了在套接字fd上的read()和write()系統調用不是阻塞的。
其次,和文件事件的處理操作一樣,往客戶端寫數據的操作也是批量的,函數如下:
int handleClientsWithPendingWrites(void) {
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
}
可以看到,首先對每個客戶端調用剛才介紹的writeToClient()
函數進行寫數據,如果還有數據沒寫完,那么注冊寫事件,當套接字文件描述符寫就緒時,調用sendReplyToClient()
進行剩余數據的寫操作:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
writeToClient(fd,privdata,1);
}
仔細想一下就明白了:處理完得到結果后,這時套接字的寫緩沖區一般是空的,因此write()函數調用成功,所以就不需要注冊寫文件事件了。如果寫緩沖區滿了,還有數據沒寫完,此時再注冊寫文件事件。並且在數據寫完后,將寫事件刪除:
int writeToClient(int fd, client *c, int handler_installed) {
if (!clientHasPendingReplies(c)) {
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
注意到,在sendReplyToClient()函數實現中,第三個參數正好是1。