在開發高性能服務器中,定時器總是不可或缺的。 常見的定時器實現三種,分別是:排序鏈表,最小堆,時間輪。 之前用的定時器是基於最小堆的,在定時器數量不多時可以使用, 目前公司用的框架中的定時器是基於簡單時間輪的,但是為了支持大范圍的時間,每個齒輪的所維護的鏈表為有序鏈表,每次插入時先mod出spoke,再從頭遍歷鏈表以便將定時器插入到合適位置, 所以本質上還是基於有序鏈表的。時間復雜度並未減少。
應用場景分析: 下面就一個實際例子來說定時器的使用。
場景: 客戶端發起的網絡請求,需要對每個請求做超時檢查。
方案1:一個定時器,一個mulimap<endtime, request>保存請求超時列表, 每次超時時檢查mulimap。這樣,請求的插入時間復雜度為O(lgn), 遍歷和刪除為O(1)。且需要額外的編碼。
方案2:一個請求一個定時器,如此便無需額外的開銷來保存請求。已無需額外的編碼,等待超時處理即可(請求的信息作為參數給定時器node保存)。時間復雜度為0。
如果程序中的定時器數量比較少,基於最小堆的定時器一般可以滿足需求,且實現簡單。而對於方案2中的應用場景,對定時器的要求邊比較高了。
三種定時器算法復雜度分析:
| 實現方式 |
StartTimer |
StopTimer |
PerTickBookkeeping |
| 基於排序鏈表 |
O(n) |
O(1) |
O(1) |
| 基於最小堆 |
O(lgn) |
O(1) |
O(1) |
| 基於時間輪 |
O(1) |
O(1) |
O(1) |
實現分析:
排序鏈表:實現比較簡單,不多講。
最小堆: 用c++現成的multimap保存便可以。實現亦比較簡單。
時間輪: 時間輪的實現 有 簡單時間輪(一個時間輪),分級時間輪。
簡單時間輪: 一個齒輪,每個齒輪保存一個超時的node鏈表。一個齒輪表示一個時間刻度,比如鍾表里面一小格代表一秒,鍾表的秒針每次跳一格。假設一個刻度代表10ms,則2^32 個格子可表示1.36年,2^16個格子可表示10.9分鍾。當要表示的時間范圍較大時,空間復雜度會大幅增加。

分級時間輪: 類似於水表,當小輪子里的指針轉動滿一圈后,上一級輪子的指針進一格。 采用五個輪子每個輪子為一個簡單時間輪,大小分別為 2^8, 2^6, 2^6, 2^6, 2^6,所需空間:2^8 + 2^6 + 2^6 + 2^6 + 2^6 = 512, 可表示的范圍為 0 -- 2^8 * 2^6 * 2^6* 2^6* 2^6 = 2^32 。
Linux底層的定時器實現便是基於此。下圖為引用的linux內核定時器的實現。

基於分級時間輪的C++實現: 已經過測試
所需數據結構:
每個spoke維護的node鏈表為一個環,如此可以簡化插入刪除的操作。spoke->next為node鏈表中第一個節點,prev為node連接的最后一個節點。
#define GRANULARITY 10 //10ms #define WHEEL_BITS1 8 #define WHEEL_BITS2 6 #define WHEEL_SIZE1 (1 << WHEEL_BITS1) //256 #define WHEEL_SIZE2 (1 << WHEEL_BITS2) //64 #define WHEEL_MASK1 (WHEEL_SIZE1 - 1) #define WHEEL_MASK2 (WHEEL_SIZE2 - 1) #define WHEEL_NUM 5 typedef struct stNodeLink { stNodeLink *prev; stNodeLink *next; stNodeLink() {prev = next = this;} //circle }SNodeLink; typedef struct stTimerNode { SNodeLink link; uint64_t dead_time; CThreadTimer *timer; stTimerNode(CThreadTimer *t, uint64_t dt) : dead_time(dt), timer(t) {} }STimerNode; typedef struct stWheel { SNodeLink *spokes; uint32_t size; uint32_t spokeindex; stWheel(uint32_t n) : size(n), spokeindex(0){ spokes = new SNodeLink[n]; } ~stWheel() { /** clean **/ } }SWheel; SWheel *wheels_[WHEEL_NUM];
插入定時器:根據超時范圍選擇輪子,再通過mod/n求出要插入的spoke位置。
void CTimerManager::AddTimerNode(uint32_t milseconds, STimerNode *node) { SNodeLink *spoke = NULL; uint32_t interval = milseconds / GRANULARITY; uint32_t threshold1 = WHEEL_SIZE1; uint32_t threshold2 = 1 << (WHEEL_BITS1 + WHEEL_BITS2); uint32_t threshold3 = 1 << (WHEEL_BITS1 + 2 * WHEEL_BITS2); uint32_t threshold4 = 1 << (WHEEL_BITS1 + 3 * WHEEL_BITS2); if (interval < threshold1) { uint32_t index = (interval + wheels_[0]->spokeindex) & WHEEL_MASK1; spoke = wheels_[0]->spokes + index; } else if (interval < threshold2) { uint32_t index = ((interval - threshold1 + wheels_[1]->spokeindex * threshold1) >> WHEEL_BITS1) & WHEEL_MASK2; spoke = wheels_[1]->spokes + index; } else if (interval < threshold3) { uint32_t index = ((interval - threshold2 + wheels_[2]->spokeindex * threshold2) >> (WHEEL_BITS1 + WHEEL_BITS2)) & WHEEL_MASK2; spoke = wheels_[2]->spokes + index; } else if (interval < threshold4) { uint32_t index = ((interval - threshold3 + wheels_[3]->spokeindex * threshold3) >> (WHEEL_BITS1 + 2 * WHEEL_BITS2)) & WHEEL_MASK2; spoke = wheels_[3]->spokes + index; } else { uint32_t index = ((interval - threshold4 + wheels_[4]->spokeindex * threshold4) >> (WHEEL_BITS1 + 3 * WHEEL_BITS2)) & WHEEL_MASK2; spoke = wheels_[4]->spokes + index; } SNodeLink *nodelink = &(node->link); nodelink->prev = spoke->prev; spoke->prev->next = nodelink; nodelink->next = spoke; spoke->prev = nodelink; }
刪除定時器:實際上是刪除一個雙向鏈表的元素。只需修改其前后節點的prev next指針指向而已。
void CTimerManager::RemoveTimer(STimerNode* node) { SNodeLink *nodelink = &(node->link); if (nodelink->prev) { nodelink->prev->next = nodelink->next; } if (nodelink->next) { nodelink->next->prev = nodelink->prev; } nodelink->prev = nodelink->next = NULL; delete node; }
定時間超時檢查:
void CTimerManager::DetectTimerList() { uint64_t now = GetCurrentMillisec(); uint32_t loopnum = now > checktime_ ? (now - checktime_) / GRANULARITY : 0; SWheel *wheel = wheels_[0]; for (uint32_t i = 0; i < loopnum; ++i) { SNodeLink *spoke = wheel->spokes + wheel->spokeindex; SNodeLink *link = spoke->next; while (link != spoke) { STimerNode *node = (STimerNode *)link; link->prev->next = link->next; link->next->prev = link->prev; link = node->link.next; AddToReadyNode(node); } if (++(wheel->spokeindex) >= wheel->size) { wheel->spokeindex = 0; Cascade(1); } checktime_ += GRANULARITY; } DoTimeOutCallBack(); }
降級:
uint32_t CTimerManager::Cascade(uint32_t wheelindex) { if (wheelindex < 1 || wheelindex >= WHEEL_NUM) { return 0; } SWheel *wheel = wheels_[wheelindex]; int casnum = 0; uint64_t now = GetCurrentMillisec(); SNodeLink *spoke = wheel->spokes + (wheel->spokeindex++); SNodeLink *link = spoke->next; spoke->next = spoke->prev = spoke; while (link != spoke) { STimerNode *node = (STimerNode *)link; link = node->link.next; if (node->dead_time <= now) { AddToReadyNode(node); } else { uint32_t milseconds = node->dead_time - now; AddTimerNode(milseconds, node); ++casnum; } } if (wheel->spokeindex >= wheel->size) { wheel->spokeindex = 0; casnum += Cascade(++wheelindex); } return casnum; }
項目開源地址: https://github.com/ape2010/ape_cpp_server/tree/master/frame/common/
