1 模型簡介
Redis
沒有使用第三方的libevent等網絡庫,而是自己開發了一個單線程的Reactor模型的事件處理模型。而Memcached
內部使用的libevent庫,多線程模型。
綜合對比可見:nginx,memcached,redis網絡模型總結
Redis在主循環中統一處理文件事件和時間事件,信號事件則由專門的handler來處理。
文件事件,我理解為IO事件,Redis將產生事件套接字放入一個就緒隊列中,即redisServer.aeEventLoop.fired數組,然后在aeProcessEvents
會依次分派給文件事件處理器;
Redis編寫了多個文件事件處理器。
Redis中文件事件包括:客戶端的連接、命令請求、數據回復、連接斷開,當上述事件發生時,會造成相應的描述符可讀可寫,再調用相應類型的文件事件處理器。
文件事件處理器有:
- 連接應答處理器
networking.c/acceptTcpHandler
; - 命令請求處理器
networking.c/readQueryFromClinet
; - 命令回復處理器
networking.c/sendReplyToClient
;
時間事件包含定時事件
和周期性事件
,Redis將其放入一個無序鏈表中,每當時間事件執行器運行時,就遍歷鏈表,查找已經到達的時間事件,調用相應的處理器。
(1) 主循環
def ae_Main(): #一直循環處理事件 while(not_stop){ aeProcessEvents() }
(2)aeProcessEvents調度文件事件和時間事件的過程:
def aeProcessEvents(): time_event = aeSearchNearestTimer() #獲取當前時間最近的時間事件 remaind_ms = time_event.when - unix_ts_now() #獲取最近的時間事件達到的毫秒時間 if remaind_ms < 0 : #時間為負數,賦值0 remaind_ms = 0 timeval = create_timeval_with_ms(remainds_ms) #創建等待的時間結構 aeApiPoll(timeval) #等待文件事件產生,時間取決於remainds_ms processFileEvent() #處理文件事件 processTimeEvent() #處理時間事件
2 Reactor事件模型在Redis中的應用
下面主要結合文件事件的處理過程講解Reactor事件模型在Redis中的應用。其中,Reactor事件模型框圖如下所示:
2.1 Initiation Dispatcher在Redis中的實現
(1) handle_events()
在Redis中,對於文件事件,相應的處理函數為Ae.c/aeProcessEvents,其關鍵處理流程如下:
(1)底層調用接口返回,將就緒事件拷貝到eventLoop->fired數組;
(2)遍歷就緒數組,獲取相關fd,進而獲取fd對應的aeFileEvent : eventLoop->events[fd],從而得到相關回調函數;
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ ....省略 // 獲取就緒文件事件,阻塞時間由最近的時間事件決定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 從已就緒數組中獲取包裝后的文件事件aeFileEvent aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; // 獲取文件事件的詳細參數:fd, mask int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 處理讀事件,調用相關回調函數 if (fe->mask & mask & AE_READABLE) { // rfired 確保讀/寫事件只能執行其中一個 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 處理寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } // 處理時間事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); }
(2)register_handler/remove_handler 事件處理器的注冊與刪除等
在Redis中,相關的處理函數也在Ae.c文件中:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, eFileProc *proc, void *clientData); //創建文件事件(fd:mask),相關的回掉函數為eFileProc void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //將 fd 從 mask 指定的監聽隊列中刪除 int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //獲取fd被監控的事件mask
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
2.2 Synchronous Event Demultiplexer在Redis中的實現
針對IO復用方法,比如select
,poll
,epoll
,kqueue
等,每種方法的效率和使用方法都不相同,Redis通過統一包裝各方法,來屏蔽它們的不同之處。
(1) IO復用跨平台
首先,Redis會根據平台,自動選擇性能最好的IO復用函數庫。該過程提現在Ae.c
頭文件包含中,如下:
#ifdef HAVE_EVPORT #include "ae_evport.c" //evport優先級最高 #else #ifdef HAVE_EPOLL #include "ae_epoll.c" //epoll優先級較次 #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" //kqueue優先級還次 #else #include "ae_select.c" //select優先級最低 #endif #endif #endif
(2) 統一事件接口
ae_select.c
、ae_epoll.c
、ae_kqueue.c
、ae_evport.c
都提供一套統一的事件注冊、刪除接口,使得在ae.c
中可以直接使用以下接口,其中針對epoll的包裝實現如下:
/* 事件狀態*/ typedef struct aeApiState { int epfd; //epoll_event 實例描述符 struct epoll_event *events; // 事件槽,存儲返回的就緒事件,大小為eventLoop->setsize } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) //創建aeApiState實例,並賦值於eventLoop->apidata
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) //增加關注的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) //刪除關注的事件 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) //等待事件就緒返回,並存儲於eventLoop->fired數組 static char *aeApiName(void) //獲取底層調用的IO復用接口,如epoll
2.3 Concrete Event Handler
文件事件相關的一些具體的事件處理器如下:
連接請求acceptTcpHandler:在 redis.c/initServer中,程序會為redisServer.eventLoop關聯一個客戶連接的事件處理器。
命令請求readQueryFromClinet : 當新連接來的時候,需要調用networking.c/createClient創建客戶端,在其中為客戶端套接字注冊讀事件,關聯處理器readQueryFromClinet。
命令回復sendReplyToClient : 當Redis調用networking.c/addReply時,會調用prepareClientToWrite來注冊寫事件,當套接字可寫時,觸發sendReplyToClient發送命令回復。
2.4 相關數據結構
從上面的相關接口可以發現,大多用到了結構體:aeEventLoop, aeFileEvent, aeFiredEvent。 它們之間的關系圖如下:
(1) aeFileEvent
/* File event structure * * 文件事件結構 */ typedef struct aeFileEvent { // 監聽事件類型掩碼, // 值可以是 AE_READABLE 或 AE_WRITABLE , // 或者 AE_READABLE | AE_WRITABLE int mask; /* one of AE_(READABLE|WRITABLE) */ // 讀事件處理器 aeFileProc *rfileProc; // 寫事件處理器 aeFileProc *wfileProc; // 多路復用庫的私有數據 void *clientData; } aeFileEvent;
可以發現aeFileEvent中沒有fd信息,獲取fd對應的aeFileEvent,需要到eventLoop->events[fd]處提取,因為在調用aeCreateFileEvent事件處理器注冊函數時,將fd對應的aeFileEvent函數存儲於eventLoop->events[fd]處。
(2)aeFiredEvent
/* A fired event * * 已就緒事件 */ typedef struct aeFiredEvent { // 已就緒文件描述符 int fd; // 事件類型掩碼, // 值可以是 AE_READABLE 或 AE_WRITABLE // 或者是兩者的或 int mask; } aeFiredEvent;
aeFiredEvent剛好包含一個就緒事件的所有有用信息,在aeApiPoll調用底層IO復用函數(如epoll)返回時,會將就緒事件從底層的就緒數組aeApiState.events拷貝到eventLoop->fired就緒數組中;通過aeFiredEvent中的fd可以找到對應的aeFileEvent,進而獲取相關的回調函數。
(3) aeEventLoop
// 事件處理器的狀態
typedef struct aeEventLoop { // 目前已注冊的最大描述符 int maxfd; /* highest file descriptor currently registered */ // 目前已追蹤的最大描述符 int setsize; /* max number of file descriptors tracked */ // 用於生成時間事件 id long long timeEventNextId; // 最后一次執行時間事件的時間 time_t lastTime; /* Used to detect system clock skew */ // 已注冊的文件事件 aeFileEvent *events; /* Registered events,events數組下標與fd對應 */ // 已就緒的文件事件 aeFiredEvent *fired; /* Fired events */ // 時間事件 aeTimeEvent *timeEventHead; // 事件處理器的開關 int stop; // 多路復用庫的私有數據 void *apidata; /* This is used for polling API specific data */ // 在處理事件前要執行的函數 aeBeforeSleepProc *beforesleep;
該結構的初始化創建過程如下:
/* * 初始化事件處理器狀態 */ aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; ... // 初始化文件事件結構和已就緒文件事件結構數組 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); //aeFileEvent中沒有fd,如何獲取fd信息,將fd對應的aeFileEvent存儲於eventLoop->events[fd]處 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); ... // 設置數組大小 eventLoop->setsize = setsize; // 初始化執行最近一次執行時間 eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ // 初始化監聽事件 for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; // 返回事件循環 return eventLoop; }
2.5 register_handler/remove_handler 事件處理器注冊與刪除等的具體實現
(1)aeCreateFileEvent
該事件處理器注冊函數主要涉及到變量eventLoop->events,eventLoop->apidata
其中,eventLoop->events數組主要用於存儲aeFileEvent,包括回調函數,感興趣的事件掩碼mask,clientData等,fd對應的aeFileEvent存儲於eventLoop->events[fd]處。(通過aeFileEvent和events數組,便將fd:mask和相關回調函數proc對應起來)
在調用aeApiAddEvent時,會將fd的指定事件加入底層的IO復用函數中;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } if (fd >= eventLoop->setsize) return AE_ERR; // 取出文件事件結構 aeFileEvent *fe = &eventLoop->events[fd]; // 監聽指定 fd 的指定事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 設置文件事件類型,以及事件的處理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有數據 fe->clientData = clientData; // 如果有需要,更新事件處理器的最大 fd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
(2)void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //刪除文件事件
與aeCreateFileEvent相反,將在fd對應的aeFileEvent中,取消對事件mask的關注;並通過aeApiDelEvent在底層取消對fd相關事件mask的監聽。具體代碼如下:
/* * 將 fd 從 mask 指定的監聽隊列中刪除 */ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { if (fd >= eventLoop->setsize) return; // 取出文件事件結構 aeFileEvent *fe = &eventLoop->events[fd]; // 未設置監聽的事件類型,直接返回 if (fe->mask == AE_NONE) return; // 計算新掩碼 fe->mask = fe->mask & (~mask); if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { /* Update the max fd */ int j; for (j = eventLoop->maxfd-1; j >= 0; j--) if (eventLoop->events[j].mask != AE_NONE) break; eventLoop->maxfd = j; } // 取消對給定 fd 的給定事件的監視 aeApiDelEvent(eventLoop, fd, mask); }
3 Redis中事件監聽和處理的流程圖
Redis中事件監聽和處理的流程如下:
(1) 通過aeApiPoll監聽用戶感興趣的事件;
(2) 當有文件事件發生時返回(此處不考慮時間事件),就緒事件將存儲於底層的就緒數組aeApiState.events;
(3) 將就緒數組拷貝到aeEventLoop的就緒數組aeEventLoop.fired中;
(4)通過fd,在aeEventLoop的注冊文件事件數組中找到aeFileEvent -- eventLoop->events[fd],最后調用相關回調函數,完成事件處理。
參考: