上兩篇介紹了redis的啟動流程接受客戶端請求到調用請求處理函數,在這篇里,我將介紹redis事件觸發細節,即epoll介紹。從redis源碼可以看出,redis的io模型主要是基於epoll實現的,不過它也提供了 select和kqueue的實現,默認采用epoll。
ae.c
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
通過這么一個條件包含,就可以決定redis使用哪種i/o多路復用函數。同時redis通過ae.h的一系列聲明為上層提供了一個統一的接口,以此隱藏底層io多路函數的具體實現。
有關unix io模型的類型,可參考(Unix 五種基本I/O模型的區別) .
那么epoll到底是個什么東西呢? 其實只是眾多i/o多路復用技術當中的一種而已,但是相比其他io多路復用技術(select, poll等等),epoll有諸多優點:
1. epoll 沒有最大並發連接的限制,上限是最大可以打開文件的數目,這個數字一般遠大於 2048, 一般來說這個數目和系統內存關系很大 ,具體數目可以 cat /proc/sys/fs/file-max 察看。
2. 效率提升, Epoll 最大的優點就在於它只管你“活躍”的連接 ,而跟連接總數無關,因此在實際的網絡環境中, Epoll 的效率就會遠遠高於 select 和 poll 。
3. 內存拷貝, Epoll 在這點上使用了“共享內存 ”,這個內存拷貝也省略了。
那么在我們的系統中,到底應該如何使用epoll呢? 這里,epoll給我們提供了3個api: epoll_create, epoll_ctl, epoll_wait。
1: int epoll_create(int size);
生成一個 epoll 專用的文件描述符,其實是申請一個內核空間,用來存放你想關注的 socket fd 上是否發生以及發生了什么事件。 size 就是你在這個 epoll fd 上能關注的最大 socket fd 數,大小自定,只要內存足夠。
2: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event );
控制某個 epoll 文件描述符上的事件:注冊、修改、刪除。參數說明:
epfd 是 epoll_create() 創建 epoll 專用的文件描述符。相對於 select 模型中的 FD_SET 和 FD_CLR 宏;
op就是你要把當前這個套接口fd如何設置到epfd上邊去,一般由epoll提供的三個宏指定:EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD。
fd: 當事件發生時操作的目標套接口。
event指針就是你要給這個套接口fd綁定什么事件。
3: int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
等待 I/O 事件的發生;參數說明:
epfd: 由 epoll_create() 生成的 Epoll 專用的文件描述符;
epoll_event: 用於回傳代處理事件的數組;
maxevents: 返回的最大事件數;
timeout: 等待 I/O 事件發生的超時值(毫秒);
epoll_wait返回觸發的事件數。
下面看一個例子:
kdpfd = epoll_create(1024); epoll_event lev; lev.events = EPOLLIN; epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &lev); struct epoll_event ev, *events; for(;;) { nfds = epoll_wait(kdpfd, events, maxevents, -1); for(n = 0; n < nfds; ++n) { if(events[n].data.fd == listener) { client = accept(listener, (struct sockaddr *) &local, &addrlen); if(client < 0){ perror("accept"); continue; } setnonblocking(client); ev.events = EPOLLIN | EPOLLET; ev.data.fd = client; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%d0, client); return -1; } } else do_use_fd(events[n].data.fd); } }
首先,通過epoll_create創建一個epoll實例, 然后聲明一個epoll_event lev(這是一個struct,epoll用它來代表事件), 並將該lev的events賦值為EPOLLIN(這樣當listener上有數據可讀時,那么epoll_wait便會返回該fd), 最后再調用epoll_wait 等待 kdpfd這個epoll實例上事件的發生。當有事件發生(io讀寫事件)或者到達設定的超時值,那么epoll_wait就會返回,然后我們就可以通過 events拿到相應的socketfd並進行相應的處理。
例子中是當給listener綁定的可讀事件發生時(客戶端連接到達),那么就調用accept函數,獲取客戶端與服務器段的套接字client , 然后給這個套接字綁定 ev.events = EPOLLIN | EPOLLET; 並調用 epoll_ctl函數將該套接字client 加入到epoll實例kdpfd,再次循環進行epoll_wait, 這樣,當client有數據可讀時(客戶端請求數據到達),那么就可以進行下一步處理了,如調用recv/read接受客戶端數據,等等。
epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev)
從上邊的介紹中,我們知道了如何調用epoll提供的api, 生成epoll實例,如何給套接口設置相應事件,如何將套接口添加到epoll實例以及進行事件輪詢(epoll_wait)等待相應事件的發生並處理, 再來看redis代碼, 就可以對redis接受客戶端請求並處理的過程一目了然了。
如圖所示,如果監聽的端口有連接到來,那么epoll_wait返回,那么redis會把觸發的套接口放到eventLoop.fired這個數組里:
1 retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
2 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
3 if (retval > 0) {
4 int j;
5
6 numevents = retval;
7 for (j = 0; j < numevents; j++) {
8 int mask = 0;
9 struct epoll_event *e = state->events+j;
10
11 if (e->events & EPOLLIN) mask |= AE_READABLE;
12 if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
13 if (e->events & EPOLLERR) mask |= AE_WRITABLE;
14 if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
15 eventLoop->fired[j].fd = e->data.fd;
16 eventLoop->fired[j].mask = mask;
17 }
18 }
然后在aeProcessEvents這個函數里,會取出eventLoop.fired中的fd,並取出對應的事件:aeFileEvent *fe, 然后判斷事件的類型,調用相應的處理函數:
if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); }
至此,redis的啟動流程,接受客戶端請求到調用請求處理函數,以及事件如何觸發,如何調用處理函數,在三篇博客里都做了詳細的分析。相信結合這三篇博客可以對redis內部的實現有一個較為深刻的理解。