Reactor事件模型在Redis中的應用


1 模型簡介  

Redis沒有使用第三方的libevent等網絡庫,而是自己開發了一個單線程的Reactor模型的事件處理模型。而Memcached內部使用的libevent庫,多線程模型。

綜合對比可見:nginx,memcached,redis網絡模型總結

Redis在主循環中統一處理文件事件和時間事件,信號事件則由專門的handler來處理。

文件事件,我理解為IO事件,Redis將產生事件套接字放入一個就緒隊列中,即redisServer.aeEventLoop.fired數組,然后在aeProcessEvents會依次分派給文件事件處理器;

Redis編寫了多個文件事件處理器。

Redis中文件事件包括:客戶端的連接、命令請求、數據回復、連接斷開,當上述事件發生時,會造成相應的描述符可讀可寫,再調用相應類型的文件事件處理器。

文件事件處理器有:

  • 連接應答處理器 networking.c/acceptTcpHandler
  • 命令請求處理器 networking.c/readQueryFromClinet
  • 命令回復處理器 networking.c/sendReplyToClient

時間事件包含定時事件周期性事件,Redis將其放入一個無序鏈表中,每當時間事件執行器運行時,就遍歷鏈表,查找已經到達的時間事件,調用相應的處理器。

(1) 主循環

def ae_Main():
    #一直循環處理事件
    while(not_stop){
        aeProcessEvents()
    }

(2)aeProcessEvents調度文件事件和時間事件的過程:

def aeProcessEvents():
    time_event = aeSearchNearestTimer() #獲取當前時間最近的時間事件
    remaind_ms = time_event.when - unix_ts_now() #獲取最近的時間事件達到的毫秒時間
    if remaind_ms < 0 : #時間為負數,賦值0
        remaind_ms = 0
    timeval = create_timeval_with_ms(remainds_ms) #創建等待的時間結構
    aeApiPoll(timeval) #等待文件事件產生,時間取決於remainds_ms
    processFileEvent() #處理文件事件
    processTimeEvent() #處理時間事件

2 Reactor事件模型在Redis中的應用

  下面主要結合文件事件的處理過程講解Reactor事件模型在Redis中的應用。其中,Reactor事件模型框圖如下所示:

   

2.1 Initiation Dispatcher在Redis中的實現

(1) handle_events()

在Redis中,對於文件事件,相應的處理函數為Ae.c/aeProcessEvents,其關鍵處理流程如下:

(1)底層調用接口返回,將就緒事件拷貝到eventLoop->fired數組;

(2)遍歷就緒數組,獲取相關fd,進而獲取fd對應的aeFileEvent : eventLoop->events[fd],從而得到相關回調函數;

int aeProcessEvents(aeEventLoop *eventLoop, int flags){
     ....省略
        // 獲取就緒文件事件,阻塞時間由最近的時間事件決定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 從已就緒數組中獲取包裝后的文件事件aeFileEvent
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            // 獲取文件事件的詳細參數:fd, mask
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

            // 處理讀事件,調用相關回調函數
            if (fe->mask & mask & AE_READABLE) {
                // rfired 確保讀/寫事件只能執行其中一個
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 處理寫事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    // 處理時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
}

 

(2)register_handler/remove_handler 事件處理器的注冊與刪除等

在Redis中,相關的處理函數也在Ae.c文件中:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, eFileProc *proc, void *clientData); //創建文件事件(fd:mask),相關的回掉函數為eFileProc
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //將 fd 從 mask 指定的監聽隊列中刪除
int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //獲取fd被監控的事件mask
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

 

2.2 Synchronous Event Demultiplexer在Redis中的實現

針對IO復用方法,比如select,poll,epoll,kqueue等,每種方法的效率和使用方法都不相同,Redis通過統一包裝各方法,來屏蔽它們的不同之處。

(1) IO復用跨平台

首先,Redis會根據平台,自動選擇性能最好的IO復用函數庫。該過程提現在Ae.c頭文件包含中,如下:

#ifdef HAVE_EVPORT
#include "ae_evport.c" //evport優先級最高
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c" //epoll優先級較次
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c" //kqueue優先級還次
        #else
        #include "ae_select.c" //select優先級最低
        #endif
    #endif
#endif

(2) 統一事件接口

ae_select.cae_epoll.cae_kqueue.cae_evport.c都提供一套統一的事件注冊、刪除接口,使得在ae.c中可以直接使用以下接口,其中針對epoll的包裝實現如下:

/* 事件狀態*/
typedef struct aeApiState {
    int epfd;  //epoll_event 實例描述符
    struct epoll_event *events; // 事件槽,存儲返回的就緒事件,大小為eventLoop->setsize
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop)  //創建aeApiState實例,並賦值於eventLoop->apidata
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) //增加關注的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) //刪除關注的事件 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) //等待事件就緒返回,並存儲於eventLoop->fired數組 static char *aeApiName(void) //獲取底層調用的IO復用接口,如epoll

 

2.3 Concrete Event Handler

文件事件相關的一些具體的事件處理器如下:

連接請求acceptTcpHandler:在 redis.c/initServer中,程序會為redisServer.eventLoop關聯一個客戶連接的事件處理器。
命令請求readQueryFromClinet : 當新連接來的時候,需要調用networking.c/createClient創建客戶端,在其中為客戶端套接字注冊讀事件,關聯處理器readQueryFromClinet。
命令回復sendReplyToClient : 當Redis調用networking.c/addReply時,會調用prepareClientToWrite來注冊寫事件,當套接字可寫時,觸發sendReplyToClient發送命令回復。

 

2.4 相關數據結構

從上面的相關接口可以發現,大多用到了結構體:aeEventLoop, aeFileEvent, aeFiredEvent。 它們之間的關系圖如下:

 

(1) aeFileEvent

/* File event structure
 *
 * 文件事件結構
 */
typedef struct aeFileEvent {

    // 監聽事件類型掩碼,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */

    // 讀事件處理器
    aeFileProc *rfileProc;

    // 寫事件處理器
    aeFileProc *wfileProc;

    // 多路復用庫的私有數據
    void *clientData;

} aeFileEvent;

可以發現aeFileEvent中沒有fd信息,獲取fd對應的aeFileEvent,需要到eventLoop->events[fd]處提取,因為在調用aeCreateFileEvent事件處理器注冊函數時,將fd對應的aeFileEvent函數存儲於eventLoop->events[fd]處。

(2)aeFiredEvent

/* A fired event
 *
 * 已就緒事件
 */
typedef struct aeFiredEvent {

    // 已就緒文件描述符
    int fd;

    // 事件類型掩碼,
    // 值可以是 AE_READABLE 或 AE_WRITABLE
    // 或者是兩者的或
    int mask;

} aeFiredEvent;

aeFiredEvent剛好包含一個就緒事件的所有有用信息,在aeApiPoll調用底層IO復用函數(如epoll)返回時,會將就緒事件從底層的就緒數組aeApiState.events拷貝到eventLoop->fired就緒數組中;通過aeFiredEvent中的fd可以找到對應的aeFileEvent,進而獲取相關的回調函數。

(3) aeEventLoop

// 事件處理器的狀態
typedef struct aeEventLoop { // 目前已注冊的最大描述符 int maxfd; /* highest file descriptor currently registered */ // 目前已追蹤的最大描述符 int setsize; /* max number of file descriptors tracked */ // 用於生成時間事件 id long long timeEventNextId; // 最后一次執行時間事件的時間 time_t lastTime; /* Used to detect system clock skew */ // 已注冊的文件事件 aeFileEvent *events; /* Registered events,events數組下標與fd對應 */ // 已就緒的文件事件 aeFiredEvent *fired; /* Fired events */ // 時間事件 aeTimeEvent *timeEventHead; // 事件處理器的開關 int stop; // 多路復用庫的私有數據 void *apidata; /* This is used for polling API specific data */ // 在處理事件前要執行的函數 aeBeforeSleepProc *beforesleep;

該結構的初始化創建過程如下:

/*
 * 初始化事件處理器狀態
 */
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    ...

    // 初始化文件事件結構和已就緒文件事件結構數組
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); //aeFileEvent中沒有fd,如何獲取fd信息,將fd對應的aeFileEvent存儲於eventLoop->events[fd]
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); 
    ...
    // 設置數組大小
    eventLoop->setsize = setsize;
    // 初始化執行最近一次執行時間

    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;

    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    // 初始化監聽事件
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;

    // 返回事件循環
    return eventLoop;
}

 

2.5 register_handler/remove_handler 事件處理器注冊與刪除等的具體實現

 (1)aeCreateFileEvent

該事件處理器注冊函數主要涉及到變量eventLoop->events,eventLoop->apidata

其中,eventLoop->events數組主要用於存儲aeFileEvent,包括回調函數,感興趣的事件掩碼mask,clientData等,fd對應的aeFileEvent存儲於eventLoop->events[fd]處。(通過aeFileEvent和events數組,便將fd:mask和相關回調函數proc對應起來)

在調用aeApiAddEvent時,會將fd的指定事件加入底層的IO復用函數中;

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    // 取出文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];

    // 監聽指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    // 設置文件事件類型,以及事件的處理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有數據
    fe->clientData = clientData;

    // 如果有需要,更新事件處理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}

(2)void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //刪除文件事件

 與aeCreateFileEvent相反,將在fd對應的aeFileEvent中,取消對事件mask的關注;並通過aeApiDelEvent在底層取消對fd相關事件mask的監聽。具體代碼如下:

/*
 * 將 fd 從 mask 指定的監聽隊列中刪除
 */
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;

    // 取出文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];

    // 未設置監聽的事件類型,直接返回
    if (fe->mask == AE_NONE) return;

    // 計算新掩碼
    fe->mask = fe->mask & (~mask);
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }

    // 取消對給定 fd 的給定事件的監視
    aeApiDelEvent(eventLoop, fd, mask);
}

 

3 Redis中事件監聽和處理的流程圖

Redis中事件監聽和處理的流程如下

(1) 通過aeApiPoll監聽用戶感興趣的事件;

(2) 當有文件事件發生時返回(此處不考慮時間事件),就緒事件將存儲於底層的就緒數組aeApiState.events;

(3) 將就緒數組拷貝到aeEventLoop的就緒數組aeEventLoop.fired中;

(4)通過fd,在aeEventLoop的注冊文件事件數組中找到aeFileEvent -- eventLoop->events[fd],最后調用相關回調函數,完成事件處理。

 

參考:

redis中事件模型實現分析

事件庫之Redis自己的事件模型-ae

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM