Redis的事件機制


Redis程序的運行過程是一個處理事件的過程,也稱Redis是一個事件驅動的服務。Redis中的事件分兩類:文件事件(File Event)、時間事件(Time Event)。文件事件處理文件的讀寫操作,特別是與客戶端通信的Socket文件描述符的讀寫操作;時間事件主要用於處理一些定時處理的任務。

本文首先介紹Redis的運行過程,闡明Redis程序是一個事件驅動的程序;接着介紹事件機制實現中涉及的數據結構以及事件的注冊;最后介紹了處理客戶端中涉及到的套接字文件讀寫事件。

一、Redis的運行過程

Redis的運行過程是一個事件處理的過程,可以通過下圖反映出來:

​ 圖1 Redis的事件處理過程

從上圖可以看出:Redis服務器的運行過程就是循環等待並處理事件的過程。通過時間事件將運行事件分成一個個的時間分片,如圖1的右半部分所示。如果在指定的時間分片中,有文件事件發生,如:讀文件描述符可讀、寫文件描述符可寫,則調用相應的處理函數進行文件的讀寫處理。文件事件處理完成之后,處理期望發生時間在當前時間之前或正好是當前時刻的時間事件。然后再進入下一次循環迭代處理。

如果在指定的事件間隔中,沒有文件事件發生,則不需要處理,直接進行時間事件的處理,如下圖所示。

​ 圖2 Redis的事件處理過程(無文件事件發生)

二、事件數據結構

2.1 文件事件數據結構

Redis用如下結構體來記錄一個文件事件:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

通過mask來描述發生了什么事件:

  • AE_READABLE:文件描述符可讀;
  • AE_WRITABLE:文件描述符可寫;
  • AE_BARRIER:文件描述符阻塞

rfileProc和wfileProc分別為讀事件和寫事件發生時的回調函數,其函數簽名如下:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
2.2 事件事件數據結構

Redis用如下結構體來記錄一個時間事件:

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

when_sec和when_ms指定時間事件發生的時間,timeProc為時間事件發生時的處理函數,簽名如下:

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

prev和next表明時間事件構成了一個雙向鏈表。

3.3 事件循環

Redis用如下結構體來記錄系統中注冊的事件及其狀態:

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

這一結構體中,最主要的就是文件事件指針events和時間事件頭指針timeEventHead。文件事件指針event指向一個固定大小(可配置)數組,通過文件描述符作為下標,可以獲取文件對應的事件對象。

三、事件的注冊過程

事件驅動的程序實際上就是在事件發生時,調用相應的處理函數(即:回調函數)進行邏輯處理。因此關於事件,程序需要知道:①事件的發生;② 回調函數。事件的注冊過程就是告訴程序這兩。下面我們分別從文件事件、時間事件的注冊過程進行闡述。

3.1 文件事件的注冊過程

對於文件事件:

  • 事件的發生:應用程序需要知道哪些文件描述符發生了哪些事件。感知文件描述符上有事件發生是由操作系統的職責,應用程序需要告訴操作系統,它關心哪些文件描述符的哪些事件,這樣通過相應的系統API就會返回發生了事件的文件描述符。
  • 回調函數:應用程序知道了文件描述符發生了事件之后,需要調用相應回調函數進行處理,因而需要在事件發生之前將相應的回調函數准備好。

這就是文件事件的注冊過程,函數的實現如下:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

這段代碼邏輯非常清晰:首先根據文件描述符獲得文件事件對象,接着在操作系統中添加自己關心的文件描述符(addApiAddEvent),最后將回調函數記錄到文件事件對象中。因此,一個線程就可以同時監聽多個文件事件,這就是IO多路復用。操作系統提供多種IO多路復用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有這些模型,用戶可以根據需要進行選擇。不同的模型,向操作系統添加文件描述符方式也不同,Redis將這部分邏輯封裝在aeApiAddEvent中,下面代碼是EPOLL模型的實現:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

這段代碼就是對操作系統調用epoll_ctl()的封裝,EPOLLIN對應的是讀(輸入)事件,EPOLLOUT對應的是寫(輸出)事件。

3.2 時間事件的注冊過程

對於時間事件:

  • 事件的發生:當前時刻正好是事件期望發生的時刻,或者是晚於事件期望發生的時刻,所以需要讓程序知道事件期望發生的時刻;
  • 回調函數:此時調用回調函數進行處理,所以需要讓程序知道事件的回調函數。

對應的事件事件注冊函數如下:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

這段代碼邏輯也是非常簡單:首先創建時間事件對象,接着設置事件,設置回調函數,最后將事件事件對象插入到時間事件鏈表中。設置時間事件期望發生的時間比較簡單:

static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
    long cur_sec, cur_ms, when_sec, when_ms;

    aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec + milliseconds/1000;
    when_ms = cur_ms + milliseconds%1000;
    if (when_ms >= 1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms = when_ms;
}

static void aeGetTime(long *seconds, long *milliseconds)
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    *seconds = tv.tv_sec;
    *milliseconds = tv.tv_usec/1000;
}

當前時間加上期望的時間間隔,作為事件期望發生的時刻。

四、套接字文件事件

Redis為客戶端提供存儲數據和獲取數據的緩存服務,監聽並處理來自請求,將結果返回給客戶端,這一過程將會發生以下文件事件:

與上圖相對應,對於一個請求,Redis會注冊三個文件事件:

4.1 TCP連接建立事件

服務器初始化時,在服務器套接字上注冊TCP連接建立的事件。

void initServer(void) {
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
}

回調函數為acceptTcpHandler,該函數最重要的職責是創建客戶端結構。

4.2 客戶端套接字讀事件

創建客戶端:在客戶端套接字上注冊客戶端套接字可讀事件。

if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                      readQueryFromClient, c) == AE_ERR)
{
    close(fd);
    zfree(c);
    return NULL;
}

回調函數為readQueryFromClient,顧名思義,此函數將從客戶端套接字中讀取數據。

4.3 向客戶端返回數據

Redis完成請求后,Redis並非處理完一個請求后就注冊一個寫文件事件,然后事件回調函數中往客戶端寫回結果。根據圖1,檢測到文件事件發生后,Redis對這些文件事件進行處理,即:調用rReadProc或writeProc回調函數。處理完成后,對於需要向客戶端寫回的數據,先緩存到內存中:

typedef struct client {
    // ...其他字段
    
    list *reply;            /* List of reply objects to send to the client. */
    
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
};

發送給客戶端的數據會存放到兩個地方:

  • reply指針存放待發送的對象;

  • buf中存放待返回的數據,bufpos指示數據中的最后一個字節所在位置。

這里遵循一個原則:只要能存放在buf中,就盡量存入buf字節數組中,如果buf存不下了,才存放在reply對象數組中。

寫回發生在進入下一次等待文件事件之前,見圖1中【等待前處理】,會調用以下函數來處理客戶端數據寫回邏輯:

int writeToClient(int fd, client *c, int handler_installed) {
    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;
            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }

            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
        }
    }
}

上述函數只截取了數據發送部分,首先發送buf中的數據,然后發送reply中的數據。

有讀者可能會疑惑:write()系統調用是阻塞式的接口,上述做法會不會在write()調用的地方有等待,從而導致性能低下?這里就要介紹Redis是怎么處理這個問題的。

首先,我們發現創建客戶端的代碼:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        anetNonBlock(NULL,fd);
    }
}

可以看到設置fd是非阻塞(NonBlock),這就保證了在套接字fd上的read()和write()系統調用不是阻塞的。

其次,和文件事件的處理操作一樣,往客戶端寫數據的操作也是批量的,函數如下:

int handleClientsWithPendingWrites(void) {
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
}

可以看到,首先對每個客戶端調用剛才介紹的writeToClient()函數進行寫數據,如果還有數據沒寫完,那么注冊寫事件,當套接字文件描述符寫就緒時,調用sendReplyToClient()進行剩余數據的寫操作:

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    writeToClient(fd,privdata,1);
}

仔細想一下就明白了:處理完得到結果后,這時套接字的寫緩沖區一般是空的,因此write()函數調用成功,所以就不需要注冊寫文件事件了。如果寫緩沖區滿了,還有數據沒寫完,此時再注冊寫文件事件。並且在數據寫完后,將寫事件刪除:

int writeToClient(int fd, client *c, int handler_installed) {
    if (!clientHasPendingReplies(c)) {
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    }
}

注意到,在sendReplyToClient()函數實現中,第三個參數正好是1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM