1. epoll 原理
假設有 100 萬用戶同時與一個進程保持着 TCP 連接,而每一時刻只有幾十個或幾百個 TCP 連接時活躍的(接收到 TCP
包),也就是說,在每一時刻,進程只需要處理這 100 萬連接中的一小部分連接。
select 和 poll 的做法是:進程每次收集事件的連接(其實這 100 萬連接中的大部分都是沒有事件發生的)都把這 100 萬連
接的套接字傳給操作系統(這首先就是用戶態內存到內核態內存的大量復制),而由操作系統內核尋找這些連接上有沒有未
處理的事件,將會是巨大的資源浪費,因此 select 和 poll 最多只能處理幾千個並發連接。
而 epoll 則是在 Linux 內核中申請了一個簡易的文件系統,把原先的一個 select 或者 poll 調用分成了 3 個部分:
- 調用 epoll_create 建立 1 個 epoll 對象(在 epoll 文件系統中給這個句柄分配資源);
- 調用 epoll_ctl 向 epoll 對象中添加這 100 萬個連接的套接字;
- 調用 epoll_wait 收集發生事件的連接。
Linux 2.6.35內核對 epoll 的實現
當某一個進程調用 epoll_create 方法時,Linux 內核會創建一個 eventpoll 結構體:
struct eventpoll {
...
/* 紅黑樹的根節點,這棵樹中存儲着所有添加到 epoll 中的事件,也就是這個 epoll 監控的事件 */
struct rb_root rbr;
/* 雙向鏈表 rdllist 保存着將要通過 epoll_wait 返回給用戶的、滿足條件的事件 */
struct list_head rdllist;
};
每一個 epoll 對象都有一個獨立的 eventpoll 結構體,這個結構體會在內核空間中創造獨立的內存,用於存儲使用
epoll_ctl 方法向 epoll 對象中添加進來的事件。這些事件都會掛到 rbr 紅黑樹中,這樣,重復添加的事件就可以通過紅黑
樹而高效地識別出來。
所有添加到 epoll 中的事件都會與設備(如網卡)驅動程序建立回調關系,即相應的事件發生時會調用 ep_poll_callback 回調
方法,它會把這樣的事件放到上面的 rdllist 雙向鏈表中。
在 epoll 中,對於每一個事件都會建立一個 epitem 結構體:
struct epitem {
...
/* 紅黑樹節點 */
struct rb_node rbn;
/* 雙向鏈表節點 */
struct list_head rdllink;
/* 事件句柄等信息 */
struct epoll_filefd ffd;
/* 指向所屬的 eventpoll 對象 */
struct eventpoll *ep;
/* 期待的事件類型 */
struct epoll_event event;
...
};
這里包含每一個事件對應着的信息。
當調用 epoll_wait 檢查是否有事件的連接時,只是檢查 eventpoll 對象中的 rdllist 雙向鏈表是否有 epitem 元素,如果
rdllist 鏈表不為空,則把這里的事件復制到用戶態內存中,同時將事件數量返回給用戶。
2. epoll 的使用
2.1 epoll 的接口
2.1.1 epoll_create()
int epoll_create(int size);
系統調用 epoll_create() 創建一個 epoll 的句柄,之后 epoll 的使用都將依靠這個句柄來標識。參數 size 是告知 epoll 所
要處理的大致事件數目。不再使用 epoll 時,必須調用 close 關閉這個句柄。
2.1.2 epoll_ctl()
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
epoll_ctl 向 epoll 對象中添加、修改或者刪除感興趣的事件,返回 0 表示成功,否則返回 -1,此時需要根據 errno 錯誤碼
判斷錯誤類型。
- epfd:是 epoll_create() 返回的句柄;
- op:表示動作,可取的值有:
- EPOLL_CTL_ADD: 添加新的事件到 epoll 中
- EPOLL_CTL_MOD: 修改 epoll 中的事件
- EPOLL_CTL_DEL:刪除 epoll 中的事件
- fd: 需要監聽的描述符;
- event: 是 epoll_event 結構體類型,用於告訴內核需要監聽什么事件。epoll_event 的定義:
struct epoll_event {
__uint32_t events;
epoll_data_t data;
};
- events:的取值有
- EPOLLIN: 表示對應的連接上有數據可讀(TCP 連接的遠端主動關閉連接,也相當於可讀事件,因為需要處理發送來的 FIN)
- EPOLLOUT: 表示對應的連接上可以寫入數據發送(主動向上游服務器發起非阻塞的 TCP 連接,連接建立成功的事件相當於可寫事件)
- EPOLLRDHUP:表示 TCP 連接的遠端關閉或半關閉連接
- EPOLLLPRI:表示對應的連接上有緊急數據需要讀
- EPOLLERR: 表示對應的連接發生錯誤
- EPOLLHUP: 表示對應的連接被掛起
- EPOLLLET: 表示將觸發方式設置為邊緣觸發(ET),系統默認為水平觸發(LT)
- EPOLLONESHOT: 表示對這個事件只處理一次,下次需要處理時需重新加入 epoll
- data:是一個 epoll_data 聯合,定義如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
2.1.3 epoll_wait()
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
收集在 epoll 監控的事件中已經發生的事件,如果 epoll 中沒有任何一個事件發生,則最多等待 timeout 毫秒后返回。
epoll_wait 的返回值表示當前發生的事件個數,如果返回 0,則表示本次調用中沒有事件發生,如果返回 -1,則表示發生錯
誤,需要檢查 errno 判斷錯誤類型。
- epfd:是 epoll_create() 返回的句柄;
- events:是分配好的 epoll_event 結構體數組,epoll 將會把發生的事件復制到 events 數組中(events 不可以是空指針,
內核只負責把數據復制到這個 events 數組中,不會去幫助我們在用戶態中分配內存) - maxevents:表示本次可以返回的最大事件數目,通常 maxevents 參數與預分配的 events 數組的大小是相等的;
- timeout:表示在沒有檢測到事件發生時最多等待的時間(單位為毫秒),如果 timeout 為 0,則表示 epoll_wait 在
rdllist 鏈表為空時,立刻返回,不會等待。
2.2 epoll 的工作模式
epoll 有兩種工作模式:LT(水平觸發)模式和 ET(邊緣觸發)模式。
默認情況下,epoll 采用 LT 模式工作,這時可以處理阻塞和非阻塞套接字。ET 模式的效率要比 LT 模式高,它只支持非阻塞
字。ET 和 LT 模式的區別在於,當一個新的事件到來時,ET 模式下當然可以從 epoll_wait調用中獲取這個事件,可是如果這
次沒有把這個事件對應的套接字緩沖區處理完,在這個套接字沒有新的事件再次到來時,在 ET 模式下是無法再次從
epoll_wait 調用中獲取這個事件的;而 LT 模式則相反,只要一個事件對應的套接字緩沖區還有數據,就總能從 epoll_wait
中獲取這個事件。因此,LT 模式相對簡單,而在 ET 模式下事件發生時,如果沒有徹底地將緩沖區數據處理完,則會導致緩沖
區中的用戶請求得不到響應。默認情況下,Nginx 是通過 ET 模式使用 epoll 的。
3. ngx_epoll_module 模塊
3.1 配置項
static ngx_command_t ngx_epoll_commands[] = {
/* 在調用 epoll_wait 時,將由第 2 和第 3 個參數告訴 Linux 內核一次最多可返回多少個事件。這個配置項
* 表示調用一次 epoll_wait 時最多可以返回的事件數,當然,它也會預分配那么多 epoll_event 結構體用於
* 存儲事件 */
{ ngx_string("epoll_events"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, events),
NULL },
/* 指明在開啟異步 I/O 且使用 io_setup 系統調用初始化異步 I/O 上下文環境時,初始分配的異步 I/O
* 事件個數 */
{ ngx_string("worker_aio_requests"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, aio_requests),
NULL },
ngx_null_command
};
存儲配置項的配置結構體:
typedef struct {
ngx_uint_t events;
ngx_uint_t aio_requests;
} ngx_epoll_conf_t;
3.2 上下文結構體:ngx_epoll_module_ctx
static ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
3.2.1 ngx_epoll_init
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;
/* 獲取配置項結構體 */
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
if (ep == -1)
{
/* 創建一個 epoll 句柄 */
ep = epoll_create(cycle->connection_n / 2);
if (ep == -1)
{
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
#if (NGX_HAVE_EVENTFD)
if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
ngx_epoll_module_ctx.actions.notify = NULL;
}
#endif
#if (NGX_HAVE_FILE_AIO)
ngx_epoll_aio_init(cycle, epcf);
#endif
#if (NGX_HAVE_EPOLLRDHUP)
ngx_epoll_test_rdhup(cycle);
#endif
}
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
/* 創建 event_list 數組,用於進行 epoll_wait 調用時傳遞內核態的事件 */
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
/* 將配置項epoll_events的參數賦給nevents */
nevents = epcf->events;
/* 指明讀寫I/O的方法 */
ngx_io = ngx_os_io;
/* 一旦設定Nginx使用某個事件處理模塊,經過事件處理模塊的初始化函數后,就把全局變量
* ngx_event_actions指向了該模塊的封裝 */
ngx_event_actions = ngx_epoll_module_ctx.actions;
#if (NGX_HAVE_CLEAR_EVENT)
/* 默認是采用ET模式來使用epoll的,NGX_USE_CLEAR_EVENT宏實際上就是在告訴Nginx使用ET模式 */
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;
return NGX_OK;
}
3.2.2 ngx_epoll_add_event
static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
/* 每個事件的 data 成員都存放着其對應的 ngx_connection_t 連接 */
c = ev->data;
/* 下面會根據event參數確定當前事件時讀事件還是寫事件,
* 這會決定events是加上EPOLLIN標志還是EPOLLOUT標志位 */
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
/* 根據active標志位確定是否為活躍事件,以決定到底是修改還是添加事件 */
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif
/* 加入flags參數到events標志位中 */
ee.events = events | (uint32_t) flags;
/* ptr成員存儲的是ngx_connection_t連接 */
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
/* 調用 epoll_ctl 方法向 epoll 中添加事件或者在 epoll 中修改事件 */
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
/* 將事件的 active 標志位置為 1,表示當前事件是活躍的 */
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
return NGX_OK;
}
3.2.3 ngx_epoll_process_events
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
/* NGX_TIMER_INFINITE == INFTIM */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
/* 調用 epoll_wait 獲取事件。注意,timer 參數是在 process_events 調用時傳入的 */
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
/* 判斷flags標志位是否指示要更新時間或ngx_event_timer_alarm是否為1 */
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
/* 遍歷本次 epoll_wait 返回的所有事件 */
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
/* uintptr_t 在 64 位平台上為:typedef unsigned long int upintptr_t;
* 在 32 位平台上為:typedef unsigned int uintptr_t;
* 該類型主要是為了跨平台,其長度總是所在平台的位數,常用於存放地址. */
/* 將地址的最后一位取出來,用 instance 變量標識 */
instance = (uintptr_t) c & 1;
/* 無論是 32 位還是 64 位機器,其地址的最后 1 位肯定是 0,可以用下面這行語句把
* ngx_connection_t 的地址還原到真正的地址值 */
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
/* 取出讀事件 */
rev = c->read;
/* 判斷這個讀事件是否是過期事件 */
if (c->fd == -1 || rev->instance != instance) {
/* 當 fd 套接字描述符為 -1 或者 instance 標志位不相等時,
* 表示這個事件已經過期了,不用處理 */
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
/* 取出事件類型 */
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
/* 若事件發生錯誤或被掛起 */
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif
/* 如果是讀事件且該事件是活躍的 */
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
rev->available = 1;
#endif
rev->ready = 1;
/* flags 參數中含有 NGX_POST_EVENTS 表示這批事件要延后處理 */
if (flags & NGX_POST_EVENTS) {
/* 如果要在 post 隊列中延后處理該事件,首先要判斷它是新連接事件還是普通事件,
* 以決定把它加入到 ngx_posted_accept_events 隊列或者 ngx_posted_events 隊列
* 中 */
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
/* 將這個事件添加到相應的延后執行隊列中 */
ngx_post_event(rev, queue);
} else {
/* 立即調用讀事件的回調方法來處理這個事件 */
rev->handler(rev);
}
}
/* 取出寫事件 */
wev = c->write;
/* 如果寫事件且事件是活躍的 */
if ((revents & EPOLLOUT) && wev->active) {
/* 檢測該事件是否是過期的 */
if (c->fd == -1 || wev->instance != instance) {
/* 如果 fd 描述符為 -1 或者 instance 標志位不相等時,表示這個
* 事件是過期的,不用處理 */
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
/* 若該事件要延遲處理 */
if (flags & NGX_POST_EVENTS) {
/* 將該寫事件添加到延遲執行隊列中 */
ngx_post_event(wev, &ngx_posted_events);
} else {
/* 立即調用這個寫事件的回調方法來處理這個事件 */
wev->handler(wev);
}
}
}
return NGX_OK;
}
ngx_epoll_process_events 方法會收集當前觸發的所有事件,對於不需要加入到 post 隊列延后處理的事件,該方法會立即執行
它們的回調方法,這其實是在做分發事件的工作,只是它會在自己的進程中調用這些回調方法而已,因此,每一個回調方法都不能
導致進程休眠或者消耗太多時間,以免 epoll 不能即時地處理其他事件.
什么是過期事件?
假設 epoll_wait 一次返回 3 個事件,在第 1 個事件的處理過程中,由於業務的需要,所以關閉了一個連接,而這個連接恰好對
應第 3 個事件。這樣的話,在處理到第 3 個事件時,這個事件就已經是過期事件了,一旦處理必然出錯。但是單純把這個連接
的 fd 套接字置為 -1 是不能解決問題的。
如下場景:
假設第 3 個事件對應的 ngx_connection_t 連接中的 fd 套接字原先是 50,處理第 1 個事件時把這個連接的套接字關閉了,同
時置為 -1,並且調用 ngx_free_connection 將該連接歸還給連接池。在 ngx_epoll_process_events 方法的循環中開始處理第 2
個事件,恰好第 2 個事件是建立新連接事件,調用 ngx_get_connection 從連接池中取出的連接非常可能就是剛剛釋放的第 3 個
事件對應的連接。由於套接字 50 剛剛被釋放,Linux 內核非常有可能把剛剛釋放的套接字 50 又分配給新建立的連接。因此,
在循環中處理第 3 個事件時,這個事件就是過期的了。它對應的事件是關閉的連接,而不是新建立的連接。
因此,解決這個問題,依靠於 instance 標志位。當調用 ngx_get_connection 從連接池中獲取一個新連接時,instance 標志位
就會置反。這樣,當這個 ngx_connection_t 連接重復使用時,它的 instance 標志位一定是不同的。因此,在
ngx_epoll_process_events 方法中一旦判斷 instance 發生了變化,就認為是過期事件而不予處理。
3.3 模塊的定義
ngx_module_t ngx_epoll_module = {
NGX_MODULE_V1,
&ngx_epoll_module_ctx, /* module context */
ngx_epoll_commands, /* module directives */
NGX_EVENT_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};