Redis自己的事件模型 ae


http://my.oschina.net/u/917596/blog/161077

 

1.Redis的事件模型庫

大家到網上Google“Redis libevent”就可以搜到Redis為什么沒有選擇libevent以及libev為其事件模型庫,而是自己寫了一個事件模型。 
從代碼中可以看到它主要支持了epoll、select、kqueue、以及基於Solaris的event ports。主要提供了對兩種類型的事件驅動:

  1. IO事件(文件事件),包括有IO的讀事件和寫事件。
  2. 定時器事件,包括有一次性定時器和循環定時器。

2.使用示例

這里寫了一個由標准輸入的讀事件驅動的echo服務例子,同時用一個5秒的循環定時器每個5秒打印一次服務器狀態。這里用了epoll為底層 
事件接口。具體的代碼抽取可以從Redis的源碼中抽取"ae.c”、“ae.h”、“ae_select.c”、“ae_epoll.c”、“ae_evport.c"這幾個文件,通過 
ae.c中的宏::

#define HAVE_EPOLL 1 // illustrate to use epoll #ifdef HAVE_EVPORT # include "ae_evport.c" #else #ifdef HAVE_EPOLL # include "ae_epoll.c" #else #ifdef HAVE_KQUEUE # include "ae_kqueue.c" #else # include "ae_select.c" #endif #endif #endif 

這里主要是分析Redis的事件模型的封裝,因此對於其對socket的包裝以及內存管理都不做分析。故采用標准輸入,同時需要將這些文件中 
的內存管理接口"zmalloc()“以及"zfree()“替換成C庫中的“malloc()”還有"free()“。可以使用sed或者vim的%s做替換操作。

將主程序貼在這里::

#include "ae.h" #include <stdio.h> #include <assert.h> #include <unistd.h> #include <sys/time.h> #define MAXFD 5 void loop_init(struct aeEventLoop *l) { puts("I'm loop_init!!! \n"); } void file_cb(struct aeEventLoop *l,int fd,void *data,int mask) { char buf[51] ={0}; read(fd,buf,51); printf("I'm file_cb ,here [EventLoop: %p],[fd : %d],[data: %p],[mask: %d] \n",l,fd,data,mask); printf("get %s",buf); } int time_cb(struct aeEventLoop *l,long long id,void *data) { printf("now is %ld\n",time(NULL)); printf("I'm time_cb,here [EventLoop: %p],[id : %lld],[data: %p] \n",l,id,data); return 5*1000; } void fin_cb(struct aeEventLoop *l,void *data) { puts("call the unknow final function \n"); } int main(int argc,char *argv[]) { aeEventLoop *l; char *msg = "Here std say:"; char *user_data = malloc(50*sizeof(char)); if(! user_data) assert( ("user_data malloc error",user_data) ); memset(user_data,'\0',50); memcpy(user_data,msg,sizeof(msg)); l = aeCreateEventLoop(MAXFD); aeSetBeforeSleepProc(l,loop_init); int res; res = aeCreateFileEvent(l,STDIN_FILENO,AE_READABLE,file_cb,user_data); printf("create file event is ok? [%d]\n",res); res = aeCreateTimeEvent(l,5*1000,time_cb,NULL,fin_cb); printf("create time event is ok? [%d]\n",!res); aeMain(l); puts("Everything is ok !!!\n"); return 0; } 

沒有什么邏輯,就是注冊一個標准輸入的讀事件,和一個定時器事件。這里要說明的就是在ae.h中定義了讀、寫、定時器等回調函數的類型::

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); 

按這個類型定義回調函數就可以。其中asFileProc和aeTimeProc比較容易理解,就是IO讀寫事件和定時器事件的回調函數。這里要注意了,如果 
定義的定時器回調函數返回值為正數,那么表示該定時器是一個循環定時器,即在第一次執行完后添加定時器事件時給定的延遲后不刪除定時器, 
在延遲該返回值時間(單位是毫秒)后再次執行該定時器。所以就要注意,比如要每5秒執行一個操作,那么在添加定時器時要給定其定時時間為 
5000毫秒,同時在該定時器的回調函數中也要返回5000.

然后aeBeforeSleepProc回調函數比較的撲朔迷離,從Sleep上不容易理解,其實想到select和epoll這些機制的作用就可以想到了,這個函數是在 
poll之前執行,從源碼中看到就是在每個處理事件的循環開始出執行的。而aeEventFinalizerProc單從名字就更難理解,從源碼中看到它是在刪除 
定時器事件時候執行的。

clientData比較好理解,就和在epoll中的ptr指針的作用一樣。主要可以存放用戶對每個事件上附加的數據。

事件循環的入口函數是aeMain(),將創建好的aeEventLoop傳入就可以了。

使用起來很簡單,對於不是很復雜或者對接入層要求不高的應用可以一試。

3.ae.c分析

Redis的ae(姑且這么稱呼Redis用的事件模型庫的名字)主要邏輯在文件“ae.c”中,其中根據使用的系統事件接口分別選擇包含"ae_epoll.c"或其他 
文件。用到的主要數據結構在文件“ae.h”中定義。下面用一個不規范的UML類圖表示了幾個主要數據結構之間的關系,其中連在一起的表示一個數組或者 
箭頭表示的鏈表。這么畫主要是幫助理解。

類圖

下面根據上面的示例程序一一做說明。

3.1 主要數據結構的創建

3.1.1 aeCreateEventLoop

首先要創建一個aeCreateEventLoop對象。該對象需要一個最大文件描述符作為參數setSize,這個參數的意義需要了解ae的數據存放結構。從上面的圖可以看到 
在aeEventLoop結構中有兩個數組(其實就是服務器程序慣用提前分配好內存然后用index映射到相應位置的做法),這兩個數組的大小就是這里的參數值。 
ae會創建一個 setSize*sizeof(aeFileEvent) 以及一個 setSize*siezeof(aeFiredEvent) 大小的內存,用文件描述符作為其索引。這一可以達到0(1)的速度找到事件數據所在位置。那么這個大小定位多少合適呢?在Linux個中,文件描述符是個有限的資源,當打開一個文件時就會消耗一個文件描述符,當關閉該文件描述符或者程序結束時會釋放該文件描述符資源,從而供其他文件打開操作使用。當文件描述符超過最大值后,打開文件就會出錯。那么這個最大值是多少呢?可以通過/proc/sys/fs/file-max看到系統支持的最大的文件描述符數。通過 ulimit -n 可以看到當前用戶能打開的最大的文件描述符。在我這里的一台8g內存的機器上,系統支持最大的文件描述是365146。而在這台64bit的機器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小為40byte。按系統最大支持的文件描述符來算,固定消耗內存為14.6M。這樣以文件描述符作為數組的下標來索引,雖然這樣的哈希在接入量不大的情況下會有大量的浪費。但是最多也就浪費14M 的內存,因此這樣的設計是可取的。

在做好這些准備后還要准備系統提供的事件模型接口。這里以epoll為例,其他的可以自行查閱源碼了解。ae首先提供了一個統一的結構名aeApiState,可以想象成c++中接口。在包裝epoll的aeApiState中有一個epfd表示epoll占用的fd,一個epoll_event *events,其實也是一個aeApiState數組::

其和aeFiredEvent相對應,當epoll_wait()返回時,會將pending的文件描述符的信息放在aeFiredEvent數組中,包括有fd,以及mask事件類型,此時的aeFiredEvent不是以fd作為下標的,而是把這個數組當成一個緩沖區,存放一次epoll_wait()返回的所有fd,同時用epoll_event數組存放了epoll_wait()返回中的epoll_data數據,用其數據可以填充aeFiredEvent數組的內容供ae使用找到pending d的aeFileEvent對象。並在下一次進入epoll_wait()前處理完。這樣完成了對epoll數據封裝。

typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState; 

3.1.2 aeCreateFileEvent

創建IO事件時需要指定要要注冊的文件的文件描述符fd,以及要監聽的事件類型mask。ae會先通過fd找到其對應的aeCreateFileEvent對象所在內存位置::

typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; 

然后添加其要監聽的事件類型mask fe->mask |= mask;,接着回根據要監聽的類型添加其讀事件或者寫事件的回調函數,即aeFileProc。同時更新maxfd以備后用,如在select中的最大fd的指定。

在創建文件事件的過程中還要通過宏判斷后include進來的底層事件模型接口來注冊IO事件。這里和上面一樣以epoll為例,其他的事件模型也類似。通過aeApiAddEvent將文件描述符fd和事件類型mask傳給epoll操作。首先通過fd為下標找到aeCreateFileEvent對應的位置,然后取得epoll的epfd.通過EPOLL_CTL_ADD和EPOLL_CTL_MOD來加入或者修改epoll在該fd上事件的類型。

3.1.3 aeCreateTimeEvent

ae的定時器是用一個單鏈表來管理的,將定時器依次從head插入到單鏈表中。插入的過程中會取得未來的牆上時間作為其超時的時刻。即將當前時間加上添加定時器時給定的延遲時間。定時器結構如下。並設置超時以及注銷定時器時的回調函數還用clientData::

typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent; 

3.2 事件循環

3.2.1 aeMain入口函數

ae事件循環的基本機構就是用一個無限循環,然后再循環中去檢測各個事件的發生。當然這里不是完全意義上的輪詢,因為循環里面封裝了epoll,select等事件驅動機制::

while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } 

這里的beforesleep就是上文中敘述過的,進入一次循環之前做的操作。后面會說到定時的過程其實也就是一個epoll或者select模擬的sleep過程,而等待事件到來也是“sleep”在epoll或者select上。所以這個叫名字感覺也算貼切。當然這里是YY一下。不過可以幫助理解。

3.2.2 aeProcessEvents

ae中最主要的邏輯應該也就是事件的處理了。從上面知道aeProcessEvents是處理事件的入口。在進入事件處理函數時,首先若沒有任何事件則立即返回::

/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 

這里注釋中說的ASAP我不太理解表示的啥意思,望高人指點。

然后判斷是否有定時器事件,如果有那么就去取得最近的一個將超時定時器的時間減去當前時間作為epoll或者select等事件接口的超時時間。該尋找過程就是通過遍歷單鏈表得來的。這樣指定超時時間,在有IO事件pending時可以處理IO事件,同時若沒有則可以保證從epoll或者select中返回去處理定時器事件。不過這里也可以不注冊定時器事件然后將事件的flags與上AE_DONT_WAIT,那么就會在poll中一直等待IO時間的到來。

在獲得事件接口的超時時間后,用其調用封裝事件接口的函數aeApiPoll。這里依舊以epoll作為示例。其將首先獲得apidata,然后從中獲得epoll的文件描述符epfd,並用events指針指向的數組內存以及超時時間調用epoll的epoll_wait().在上面已經描述了,epoll()返回時會將結果放在epoll_event數組中同時返回新的文件描述符。通過對返回數據的事件類型做判斷可以填充到aeFiredEvent中fd和事件類型信息。

然后返回到ae的邏輯中,通過遍歷aeFiredEvent數組取得fd可以找到pending事件的aeFileEvent,然后根據事件的類型去調用用戶定義的IO回調函數。

當epoll或者select超時返回時並注冊了定時器事件時,通過processTimeEvents進入去處理超時事件::

/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } 

這里的注釋說明了這么做的意義,其實就是如果系統事件變更了,就將所有的定時器時間設為0,讓他在本次循環中超時並被執行。

當一個定時器被處理的時候,此時可能會加入新的定時,比如在定時器處理函數中加入新的定時器。而此時僅應該處理上一個時間段的狀態,不應該在本次循環中去處理新的定時器。因此ae在EventLoop中加入了一個timeEventNextId的成員表示此次循環中最大的定時器id+1,這樣在遍歷定時器列表時,先保存最大的定時器id,然后遍歷過程過濾掉定時器列表可能加入新的定時器即可::

    if (te->id > maxId) { te = te->next; continue; } 

這里定時器的邏輯是若單鏈表中的定時器時間比當前時間晚就執行定時器注冊的回調函數。如果該回調函數返回正值,那么就更新定時器時間為該值之后,從而可以循環執行定時器。如果該回調函數返回AE_NOMORE,那么在執行完回調函數后注銷該定時器。

3.3 清理工作

3.3.1 注銷IO事件

注銷IO事件不是以aeFileEvent為單位而是該IO事件加上其監聽的事件類型為對象,因此其接口為aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)。其首先通過fd找到去掉aeFileEvent對象,然后獲得已有的mask,對其進行減操作后,構成fd上新的mask事件類型。通過修改epoll或者select中注冊的IO事件來完成。這里以epoll為例,會根據該文件描述符上是否還有待等待的事件類型分別調用epoll_ctr的EPOLL_CTL_MOD或者EPOLL_CTL_DEL命令。

3.3.2注銷Timer時間

注銷定時器事件的操作比較暴力,直接遍歷鏈表,找到定時器id匹配的項,使用單鏈表刪除操作進行刪除。這里再刪除之前會調用定時器上的finalizerProc。

3.3.3注銷aeEventLooop

最后注銷aeEventLooop就是對相關內存的釋放。

4.總結

分析到這就結束了。感覺ae比較的直觀。主要提供了一個IO事件和定時器事件的事件驅動模型。定時器的單鏈表邏輯可以再改進,比如用最小堆或者Timing-Wheel等著名的定時器解決方法。這樣的一個模型用select可以跨到Windows上。因此用這套東西寫的server再客戶端測試的時候,也可以復用接入層。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM