對於一個服務器模型來說,事件模型是至關重要的,nginx本身的高性能也歸功於它的事件模型。一般來說,nginx的事件模型是基於epoll。而epoll中會調用3函數,epoll_create,epoll_ctl,epoll_wait.
(1) 首先介紹一些相關的數據結構:
typedef struct { ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); //將某描述符的某個事件(可讀/可寫)添加到多路復用監控里 ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); //將某描述符的某個事件(可讀/可寫)從多路復用監控里刪除 ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); //啟動對某個事件的監控 ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); //禁止對某個事件的監控 ngx_int_t (*add_conn)(ngx_connection_t *c); //將指定的連接關聯的描述符添加到多路復用的監控里 ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);//將指定的連接關聯的描述符從多路復用的監控里刪除
ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);//只有kqueue用到
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); //阻塞等待事件發生,對發生的事件進行逐個處理
ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); //初始化
void (*done)(ngx_cycle_t *cycle);//回收資源
} ngx_event_actions_t;
extern ngx_event_actions_t ngx_event_actions; //定義全局的 ngx_event_actions
#define ngx_process_changes ngx_event_actions.process_changes #define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn
typedef struct { ngx_uint_t connections; ngx_uint_t use; ngx_flag_t multi_accept; ngx_flag_t accept_mutex; ngx_msec_t accept_mutex_delay; u_char *name; #if (NGX_DEBUG) ngx_array_t debug_connection; #endif } ngx_event_conf_t; typedef struct { ngx_str_t *name; void *(*create_conf)(ngx_cycle_t *cycle); char *(*init_conf)(ngx_cycle_t *cycle, void *conf); ngx_event_actions_t actions; } ngx_event_module_t;
使用epoll事件模型時會調用的函數:
ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } };
(2)事件模型
ngx_events_module是一個核心模塊, 由它來完成event module 的初始化.在obj/ngx_modules.c文件中我們只發現了2個event module。分別是ngx_event_core_module和ngx_epoll_module。ngx_event_core_module這個模塊在事件模型初始化過程中起着至關重要的作用,而ngx_epoll_module實際上就是底層io模型的實現。事件模型的初始化與http模塊類似,由ngx_events_module驅動整個事件模塊的解析和初始化,ngx_event_core_module對events塊大部分指令的解析保存重要的配置信息。
static ngx_command_t ngx_events_commands[] = { { ngx_string("events"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_events_block, //events模塊的入口函數 0, 0, NULL }, ngx_null_command };
事件模型的初始化的入口函數:
1. ngx_events_block.
static char * ngx_events_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { char *rv; void ***ctx; ngx_uint_t i; ngx_conf_t pcf; ngx_event_module_t *m;
//為ctx分配空間,所有的event module的全局配置是一個數組,這里也為它分配空間,同時將存放在ngx_cycle->conf_ctx數組的ngx_events_module的配置結構賦值為ctx ctx = ngx_pcalloc(cf->pool, sizeof(void *)); if (ctx == NULL) { return NGX_CONF_ERROR; } *ctx = ngx_pcalloc(cf->pool, ngx_event_max_module * sizeof(void *)); //將在ngx_cycle->conf_ctx數組中的存放的ngx_events_moudle的config信息賦值給ctx,也就是event module配置信息的數組 *(void **) conf = ctx; for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; //調用create_conf回調函數創建配置信息結構 if (m->create_conf) { (*ctx)[ngx_modules[i]->ctx_index] = m->create_conf(cf->cycle); if ((*ctx)[ngx_modules[i]->ctx_index] == NULL) { return NGX_CONF_ERROR; } } } //為解析events塊准備,設置解析模塊的類型, 備份cf,解析完events塊后恢復 pcf = *cf; cf->ctx = ctx; cf->module_type = NGX_EVENT_MODULE; cf->cmd_type = NGX_EVENT_CONF; //解析events塊 rv = ngx_conf_parse(cf, NULL); //遞歸調用,復雜塊 //恢復cf *cf = pcf; if (rv != NGX_CONF_OK) return rv; for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; // 解析完events塊,接着調用所有event module的init_conf回調函數初始化模塊的配置結構。這里ngx_event_core_module和ngx_epoll_module會對配置結構中尚未初始化的一些屬性賦默認值,比如默認使用io模型,也就是use指令的默認值。 if (m->init_conf) { rv = m->init_conf(cf->cycle, (*ctx)[ngx_modules[i]->ctx_index]); if (rv != NGX_CONF_OK) { return rv; } } } return NGX_CONF_OK; }
看到這里,events塊已經解析完畢,為什么還沒看到epoll.在我們解析完配置文件之后會依次調用init_module和init_process,所以事件模型一定是在這兩個時間點初始化的。下面看一下ngx_event_core_moduel的結構:
ngx_module_t ngx_event_core_module = { NGX_MODULE_V1, &ngx_event_core_module_ctx, /* module context */ ngx_event_core_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ ngx_event_module_init, /* init module */ //ngx_init_cycle中被調用,初始化共享內存中存放的數據結構,accept鎖,鏈接計數器 ngx_event_process_init, /* init process *///在創建worker進程后調用 NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };
init_module:
static ngx_int_t ngx_event_module_init(ngx_cycle_t *cycle) { ....... cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ shm.size = size; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } shared = shm.addr; ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; // 將accept鎖放入共享內存,並將其初始化。 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);//接着放入連接計數器 }
init_module的回調函數執行完了,接下來執行init_process回調函數.
3. ngx_event_process_init函數是在創建完worker進程后調用的,實際的初始化工作的大部分由它完成.
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) {
//而對於ecf->accept_mutex字段的判斷主要是提供用戶便利,可以關閉該功能,因為既然均衡策略也有相應的代碼邏輯,難保在某些情況下其本身的消耗也許會得不償失;當然,該字段默認為1,在配置初始化函數ngx_event_core_init_conf()內,有這么一句:ngx_conf_init_value(ecf->accept_mutex, 1); if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } }
根據配置信息初始化ngx_use_accept_mutex,這個變量用於指示是否使用accept鎖.
//初始化timer
if (ngx_event_timer_init(cycle->log) == NGX_ERROR) { return NGX_ERROR; }
for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_EVENT_MODULE) { continue; } if (ngx_modules[m]->ctx_index != ecf->use) { continue; } module = ngx_modules[m]->ctx; if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { //初始化epoll,在lunix下 ngx_epoll_init /* fatal */ exit(2); } break; }
這端代碼完成了事件模型的初始化。遍歷所有的事件模塊找到通過use設定的事件模塊,然后調用該模塊init函數來初始化事件模型.默認nginx使用epoll,對應 的模塊是ngx_epoll_module,它指定的init函數為ngx_epoll_init函數.該函數會調用epoll_create.
4.ngx_epoll_init
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ep = epoll_create(cycle->connection_n / 2);// ep就是epoll的句柄,初值為-1,所以一啟動nginx就是調用epoll_create創建句柄,
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
//初始化nevents和event_list,epcf->events是由ngx_epoll_module的epoll_events指令設置的。nevents和event_list是要傳給epoll_wait的參數,nevents是要監聽的事件的最大個數,event_list用於存放epoll返回的事件。
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,ycle->log);
nevents = epcf->events;
ngx_event_actions = ngx_epoll_module_ctx.actions; //為抽象事件模型賦值
}
為ngx_event_actions賦值,之后ngx_event_actions指向epoll的事件結構。在此之后,所有的事件操作都可以由一組宏完成,定義在ngx_event.h:
#define ngx_process_changes ngx_event_actions.process_changes #define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn
在回到ngx_event_process_init函數 :
為nginx中的鏈接池分配存儲空間,cycle->connection_n是由worker_connection指令設置的。
cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections == NULL) { return NGX_ERROR; } c = cycle->connections;
//為讀事件表分配空間並初始化
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events == NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; // 初始化是關閉的連接 rev[i].instance = 1; #if (NGX_THREADS) rev[i].lock = &c[i].lock; rev[i].own_lock = &c[i].lock; #endif }
//為寫事件表分配空間並初始化
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events == NULL) { return NGX_ERROR; } wev = cycle->write_events;
for (i = 0; i < cycle->connection_n; i++) {wev[i].closed = 1;}
i = cycle->connection_n; next = NULL; do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; #if (NGX_THREADS) c[i].lock = 0; #endif } while (i);
初始化鏈接池,每個鏈接的讀寫事件在read_events和write_events數組中的下標和這個連接在connections數組中的下標是一致的。nginx將連接池組織成一種類似鏈表的結構,獲取和 釋放連接很方便.
//初始化連接池 一開始時鏈接池都是空閑的
cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n;
ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ngx_get_connection(ls[i].fd, cycle->log); if (c == NULL) { return NGX_ERROR; } c->log = &ls[i].log; c->listening = &ls[i]; ls[i].connection = c; rev = c->read; rev->log = c->log; rev->accept = 1;// 將連接的accept設置成1,表示可以接受連接請求 #if (NGX_HAVE_DEFERRED_ACCEPT) rev->deferred_accept = ls[i].deferred_accept; #endif if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ls[i].previous) { /* * delete the old accept events that were bound to * the old cycle read events array */ old = ls[i].previous->connection; if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT) == NGX_ERROR) { return NGX_ERROR; } old->fd = (ngx_socket_t) -1; } }
//在監聽到連接的時候,調用ngx_event_accept函數 rev->handler = ngx_event_accept;//設置讀事件的處理函數,上面已經設置了rev->accept = 1,也就是說這是監聽套接字, //使用accept鎖時,等worker進程搶到accept鎖,再加入epoll事件循環中 if (ngx_use_accept_mutex) { continue; } //在不使用accept鎖時,直接將事件加入epoll事件循環 if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { if (ngx_add_conn(c) == NGX_ERROR) { return NGX_ERROR; } } else { if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } }
最后這部分代碼完成的是最重要的功能,為所有監聽socket分配連接並初始化。在不使用accept鎖的情況會直接將所有的監聽socket放入epoll事件循環,而在使用accept鎖時worker進程必須獲得鎖后才能將監聽socket加入事件循環,這部分工作在事件主循環中完成。上面就是nginx事件模型初始化的全部過程,我們看到了epoll_create和epoll_ctl,那么epoll_wait在那調用的呢?大家還記得在worker進程的主循環中調用的ngx_process_events_and_timers函數吧,之前介紹過這個函數是用於處理網絡io事件的,實際就是epoll_wait被調用的地方。
5. ngx_process_events_and_timers
(ngx_use_accept_mutex) { if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } if (ngx_accept_mutex_held) { //當前是否擁有鎖 flags |= NGX_POST_EVENTS; //所有發生的事件都將延后處理,先釋放鎖 } else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } }
在使用accept鎖的情況下,需要先獲得這個鎖才能accept到新的連接,通過調用ngx_trylock_accept_mutex可以搶奪鎖搶奪鎖並accept連接。ngx_accept_disabled變量用於實現簡單的負載均衡,防止連接分配的不均勻。這個變量在ngx_event_accept函數被設置為:ngx_cycle->connection_n/8 - ngx_cycle->free_connection_n,當ngx_accept_disabled大於0時,worker進程將放棄搶奪鎖,專心處理已有的連接,並把ngx_accept_disabled變量減1。
ngx_accept_mutex_held標記指示進程是否獲得鎖,得到鎖的進程會添加NGX_POST_EVENTS標記,這個標記意味着將所有發生的事件放到一個隊列中在,等到進程釋放鎖之后再慢慢處理,因為請求的處理可能非常耗時,如果不體現釋放鎖的話,會導致其他進程一直獲取不到鎖,這樣accept的效率很低。在ngx_trylock_accept_mutex函數內部會先獲取鎖,然后再調用ngx_enable_accept_events把所有的監聽socket添加到epoll事件循環中,下面看一下這個函數。
static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c->read->active) { continue; } if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { if (ngx_add_conn(c) == NGX_ERROR) { return NGX_ERROR; } } else { if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } } return NGX_OK; }
在ngx_event_process_init中對於使用accept鎖的情況,沒有將監聽socket加入epoll事件循環,而是這個函數中完成的。下面繼續看ngx_process_events_and_timers函數。
delta = ngx_current_msec; (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta;
上面代碼會調用ngx_process_events,根據上面的宏定義,實際會調用ngx_epoll_preocess_events,這個函數會調用epoll_wait監聽事件 。
6. ngx_epoll_process_events
events = epoll_wait(ep, event_list, (int) nevents, timer); err = (events == -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); }
調用epoll_wait函數,根據flag標簽更新時間.
for (i = 0; i < events; i++) { c = event_list[i].data.ptr;//事件對應的連接信息ngx_connection_t instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; revents = event_list[i].events; //發生的事件 //處理讀事件 if ((revents & EPOLLIN) && rev->active) { if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { rev->ready = 1;//epoll設置 } if (flags & NGX_POST_EVENTS)
// * 使用accept鎖,此處會將事件添加到隊列中。 這里根據是不是監聽socket放到不同的隊列
queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); // 添加到隊列 ngx_locked_post_event(rev, queue); } else { //如果沒有使用accept鎖的話,則直接使用回調函數. rev->handler(rev); } } wev = c->write; if ((revents & EPOLLOUT) && wev->active) { if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; } else { wev->ready = 1; } if (flags & NGX_POST_EVENTS) { ngx_locked_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } }
接下來遍歷所有返回的事件對這些事件進行處理,和正常使用epoll差不多,關於讀寫事件的處理直接看注釋。這就是ngx_epoll_process_events的全過程,下面繼續介紹ngx_process_events_and_timers的后面流程。
//ngx_posted_accept_events是accept事件的隊列,這里會依次調用每個事件的handler並將其刪除。accept事件的handler就是ngx_event_accept,它是在ngx_event_process_init中為每個監聽socket的讀事件添加的,用來處理新建連接的初始化
if (ngx_posted_accept_events) { ngx_event_process_posted(cycle, &ngx_posted_accept_events); } //釋放accept鎖 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } if (ngx_posted_events) { if (ngx_threaded) { ngx_wakeup_worker_thread(cycle); } else { ngx_event_process_posted(cycle, &ngx_posted_events); } }