深入Linux C/C++ Timer定時器的實現核心原理


我曾以為像定時器這樣基礎的功能,操作系統會有一個完備的實現。當需要開啟一個定時任務的時候,會有一個優雅的、如下形式的接口:

1
2
typedef void (*callback)(void*);
void setTimeout(unsigned int second,callback cb,void* arg);

 

可是事與願違,Linux下不存在這樣的接口。

定時器的實現原理

定時器的實現依賴的是CPU時鍾中斷,時鍾中斷的精度就決定定時器精度的極限。一個時鍾中斷源如何實現多個定時器呢?對於內核,簡單來說就是用特定的數據結構管理眾多的定時器,在時鍾中斷處理中判斷哪些定時器超時,然后執行超時處理動作。而用戶空間程序不直接感知CPU時鍾中斷,通過感知內核的信號、IO事件、調度,間接依賴時鍾中斷。用軟件來實現動態定時器常用數據結構有:時間輪、最小堆和紅黑樹。下面就是一些知名的實現:

Hierarchy 時間輪算法:Linux內核

紅黑樹最小堆算法:Asio C++ Library或nginx 

 

Linux上的定時函數

要想使用上面那樣的定時器功能,我們必須利用Linux上現有的定時通知函數,封裝一個定時器。Linux上的定時通知函數五花八門,要封裝我們自己的定時器,首先需要選用一個定時通知的函數。查閱資料整理出了Linux上所有的定時函數,如下表:

Function Type Precision Remark
sleep(3) unsigned int second  
usleep(3) useconds_t microsecond  
nanosleep(2) struct timespec nanosecond  
clock_nanosleep(2) struct timespec nanosecond It differs in allowing the caller to select the clock against which the sleep interval is to be measured, and in allowingthe sleep interval to be specified as either an absolute or a relative value.
alarm(2) unsigned int second SIGALRM
setitimer(2) struct itimerval microsecond SIGALRM
timer_settime(2) struct itimerspec nanosecond notify method : struct sigevent
Timerfd API File descriptor nanosecond From linux kernel 2.6.25

前四個函數比較雞肋,會讓調用線程掛起,原地等待定時器超時,否定。

alarm()和setitimer(),它們的通知機制采用了信號SIGALRM,由於SIGALRM信號不可靠,會造成超時通知不可靠,而且多線程中處理信號也是一個麻煩事,也不考慮。

timer_create()/timer_settime()系列函數是POSIX規定,精度達到納秒級,提供了一個數據結構struct sigevent可以指定一個實時信號作為通知信號,同時也可以設置線程ID,將信號傳遞到指定的線程。相比前兩個函數,有了不小的改進,可以作為一個備選的實現,但是可以預見到封裝起來不會很輕松。此外使用此系列的函數,需要鏈接librt庫。

事實上,我們遺漏掉了幾個同樣具有定時的功能的API——多路復用。在Linux上的多路復用機制有select/poll/epoll幾種,它們輪詢時都允許指定一個超時時間,如果在指定時間內,監控的事件沒有到達,輪詢函數會超時返回。精度也足夠用,poll/epoll是毫秒級的(millisecond),select超時參數是struct timeval,是微秒級的(microsecond)。

選擇epoll的優勢很明顯,能將定時功能完美的融入已有的event loop里,同時epoll有着天然的高並發的能力,millisecond級的精度也足夠用。

獲取當前時間

要實現一個定時器,有了定時函數,我們還需要選用一個獲取時間的函數。同樣地,這些函數我也整理了一下:

Function Type Precision Remark
time(2) time_t second  
ftime(3) struct timeb millisecond obsolete
gettimeofday(2) struct timeval microsecond  
clock_gettime(2) struct timespec nanosecond  
Time Stamp Counter 64-bit register CPU related on all x86 processors since the Pentium(TSC)

time()精度太低,不合適。

ftime() 毫秒級精度,但是被廢棄了,也不合適。

gettimeofday() 精度達到微秒級,並且在x86-64平台上該函數的調用不是系統調用(vdso),似乎很合適,不幸的是POSIX.1-2008中也將這個函數廢棄了。

Time Stamp Counter 使用匯編指定獲取時間戳的計數器,精度應該是最高的,效率可能也應該是最高的,一條匯編指令rdtscp(相比rdtsc,rdtscp可以避免,因為cpu亂序執行帶來的誤差問題)即可。是可以作為一個選擇的,騰訊的libco就是優先使用這個方法獲取時間的。

clock_gettime() 。默認是nanosecond 級精度,是系統調用(_sys_clock_gettime()),會有開銷。調用頻繁的話,可能造成損失性能。但是Linux 2.6.32后可以指定參數CLOCK_REALTIME_COARSECLOCK_MONOTONIC_COARSE,粗粒度地獲取時間,而不需要發生上下文切換(和gettimeofday()一樣也是vdso技術,https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux_for_real_time/7/html/reference_guide/sect-posix_clocks#CLOCK_MONOTONIC_COARSE_and_CLOCK_REALTIME_COARSE)。使用_COARSE后綴獲取的時間,精度是millisecond級。對我們來說也夠用了。

要對clock_gettime系統調用的開銷有一個直觀的感受的話可以,借助於strace工具,利用-T參數可以追蹤每個系統調用的時間開銷,比我我的環境上CLOCK_MONOTONIC獲取時間的開銷大概是50微秒

clock

換成CLOCK_MONOTONIC_COARSE方式再去獲取時間,用strace就追蹤不到了。

定時器的設計

有了獲取時間函數clock_gettime和定時函數epoll之后,我們就可以開始設計定時器了。首先明確一點,epoll和其他的定時通知函數一樣,一次也只能設置一個超時時間,依然不能滿足我開篇提出的需求。

主流的做法是利用一個容器保存所有設置的超時時間,將容器里最快的超時的時間設置為epoll_wait的超時時間。比如,我先后設置了1400ms,800ms,300ms,2900ms,那么下一次事件循環就將epoll_wait的第四個參數設置為300ms。

如果用鏈表保存的話,每次設置定時器都要遍歷一遍鏈表才能選到最快超時的那個時間,復雜度太高,如果設置了定時器特別多的話,這樣的開銷不能接受。

要像O(1)的時間獲取到最小的哪個值,用最小堆保存超時時間正合適,效率大大提高。事實上libevent就是這么實現的(C語言實現的min_heap_t)。

最小堆實現

先實現一個類Timer表示每一個被添加的定時,構造時需要一個millisecond為單位的超時時間,一個回調函數,一個回調函數的參數。為了簡化實現,我測試用的超時的回調函數,並未使用回調函數的參數,但也沒有去掉,僅僅是占個坑的作用。本來是想打算把args抽象,將Timer寫成模板類,防止本末倒置,本文僅為演示定時器的實現,越簡單越好。

expire時間用的是相對系統啟動的時間,是一個不可以設置的恆定的時間(nonsettable monotonic clock),而不是用的真實的時間(Wall time ,牆上時間),因為這個時間可能會隨着設置系統的日期時間而發生跳躍性的改變。

1
2
3
4
5
6
7
8
9
10
11
12
class Timer
{
public:
Timer(unsigned long long expire, std::function<void(void)> fun, void *args)
: expire_(expire), fun(fun){ }
inline void active() { fun(); }
inline unsigned long long getExpire() const{ return expire_; }
private:
std::function<void(void)> fun;
void *args;
unsigned long long expire_;
};

TimerManager是用戶操作的接口,提供增加,刪除定時器的功能。STL中提供能優先隊列,直接可以拿來用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TimerManager
{
public:
TimerManager() {}

Timer *addTimer(int timeout, std::function<void(void)> fun, void *args = NULL);
void delTimer(Timer* timer);
unsigned long long getRecentTimeout();
void takeAllTimeout();
unsigned long long getCurrentMillisecs();

private:
struct cmp
{
bool operator()(Timer*& lhs, Timer*& rhs) const { return lhs->getExpire() > rhs->getExpire(); }
};
std::priority_queue<Timer*,std::vector<Timer*>,cmp> queue_;
};

addTimer()參數和Timer構造函數一直,實現就是構造一個Timer然后加入到std::priority_queue后,返回Timer指針。

delTimer() 刪除一個指定的Timer,由於priority_queue沒有提供erease()接口,因此刪除Timer的操作,我這里采用了新建一個priority_queue的做法,復雜度O(n)。

getRecentTimeout()獲取一個最近的超時時間(超時時間 = 優先隊列里的時間 - 當前獲取的系統啟動時間)。如果這個值小於0,那么說明這個定時器已經超時了,將其置為0,稍后的epoll_wait將會立馬返回。

takeAllTimeout() 函數,處理超時的定時,並回調其綁定的回調函數。由於超時的可能不止一個定時,需要用一個循環遍歷所有超時的Timer,一一處理。

getCurrentMillisecs()對clock_gettime()的封裝,獲取到的struct timespec轉換為millisecond。

這兩個類的完整實現,我放到了Github上了:https://gist.github.com/baixiangcpp/b2199f1f1c7108f22f47d2ca617f6960。使用的時候,只需要在你的主循環里,把epoll_wait的超時參數設置為TimerManager::getRecentTimeout(),每次epoll_wait()返回后,處理一下超時事件TimerManager::takeAllTimeout()。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
int dispatch()
{
...
TimerManager tm;
tm.addTimer(1000, []() { std::cout << "hello world" << std::endl; }, NULL);
tm.addTimer(5000, []() { std::cout << "hello baixiancpp" << std::endl; }, NULL);
for(;;)
{
int ret = epoll_wait(epollfd,events,events_num,tm.getRecentTimeout());
tm.takeAllTimeout();
}
...
}

時間輪實現

另外一種常見的定時器設計使用的存放超時時間的容器叫做”時間輪”。微信的開源項目libco中使用的就是這種數據結構。

Hierarchy 時間輪的原理大致如下,下面是一個時分秒的Hierarchy時間輪,不同於Linux內核的實現,但原理類似。對於時分秒三級時間輪,每個時間輪都維護一個cursor,新建一個timer時,要掛在合適的格子,剩余輪數以及時間都要記錄,到期判斷超時並調整位置。原理圖大致如下:

timer wheel

 

 

 

 

對於時間輪的實現,Timer依然是存放在鏈表上,但是借助了hash的思想,將相同間隔(或者相同周期的整數倍)的超時Timer放在同一個時間輪子上的槽(slot)上。時間輪上有一個指針,按照一個基准的頻率(比如1ms,5ms,10ms等,libco中設置的是1ms)向前移動。這個基准的頻率就是傳遞給epoll_wait()超時的值,也是定時器精度的基本單位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Timer
{
public:
Timer(int rotations,int slot,std::function<void(void)> fun,void* args)
: rotations_(rotations),slot_(slot),fun(fun) { }
inline int getRotations() { return rotations_; }
inline void decreaseRotations() { --rotations_; }
inline void active() { fun(); }
inline int getSlot() { return slot_; }
private:
int rotations_;
int slot_;
std::function<void(void)> fun;
void* args;
};

時間輪中的Timer類和最小堆中的實現,多了兩個參數,rotations表示時間輪轉多少圈后當前的Timer會觸發,slot表示當前的Timer應該掛在對應的槽指向的鏈表上。成員函數比較簡單,不多贅述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TimeWheel
{
public:
TimeWheel(int nslots)
: nslosts_(nslots),curslot_(0),
slots_(nslosts_,std::vector<Timer*>()),starttime_(getCurrentMillisecs()) { }
~TimeWheel();
unsigned long long getCurrentMillisecs();
Timer *addTimer(int timeout,std::function<void(void)> fun,void* args);
void delTimer(Timer *timer);
void tick();
void takeAllTimeout();

private:
int nslosts_;
int curslot_;
unsigned long long starttime_;
std::vector<std::vector<Timer*>> slots_;
};

curslot_表示時間輪當前指向的那個slot。nslosts_表示這個時間輪擁有多少個slot,不要上圖迷惑了,實際上slot會遠遠超過這8個。要想效率足夠高,slot就要越大,當然占用的內存也會越大(現代計算機,這點內存可以忽略不計),libco默認使用了 60 * 1000 個slot 。

addTimer()是添加一個Timer到TimeWheel上,需要根據傳遞的timeout參數,計算出該Timer所對應的slot (slot = (curslot_ + (timeout % nslosts_)) % nslosts_;) ,還有到這個Timer超時時間輪的指針需要轉過的圈數(timeout / nslosts_)。

delTimer() 根據Timer*參數,刪除時間輪上對應的Timer。

tick() 時間輪的指針走動一下。同時遍歷當前slot上鏈表里的每一個Timer,如果Timer的圈數大於0,將Timer里的圈數-1,否則激活這個Timer。

takeAllTimeout() 是必要的,由於誤差的存在,每次epoll_wait超時后(一個基准頻率),時間輪可能需要走動好幾步。如果每次epoll_wait后直接tick()而不是takeAllTimeout(),會導致誤差一直被累積,時間輪上剩余的定時器被滯后觸發。

時間輪的實現我也放到了Github上:https://gist.github.com/baixiangcpp/63278c0087201a655f940ab8de543abd。使用方式和之前的最小堆實現的基本是一樣的。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int dispatch()
{
...
TimeWheel tw(60 * 1000);
tw.addTimer(1000, []() { std::cout << "hello world" << std::endl; }, NULL);
tw.addTimer(5000, []() { std::cout << "hello baixiancpp" << std::endl; }, NULL);
for(;;)
{
int ret = epoll_wait(epollfd,events,events_num,1); // 基准頻率 1ms
// tw.tick(); 不要這么做,會導致誤差累積
tw.takeAllTimeout();
}
...
}

最小堆和時間輪的時間復雜度,如下:

Type add exec
list O(1) O(n)
min-heap O(lgn) O(1)
time wheel O(1) O(n)

這里我只列出添加定時器和觸發定時器的時間復雜度,因為這幾種實現中,刪除一個定時器可以優化到O(1)的時間復雜度————把其對應的回調函數置空。前面的例子我沒有這么做,僅為展示,有興趣的話可以自行修改。

乍看下來,時間輪的復雜度和鏈表是一樣的。其實不然,時間輪上觸發一個定時器,僅僅是理論上的O(n),只要slot的數量設置合理,時間復雜度會下降至接近O(1)。

可以根據實際需要,選擇合適的定時器容器。

要不要用Timerfd?

開篇的表格里有提到,從Linux2.6.25開始,timerfd系列API,帶來了一種全新的定時機制。把超時事件轉換為了文件描述符,當超時發生后該文件描述符會變成可讀。於是超時事件就變成了普通的IO事件。如果未對timerfd設置阻塞,對其read操作會一直阻塞到超時發生。此外timerfd的精度達到了納秒級。不考慮跨平台等因素,這是一個非常不錯的選擇。

libevent2.1的源碼里也支持timerfd了,在版本說明里也很明確了說明了使用多路復用的超時參數和使用timerfd之間的差異 ,它使用了兩個詞”efficient”和”precise”,分別表示這種實現之間的差異,我想着這還是非常有說服力的。

每個超時事件獨享一個timerfd

如果對於每一個超時事件都用timerfd_create()創建一個對應的fd,放到epoll中統一管理。這樣的做法是不合適的。每增加一個定時事件,都需要額外的3個系統調用:

timerfd

此外,文件描述符還是稀缺的資源,每個進程能夠使用的文件描述符是受系統限制的,如果定時器過多,會造成嚴重的浪費。

這種方式的定時器,比較容易實現,這里我就不再浪費篇幅了。

所有超時事件共享一個timerfd

libevent就是使用的這種方式。定時時間仍然使用最小堆來保存,每個event loop共享同一個timerfd。每次事件循環之前,取出最近的一個超時的時間,將這個timerfd設置為這個超時時間。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int epoll_dispatch( ...)
{
...
if (epollop->timerfd >= 0)
{
struct itimerspec is;
is.it_value.tv_sec = tv->tv_sec;
is.it_value.tv_nsec = tv->tv_usec * 1000;
timerfd_settime(epollop->timerfd, 0, &is, NULL);
}

res = epoll_wait(epollop->epfd, events, epollop->nevents, -1);

for (i = 0; i < res; i++)
{
if (events[i].data.fd == epollop->timerfd)
;//
}
}

這樣的改進規避了前一種方式提到的造成文件描述符資源浪費的問題,僅僅需要1個額外的文件描述符。

額外的系統調用從額外的3個,降到了1個。而且還有改進的空間,只有當棧頂的timeout變化時,才調用timerfd_settime()改變。

這種方式實現的定時器,精度提高了但是多了1個額外的系統調用。libevent把選擇權給了用戶,用戶可以根據實際情況在創建event base的時候是否配置EVENT_BASE_FLAG_PRECISE_TIMER宏而選擇使用哪個定時器實現。

總結

std::priority_queue是一個容器適配器,底層的容器默認使用的std::vector(make_heap())。但是這不意味着往std::priority_queue插入一個元素的開銷是O(n),C++標准對此實現有要求,可以放心大膽的去用。但是std::priority_queue沒有提供高效刪除元素的接口,我們可以通過將回調函數置空的方式,以O(1)的時間復雜度實現刪除。

以C++實現的muduo網絡庫使用的是std::set集合存放Timer:

1
2
3
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
TimerList timers_;

實際上std::set實現應該是二叉搜索樹,因此效率可能會比用std::priority_queue略差一點(《linux多線程網絡編程》 8.2 )。

此外,libev 允許使用一個宏EV_USE_4HEAP指定以一個4-heap的數據結構保存定時器,據說效率更高,我也沒有測試。

以上就是目前一些c/c++語言實現的網絡庫里邊定時器常用的設計手法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM