muduo筆記 網絡庫(四)TimerQueue定時器隊列


網絡編程中,有一類非常重要的事件,跟IO事件沒有直接聯系,而是內部產生的事件,即定時事件。

muduo網絡庫中的定時功能是如何實現的呢?
傳統的Reactor通過控制select(2)和poll(2)的等待時間,來實現定時,而Linux中,可以用timerfd來實現。前面講過,timerfd是Linux特有的定時器,能有效融入select/poll/epoll框架,來做超時事件處理。

參見:Linux定時器timerfd用法


timerfd簡要介紹

timerfd的特點是有一個與之關聯fd,可綁定Channel,交由Poller監聽感興趣的事件(讀、寫等)。
timerfd 3個接口: timerfd_create,timerfd_settime,timerfd_gettime。

#include <sys/timerfd.h>

/* 創建一個定時器對象, 返回與之關聯的fd
* clockid 可指定為CLOCK_REALTIME(系統范圍時鍾)或CLOCK_MONOTONIC(不可設置的時鍾,不能手動修改)
* flags 可指定為TFD_NONBLOCK(為fd設置O_NONBLOCK),TFD_CLOEXEC(為fd設置close-on-exec)
*/
int timerfd_create(int clockid, int flags);

/* 啟動或停止綁定到fd的定時器
 * flags 指定0:啟動一個相對定時器,由new_value->it_value指定相對定時值;TFD_TIMER_ABSTIME啟動一個絕對定時器,由new_value->it_value指定定時值
 * old_value 保存舊定時值
 */
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);

/* 獲取fd對應定時器的當前時間值  */
int timerfd_gettime(int fd, struct itimerspec *curr_value);

定時功能相關類

muduo定時功能如何將timerfd融入select/poll/select框架?
由3個class實現:TimerID、Timer、TimerQueue。用戶可見的只有TimerId。

Timestamp類是時間戳類,用來保存超時時刻(精確到1us),保存的是UTC時間,即從 Unix Epoch(1970-01-01 00:00:00)到指定時間的微秒數。

Timer類對應一個超時任務,保存了超時時刻Timestamp,超時回調函數,以及超時任務類型(一次 or 周期)。

TimerId類用於保存Timer對象,以及獨一無二的id。

TimerQueue類用於設置所有超時任務(Timer),需要高效組織尚未到期的Timer,快速查找已到期Timer,以及高效添加和刪除Timer。TimerQueue用std::set存儲 ,set會對Timer按到期時間先后順序進行二叉搜索樹排序,時間復雜度O(logN)。

TimerQueue的定時接口並不是直接暴露給庫的使用者的,而是通過EventLoop的runAfter和runEvery來運行用戶任務的。其中,runAfter延遲固定秒數后運行一次指定用戶任務;runEvery延遲固定秒數后運行用戶任務,后續以指定周期運行用戶任務。

TimerQueue回調用戶代碼onTimer()的時序:

時序圖里的TimerQueue獲取超時Timer(getExpired())后,User及onTimer()是指用戶自定義的超時處理函數,並非庫本身的。

與普通Channel事件一樣,超時任務TimerQueue也會使用一個Channel,專門用於綁定timerfd,交由Poller監聽,發生可讀事件(代表超時)后加入激活通道列表,然后EventLoop::loop()逐個Channel調用對應的回調,從而處理超時事件。
注意:一個EventLoop只持有一個TimerQueue對象,而TimerQueue通過std::set持有多個Timer對象,但只會設置一個Channel。


Timer類

Timer類代表一個超時任務,但並不直接綁定Channel。Timer主要包含超時時刻(expiration_),超時回調(callback_),周期時間值(interval_),全局唯一id(sequence_)。

其聲明如下:

/**
* 用於定時事件的內部類
*/
class Timer : noncopyable
{
public:
    Timer(TimerCallback cb, Timestamp when, double interval)
    : callback_(std::move(cb)),
    expiration_(when),
    interval_(interval),
    repeat_(interval > 0.0),
    sequence_(s_numCreated_.incrementAndGet())
    { }
    /* 運行超時回調函數 */
    void run() const
    {
        callback_();
    }
    /* 返回超時時刻 */
    Timestamp expiration() const { return expiration_; }
    /* 周期重復標志 */
    bool repeat() const { return repeat_; }
    /* 全局唯一序列號, 用來表示當前Timer對象 */
    int64_t sequence() const { return sequence_; }
    /* 重啟定時器, 只對周期Timer有效(repeat_為true) */
    void restart(Timestamp now);
    /* 當前創建的Timer對象個數, 每新建一個Timer對象就會自增1 */
    static int64_t numCreated() { return s_numCreated_.get(); }

private:
    const TimerCallback callback_; /* 超時回調 */
    Timestamp expiration_;         /* 超時時刻 */
    const double interval_;        /* 周期時間, 單位秒, 可用來結合基礎時刻expiration_, 計算新的時刻 */
    const bool repeat_;            /* 重復標記. true: 周期Timer; false: 一次Timer */
    const int64_t sequence_;       /* 全局唯一序列號 */

    // global increasing number, atomic. help to identify different Timer
    static AtomicInt64 s_numCreated_; /* 類變量, 創建Timer對象的個數, 用來實現全局唯一序列號 */
};

每當創建一個新Timer對象時,原子變量s_numCreated_就會自增1,作為全劇唯一序列號sequence_,用來標識該Timer對象。

  • 周期Timer

創建Timer時,超時時刻when決定了回調超時事件時間點,而interval決定了Timer是一次性的,還是周期性的。如果是周期性的,會在TimerQueue::reset中,調用Timer::restart,在當前時間點基礎上,重啟定時器。

  • restart函數

restart重啟Timer,根據Timer是否為周期類型,分為兩種情況:
1)周期Timer,restart將重置超時時刻expiration_為當前時間 + 周期間隔時間;
2)非周期Timer,即一次性Timer,將restart將expiration_置為無效時間(默認自UTC Epoch以來的微妙數為0);

void Timer::restart(Timestamp now)
{
    if (repeat_)
    {
        expiration_ = addTime(now, interval_);
    }
    else
    {
        expiration_ = Timestamp::invalid();
    }
}

TimerId類

TimerId來主要用來作為Timer的唯一標識,用於取消(canceling)Timer。

其實現代碼很簡單:

/**
* An opaque identifier, for canceling Timer.
*/
class TimerId : public muduo::copyable
{
public:
    TimerId()
    : timer_(NULL),
    sequence_(0)
    { }

    TimerId(Timer* timer, int64_t seq)
    : timer_(timer),
    sequence_(seq)
    { }

    // default copy-ctor, dtor and assignment are okay

    friend class TimerQueue;

private:
    Timer* timer_;
    int64_t sequence_;
};

注意:TimerId並不直接生成Timer序列號sequence_,這是由Timer來生成的,通過構造函數傳遞給TimerId。而生成Timer標識的方式,在Timer類介紹中也提到過,只需要創建一個Timer對象即可,然后通過Timer::sequence()方法就可以取得該序列號。


TimerQueue類

定時器隊列TimerQueue是定時功能的核心,由所在EventLoop持有,綁定一個Channel,同時維護多個定時任務(Timer)。為用戶(EventLoop)提供添加定時器(addTimer)、取消定時器(cancel)接口。

同樣是定時,TimerQueue與Timer有什么區別?

TimerQueue包含2個Timer集合:
1)timers_定時器集合:包含用戶添加的所有Timer對象,std::set會用AVL搜索樹,對集合元素按時間戳(Timestamp)從小到大順序;
2)activeTimers_激活定時器集合:包含激活的Timer對象,與timers_包含的Timer對象相同,個數也相同,std::set會根據Timer*指針大小,對元素進行排序;3)cancelingTimers_取消定時器集合:包含所有取消的Timer對象,與activeTimers_相對。

注意:timers_和activeTimers_的類型並不相同,只是包含的Timer*相同。cancelingTimers_和activeTimers_的類型相同。

這也是TimerQueue並非Timer的原因,是一個Timer集合,根據其時間戳大小進行排序,更像是一個隊列,先到期的先觸發超時事件。因此,可稱為Timer隊列,即TimerQueue。

調用TimerQueue::addTimer的,只有EventLoop中這3個函數:

/**
* 定時功能,由用戶指定絕對時間
* @details 每為定時器隊列timerQueue添加一個Timer,
* timerQueue內部就會新建一個Timer對象, TimerId就保含了這個對象的唯一標識(序列號)
* @param time 時間戳對象, 單位1us
* @param cb 超時回調函數. 當前時間超過time代表時間時, EventLoop就會調用cb
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
    return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

/**
* 定時功能, 由用戶相對時間, 通過runAt實現
* @param delay 相對時間, 單位s, 精度1us(小數)
* @param cb 超時回調
*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, std::move(cb));
}

/**
* 定時功能, 由用戶指定周期, 重復運行
* @param interval 運行周期, 單位s, 精度1us(小數)
* @param cb 超時回調
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(std::move(cb), time, interval);
}

下面是TimerQueue中,3個集合相關的類型及成員定義:

    typedef std::pair<Timestamp, Timer*> Entry;
    typedef std::set<Entry> TimerList;
    typedef std::pair<Timer*, int64_t> ActiveTimer;
    typedef std::set<ActiveTimer> ActiveTimerSet;

    // Timer list sorted by expiration
    /* 用戶添加的所有Timer對象集合
     * 需要為set元素比較實現operator< */
    TimerList timers_;

    // for cancel()
    ActiveTimerSet activeTimers_;
    bool callingExpiredTimers_; /* atomic */
    ActiveTimerSet cancelingTimers_;

TimerQueue聲明

除了前面提到的3個集合相關類型及成員,其他成員函數和變量聲明如下:

/**
* 定時器隊列.
* 不能保證回調能及時調用.
*
* 只能在所在loop線程中運行, 因此線程安全是非必須的
*/
class TimerQueue : noncopyable
{
public:
    explicit TimerQueue(EventLoop* loop);
    ~TimerQueue();
    /*
     * 添加一個定時器.
     * 運行到指定時間, 調度相應的回調函數.
     * 如果interval參數 > 0.0, 就周期重復運行.
     * 必須線程安全: 可能會由其他線程調用
     */
    TimerId addTimer(TimerCallback cb, Timestamp when, double interval);
    /* 取消指定TimerId的定時器 */
    void cancel(TimerId);

private:
    ...
    void addTimerInLoop(Timer* timer);
    void cancelInLoop(TimerId timerId);
    // called when timerfd alarms
    void handleRead();
    // move out all expired timers
    std::vector<Entry> getExpired(Timestamp now);
    void reset(const std::vector<Entry>& expired, Timestamp now);

    bool insert(Timer* timer);

    EventLoop* loop_;
    const int timerfd_;
    Channel timerfdChannel_; // watch readable event of timerfd
    ...
}

TimerQueue所屬EventLoop對象,通過一個EventLoop*來傳遞,注意這是一個raw pointer,而非smart pointer。EventLoop對象與TimerQueue對象生命周期相同,而且只會通過EventLoop對象來調用TimerQueue對象方法,因此不存在與之相關的內存泄漏或非法訪問的問題。

TimerQueue構造函數

TimerQueue::TimerQueue(EventLoop *loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
    timerfdChannel_.setReadCallback(std::bind(&TimerQueue::handleRead, this));
    // we are always reading the timerfd, we disrm it with timerfd_settime.
    timerfdChannel_.enableReading();
}

構造TimerQueue對象時,就會綁定TimerQueue所屬EventLoop,即創建TimerQueue的EventLoop對象。
另外,調用Channel::enableReading(),會將通道事件加入Poller的監聽通道列表中。

交給Poller監聽的timerfd,是由createTimerfd創建的:

int createTimerfd()
{
    // create timers that notify via fd
    int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
    if (timerfd < 0)
    {
        LOG_SYSFATAL << "Failed in timerfd_create";
    }
    return timerfd;
}

TimerQueue析構

析構有2點需要注意:
1)在remove綁定的通道前,要先disableAll停止監聽所有通道事件;
2)timers_中Timer對象是在TimerQueue::addTimer中new出來的,需要手動delete;
另外,對注釋“do not remove channel, since we're in EventLoop::dtor();”並不明白是何用意。

TimerQueue::~TimerQueue()
{
    // 關閉所有(通道)事件, Poller不再監聽該通道
    timerfdChannel_.disableAll();
    // 如果正在處理該通道, 會從激活的通道列表中移除, 同時Poller不再監聽該通道
    timerfdChannel_.remove();
    // 關閉通道對應timerfd
    ::close(timerfd_);

    // FIXME: I dont understand why "do not remove channel". What does it mean?
    // do not remove channel, since we're in EventLoop::dtor();

    // TimerQueue::addTimer中new出來的Timer對象, 需要手動delete
    for (const Entry& timer : timers_)
    {
        delete timer.second;
    }
}

TimerQueue重要接口

addTimer 添加定時器

注意到addTimer 會在構造一個Timer對象后,將其添加到timers_的工作轉交給addTimerInLoop完成了。這是為什么?

