nginx 事件機制


     對於一個服務器模型來說,事件模型是至關重要的,nginx本身的高性能也歸功於它的事件模型。一般來說,nginx的事件模型是基於epoll。而epoll中會調用3函數,epoll_create,epoll_ctl,epoll_wait.

   (1) 首先介紹一些相關的數據結構:

typedef struct {
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);  //將某描述符的某個事件(可讀/可寫)添加到多路復用監控里
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);  //將某描述符的某個事件(可讀/可寫)從多路復用監控里刪除 

    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);  //啟動對某個事件的監控
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);  //禁止對某個事件的監控

    ngx_int_t  (*add_conn)(ngx_connection_t *c);    //將指定的連接關聯的描述符添加到多路復用的監控里
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);//將指定的連接關聯的描述符從多路復用的監控里刪除
    ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);//只有kqueue用到
ngx_int_t (
*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); //阻塞等待事件發生,對發生的事件進行逐個處理

ngx_int_t (
*init)(ngx_cycle_t *cycle, ngx_msec_t timer); //初始化
void (*done)(ngx_cycle_t *cycle);//回收資源
} ngx_event_actions_t; 
extern ngx_event_actions_t   ngx_event_actions;   //定義全局的 ngx_event_actions
#define ngx_process_changes  ngx_event_actions.process_changes
#define ngx_process_events   ngx_event_actions.process_events
#define ngx_done_events      ngx_event_actions.done

#define ngx_add_event        ngx_event_actions.add
#define ngx_del_event        ngx_event_actions.del
#define ngx_add_conn         ngx_event_actions.add_conn
#define ngx_del_conn         ngx_event_actions.del_conn
typedef struct {
    ngx_uint_t    connections;
    ngx_uint_t    use;

    ngx_flag_t    multi_accept;
    ngx_flag_t    accept_mutex;

    ngx_msec_t    accept_mutex_delay;

    u_char       *name;

#if (NGX_DEBUG)
    ngx_array_t   debug_connection;
#endif
} ngx_event_conf_t;


typedef struct {
    ngx_str_t              *name;

    void                 *(*create_conf)(ngx_cycle_t *cycle);
    char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);

    ngx_event_actions_t     actions;
} ngx_event_module_t;

    使用epoll事件模型時會調用的函數:

ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */   
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
        NULL,                            /* process the changes */
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

 

(2)事件模型

      ngx_events_module是一個核心模塊,  由它來完成event  module  的初始化.在obj/ngx_modules.c文件中我們只發現了2個event module。分別是ngx_event_core_module和ngx_epoll_module。ngx_event_core_module這個模塊在事件模型初始化過程中起着至關重要的作用,而ngx_epoll_module實際上就是底層io模型的實現。事件模型的初始化與http模塊類似,由ngx_events_module驅動整個事件模塊的解析和初始化,ngx_event_core_module對events塊大部分指令的解析保存重要的配置信息。

static ngx_command_t  ngx_events_commands[] = {

    { ngx_string("events"),
      NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
      ngx_events_block,  //events模塊的入口函數
      0,
      0,
      NULL },

      ngx_null_command
};

事件模型的初始化的入口函數:

  1.  ngx_events_block. 

static char *
ngx_events_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    char                 *rv;
    void               ***ctx;
    ngx_uint_t            i;
    ngx_conf_t            pcf;
    ngx_event_module_t   *m;
//為ctx分配空間,所有的event module的全局配置是一個數組,這里也為它分配空間,同時將存放在ngx_cycle->conf_ctx數組的ngx_events_module的配置結構賦值為ctx ctx = ngx_pcalloc(cf->pool, sizeof(void *)); if (ctx == NULL) { return NGX_CONF_ERROR; } *ctx = ngx_pcalloc(cf->pool, ngx_event_max_module * sizeof(void *)); //將在ngx_cycle->conf_ctx數組中的存放的ngx_events_moudle的config信息賦值給ctx,也就是event module配置信息的數組 *(void **) conf = ctx; for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; //調用create_conf回調函數創建配置信息結構 if (m->create_conf) { (*ctx)[ngx_modules[i]->ctx_index] = m->create_conf(cf->cycle); if ((*ctx)[ngx_modules[i]->ctx_index] == NULL) { return NGX_CONF_ERROR; } } } //為解析events塊准備,設置解析模塊的類型, 備份cf,解析完events塊后恢復 pcf = *cf; cf->ctx = ctx; cf->module_type = NGX_EVENT_MODULE; cf->cmd_type = NGX_EVENT_CONF; //解析events塊 rv = ngx_conf_parse(cf, NULL); //遞歸調用,復雜塊 //恢復cf *cf = pcf; if (rv != NGX_CONF_OK) return rv; for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; // 解析完events塊,接着調用所有event module的init_conf回調函數初始化模塊的配置結構。這里ngx_event_core_module和ngx_epoll_module會對配置結構中尚未初始化的一些屬性賦默認值,比如默認使用io模型,也就是use指令的默認值。 if (m->init_conf) { rv = m->init_conf(cf->cycle, (*ctx)[ngx_modules[i]->ctx_index]); if (rv != NGX_CONF_OK) { return rv; } } } return NGX_CONF_OK; }

     看到這里,events塊已經解析完畢,為什么還沒看到epoll.在我們解析完配置文件之后會依次調用init_module和init_process,所以事件模型一定是在這兩個時間點初始化的。下面看一下ngx_event_core_moduel的結構:

ngx_module_t  ngx_event_core_module = {
    NGX_MODULE_V1,
    &ngx_event_core_module_ctx,            /* module context */
    ngx_event_core_commands,               /* module directives */
    NGX_EVENT_MODULE,                      /* module type */
    NULL,                                  /* init master */
    ngx_event_module_init,                 /* init module */ //ngx_init_cycle中被調用,初始化共享內存中存放的數據結構,accept鎖,鏈接計數器
    ngx_event_process_init,                /* init process *///在創建worker進程后調用
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};
init_module:
static
ngx_int_t ngx_event_module_init(ngx_cycle_t *cycle) { ....... cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ shm.size = size; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } shared = shm.addr; ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; //  將accept鎖放入共享內存,並將其初始化。 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);//接着放入連接計數器 }

init_module的回調函數執行完了,接下來執行init_process回調函數.

3.   ngx_event_process_init函數是在創建完worker進程后調用的,實際的初始化工作的大部分由它完成.

static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
//而對於ecf->accept_mutex字段的判斷主要是提供用戶便利,可以關閉該功能,因為既然均衡策略也有相應的代碼邏輯,難保在某些情況下其本身的消耗也許會得不償失;當然,該字段默認為1,在配置初始化函數ngx_event_core_init_conf()內,有這么一句:ngx_conf_init_value(ecf->accept_mutex, 1);
if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { ngx_use_accept_mutex = 1; ngx_accept_mutex_held = 0; ngx_accept_mutex_delay = ecf->accept_mutex_delay; } else { ngx_use_accept_mutex = 0; } }

根據配置信息初始化ngx_use_accept_mutex,這個變量用於指示是否使用accept鎖.

//初始化timer
if
(ngx_event_timer_init(cycle->log) == NGX_ERROR) { return NGX_ERROR; }
  for (m = 0; ngx_modules[m]; m++) {
        if (ngx_modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }

        if (ngx_modules[m]->ctx_index != ecf->use) {
            continue;
        }

        module = ngx_modules[m]->ctx;

        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { //初始化epoll,在lunix下  ngx_epoll_init
            /* fatal */
            exit(2);
        }

        break;
    }

 這端代碼完成了事件模型的初始化。遍歷所有的事件模塊找到通過use設定的事件模塊,然后調用該模塊init函數來初始化事件模型.默認nginx使用epoll,對應 的模塊是ngx_epoll_module,它指定的init函數為ngx_epoll_init函數.該函數會調用epoll_create.

   4.ngx_epoll_init    

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
       ep = epoll_create(cycle->connection_n / 2);//  ep就是epoll的句柄,初值為-1,所以一啟動nginx就是調用epoll_create創建句柄,

         if (nevents < epcf->events) {
                  if (event_list) {
                          ngx_free(event_list);
         }

         //初始化nevents和event_list,epcf->events是由ngx_epoll_module的epoll_events指令設置的。nevents和event_list是要傳給epoll_wait的參數,nevents是要監聽的事件的最大個數,event_list用於存放epoll返回的事件。

         event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,ycle->log);

         nevents = epcf->events;

         ngx_event_actions = ngx_epoll_module_ctx.actions; //為抽象事件模型賦值

}

 為ngx_event_actions賦值,之后ngx_event_actions指向epoll的事件結構。在此之后,所有的事件操作都可以由一組宏完成,定義在ngx_event.h:

#define ngx_process_changes  ngx_event_actions.process_changes  
#define ngx_process_events   ngx_event_actions.process_events  
#define ngx_done_events      ngx_event_actions.done  
  
#define ngx_add_event        ngx_event_actions.add  
#define ngx_del_event        ngx_event_actions.del  
#define ngx_add_conn         ngx_event_actions.add_conn  
#define ngx_del_conn         ngx_event_actions.del_conn  

在回到ngx_event_process_init函數 :

為nginx中的鏈接池分配存儲空間,cycle->connection_n是由worker_connection指令設置的。
    cycle->connections =  ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
    if (cycle->connections == NULL) {
        return NGX_ERROR;
    }
    c = cycle->connections;
    //為讀事件表分配空間並初始化
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events == NULL) { return NGX_ERROR; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; // 初始化是關閉的連接 rev[i].instance = 1; #if (NGX_THREADS) rev[i].lock = &c[i].lock; rev[i].own_lock = &c[i].lock; #endif }
   //為寫事件表分配空間並初始化
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events == NULL) { return NGX_ERROR; } wev = cycle->write_events;

for (i = 0; i < cycle->connection_n; i++) {wev[i].closed = 1;}
i = cycle->connection_n;
    next = NULL;

    do {
        i--;

        c[i].data = next;
        c[i].read = &cycle->read_events[i];
        c[i].write = &cycle->write_events[i];
        c[i].fd = (ngx_socket_t) -1;

        next = &c[i];

#if (NGX_THREADS)
        c[i].lock = 0;
#endif
    } while (i);

初始化鏈接池,每個鏈接的讀寫事件在read_events和write_events數組中的下標和這個連接在connections數組中的下標是一致的。nginx將連接池組織成一種類似鏈表的結構,獲取和 釋放連接很方便.

//初始化連接池   一開始時鏈接池都是空閑的
cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n;
    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ngx_get_connection(ls[i].fd, cycle->log);

        if (c == NULL) {
            return NGX_ERROR;
        }

        c->log = &ls[i].log;

        c->listening = &ls[i];
        ls[i].connection = c;

        rev = c->read;

        rev->log = c->log;
        rev->accept = 1;// 將連接的accept設置成1,表示可以接受連接請求

#if (NGX_HAVE_DEFERRED_ACCEPT)
        rev->deferred_accept = ls[i].deferred_accept;
#endif

        if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
            if (ls[i].previous) {

                /*
                 * delete the old accept events that were bound to
                 * the old cycle read events array
                 */

                old = ls[i].previous->connection;

                if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
                    == NGX_ERROR)
                {
                    return NGX_ERROR;
                }

                old->fd = (ngx_socket_t) -1;
            }
        }
//在監聽到連接的時候,調用ngx_event_accept函數 rev
->handler = ngx_event_accept;//設置讀事件的處理函數,上面已經設置了rev->accept = 1,也就是說這是監聽套接字, //使用accept鎖時,等worker進程搶到accept鎖,再加入epoll事件循環中 if (ngx_use_accept_mutex) { continue; } //在不使用accept鎖時,直接將事件加入epoll事件循環 if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { if (ngx_add_conn(c) == NGX_ERROR) { return NGX_ERROR; } } else { if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } }

       最后這部分代碼完成的是最重要的功能,為所有監聽socket分配連接並初始化。在不使用accept鎖的情況會直接將所有的監聽socket放入epoll事件循環,而在使用accept鎖時worker進程必須獲得鎖后才能將監聽socket加入事件循環,這部分工作在事件主循環中完成。上面就是nginx事件模型初始化的全部過程,我們看到了epoll_create和epoll_ctl,那么epoll_wait在那調用的呢?大家還記得在worker進程的主循環中調用的ngx_process_events_and_timers函數吧,之前介紹過這個函數是用於處理網絡io事件的,實際就是epoll_wait被調用的地方。

     5. ngx_process_events_and_timers

 (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {  //當前是否擁有鎖
                flags |= NGX_POST_EVENTS;  //所有發生的事件都將延后處理,先釋放鎖

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    在使用accept鎖的情況下,需要先獲得這個鎖才能accept到新的連接,通過調用ngx_trylock_accept_mutex可以搶奪鎖搶奪鎖並accept連接。ngx_accept_disabled變量用於實現簡單的負載均衡,防止連接分配的不均勻。這個變量在ngx_event_accept函數被設置為:ngx_cycle->connection_n/8 - ngx_cycle->free_connection_n,當ngx_accept_disabled大於0時,worker進程將放棄搶奪鎖,專心處理已有的連接,並把ngx_accept_disabled變量減1。

     ngx_accept_mutex_held標記指示進程是否獲得鎖,得到鎖的進程會添加NGX_POST_EVENTS標記,這個標記意味着將所有發生的事件放到一個隊列中在,等到進程釋放鎖之后再慢慢處理,因為請求的處理可能非常耗時,如果不體現釋放鎖的話,會導致其他進程一直獲取不到鎖,這樣accept的效率很低。在ngx_trylock_accept_mutex函數內部會先獲取鎖,然后再調用ngx_enable_accept_events把所有的監聽socket添加到epoll事件循環中,下面看一下這個函數。

static ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ls[i].connection;

        if (c->read->active) {
            continue;
        }

        if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {

            if (ngx_add_conn(c) == NGX_ERROR) {
                return NGX_ERROR;
            }

        } else {
            if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }
        }
    }

    return NGX_OK;
}

在ngx_event_process_init中對於使用accept鎖的情況,沒有將監聽socket加入epoll事件循環,而是這個函數中完成的。下面繼續看ngx_process_events_and_timers函數。

delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;

上面代碼會調用ngx_process_events,根據上面的宏定義,實際會調用ngx_epoll_preocess_events,這個函數會調用epoll_wait監聽事件 。

6. ngx_epoll_process_events

    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

調用epoll_wait函數,根據flag標簽更新時間.

for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;//事件對應的連接信息ngx_connection_t

        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        rev = c->read;

        revents = event_list[i].events; //發生的事件
        //處理讀事件
        if ((revents & EPOLLIN) && rev->active) {

            if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
                rev->posted_ready = 1;

            } else {
                rev->ready = 1;//epoll設置
            }

            if (flags & NGX_POST_EVENTS)  
// * 使用accept鎖,此處會將事件添加到隊列中。
 這里根據是不是監聽socket放到不同的隊列
                queue = (ngx_event_t **) (rev->accept ?  &ngx_posted_accept_events : &ngx_posted_events);
                 // 添加到隊列 
                ngx_locked_post_event(rev, queue);

            } else {  //如果沒有使用accept鎖的話,則直接使用回調函數.
                rev->handler(rev);
            }
        }

        wev = c->write;
if ((revents & EPOLLOUT) && wev->active) {

            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            if (flags & NGX_POST_THREAD_EVENTS) {
                wev->posted_ready = 1;

            } else {
                wev->ready = 1;
            }

            if (flags & NGX_POST_EVENTS) {
                ngx_locked_post_event(wev, &ngx_posted_events);

            } else {
                wev->handler(wev);
            }
        }
}

    接下來遍歷所有返回的事件對這些事件進行處理,和正常使用epoll差不多,關於讀寫事件的處理直接看注釋。這就是ngx_epoll_process_events的全過程,下面繼續介紹ngx_process_events_and_timers的后面流程。

  //ngx_posted_accept_events是accept事件的隊列,這里會依次調用每個事件的handler並將其刪除。accept事件的handler就是ngx_event_accept,它是在ngx_event_process_init中為每個監聽socket的讀事件添加的,用來處理新建連接的初始化
if (ngx_posted_accept_events) { ngx_event_process_posted(cycle, &ngx_posted_accept_events); } //釋放accept鎖 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } if (ngx_posted_events) { if (ngx_threaded) { ngx_wakeup_worker_thread(cycle); } else { ngx_event_process_posted(cycle, &ngx_posted_events); } }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM