Nginx的定時事件的實現(timer)
在前面的文章里面就說到了在事件循環中除了要處理所有的從epoll中獲取的事件之外,還要處理一些timer事件,這篇文章就講講Nginx的timer是如何實現的。
在講Nginx的實現之前,我們可以先回顧一下linux的定時器的實現。在linux中通過每次系統定時器時鍾的中斷的中斷處理程序來設置相應的軟中斷位,該軟中斷的中斷處理程序目的就是為了掃描系統中所有掛起的定時器,如果他們已經超時的話,那么就調用他們相應的函數,這樣說白了就是利用中斷來實現定時器。
而在Nginx中,timer是自己實現的,而且實現的方法完全不同,而是通過紅黑樹來維護所有的timer節點,在worker進程的每一次循環中都會調用ngx_process_events_and_timers函數,在該函數中就會調用處理定時器的函數ngx_event_expire_timers,每次該函數都不斷的從紅黑樹中取出時間值最小的,查看他們是否已經超時,然后執行他們的函數,直到取出的節點的時間沒有超時為止。其實在Nginx的運行環境中,這種實現方式可能比linux本身的實現方式更為高效。
首先我們來看Nginx的定時器的初始化,在ngx_event_process_init函數中( Ngx_event.c):
/*初始化計時器,此處將會創建起一顆紅黑色,來維護計時器。*/ if (ngx_event_timer_init(cycle->log) == NGX_ERROR) { return NGX_ERROR; }
嗯,就是這個函數,那么接下來我們來看這個函數吧,
//timer的初始化 ngx_int_t ngx_event_timer_init(ngx_log_t *log) { //初始化紅黑樹 ngx_rbtree_init(&ngx_event_timer_rbtree, &ngx_event_timer_sentinel, ngx_rbtree_insert_timer_value); #if (NGX_THREADS) if (ngx_event_timer_mutex) { ngx_event_timer_mutex->log = log; return NGX_OK; } ngx_event_timer_mutex = ngx_mutex_init(log, 0); if (ngx_event_timer_mutex == NULL) { return NGX_ERROR; } #endif return NGX_OK; }
其實該函數非常的簡單,說白了就是直接調用ngx_rbtree_init函數來初始化一顆紅黑樹而已(紅黑樹是Nginx自己實現的)。
接下來來看如何定義一個timer事件:
//將一個事件加入到紅黑樹當中,它的超時未timer時間 static ngx_inline void ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer) //timer說白了就是一個int的值,表示超時的事件,用於表示紅黑樹節點的key { ngx_msec_t key; ngx_msec_int_t diff; key = ngx_current_msec + timer; //表示該event的超時時間,為當前時間的值加上超時變量 if (ev->timer_set) { /* * Use a previous timer value if difference between it and a new * value is less than NGX_TIMER_LAZY_DELAY milliseconds: this allows * to minimize the rbtree operations for fast connections. */ diff = (ngx_msec_int_t) (key - ev->timer.key); if (ngx_abs(diff) < NGX_TIMER_LAZY_DELAY) { ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "event timer: %d, old: %M, new: %M", ngx_event_ident(ev->data), ev->timer.key, key); return; } ngx_del_timer(ev); } ev->timer.key = key; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, "event timer add: %d: %M:%M", ngx_event_ident(ev->data), timer, ev->timer.key); ngx_mutex_lock(ngx_event_timer_mutex); ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer); //事件的timer域插入到紅黑樹當中 ngx_mutex_unlock(ngx_event_timer_mutex); ev->timer_set = 1; }
該函數用於將事件加入到紅黑樹中,首先設置超時時間,也就是當前的時間加上傳進來的超時時間。然后再將timer域加入到紅黑樹中就可以了,這里timer域的定義說白了是一棵紅黑樹節點。然后還有一個函數ngx_event_del_timer,它用於將某個事件從紅黑樹當中移除。
接下來我們來看看Nginx到底是如何處理定時事件的。在ngx_process_events_and_timers函數中首先會有如下的代碼:
if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); //找到當前紅黑樹當中的最小的事件,傳遞給epoll的wait,保證epoll可以該時間內可以超時,可以使得超時的事件得到處理 flags = NGX_UPDATE_TIME; #if (NGX_THREADS) if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; } #endif }
該段代碼主要是調用ngx_event_find_timer函數獲取當前紅黑樹中超時時間最小的一個節點,該函數的定義如下:
//用於返回當前紅黑樹當中的超時時間,說白了就是返回紅黑樹中最左邊的元素的超時時間 ngx_msec_t ngx_event_find_timer(void) { ngx_msec_int_t timer; ngx_rbtree_node_t *node, *root, *sentinel; if (ngx_event_timer_rbtree.root == &ngx_event_timer_sentinel) { return NGX_TIMER_INFINITE; } ngx_mutex_lock(ngx_event_timer_mutex); root = ngx_event_timer_rbtree.root; sentinel = ngx_event_timer_rbtree.sentinel; node = ngx_rbtree_min(root, sentinel); //找到紅黑樹中key最小的節點 ngx_mutex_unlock(ngx_event_timer_mutex); timer = (ngx_msec_int_t) (node->key - ngx_current_msec); return (ngx_msec_t) (timer > 0 ? timer : 0); }
該函數其實還是很簡單的,說白了就是找到紅黑樹中key的值最小的節點,然后返回它還剩下的超時時間就可以了。在ngx_process_events_and_timers函數中之所以要獲取這個值,是為了為epoll的wait提供一個超時時間,以防止epoll的wait阻塞時間太長,影響了timer的處理。接着或有如下的代碼:
/*delta是上文對epoll wait事件的耗時統計,存在毫秒級的耗時 就對所有事件的timer進行檢查,如果time out就從timer rbtree中, 刪除到期的timer,同時調用相應事件的handler函數完成處理。 */ if (delta) { ngx_event_expire_timers(); }
在ngx_process_events_and_timers函數中調用ngx_event_expire_timers函數來處理所有的定時事件。嗯,這里可以看到一個比較奇怪的現象,干嘛要判斷delta的值呢,嗯,其實這個值是統計處理其余事件的用時,如果用時超過了毫秒,那么才會真正的調用ngx_event_expire_timers函數來處理所有的定時,否則不會,因為距離上次循環間隔太小,完全沒有必要。嗯,性能的提升也就是從這一點一滴中獲取的吧。接下來來看ngx_event_expire_timers函數吧:
//處理紅黑樹中所有超時的事件 void ngx_event_expire_timers(void) { ngx_event_t *ev; ngx_rbtree_node_t *node, *root, *sentinel; sentinel = ngx_event_timer_rbtree.sentinel; //死循環,找到所有的超時的timer,然后處理他們 for ( ;; ) { ngx_mutex_lock(ngx_event_timer_mutex); root = ngx_event_timer_rbtree.root; if (root == sentinel) { return; } node = ngx_rbtree_min(root, sentinel); //獲取key最小的節點 /* node->key <= ngx_current_time */ if ((ngx_msec_int_t) (node->key - ngx_current_msec) <= 0) { //判斷該節點是否超時,如果超時的話,就執行處理函數,否則就可以跳出循環了 //通過偏移來獲取當前timer所在的event ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer)); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "event timer del: %d: %M", ngx_event_ident(ev->data), ev->timer.key); //將當前timer移除 ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); ngx_mutex_unlock(ngx_event_timer_mutex); ev->timer_set = 0; ev->timedout = 1; ev->handler(ev); //調用event的handler來處理這個事件 continue; } break; } ngx_mutex_unlock(ngx_event_timer_mutex); }
該函數其實很簡單的,一看就看明白了,說白了就是一個死循環,不斷的從紅黑樹中獲取key最小的元素,如果超時的話,就通過偏移量來獲取其所在的event,然后執行handler,直到找到一個沒有超時的timer為止,跳出循環。
嗯,到這里timer就講完了。
轉自:http://www.xuebuyuan.com/2041520.html