因為調用EventLoop::runAt/runEvery的線程,可能並非TimerQueue的loop線程,而修改TimerQueue數據成員時,必須在所屬loop線程中進行,因此需要通過loop_->runInLoop將工作轉交給所屬loop線程。
runInLoop:如果當前線程是所屬loop線程,則直接運行函數;如果不是,就排隊到所屬loop線程末尾,等待運行。

/**
* 添加一個定時器.
* @details 運行到指定時間點when, 調度相應的回調函數cb.
* 如果interval參數 > 0.0, 就周期重復運行.
* 可能會由其他線程調用, 需要讓對TimerQueue數據成員有修改的部分, 在所屬loop所在線程中運行.
* @param cb 超時回調函數
* @param when 觸發超時的時間點
* @param interval 循環周期. > 0.0 代表周期定時器; 否則, 代表一次性定時器
* @return 返回添加的Timer對應TimerId, 用來標識該Timer對象
*/
TimerId TimerQueue::addTimer(TimerCallback cb, Timestamp when, double interval)
{
    Timer* timer = new Timer(std::move(cb), when, interval);
    loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); // 轉交所屬loop線程運行
    return TimerId(timer, timer->sequence());
}

/**
* 在loop線程中添加一個定時器.
* @details addTimerInLoop 必須在所屬loop線程中運行
*/
void TimerQueue::addTimerInLoop(Timer *timer)
{
    loop_->assertInLoopThread();
    bool earliestChanged = insert(timer);

    if (earliestChanged)
    {
        resetTimerfd(timerfd_, timer->expiration());
    }
}

addTimerInLoop的主要工作由2個函數來完成:insert,resetTimerfd。

/**
* 插入一個timer指向的定時器
* @details timers_是std::set<std::pair<Timestamp, Timer*>>類型, 容器會自動對元素進行排序,
* 默認先按pair.first即Timestamp進行排序, 其次是pair.second(.first相同情況下才比較second),
* 這樣第一個元素就是時間戳最小的元素.
* @return 定時器timer當前是否已經超時
* - true timers_為空或已經超時
* - false timers_非空, 且最近的一個定時器尚未超時
*/
bool TimerQueue::insert(Timer *timer)
{
    loop_->assertInLoopThread();
    assert(timers_.size() == activeTimers_.size());
    bool earliestChanged = false;
    Timestamp when = timer->expiration(); // 超時時刻
    TimerList::iterator it = timers_.begin();
    if (it == timers_.end() || when < it->first)
    { // 定時器集合為空 或者 新添加的timer已經超時(因為it指向的Timer超時時刻是距離當前最近的)
        earliestChanged = true; // timer已經超時
    }

    // 同時往timers_和activeTimers_集合中, 添加timer
    // 注意: timers_和activeTimers_元素類型不同, 但所包含的Timer是相同的, 個數也相同

    { // ensure insert new timer to timers_ successfully
        std::pair<TimerList::iterator, bool> result
        = timers_.insert(Entry(when, timer));
        assert(result.second); (void)result;
    }

    { // ensure insert new timer to activeTimers_ successfully
        std::pair<ActiveTimerSet::iterator, bool> result
        = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
        assert(result.second); (void)result;
    }

    assert(timers_.size() == activeTimers_.size());
    return earliestChanged;
}

cancel 取消定時器

一個已超時的定時器,會通過TimerQueue::getExpired自動清除,但一個尚未到期的定時器如何取消?
可以通過調用TimerQueue::cancel。類似於addTimer,cancel也可能在別的線程被調用,因此需要將其轉交給cancelInLoop執行。

/**
* 取消一個定時器, 函數可能在別的線程調用
* @param timerId 每個定時器都有一個唯一的TimerId作為標識
*/
void TimerQueue::cancel(TimerId timerId)
{
    loop_->runInLoop(
            std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

/**
* 在所屬loop線程中, 取消一個定時器
* @details 同時擦出timers_, activeTimers_中包含的Timer對象, timerId用來查找該Timer對象.
* @param timerId 待取消Timer的唯一Id標識
*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
    loop_->assertInLoopThread(); // 確保當前線程是所屬loop線程
    assert(timers_.size() == activeTimers_.size());
    ActiveTimer timer(timerId.timer_, timerId.sequence_);
    ActiveTimerSet::const_iterator it = activeTimers_.find(timer);
    if (it != activeTimers_.end())
    {
        // 注意timers_和activeTimers_的Timer指針指向相同對象, 只能delete一次
        size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
        assert(n == 1); (void)n;
        delete it->first; // FIXME: no delete please
        activeTimers_.erase(it);
    }
    else if (callingExpiredTimers_)
    { // 如果正在處理超時定時器
        cancelingTimers_.insert(timer);
    }
    assert(timers_.size() == activeTimers_.size());
}

handleRead處理TimerQueue上所有超時任務

handleRead有幾個要點:
1)必須在所在loop線程運行;
2)可能不止一個定時任務超時,可用getExpired()獲取;
3)所有超時任務執行完后,重置周期定時任務,釋放一次性定時任務;

/**
* 處理讀事件, 只能是所屬loop線程調用
* @details 當PollPoller監聽到超時發生時, 將channel加入激活通道列表, loop中回調
* 事件處理函數, TimerQueue::handleRead.
* 發生超時事件時, 可能會有多個超時任務超時, 需要通過getExpired一次性全部獲取, 然后逐個執行回調.
* @note timerfd只會發生讀事件.
*/
void TimerQueue::handleRead()
{
    loop_->assertInLoopThread();
    Timestamp now(Timestamp::now());
    readTimerfd(timerfd_, now);

    std::vector<Entry> expired = getExpired(now); // 獲取所有超時任務

    // 正在調用超時任務回調時, 先清除取消的超時任務cancelingTimers_, 再逐個執行超時回調.
    // 可由getExpired()獲取的所有超時任務.
    callingExpiredTimers_ = true;
    cancelingTimers_.clear();
    // safe to callback outside critical section
    for (const Entry& it : expired)
    {
        it.second->run(); // 通過Timer::run()回調超時處理函數
    }
    callingExpiredTimers_ = false;
    // 重置所有已超時任務
    reset(expired, now);
}

getExpired以參數時間點now為界限,查找set timers_中所有超時定時任務(Timer)。set會對timers_元素進行排序,std::set::lower_bound()會找到第一個時間點 < now時間點的定時任務。

getExpired調用reset重置所有超時的周期定時任務,釋放超時的一次性任務。

/**
* 定時任務超時時, 從set timers_中取出所有的超時任務, 以vector形式返回給調用者
* @note 注意從set timers_要和從set activeTimers_同步取出超時任務, 兩者保留的定時任務是相同的
* @param now 當前時間點, 用來判斷從set中的定時器是否超時
* @return set timers_中超時的定時器
*/
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
    assert(timers_.size() == activeTimers_.size());
    std::vector<Entry> expired;
    Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
    // end.key >= sentry.key, Entry.key is pair<Timestamp, Timer*>
    // in that end.key.second < sentry.key.second(MAX PTR)
    // => end.key == sentry.key is impossible
    // => end.key > sentry.key
    TimerList::iterator end = timers_.lower_bound(sentry);
    assert(end == timers_.end() || now < end->first);
    std::copy(timers_.begin(), end, back_inserter(expired));
    timers_.erase(timers_.begin(), end);

    for (const Entry& it : expired)
    {
        ActiveTimer timer(it.second, it.second->sequence());
        size_t n = activeTimers_.erase(timer);
        assert(n == 1); (void)n;
    }

    assert(timers_.size() == activeTimers_.size());
    return expired;
}

/**
* 根據指定時間now重置所有超時任務, 只對周期定時任務有效
* @param expired 所有超時任務
* @param now 指定的reset基准時間點, 新的超時時間點以此為基准
*/
void TimerQueue::reset(const std::vector<Entry> &expired, Timestamp now)
{
    Timestamp nextExpire;

    for (const Entry& it : expired)
    {
        ActiveTimer timer(it.second, it.second->sequence());
        // 只重置周期定時任務和沒有取消的定時任務, 釋放一次性超時的定時任務
        if (it.second->repeat()
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
        {
            it.second->restart(now);
            insert(it.second);
        }
        else
        {
            // FIXME move to a free list
            delete it.second; // FIXME: no delete please
        }
    }

    // 根據最近的尚未達到的超時任務, 重置timerfd下一次超時時間
    if (!timers_.empty())
    {
        nextExpire = timers_.begin()->second->expiration();
    }

    if (nextExpire.valid())
    {
        resetTimerfd(timerfd_, nextExpire);
    }
}

參考

https://blog.csdn.net/sinat_35261315/article/details/78324227


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM