前幾天nodejs發布了新版本4.0,其中涉及到一個更新比較多的模塊,那就是下面要介紹的timer模塊。
timers: Improved timer performance from porting the 0.12 implementation, plus minor fixes (Jeremiah Senkpiel)#2540, (Julien Gilli)nodejs/node-v0.x-archive#8751nodejs/node-v0.x-archive#8905
之前也對timer模塊有過比較多的研究,斷斷續續的看過這個模塊在github上的一些改動,於是借着這次機會整理一下自己對timer模塊的理解,和小伙伴們一起分享timer模塊的優化過程。
使用場景
也許你在使用nodejs開發項目時並沒有使用到timer模塊,諸如setTimeout以及setInterval和setImediate方法等等。但如果你開發的是web項目,那么你的項目中一定涉及到了timer模塊。
細心的同學在平時的http接口開發調試中可能會注意到,每個http的request header里都有一個Connection:keep-alive
標識,這是http/1.1開始引入的,表示客戶端需要和服務端一直保持着tcp連接。當然了,這個連接不能就這么一直保持着,所以一般都會有一個超時時間,超過這個時間客戶端還沒有發送新的http請求,那么服務器就需要自動斷開從而繼續為其他客戶端提供服務。
nodejs提供的http服務器便是采用timer模塊來滿足這種請求,每一個新的連接到來構造出一個socket對象,便會調用socket.setTimeout設置一個定時器用於超時后自動斷開連接。
設計
在nodejs開發的web項目中,timer模塊的使用頻率是非常高的,每一個新的連接到來都會設置它的超時時間,而且每個連接的超時時間都一樣,在http server中默認是2 * 60 * 1000ms。nodejs使用c++包裹的Timer對象來實現定時器功能,下面的代碼示例了使用Timer對象來實現一個非常簡單的定時器。
const Timer = process.binding('timer_wrap').Timer;
const kOnTimeout = Timer.kOnTimeout | 0;
var mySetTimeout = function (fn, ms) {
var timer = new Timer();
timer.start(ms, 0);
timer[kOnTimeout] = fn;
return timer;
}
var myClearTimeout = function(timer){
if(timer && timer.close) {
timer.close();
}
}
mySetTimeout(function() {
console.log('timeout!');
},1000);
那我們是否就可以用上面實現的mySetTimeout來對每個socket進行超時操作呢
mySetTimeout(function(){socket.close();},2 * 60 * 1000);
可以是可以,但是這樣真的好嗎?設想我們做的是一個非常棒的產品,每天好幾百萬上千萬的用戶,高峰期在2 * 60 * 1000ms這段時間內會產生非常多的新連接,必然會創建非常多的Timer對象,這個開銷還真不小!
nodejs在設計之初就非常非常注重性能,所以像上面這種這么簡單的方案必然是不能接受的。
實際上在這2分鍾之內,nodejs中的timer模塊只會創建一個Timer對象,一個Timer對象如何來滿足這么多連接的超時處理呢?
timer模塊會使用一個鏈表來保存所有超時時間相同的對象,每個對象中都會存儲開始時間_idleStart以及超時時間_idleTimeout。鏈表中第一個加入的對象一定會比后面加入的對象先超時,當第一個對象超時完成處理后,重新計算下一個對象是否已經到時或者還有多久到時,之前創建的Timer對象便會再次啟動並設置新的超時時間,直到當鏈表上所有的對象都已經完成超時處理,此時便會關閉這個Timer對象。
通過這種巧妙的設計,使得一個Timer對象得到了最大的重用,從而極大的提升了timer模塊的性能。這一場景其實在libev中已早有研究 http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
實現
上面說到timer模塊通過c++提供的Timer對象,最終生成setTimeout以及setInterval等函數暴露給用戶使用。那Timer對象是如何實現的呢,下面我們就來一探究竟。
一個最底層的timer
熟悉linux網絡編程的同學一定聽說過epoll吧,
epoll是什么?按照man手冊的說法:是為處理大批量句柄而作了改進的poll。當然,這不是2.6內核才有的,它是在2.5.44內核中被引進的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
其中有這么一個函數
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll監控的事件中已經發送的事件。參數events是分配好的epoll_event結構體數組,epoll將會把發生的事件賦值到events數組中(events不可以是空指針,內核只負責把數據復制到這個events數組中,不會去幫助我們在用戶態中分配內存)。maxevents告之內核這個events有多大,這個 maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。如果函數調用成功,返回對應I/O上已准備好的文件描述符數目,如返回0表示已超時。
當我們監聽一個fd上的事件時,可以設置等待事件發生的超時時間。利用這個特性便可以非常簡單的實現一個定時器功能。
由於我使用的是mac系統,所以就用kqueue來代替epoll(它們之間非常相似,具體的詳細介紹以及使用方法感興趣的可以自行查閱相關資料)
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
const int MAX_EVENT_COUNT = 5000;
int main() {
struct timeval t_start,t_end;
int fd = -1;//構造一個不會有任何事件發生的fd
int kq = kqueue();
struct kevent changes[1];
EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
int timeout = 3500;
struct kevent events[MAX_EVENT_COUNT];
struct timespec spec;
spec.tv_sec = timeout / 1000;
spec.tv_nsec = (timeout % 1000) * 1000000;
gettimeofday(&t_start, NULL);
kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec);
gettimeofday(&t_end, NULL);
printf("timeout = %d, run time is %ld\n", timeout, t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000));
return 0;
}
至此我們便利用kqueue實現了一個非常簡單非常底層的定時器。
libuv中的timer
前面講到,在http server中每一個新的連接並不會真的就去創建一個Timer對象。同樣,在nodejs底層的定時器中,並不會每次創建一個Timer對象就在kqueue上注冊一個事件等待超時。優化的思路和nodejs中的timer模塊很相似,只不過現在不能保證每個定時器的超時時間都一樣。
定時器有一個非常顯著的特征,超時時間最短的定時器一定最先觸發,假設我們有很多的定時任務,每個任務的執行時間都不同。當第一個定時器超時后,便從這些任務中查找出已經到點的任務並執行對應的超時處理,然后再重新計算余下任務中最先執行的時間,並根據這個時間再次開啟一個定時器。
對應的算法需求就是每次都需要查找集合中最小的元素,顯然二叉堆中的最小堆(父結點的鍵值總是小於或等於任何一個子節點的鍵值)是最適合不過的一種數據結構了。由於最小的元素總是處於根節點,我們可以以O(1)時間找到最小值。對於插入操作,在最壞的情況下,新插入的節點需要不斷的和它的父節點進行交換,直到它為根節點為止。假設堆的高度為h, 二叉樹最多有2^(h+1) - 1 個 節點. 因此新插入一個節點最多需要log(n+1) -1 次比較,其算法復雜度為O(logn)。

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
//https://github.com/joyent/libuv/blob/master/src/heap-inl.h
#include "heap-inl.h"
#define container_of(ptr, type, member) \
((type *) ((char *) (ptr) - offsetof(type, member)))
const int MAX_EVENT_COUNT = 5000;
typedef struct {
struct heap_node* heap_node[3];
int value;
}node_t;
static int less_than(const struct heap_node* ha, const struct heap_node* hb) {
const node_t* a;
const node_t* b;
a = container_of(ha, const node_t, heap_node);
b = container_of(hb, const node_t, heap_node);
if (a->value < b->value)
return 1;
return 0;
}
int main() {
struct heap *heap_p = malloc(sizeof(node_t));
heap_init(heap_p);
int a[] = {10,9,8,6,7,3,5,4,2};
int len = sizeof(a)/sizeof(int);
for(int i=0;i<len;i++){
node_t *node_p = malloc(sizeof(node_t));
node_p->value = a[i]*1000;
heap_insert(heap_p, (struct heap_node*)node_p->heap_node, less_than);
}
int fd = -1;
int kq = kqueue();
struct kevent changes[1];
EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
struct kevent events[MAX_EVENT_COUNT];
struct timeval t_start,t_end;
while(heap_p->nelts) {
node_t *node_p = container_of(heap_p->min, node_t, heap_node);
struct timespec spec;
spec.tv_sec = node_p->value / 1000;
spec.tv_nsec = (node_p->value % 1000) * 1000000;
gettimeofday(&t_start, NULL);
kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec);
gettimeofday(&t_end, NULL);
printf("timeout = %d, run time is %ld\n", node_p->value, t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000));
heap_dequeue(heap_p, less_than);
}
printf("timer is over!\n");
return 0;
}
執行gcc timer.c -o timer && ./timer后輸出
timeout = 2000, run time is 2004
timeout = 3000, run time is 3000
timeout = 4000, run time is 4003
timeout = 5000, run time is 5005
timeout = 6000, run time is 6005
timeout = 7000, run time is 7005
timeout = 8000, run time is 8004
timeout = 9000, run time is 9000
timeout = 10000, run time is 10005
timer is over!
可以看到我們設置的9個定時器都預期執行了,除了有5ms以內的偏差。這就是nodejs中最底層的定時器實現了。
nodejs中的timer
我們再回到nodejs中的timer模塊,為了不影響到nodejs中的event loop,timer模塊專門提供了一些內部的api(timers._unrefActive)給像socket這樣的對象使用。
timer內部會維護一個unrefList鏈表以及一個unrefTimer Timer對象,當有新的超時任務到來時便會添加到unrefList中,超時后便從unrefList中取出任務執行。
在最初的設計中,每次執行_unrefActive添加任務時都會維持着unrefList的順序,保證超時時間最小的處於前面。這樣在定時器超時后便可以以最快的速度處理超時任務並設置下一個定時器,但是在添加任務時最壞的情況下需要遍歷unrefList鏈表中的所有節點。具體實現可參考https://github.com/nodejs/node/blob/5abd4ac079b390467360d671a186a061b5aba736/lib/timers.js
很顯然,在web開發中建立連接是最頻繁的操作,那么向unrefList鏈表中添加節點也就非常頻繁了,而且最開始設置的定時器其實最后真正會超時的非常少,因為中間涉及到io的正常操作時便會取消定時器。所以問題就變成最耗性能的操作非常頻繁,而幾乎不花時間的操作卻很少被執行到。
針對這種情況,如何解決呢?目前在node社區主要有2種方案。
使用不排序的鏈表
主要思路就是將對unrefList鏈表的遍歷操作,移到unrefTimeout定時器超時處理中。這樣每次查找出已經超時的任務就需要花比較多的時間了O(n),但是插入操作卻變得非常簡單O(1),而插入節點正是最頻繁的操作。
使用二叉堆
原理和libuv中的timer實現一樣,添加和查找一個節點都能達到O(log(n))的復雜度(找出最小節點本身很快,但是刪除它需要O(log(n))的復雜度),能夠在二者之間保持一個很好的平衡。
benchamark
這2種方案都有比較詳細benchamark數據, 具體可參考https://github.com/nodejs/node-v0.x-archive/wiki/Optimizing-_unrefActive
小結
在高並發連接到來並且很少有實際的超時事件發生時unrefList使用沒有排序的鏈表來存儲超時任務時性能是非常棒的。但是一旦出現很多超時事件都發生的情況下,對超時事件的處理會再次變成一個瓶頸。
而使用二叉堆來存儲超時任務時,當有大量超時事件發生時性能會比鏈表好很多,沒有超時事件觸發時性能比鏈表稍差。
可見nodejs在不同的場景中使用的定時器實現也不都一樣。當我們自己在實際的開發時,如果需要使用到定時器功能,不妨好好思考下哪種方案更適合業務場景,能夠最大的提升timer模塊的性能。
參考文檔
- HTTP Keep Alive分析與優化總結
- 使用 kqueue 在 FreeBSD 上開發高性能應用服務器
- https://github.com/nodejs/node/blob/master/lib/timers.js
- https://github.com/nodejs/node/commit/e5bb66886bfa40818de7a96e982c5964eef9eb78
- https://github.com/nodejs/node-v0.x-archive/wiki/Optimizing-_unrefActive
- http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
- binary heap