Redis系列(三)---事件處理細節分析及epoll介紹


  上兩篇介紹了redis的啟動流程接受客戶端請求到調用請求處理函數,在這篇里,我將介紹redis事件觸發細節,即epoll介紹。從redis源碼可以看出,redis的io模型主要是基於epoll實現的,不過它也提供了 select和kqueue的實現,默認采用epoll。

ae.c

#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
    #ifdef HAVE_KQUEUE
    #include "ae_kqueue.c"
    #else
    #include "ae_select.c"
    #endif
#endif

通過這么一個條件包含,就可以決定redis使用哪種i/o多路復用函數。同時redis通過ae.h的一系列聲明為上層提供了一個統一的接口,以此隱藏底層io多路函數的具體實現。
有關unix io模型的類型,可參考(Unix 五種基本I/O模型的區別) .

那么epoll到底是個什么東西呢? 其實只是眾多i/o多路復用技術當中的一種而已,但是相比其他io多路復用技術(select, poll等等),epoll有諸多優點:

  1. epoll 沒有最大並發連接的限制,上限是最大可以打開文件的數目,這個數字一般遠大於 2048, 一般來說這個數目和系統內存關系很大  ,具體數目可以 cat /proc/sys/fs/file-max 察看。

  2. 效率提升, Epoll 最大的優點就在於它只管你“活躍”的連接 ,而跟連接總數無關,因此在實際的網絡環境中, Epoll 的效率就會遠遠高於 select 和 poll 。

  3. 內存拷貝, Epoll 在這點上使用了“共享內存 ”,這個內存拷貝也省略了。

那么在我們的系統中,到底應該如何使用epoll呢? 這里,epoll給我們提供了3個api: epoll_create, epoll_ctl, epoll_wait。

1: int epoll_create(int size);

生成一個 epoll 專用的文件描述符,其實是申請一個內核空間,用來存放你想關注的 socket fd 上是否發生以及發生了什么事件。 size 就是你在這個 epoll fd 上能關注的最大 socket fd 數,大小自定,只要內存足夠。

2: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event );

控制某個 epoll 文件描述符上的事件:注冊、修改、刪除。參數說明:

   epfd 是 epoll_create() 創建 epoll 專用的文件描述符。相對於 select 模型中的 FD_SET 和 FD_CLR 宏;

  op就是你要把當前這個套接口fd如何設置到epfd上邊去,一般由epoll提供的三個宏指定:EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD。

  fd: 當事件發生時操作的目標套接口。

  event指針就是你要給這個套接口fd綁定什么事件。

3: int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

等待 I/O 事件的發生;參數說明:

epfd: 由 epoll_create() 生成的 Epoll 專用的文件描述符;

epoll_event: 用於回傳代處理事件的數組;

maxevents: 返回的最大事件數;

timeout: 等待 I/O 事件發生的超時值(毫秒);

epoll_wait返回觸發的事件數。

 

下面看一個例子:

 

  

kdpfd = epoll_create(1024);
epoll_event lev;
lev.events = EPOLLIN;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener,  &lev);
struct epoll_event ev, *events;
for(;;) {
    nfds = epoll_wait(kdpfd, events, maxevents, -1);
    for(n = 0; n < nfds; ++n) {
        if(events[n].data.fd == listener) {
            client = accept(listener, (struct sockaddr *) &local,
                            &addrlen);
            if(client < 0){
                perror("accept");
                continue;
            }
            setnonblocking(client);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = client;
            if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                fprintf(stderr, "epoll set insertion error: fd=%d0,
                        client);
                return -1;
            }
        }
        else
            do_use_fd(events[n].data.fd);
    }
}

 

  首先,通過epoll_create創建一個epoll實例, 然后聲明一個epoll_event lev(這是一個struct,epoll用它來代表事件), 並將該lev的events賦值為EPOLLIN(這樣當listener上有數據可讀時,那么epoll_wait便會返回該fd), 最后再調用epoll_wait 等待 kdpfd這個epoll實例上事件的發生。當有事件發生(io讀寫事件)或者到達設定的超時值,那么epoll_wait就會返回,然后我們就可以通過 events拿到相應的socketfd並進行相應的處理。

  例子中是當給listener綁定的可讀事件發生時(客戶端連接到達),那么就調用accept函數,獲取客戶端與服務器段的套接字client , 然后給這個套接字綁定 ev.events = EPOLLIN | EPOLLET; 並調用 epoll_ctl函數將該套接字client 加入到epoll實例kdpfd,再次循環進行epoll_wait, 這樣,當client有數據可讀時(客戶端請求數據到達),那么就可以進行下一步處理了,如調用recv/read接受客戶端數據,等等。

  epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev)

  從上邊的介紹中,我們知道了如何調用epoll提供的api, 生成epoll實例,如何給套接口設置相應事件,如何將套接口添加到epoll實例以及進行事件輪詢(epoll_wait)等待相應事件的發生並處理, 再來看redis代碼, 就可以對redis接受客戶端請求並處理的過程一目了然了。

  

 

如圖所示,如果監聽的端口有連接到來,那么epoll_wait返回,那么redis會把觸發的套接口放到eventLoop.fired這個數組里:

 

 1  retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
 2             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
 3     if (retval > 0) {
 4         int j;
 5 
 6         numevents = retval;
 7         for (j = 0; j < numevents; j++) {
 8             int mask = 0;
 9             struct epoll_event *e = state->events+j;
10 
11             if (e->events & EPOLLIN) mask |= AE_READABLE;
12             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
13             if (e->events & EPOLLERR) mask |= AE_WRITABLE;
14             if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
15             eventLoop->fired[j].fd = e->data.fd;
16             eventLoop->fired[j].mask = mask;
17         }
18     }

 

 

 

  然后在aeProcessEvents這個函數里,會取出eventLoop.fired中的fd,並取出對應的事件:aeFileEvent *fe, 然后判斷事件的類型,調用相應的處理函數:

  

 if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

 

至此,redis的啟動流程,接受客戶端請求到調用請求處理函數,以及事件如何觸發,如何調用處理函數,在三篇博客里都做了詳細的分析。相信結合這三篇博客可以對redis內部的實現有一個較為深刻的理解。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM