網絡編程中,有一類非常重要的事件,跟IO事件沒有直接聯系,而是內部產生的事件,即定時事件。
muduo網絡庫中的定時功能是如何實現的呢?
傳統的Reactor通過控制select(2)和poll(2)的等待時間,來實現定時,而Linux中,可以用timerfd來實現。前面講過,timerfd是Linux特有的定時器,能有效融入select/poll/epoll框架,來做超時事件處理。
timerfd簡要介紹
timerfd的特點是有一個與之關聯fd,可綁定Channel,交由Poller監聽感興趣的事件(讀、寫等)。
timerfd 3個接口: timerfd_create,timerfd_settime,timerfd_gettime。
#include <sys/timerfd.h>
/* 創建一個定時器對象, 返回與之關聯的fd
* clockid 可指定為CLOCK_REALTIME(系統范圍時鍾)或CLOCK_MONOTONIC(不可設置的時鍾,不能手動修改)
* flags 可指定為TFD_NONBLOCK(為fd設置O_NONBLOCK),TFD_CLOEXEC(為fd設置close-on-exec)
*/
int timerfd_create(int clockid, int flags);
/* 啟動或停止綁定到fd的定時器
* flags 指定0:啟動一個相對定時器,由new_value->it_value指定相對定時值;TFD_TIMER_ABSTIME啟動一個絕對定時器,由new_value->it_value指定定時值
* old_value 保存舊定時值
*/
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
/* 獲取fd對應定時器的當前時間值 */
int timerfd_gettime(int fd, struct itimerspec *curr_value);
定時功能相關類
muduo定時功能如何將timerfd融入select/poll/select框架?
由3個class實現:TimerID、Timer、TimerQueue。用戶可見的只有TimerId。
Timestamp類是時間戳類,用來保存超時時刻(精確到1us),保存的是UTC時間,即從 Unix Epoch(1970-01-01 00:00:00)到指定時間的微秒數。
Timer類對應一個超時任務,保存了超時時刻Timestamp,超時回調函數,以及超時任務類型(一次 or 周期)。
TimerId類用於保存Timer對象,以及獨一無二的id。
TimerQueue類用於設置所有超時任務(Timer),需要高效組織尚未到期的Timer,快速查找已到期Timer,以及高效添加和刪除Timer。TimerQueue用std::set存儲 ,set會對Timer按到期時間先后順序進行二叉搜索樹排序,時間復雜度O(logN)。
TimerQueue的定時接口並不是直接暴露給庫的使用者的,而是通過EventLoop的runAfter和runEvery來運行用戶任務的。其中,runAfter延遲固定秒數后運行一次指定用戶任務;runEvery延遲固定秒數后運行用戶任務,后續以指定周期運行用戶任務。
TimerQueue回調用戶代碼onTimer()的時序:
時序圖里的TimerQueue獲取超時Timer(getExpired())后,User及onTimer()是指用戶自定義的超時處理函數,並非庫本身的。
與普通Channel事件一樣,超時任務TimerQueue也會使用一個Channel,專門用於綁定timerfd,交由Poller監聽,發生可讀事件(代表超時)后加入激活通道列表,然后EventLoop::loop()逐個Channel調用對應的回調,從而處理超時事件。
注意:一個EventLoop只持有一個TimerQueue對象,而TimerQueue通過std::set持有多個Timer對象,但只會設置一個Channel。
Timer類
Timer類代表一個超時任務,但並不直接綁定Channel。Timer主要包含超時時刻(expiration_),超時回調(callback_),周期時間值(interval_),全局唯一id(sequence_)。
其聲明如下:
/**
* 用於定時事件的內部類
*/
class Timer : noncopyable
{
public:
Timer(TimerCallback cb, Timestamp when, double interval)
: callback_(std::move(cb)),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
/* 運行超時回調函數 */
void run() const
{
callback_();
}
/* 返回超時時刻 */
Timestamp expiration() const { return expiration_; }
/* 周期重復標志 */
bool repeat() const { return repeat_; }
/* 全局唯一序列號, 用來表示當前Timer對象 */
int64_t sequence() const { return sequence_; }
/* 重啟定時器, 只對周期Timer有效(repeat_為true) */
void restart(Timestamp now);
/* 當前創建的Timer對象個數, 每新建一個Timer對象就會自增1 */
static int64_t numCreated() { return s_numCreated_.get(); }
private:
const TimerCallback callback_; /* 超時回調 */
Timestamp expiration_; /* 超時時刻 */
const double interval_; /* 周期時間, 單位秒, 可用來結合基礎時刻expiration_, 計算新的時刻 */
const bool repeat_; /* 重復標記. true: 周期Timer; false: 一次Timer */
const int64_t sequence_; /* 全局唯一序列號 */
// global increasing number, atomic. help to identify different Timer
static AtomicInt64 s_numCreated_; /* 類變量, 創建Timer對象的個數, 用來實現全局唯一序列號 */
};
每當創建一個新Timer對象時,原子變量s_numCreated_就會自增1,作為全劇唯一序列號sequence_,用來標識該Timer對象。
- 周期Timer
創建Timer時,超時時刻when決定了回調超時事件時間點,而interval決定了Timer是一次性的,還是周期性的。如果是周期性的,會在TimerQueue::reset中,調用Timer::restart,在當前時間點基礎上,重啟定時器。
- restart函數
restart重啟Timer,根據Timer是否為周期類型,分為兩種情況:
1)周期Timer,restart將重置超時時刻expiration_為當前時間 + 周期間隔時間;
2)非周期Timer,即一次性Timer,將restart將expiration_置為無效時間(默認自UTC Epoch以來的微妙數為0);
void Timer::restart(Timestamp now)
{
if (repeat_)
{
expiration_ = addTime(now, interval_);
}
else
{
expiration_ = Timestamp::invalid();
}
}
TimerId類
TimerId來主要用來作為Timer的唯一標識,用於取消(canceling)Timer。
其實現代碼很簡單:
/**
* An opaque identifier, for canceling Timer.
*/
class TimerId : public muduo::copyable
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
{ }
TimerId(Timer* timer, int64_t seq)
: timer_(timer),
sequence_(seq)
{ }
// default copy-ctor, dtor and assignment are okay
friend class TimerQueue;
private:
Timer* timer_;
int64_t sequence_;
};
注意:TimerId並不直接生成Timer序列號sequence_,這是由Timer來生成的,通過構造函數傳遞給TimerId。而生成Timer標識的方式,在Timer類介紹中也提到過,只需要創建一個Timer對象即可,然后通過Timer::sequence()方法就可以取得該序列號。
TimerQueue類
定時器隊列TimerQueue是定時功能的核心,由所在EventLoop持有,綁定一個Channel,同時維護多個定時任務(Timer)。為用戶(EventLoop)提供添加定時器(addTimer)、取消定時器(cancel)接口。
同樣是定時,TimerQueue與Timer有什么區別?
TimerQueue包含2個Timer集合:
1)timers_定時器集合:包含用戶添加的所有Timer對象,std::set會用AVL搜索樹,對集合元素按時間戳(Timestamp)從小到大順序;
2)activeTimers_激活定時器集合:包含激活的Timer對象,與timers_包含的Timer對象相同,個數也相同,std::set會根據Timer*指針大小,對元素進行排序;3)cancelingTimers_取消定時器集合:包含所有取消的Timer對象,與activeTimers_相對。
注意:timers_和activeTimers_的類型並不相同,只是包含的Timer*相同。cancelingTimers_和activeTimers_的類型相同。
這也是TimerQueue並非Timer的原因,是一個Timer集合,根據其時間戳大小進行排序,更像是一個隊列,先到期的先觸發超時事件。因此,可稱為Timer隊列,即TimerQueue。
調用TimerQueue::addTimer的,只有EventLoop中這3個函數:
/**
* 定時功能,由用戶指定絕對時間
* @details 每為定時器隊列timerQueue添加一個Timer,
* timerQueue內部就會新建一個Timer對象, TimerId就保含了這個對象的唯一標識(序列號)
* @param time 時間戳對象, 單位1us
* @param cb 超時回調函數. 當前時間超過time代表時間時, EventLoop就會調用cb
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
return timerQueue_->addTimer(std::move(cb), time, 0.0);
}
/**
* 定時功能, 由用戶相對時間, 通過runAt實現
* @param delay 相對時間, 單位s, 精度1us(小數)
* @param cb 超時回調
*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, std::move(cb));
}
/**
* 定時功能, 由用戶指定周期, 重復運行
* @param interval 運行周期, 單位s, 精度1us(小數)
* @param cb 超時回調
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(std::move(cb), time, interval);
}
下面是TimerQueue中,3個集合相關的類型及成員定義:
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;
// Timer list sorted by expiration
/* 用戶添加的所有Timer對象集合
* 需要為set元素比較實現operator< */
TimerList timers_;
// for cancel()
ActiveTimerSet activeTimers_;
bool callingExpiredTimers_; /* atomic */
ActiveTimerSet cancelingTimers_;
TimerQueue聲明
除了前面提到的3個集合相關類型及成員,其他成員函數和變量聲明如下:
/**
* 定時器隊列.
* 不能保證回調能及時調用.
*
* 只能在所在loop線程中運行, 因此線程安全是非必須的
*/
class TimerQueue : noncopyable
{
public:
explicit TimerQueue(EventLoop* loop);
~TimerQueue();
/*
* 添加一個定時器.
* 運行到指定時間, 調度相應的回調函數.
* 如果interval參數 > 0.0, 就周期重復運行.
* 必須線程安全: 可能會由其他線程調用
*/
TimerId addTimer(TimerCallback cb, Timestamp when, double interval);
/* 取消指定TimerId的定時器 */
void cancel(TimerId);
private:
...
void addTimerInLoop(Timer* timer);
void cancelInLoop(TimerId timerId);
// called when timerfd alarms
void handleRead();
// move out all expired timers
std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);
bool insert(Timer* timer);
EventLoop* loop_;
const int timerfd_;
Channel timerfdChannel_; // watch readable event of timerfd
...
}
TimerQueue所屬EventLoop對象,通過一個EventLoop*來傳遞,注意這是一個raw pointer,而非smart pointer。EventLoop對象與TimerQueue對象生命周期相同,而且只會通過EventLoop對象來調用TimerQueue對象方法,因此不存在與之相關的內存泄漏或非法訪問的問題。
TimerQueue構造函數
TimerQueue::TimerQueue(EventLoop *loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(std::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disrm it with timerfd_settime.
timerfdChannel_.enableReading();
}
構造TimerQueue對象時,就會綁定TimerQueue所屬EventLoop,即創建TimerQueue的EventLoop對象。
另外,調用Channel::enableReading(),會將通道事件加入Poller的監聽通道列表中。
交給Poller監聽的timerfd,是由createTimerfd創建的:
int createTimerfd()
{
// create timers that notify via fd
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}
TimerQueue析構
析構有2點需要注意:
1)在remove綁定的通道前,要先disableAll停止監聽所有通道事件;
2)timers_中Timer對象是在TimerQueue::addTimer中new出來的,需要手動delete;
另外,對注釋“do not remove channel, since we're in EventLoop::dtor();”並不明白是何用意。
TimerQueue::~TimerQueue()
{
// 關閉所有(通道)事件, Poller不再監聽該通道
timerfdChannel_.disableAll();
// 如果正在處理該通道, 會從激活的通道列表中移除, 同時Poller不再監聽該通道
timerfdChannel_.remove();
// 關閉通道對應timerfd
::close(timerfd_);
// FIXME: I dont understand why "do not remove channel". What does it mean?
// do not remove channel, since we're in EventLoop::dtor();
// TimerQueue::addTimer中new出來的Timer對象, 需要手動delete
for (const Entry& timer : timers_)
{
delete timer.second;
}
}
TimerQueue重要接口
addTimer 添加定時器
注意到addTimer 會在構造一個Timer對象后,將其添加到timers_的工作轉交給addTimerInLoop完成了。這是為什么?
因為調用EventLoop::runAt/runEvery的線程,可能並非TimerQueue的loop線程,而修改TimerQueue數據成員時,必須在所屬loop線程中進行,因此需要通過loop_->runInLoop將工作轉交給所屬loop線程。
runInLoop:如果當前線程是所屬loop線程,則直接運行函數;如果不是,就排隊到所屬loop線程末尾,等待運行。
/**
* 添加一個定時器.
* @details 運行到指定時間點when, 調度相應的回調函數cb.
* 如果interval參數 > 0.0, 就周期重復運行.
* 可能會由其他線程調用, 需要讓對TimerQueue數據成員有修改的部分, 在所屬loop所在線程中運行.
* @param cb 超時回調函數
* @param when 觸發超時的時間點
* @param interval 循環周期. > 0.0 代表周期定時器; 否則, 代表一次性定時器
* @return 返回添加的Timer對應TimerId, 用來標識該Timer對象
*/
TimerId TimerQueue::addTimer(TimerCallback cb, Timestamp when, double interval)
{
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); // 轉交所屬loop線程運行
return TimerId(timer, timer->sequence());
}
/**
* 在loop線程中添加一個定時器.
* @details addTimerInLoop 必須在所屬loop線程中運行
*/
void TimerQueue::addTimerInLoop(Timer *timer)
{
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
if (earliestChanged)
{
resetTimerfd(timerfd_, timer->expiration());
}
}
addTimerInLoop的主要工作由2個函數來完成:insert,resetTimerfd。
/**
* 插入一個timer指向的定時器
* @details timers_是std::set<std::pair<Timestamp, Timer*>>類型, 容器會自動對元素進行排序,
* 默認先按pair.first即Timestamp進行排序, 其次是pair.second(.first相同情況下才比較second),
* 這樣第一個元素就是時間戳最小的元素.
* @return 定時器timer當前是否已經超時
* - true timers_為空或已經超時
* - false timers_非空, 且最近的一個定時器尚未超時
*/
bool TimerQueue::insert(Timer *timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false;
Timestamp when = timer->expiration(); // 超時時刻
TimerList::iterator it = timers_.begin();
if (it == timers_.end() || when < it->first)
{ // 定時器集合為空 或者 新添加的timer已經超時(因為it指向的Timer超時時刻是距離當前最近的)
earliestChanged = true; // timer已經超時
}
// 同時往timers_和activeTimers_集合中, 添加timer
// 注意: timers_和activeTimers_元素類型不同, 但所包含的Timer是相同的, 個數也相同
{ // ensure insert new timer to timers_ successfully
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
assert(result.second); (void)result;
}
{ // ensure insert new timer to activeTimers_ successfully
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}
cancel 取消定時器
一個已超時的定時器,會通過TimerQueue::getExpired自動清除,但一個尚未到期的定時器如何取消?
可以通過調用TimerQueue::cancel。類似於addTimer,cancel也可能在別的線程被調用,因此需要將其轉交給cancelInLoop執行。
/**
* 取消一個定時器, 函數可能在別的線程調用
* @param timerId 每個定時器都有一個唯一的TimerId作為標識
*/
void TimerQueue::cancel(TimerId timerId)
{
loop_->runInLoop(
std::bind(&TimerQueue::cancelInLoop, this, timerId));
}
/**
* 在所屬loop線程中, 取消一個定時器
* @details 同時擦出timers_, activeTimers_中包含的Timer對象, timerId用來查找該Timer對象.
* @param timerId 待取消Timer的唯一Id標識
*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread(); // 確保當前線程是所屬loop線程
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
ActiveTimerSet::const_iterator it = activeTimers_.find(timer);
if (it != activeTimers_.end())
{
// 注意timers_和activeTimers_的Timer指針指向相同對象, 只能delete一次
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{ // 如果正在處理超時定時器
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}
handleRead處理TimerQueue上所有超時任務
handleRead有幾個要點:
1)必須在所在loop線程運行;
2)可能不止一個定時任務超時,可用getExpired()獲取;
3)所有超時任務執行完后,重置周期定時任務,釋放一次性定時任務;
/**
* 處理讀事件, 只能是所屬loop線程調用
* @details 當PollPoller監聽到超時發生時, 將channel加入激活通道列表, loop中回調
* 事件處理函數, TimerQueue::handleRead.
* 發生超時事件時, 可能會有多個超時任務超時, 需要通過getExpired一次性全部獲取, 然后逐個執行回調.
* @note timerfd只會發生讀事件.
*/
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);
std::vector<Entry> expired = getExpired(now); // 獲取所有超時任務
// 正在調用超時任務回調時, 先清除取消的超時任務cancelingTimers_, 再逐個執行超時回調.
// 可由getExpired()獲取的所有超時任務.
callingExpiredTimers_ = true;
cancelingTimers_.clear();
// safe to callback outside critical section
for (const Entry& it : expired)
{
it.second->run(); // 通過Timer::run()回調超時處理函數
}
callingExpiredTimers_ = false;
// 重置所有已超時任務
reset(expired, now);
}
getExpired以參數時間點now為界限,查找set timers_中所有超時定時任務(Timer)。set會對timers_元素進行排序,std::set::lower_bound()會找到第一個時間點 < now時間點的定時任務。
getExpired調用reset重置所有超時的周期定時任務,釋放超時的一次性任務。
/**
* 定時任務超時時, 從set timers_中取出所有的超時任務, 以vector形式返回給調用者
* @note 注意從set timers_要和從set activeTimers_同步取出超時任務, 兩者保留的定時任務是相同的
* @param now 當前時間點, 用來判斷從set中的定時器是否超時
* @return set timers_中超時的定時器
*/
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// end.key >= sentry.key, Entry.key is pair<Timestamp, Timer*>
// in that end.key.second < sentry.key.second(MAX PTR)
// => end.key == sentry.key is impossible
// => end.key > sentry.key
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}
assert(timers_.size() == activeTimers_.size());
return expired;
}
/**
* 根據指定時間now重置所有超時任務, 只對周期定時任務有效
* @param expired 所有超時任務
* @param now 指定的reset基准時間點, 新的超時時間點以此為基准
*/
void TimerQueue::reset(const std::vector<Entry> &expired, Timestamp now)
{
Timestamp nextExpire;
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
// 只重置周期定時任務和沒有取消的定時任務, 釋放一次性超時的定時任務
if (it.second->repeat()
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it.second->restart(now);
insert(it.second);
}
else
{
// FIXME move to a free list
delete it.second; // FIXME: no delete please
}
}
// 根據最近的尚未達到的超時任務, 重置timerfd下一次超時時間
if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}
if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}
參考
https://blog.csdn.net/sinat_35261315/article/details/78324227