Nginx事件管理之epoll模塊


1. epoll 原理

假設有 100 萬用戶同時與一個進程保持着 TCP 連接,而每一時刻只有幾十個或幾百個 TCP 連接時活躍的(接收到 TCP
包),也就是說,在每一時刻,進程只需要處理這 100 萬連接中的一小部分連接。

select 和 poll 的做法是:進程每次收集事件的連接(其實這 100 萬連接中的大部分都是沒有事件發生的)都把這 100 萬連
接的套接字傳給操作系統(這首先就是用戶態內存到內核態內存的大量復制),而由操作系統內核尋找這些連接上有沒有未
處理的事件,將會是巨大的資源浪費,因此 select 和 poll 最多只能處理幾千個並發連接。

而 epoll 則是在 Linux 內核中申請了一個簡易的文件系統,把原先的一個 select 或者 poll 調用分成了 3 個部分:

  • 調用 epoll_create 建立 1 個 epoll 對象(在 epoll 文件系統中給這個句柄分配資源);
  • 調用 epoll_ctl 向 epoll 對象中添加這 100 萬個連接的套接字;
  • 調用 epoll_wait 收集發生事件的連接。
Linux 2.6.35內核對 epoll 的實現

當某一個進程調用 epoll_create 方法時,Linux 內核會創建一個 eventpoll 結構體:

struct eventpoll {
    ...
    /* 紅黑樹的根節點,這棵樹中存儲着所有添加到 epoll 中的事件,也就是這個 epoll 監控的事件 */
    struct rb_root rbr;
    
    /* 雙向鏈表 rdllist 保存着將要通過 epoll_wait 返回給用戶的、滿足條件的事件 */
    struct list_head rdllist;
};

每一個 epoll 對象都有一個獨立的 eventpoll 結構體,這個結構體會在內核空間中創造獨立的內存,用於存儲使用
epoll_ctl 方法向 epoll 對象中添加進來的事件。這些事件都會掛到 rbr 紅黑樹中,這樣,重復添加的事件就可以通過紅黑
樹而高效地識別出來。

所有添加到 epoll 中的事件都會與設備(如網卡)驅動程序建立回調關系,即相應的事件發生時會調用 ep_poll_callback 回調
方法,它會把這樣的事件放到上面的 rdllist 雙向鏈表中。

在 epoll 中,對於每一個事件都會建立一個 epitem 結構體:

struct epitem {
    ...
    /* 紅黑樹節點 */
    struct rb_node rbn;
    
    /* 雙向鏈表節點 */
    struct list_head rdllink;
    
    /* 事件句柄等信息 */
    struct epoll_filefd ffd;
    
    /* 指向所屬的 eventpoll 對象 */
    struct eventpoll *ep;
    
    /* 期待的事件類型 */
    struct epoll_event event;
    ...
};

這里包含每一個事件對應着的信息。
當調用 epoll_wait 檢查是否有事件的連接時,只是檢查 eventpoll 對象中的 rdllist 雙向鏈表是否有 epitem 元素,如果
rdllist 鏈表不為空,則把這里的事件復制到用戶態內存中,同時將事件數量返回給用戶。

2. epoll 的使用

2.1 epoll 的接口

2.1.1 epoll_create()

int epoll_create(int size);

系統調用 epoll_create() 創建一個 epoll 的句柄,之后 epoll 的使用都將依靠這個句柄來標識。參數 size 是告知 epoll 所
要處理的大致事件數目。不再使用 epoll 時,必須調用 close 關閉這個句柄。

2.1.2 epoll_ctl()

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

epoll_ctl 向 epoll 對象中添加、修改或者刪除感興趣的事件,返回 0 表示成功,否則返回 -1,此時需要根據 errno 錯誤碼
判斷錯誤類型。

  • epfd:是 epoll_create() 返回的句柄;
  • op:表示動作,可取的值有:
    • EPOLL_CTL_ADD: 添加新的事件到 epoll 中
    • EPOLL_CTL_MOD: 修改 epoll 中的事件
    • EPOLL_CTL_DEL:刪除 epoll 中的事件
  • fd: 需要監聽的描述符;
  • event: 是 epoll_event 結構體類型,用於告訴內核需要監聽什么事件。epoll_event 的定義:
struct epoll_event {
    __uint32_t events;
    epoll_data_t data;
};
  • events:的取值有
    • EPOLLIN: 表示對應的連接上有數據可讀(TCP 連接的遠端主動關閉連接,也相當於可讀事件,因為需要處理發送來的 FIN)
    • EPOLLOUT: 表示對應的連接上可以寫入數據發送(主動向上游服務器發起非阻塞的 TCP 連接,連接建立成功的事件相當於可寫事件)
    • EPOLLRDHUP:表示 TCP 連接的遠端關閉或半關閉連接
    • EPOLLLPRI:表示對應的連接上有緊急數據需要讀
    • EPOLLERR: 表示對應的連接發生錯誤
    • EPOLLHUP: 表示對應的連接被掛起
    • EPOLLLET: 表示將觸發方式設置為邊緣觸發(ET),系統默認為水平觸發(LT)
    • EPOLLONESHOT: 表示對這個事件只處理一次,下次需要處理時需重新加入 epoll
  • data:是一個 epoll_data 聯合,定義如下:
typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
}epoll_data_t;

2.1.3 epoll_wait()

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

收集在 epoll 監控的事件中已經發生的事件,如果 epoll 中沒有任何一個事件發生,則最多等待 timeout 毫秒后返回。
epoll_wait 的返回值表示當前發生的事件個數,如果返回 0,則表示本次調用中沒有事件發生,如果返回 -1,則表示發生錯
誤,需要檢查 errno 判斷錯誤類型。

  • epfd:是 epoll_create() 返回的句柄;
  • events:是分配好的 epoll_event 結構體數組,epoll 將會把發生的事件復制到 events 數組中(events 不可以是空指針,
    內核只負責把數據復制到這個 events 數組中,不會去幫助我們在用戶態中分配內存)
  • maxevents:表示本次可以返回的最大事件數目,通常 maxevents 參數與預分配的 events 數組的大小是相等的;
  • timeout:表示在沒有檢測到事件發生時最多等待的時間(單位為毫秒),如果 timeout 為 0,則表示 epoll_wait 在
    rdllist 鏈表為空時,立刻返回,不會等待。

2.2 epoll 的工作模式

epoll 有兩種工作模式:LT(水平觸發)模式和 ET(邊緣觸發)模式。

默認情況下,epoll 采用 LT 模式工作,這時可以處理阻塞和非阻塞套接字。ET 模式的效率要比 LT 模式高,它只支持非阻塞
字。ET 和 LT 模式的區別在於,當一個新的事件到來時,ET 模式下當然可以從 epoll_wait調用中獲取這個事件,可是如果這
次沒有把這個事件對應的套接字緩沖區處理完,在這個套接字沒有新的事件再次到來時,在 ET 模式下是無法再次從
epoll_wait 調用中獲取這個事件的;而 LT 模式則相反,只要一個事件對應的套接字緩沖區還有數據,就總能從 epoll_wait
中獲取這個事件。因此,LT 模式相對簡單,而在 ET 模式下事件發生時,如果沒有徹底地將緩沖區數據處理完,則會導致緩沖
區中的用戶請求得不到響應。默認情況下,Nginx 是通過 ET 模式使用 epoll 的。

3. ngx_epoll_module 模塊

3.1 配置項

static ngx_command_t  ngx_epoll_commands[] = {

    /* 在調用 epoll_wait 時,將由第 2 和第 3 個參數告訴 Linux 內核一次最多可返回多少個事件。這個配置項
     * 表示調用一次 epoll_wait 時最多可以返回的事件數,當然,它也會預分配那么多 epoll_event 結構體用於
     * 存儲事件 */
    { ngx_string("epoll_events"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,
      offsetof(ngx_epoll_conf_t, events),
      NULL },

    /* 指明在開啟異步 I/O 且使用 io_setup 系統調用初始化異步 I/O 上下文環境時,初始分配的異步 I/O 
     * 事件個數 */
    { ngx_string("worker_aio_requests"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,
      offsetof(ngx_epoll_conf_t, aio_requests),
      NULL },

      ngx_null_command
};

存儲配置項的配置結構體:

typedef struct {
    ngx_uint_t  events;
    ngx_uint_t  aio_requests;
} ngx_epoll_conf_t;

3.2 上下文結構體:ngx_epoll_module_ctx

static ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
#if (NGX_HAVE_EVENTFD)
        ngx_epoll_notify,                /* trigger a notify */
#else
        NULL,                            /* trigger a notify */
#endif
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

3.2.1 ngx_epoll_init

static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    /* 獲取配置項結構體 */
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (ep == -1) 
    {
        /* 創建一個 epoll 句柄 */
        ep = epoll_create(cycle->connection_n / 2);

        if (ep == -1) 
        {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }

#if (NGX_HAVE_EVENTFD)
        if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
            ngx_epoll_module_ctx.actions.notify = NULL;
        }
#endif

#if (NGX_HAVE_FILE_AIO)
        ngx_epoll_aio_init(cycle, epcf);
#endif

#if (NGX_HAVE_EPOLLRDHUP)
        ngx_epoll_test_rdhup(cycle);
#endif
    }

    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }

        /* 創建 event_list 數組,用於進行 epoll_wait 調用時傳遞內核態的事件 */
        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    /* 將配置項epoll_events的參數賦給nevents */
    nevents = epcf->events;

    /* 指明讀寫I/O的方法 */
    ngx_io = ngx_os_io;

    /* 一旦設定Nginx使用某個事件處理模塊,經過事件處理模塊的初始化函數后,就把全局變量
     * ngx_event_actions指向了該模塊的封裝 */
    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    /* 默認是采用ET模式來使用epoll的,NGX_USE_CLEAR_EVENT宏實際上就是在告訴Nginx使用ET模式 */
    ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
                      |NGX_USE_GREEDY_EVENT
                      |NGX_USE_EPOLL_EVENT;

    return NGX_OK;
}

3.2.2 ngx_epoll_add_event

static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             events, prev;
    ngx_event_t         *e;
    ngx_connection_t    *c;
    struct epoll_event   ee;

    /* 每個事件的 data 成員都存放着其對應的 ngx_connection_t 連接 */
    c = ev->data;

    /* 下面會根據event參數確定當前事件時讀事件還是寫事件,
     * 這會決定events是加上EPOLLIN標志還是EPOLLOUT標志位 */
    events = (uint32_t) event;

    if (event == NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
        events = EPOLLIN|EPOLLRDHUP;
#endif

    } else {
        e = c->read;
        prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
        events = EPOLLOUT;
#endif
    }
    
    /* 根據active標志位確定是否為活躍事件,以決定到底是修改還是添加事件 */
    if (e->active) {
        op = EPOLL_CTL_MOD;
        events |= prev;

    } else {
        op = EPOLL_CTL_ADD;
    }

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
    if (flags & NGX_EXCLUSIVE_EVENT) {
        events &= ~EPOLLRDHUP;
    }
#endif

    /* 加入flags參數到events標志位中 */
    ee.events   = events | (uint32_t) flags;
    /* ptr成員存儲的是ngx_connection_t連接 */
    ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll add event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);

    /* 調用 epoll_ctl 方法向 epoll 中添加事件或者在 epoll 中修改事件 */
    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }

    /* 將事件的 active 標志位置為 1,表示當前事件是活躍的 */
    ev->active = 1;
#if 0
    ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

    return NGX_OK;
}

3.2.3 ngx_epoll_process_events

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev;
    ngx_queue_t       *queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

    /* 調用 epoll_wait 獲取事件。注意,timer 參數是在 process_events 調用時傳入的 */
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    /* 判斷flags標志位是否指示要更新時間或ngx_event_timer_alarm是否為1 */
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

    if (err) {
        if (err == NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }

    if (events == 0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }

    /* 遍歷本次 epoll_wait 返回的所有事件 */
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;
        
        /* uintptr_t 在 64 位平台上為:typedef unsigned long int upintptr_t;
         *           在 32 位平台上為:typedef unsigned int uintptr_t; 
         * 該類型主要是為了跨平台,其長度總是所在平台的位數,常用於存放地址. */
         
        /* 將地址的最后一位取出來,用 instance 變量標識 */
        instance = (uintptr_t) c & 1;
        /* 無論是 32 位還是 64 位機器,其地址的最后 1 位肯定是 0,可以用下面這行語句把
         * ngx_connection_t 的地址還原到真正的地址值 */
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        /* 取出讀事件 */
        rev = c->read;

        /* 判斷這個讀事件是否是過期事件 */
        if (c->fd == -1 || rev->instance != instance) {

            /* 當 fd 套接字描述符為 -1 或者 instance 標志位不相等時,
             * 表示這個事件已經過期了,不用處理 */

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        /* 取出事件類型 */
        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);

        /* 若事件發生錯誤或被掛起 */
        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);

            /*
             * if the error events were returned, add EPOLLIN and EPOLLOUT
             * to handle the events at least in one active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

#if 0
        if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "strange epoll_wait() events fd:%d ev:%04XD",
                          c->fd, revents);
        }
#endif

        /* 如果是讀事件且該事件是活躍的 */
        if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
            if (revents & EPOLLRDHUP) {
                rev->pending_eof = 1;
            }

            rev->available = 1;
#endif

            rev->ready = 1;

            /* flags 參數中含有 NGX_POST_EVENTS 表示這批事件要延后處理 */
            if (flags & NGX_POST_EVENTS) {
                /* 如果要在 post 隊列中延后處理該事件,首先要判斷它是新連接事件還是普通事件,
                 * 以決定把它加入到 ngx_posted_accept_events 隊列或者 ngx_posted_events 隊列
                 * 中 */
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                /* 將這個事件添加到相應的延后執行隊列中 */
                ngx_post_event(rev, queue);

            } else {
                /* 立即調用讀事件的回調方法來處理這個事件 */
                rev->handler(rev);
            }
        }

        /* 取出寫事件 */
        wev = c->write;

        /* 如果寫事件且事件是活躍的 */
        if ((revents & EPOLLOUT) && wev->active) {

            /* 檢測該事件是否是過期的 */
            if (c->fd == -1 || wev->instance != instance) {

                /* 如果 fd 描述符為 -1 或者 instance 標志位不相等時,表示這個
                 * 事件是過期的,不用處理 */
                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
#if (NGX_THREADS)
            wev->complete = 1;
#endif

            /* 若該事件要延遲處理 */
            if (flags & NGX_POST_EVENTS) {
                /* 將該寫事件添加到延遲執行隊列中 */
                ngx_post_event(wev, &ngx_posted_events);

            } else {
                /* 立即調用這個寫事件的回調方法來處理這個事件 */
                wev->handler(wev);
            }
        }
    }

    return NGX_OK;
}

ngx_epoll_process_events 方法會收集當前觸發的所有事件,對於不需要加入到 post 隊列延后處理的事件,該方法會立即執行
它們的回調方法,這其實是在做分發事件的工作,只是它會在自己的進程中調用這些回調方法而已,因此,每一個回調方法都不能
導致進程休眠或者消耗太多時間,以免 epoll 不能即時地處理其他事件.

什么是過期事件?

假設 epoll_wait 一次返回 3 個事件,在第 1 個事件的處理過程中,由於業務的需要,所以關閉了一個連接,而這個連接恰好對
應第 3 個事件。這樣的話,在處理到第 3 個事件時,這個事件就已經是過期事件了,一旦處理必然出錯。但是單純把這個連接
的 fd 套接字置為 -1 是不能解決問題的。

如下場景:
假設第 3 個事件對應的 ngx_connection_t 連接中的 fd 套接字原先是 50,處理第 1 個事件時把這個連接的套接字關閉了,同
時置為 -1,並且調用 ngx_free_connection 將該連接歸還給連接池。在 ngx_epoll_process_events 方法的循環中開始處理第 2
個事件,恰好第 2 個事件是建立新連接事件,調用 ngx_get_connection 從連接池中取出的連接非常可能就是剛剛釋放的第 3 個
事件對應的連接。由於套接字 50 剛剛被釋放,Linux 內核非常有可能把剛剛釋放的套接字 50 又分配給新建立的連接。因此,
在循環中處理第 3 個事件時,這個事件就是過期的了。它對應的事件是關閉的連接,而不是新建立的連接。

因此,解決這個問題,依靠於 instance 標志位。當調用 ngx_get_connection 從連接池中獲取一個新連接時,instance 標志位
就會置反。這樣,當這個 ngx_connection_t 連接重復使用時,它的 instance 標志位一定是不同的。因此,在
ngx_epoll_process_events 方法中一旦判斷 instance 發生了變化,就認為是過期事件而不予處理。

3.3 模塊的定義

ngx_module_t  ngx_epoll_module = {
    NGX_MODULE_V1,
    &ngx_epoll_module_ctx,               /* module context */
    ngx_epoll_commands,                  /* module directives */
    NGX_EVENT_MODULE,                    /* module type */
    NULL,                                /* init master */
    NULL,                                /* init module */
    NULL,                                /* init process */
    NULL,                                /* init thread */
    NULL,                                /* exit thread */
    NULL,                                /* exit process */
    NULL,                                /* exit master */
    NGX_MODULE_V1_PADDING
};


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM