事件驅動與EventLoop
前面(muduo筆記 網絡庫(一)總覽)講過,muduo網絡庫處理事件是Reactor模式,one loop per thread,一個線程一個事件循環。這個循環稱為EventLoop,這種以事件為驅動的編程模式,稱為事件驅動模式。
這種事件驅動模型要求所有任務是非阻塞的,其典型特點是:
如果一個任務需要很長時間才能完成,或者中間可能導致阻塞,就需要對任務進行分段,將其設置為非阻塞的,每次監聽到前次任務完成,觸發事件回調,從而接着完成后續任務。例如,要發送一個大文件,可以先發送一段,完成后,在寫完成事件回調中又發送下一段,這樣每次都發生一段,從而完成整個文件發送。
EventLoop是實現事件驅動模型的關鍵之一。核心是為線程提供運行循環,不斷監聽事件、處理事件,為用戶提供在loop循環中運行的接口。
還是那張圖,EventLoop實現事件驅動相關類圖關系:
聚合關系:has-a,表示擁有的關系,兩種生命周期沒有必然關聯,可以獨立存在。
組合關系:contain-a,表包含的關系,是一種強聚合關系,強調整體與部分,生命周期一致。
EventLoop
EventLoop是一個接口類,不宜暴露太多內部細節給客戶,接口及其使用應盡量簡潔。EventLoop的主要職責是:
1)提供定時執行用戶指定任務的方法,支持一次性、周期執行用戶任務;
2)提供一個運行循環,每當Poller監聽到有通道對應事件發生時,會將通道加入激活通道列表,運行循環要不斷從取出激活通道,然后調用事件回調處理事件;
3)每個EventLoop對應一個線程,不允許一對多或者多對一,提供判斷當前線程是否為創建EventLoop對象的線程的方法;
4)允許在其他線程中調用EventLoop的public接口,但同時要確保線程安全;
下面來看看EventLoop類聲明:
/**
* Reactor模式, 每個線程最多一個EventLoop (One loop per thread).
* 接口類, 不要暴露太多細節給客戶
*/
class EventLoop : public noncopyable
{
public:
typedef std::function<void()> Functor;
EventLoop();
~EventLoop(); // force out-line dtor, for std::unique_ptr members.
/* loop循環, 運行一個死循環.
* 必須在當前對象的創建線程中運行.
*/
void loop();
/*
* 退出loop循環.
* 如果通過原始指針(raw pointer)調用, 不是100%線程安全;
* 為了100%安全, 最好通過shared_ptr<EventLoop>調用
*/
void quit();
/*
* Poller::poll()返回的時間, 通常意味着有數據達到.
* 對於PollPoller, 是調用完poll(); 對於EPollPoller, 是調用完epoll_wait()
*/
Timestamp pollReturnTime() const { return pollReturnTime_; }
/* 獲取loop循環次數 */
int64_t iterator() const { return iteration_; }
/*
* 在loop線程中, 立即運行回調cb.
* 如果沒在loop線程, 就會喚醒loop, (排隊)運行回調cb.
* 如果用戶在同一個loop線程, cb會在該函數內運行; 否則, 會在loop線程中排隊運行.
* 因此, 在其他線程中調用該函數是安全的.
*/
void runInLoop(Functor cb);
/* 排隊回調cb進loop線程.
* 回調cb在loop中完成polling后運行.
* 從其他線程調用是安全的.
*/
void queueInLoop(Functor cb);
/* 排隊的回調cb個數 */
size_t queueSize() const;
// timers
/*
* 在指定時間點運行回調cb.
* 從其他線程調用安全.
*/
TimerId runAt(Timestamp time, TimerCallback cb);
/*
* 在當前時間點+delay延時后運行回調cb.
* 從其他線程調用安全.
*/
TimerId runAfter(double delay, TimerCallback cb);
/*
* 每隔interval sec周期運行回調cb.
* 從其他線程調用安全.
*/
TimerId runEvery(double interval, TimerCallback cb);
/*
* 取消定時器, timerId唯一標識定時器Timer
* 從其他線程調用安全.
*/
void cancel(TimerId timerId);
// internal usage
/* 喚醒loop線程, 沒有事件就緒時, loop線程可能阻塞在poll()/epoll_wait() */
void wakeup();
/* 更新Poller監聽的channel, 只能在channel所屬loop線程中調用 */
void updateChannel(Channel* channel);
/* 移除Poller監聽的channel, 只能在channel所屬loop線程中調用 */
void removeChannel(Channel* channel);
/* 判斷Poller是否正在監聽channel, 只能在channel所屬loop線程中調用 */
bool hasChannel(Channel* channel);
// pid_t threadId() const { return threadId_; }
/* 斷言當前線程是創建當前對象的線程, 如果不是就終止程序(LOG_FATAL) */
void assertInLoopThread();
/* 判斷前線程是否創建當前對象的線程.
* threadId_是創建當前EventLoop對象時, 記錄的線程tid
*/
bool isInLoopThread() const;
/*
* 判斷是否有待調用的回調函數(pending functor).
* 由其他線程調用runAt/runAfter/runEvery, 會導致回調入隊列待調用.
*/
bool callingPendingFunctors() const
{ return callingPendingFunctors_; }
/*
* 判斷loop線程是否正在處理事件, 執行事件回調.
* loop線程正在遍歷,執行激活channels時, eventHandling_會置位; 其余時候, 會清除.
*/
bool eventHandling() const
{ return eventHandling_; }
/* context_ 用於應用程序傳參, 由網絡庫用戶定義數據 */
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
/* 獲取當前線程的EventLoop對象指針 */
static EventLoop* getEventLoopOfCurrentThread();
private:
/* 終止程序(LOG_FATAL), 當前線程不是創建當前EventLoop對象的線程時,
* 由assertInLoopThread()調用 */
void abortNotInLoopThread();
/* 喚醒所屬loop線程, 也是wakeupFd_的事件回調 */
void handleRead(); // waked up
/* 處理pending函數 */
void doPendingFunctors();
/* 打印激活通道的事件信息, 用於debug */
void printActiveChannels() const; // DEBUG
typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic, true表示loop循環執行中 */
std::atomic<bool> quit_; /* loop循環退出條件 */
bool eventHandling_; /* atomic, true表示loop循環正在處理事件回調 */
bool callingPendingFunctors_; /* atomic, true表示loop循環正在調用pending函數 */
int64_t iteration_; /* loop迭代次數 */
const pid_t threadId_; /* 線程id, 對象構造時初始化 */
Timestamp pollReturnTime_; /* poll()返回時間點 */
std::unique_ptr<Poller> poller_; /* 輪詢器, 用於監聽事件 */
std::unique_ptr<TimerQueue> timerQueue_; /* 定時器隊列 */
int wakeupFd_; /* 喚醒loop線程的eventfd */
/* 用於喚醒loop線程的channel.
* 不像TimerQueue是內部類, 不應該暴露Channel給客戶. */
std::unique_ptr<Channel> wakeupChannel_;
boost::any context_; /* 用於應用程序通過當前對象傳參的變量, 由用戶定義數據 */
/* 臨時輔助變量 */
ChannelList activeChannels_; /* 激活事件的通道列表 */
Channel* currentActiveChannel_; /* 當前激活的通道, 即正在調用事件回調的通道 */
mutable MutexLock mutex_;
/* 待調用函數列表, 存放不在loop線程的其他線程調用 runAt/runAfter/runEvery, 而要運行的函數 */
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
EventLoop不可拷貝,因為與之關聯的不僅對象本身,還有線程以及thread local數據等資源。
在這里,可以把EventLoop功能簡要分為這幾大部分:
1)提供運行循環;
2)運行定時任務,一次性 or 周期;
3)處理激活通道事件;
4)線程安全;
對於1),loop()提供運行循環,quit()退出循環,iterator()查詢循環次數,wakeup()用於喚醒loop線程,handleRead()讀取喚醒消息。
對於2),runInLoop()在loop線程中“立即”運行一次用戶任務,runAt()/runAfter()添加一次性定時任務,runEvery()添加周期定時任務,doPendingFunctors()回調所有的pending函數,vector pendingFunctors_用於排隊待處理函數到loop線程執行,queueSize()獲取該vector大小;cancel()取消定時任務。
對於3),updateChannel()/removeChannel()/hasChannel()用於通道更新/移除/判斷,vector activeChannels_存儲當前所有激活的通道,currentActiveChannel_存儲當前正在處理的激活通道;
對於4),isInLoopThread()/assertInLoopThread()判斷/斷言 當前線程是創建當前EventLoop對象的線程,互斥鎖mutex_用來做互斥訪問需要保護數據。
值得一提的是,boost::any類型的成員context_用來給用戶提供利用EventLoop傳數據的方式,相當於C里面的void*,用戶可利用boost::any_cast進行轉型。
EventLoop的構造與析構
EventLoop代表一個事件循環,是one loop per thread的體現,每個線程只能有一個EventLoop對象。
EventLoop構造函數要點:
1)檢查當前線程是否已經創建了EventLoop對象,遇到錯誤就終止程序(LOG_FATAL);
2)記住本對象所屬線程id(threadId_);
析構函數要點:
1)清除當前線程EventLoop指針,便於下次再創建EventLoop對象。
__thread EventLoop* t_loopInThisThread = 0; // thread local變量, 指向當前線程創建的EventLoop對象
EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid()),
{
LOG_DEBUG << "EventLoop create " << this << " in thread " << threadId_;
if (t_loopInThisThread) // 當前線程已經包含了EventLoop對象
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else // 當前線程尚未包含EventLoop對象
{
t_loopInThisThread = this;
}
}
EventLoop::~EventLoop()
{
LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
<< " destructs in thread " << CurrentThread::tid();
t_loopInThisThread = NULL;
}
可以通過thread local變量t_loopInThisThread指向創建的EventLoop對象,來確保每個線程只有一個EventLoop對象。同一個線程內,可通過static函數getEventLoopOfCurrentThread,返回該EventLoop對象指針。
EventLoop *EventLoop::getEventLoopOfCurrentThread() // static
{
return t_loopInThisThread;
}
特定線程檢查,確保線程安全
有些成員函數只能在EventLoop對象所在線程調用,如何檢查該前提條件(pre-condition)?
EventLoop提供了isInLoopThread()、assertInLoopThread(),分別用於判斷、斷言 當前線程為創建EventLoop對象線程。
從下面實現可看到,assertInLoopThread()斷言失敗時,調用abortNotInLoopThread()終止程序(LOG_FATAL)。
void EventLoop::assertInLoopThread() // 斷言當前線程(tid())是調用當前EventLoop對象的持有者線程(threadId_)
{
if (!isInLoopThread())
{
abortNotInLoopThread(); // 斷言失敗則終止程序
}
}
bool EventLoop::isInLoopThread() const // 判斷當前線程是否為當前EventLoop對象的持有者線程
{ return threadId_ == CurrentThread::tid(); }
void EventLoop::abortNotInLoopThread() // LOG_FATAL 終止程序
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}
loop循環
提供運行循環,不斷監聽事件、處理事件。
/**
* 真正的工作循環.
* 獲得所有當前激活事件的通道,用Poller->poll()填到activeChannels_,
* 然后調用Channel::handleEvent()處理每個激活通道.
*
* 最后排隊運行所有pending函數, 通常是其他線程通過loop來調用運行用戶任務
*/
void EventLoop::loop()
{
assert(!looping_); // to avoid reduplicate loop
assertInLoopThread(); // to avoid new EventLoop() and loop() are not one thread
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear(); // 清除激活事件的通道列表
// 監聽所有通道, 可能阻塞線程, 所有激活事件對應通道會填入activeChannels_
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_; // 循環次數+1
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
// 處理所有激活事件
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
// 通過Channel::handleEvent回調事件處理函數
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
// 運行pending函數, 由其他線程請求調用的用戶任務
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
loop線程運行事件回調的關鍵是,用Poller::poll()將激活事件的通道填入通道列表activeChannels_,然后逐一調用每個通道的handleEvent,從而調用為Channel注冊的事件回調來處理事件。
添加、更新、刪除通道
loop循環用來處理激活事件,那用戶如何更新添加、更新、刪除通道事件呢?
前面已經提到,可以用updateChannel/removeChannel 更新/移除 Poller 監聽的通道。
關於 Poller這部分,可參見 muduo筆記 網絡庫(二)I/O復用封裝Poller
/**
* 根據具體poller對象, 來更新通道.
* 會修改poller對象監聽的通道數組.
* @note 必須在channel所屬loop線程運行
*/
void EventLoop::updateChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
/**
* 根據具體poller對象, 來刪除通道.
* 會刪除poller對象監聽的通道數組.
* @note 如果待移除通道正在激活事件隊列, 應該先從激活事件隊列中移除
*/
void EventLoop::removeChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}
另外,可用hasChannel來判斷Poller是否正在監聽channel。
/**
* 判斷poller是否正在監聽通道channel
* @note 必須在channel所屬loop線程運行
*/
bool EventLoop::hasChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
return poller_->hasChannel(channel);
}
定時任務
用戶調用runAt/runAfter/runEvery 運行定時任務,調用cancel取消定時任務。
/**
* 定時功能,由用戶指定絕對時間
* @details 每為定時器隊列timerQueue添加一個Timer,
* timerQueue內部就會新建一個Timer對象, TimerId就保含了這個對象的唯一標識(序列號)
* @param time 時間戳對象, 單位1us
* @param cb 超時回調函數. 當前時間超過time代表時間時, EventLoop就會調用cb
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
return timerQueue_->addTimer(std::move(cb), time, 0.0);
}
/**
* 定時功能, 由用戶相對時間, 通過runAt實現
* @param delay 相對時間, 單位s, 精度1us(小數)
* @param cb 超時回調
*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, std::move(cb));
}
/**
* 定時功能, 由用戶指定周期, 重復運行
* @param interval 運行周期, 單位s, 精度1us(小數)
* @param cb 超時回調
* @return 一個綁定timerQueue內部新增的Timer對象的TimerId對象, 用來唯一標識該Timer對象
*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(std::move(cb), time, interval);
}
/**
* 取消指定定時器
* @param timerId Timer id, 唯一對應一個Timer對象
*/
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}
用戶運行一個loop線程,並添加定時任務示例:
void threadFunc()
{
assert(EventLoop::getEventLoopOfCurrentThread() == NULL); // 斷言當前線程沒有創建EventLoop對象
EventLoop loop; // 創建EventLoop對象
assert(EventLoop::getEventLoopOfCurrentThread() == &loop); // 斷言當前線程創建了EventLoop對象
loop.runAfter(1.0, callback); // 1sec后運行callback
loop.loop(); // 啟動loop循環
}
runInLoop與queueInLoop執行用戶任務
同樣是運行用戶任務函數,runInLoop和queueInLoop都可以被多個線程執行,分為2種情況:1)如果當前線程是創建當前EventLoop對象的線程,那么立即執行用戶任務;2)如果不是,那么在loop循環中排隊執行(本次循環末尾),實際上這點也是由queueInLoop完成的。
queueInLoop只做了runInLoop的第2)種情況的工作,也就是只會在loop循環中排隊執行用戶任務。
為什么要對pendingFunctors_加鎖?
因為queueInLoop可以被多個線程訪問,意味着pendingFunctors_也能被多個線程訪問,加鎖確保線程安全。
/**
* 執行用戶任務
* @param cb 用戶任務函數
* @note 可以被多個線程執行:
* 如果當前線程是創建當前EventLoop對象的線程,直接執行;
* 否則,用戶任務函數入隊列pendingFunctors_成為一個pending functor,在loop循環中排隊執行
*/
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
/**
* 排隊進入pendingFunctors_,等待執行
* @param cb 用戶任務函數
* @note 如果當前線程不是創建當前EventLoop對象的線程 或者正在調用pending functor,
* 就喚醒loop線程,避免loop線程阻塞.
*/
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
eventfd與wakeup()喚醒
有2處可能導致loop線程阻塞:
1)Poller::poll()中調用poll(2)/epoll_wait(7) 監聽fd,沒有事件就緒時;
2)用戶任務函數調用了可能導致阻塞的函數;
而當EventLoop加入用戶任務時,loop循環是沒辦法直接知道的,要避免無謂的等待,就需要及時喚醒loop線程。
muduo用eventfd技術,來喚醒線程。
eventfd原理
eventfd是Linux特有的(Linux 2.6以后),專用於事件通知的機制,類似於管道(pipe)、域套接字(UNIX Domain Socket)。
創建eventfd 函數原型:
#include <sys/eventfd.h>
/* 創建一個文件描述符(event fd), 用於事件通知
* initval 計數初值
* flags 標志位, 如果沒用到可設為0, 也可以用以下選項 按位或 取值:
* EFD_CLOEXEC 為新建的fd設置close-on-exec(FD_CLOEXEC), 等效於以O_CLOEXEC方式open(2)
* EFD_NONBLOCK 等效於fcntl(2)設置O_NONBLOCK
* EFD_SEMAPHORE 將eventfd當信號量一樣調用, read 將導致計數-1, write 將導致計數+1; 如果沒指定該標志, read將返回8byte計數值, 且計數值歸0, write將計數值+指定值.
* 返回 新建的fd, 用於事件通知, 綁定到一個eventfd對象; 失敗, 返回-1
*/
int eventfd(unsigned int initval, int flags);
創建完event fd后,可用read(2)讀取event fd,如果fd是阻塞的,read可能阻塞線程;如果event fd設置了EFD_NONBLOCK,read返回EAGIAN錯誤。直到另外一個線程對event fd進行write。
// 為wakeupChannel_設置讀回調
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_讀事件
wakeupChannel_->enableReading();
eventfd使用示例:
線程1阻塞等待,線程2喚醒線程1。
#include <stdio.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <pthread.h>
#define __STDC_FORMAT_MACROS // for 跨平台打印
#include <inttypes.h>
void* thread_func1(void* arg) /* 等待線程 */
{
int wakeupfd = *(int*)arg;
printf("thread_func1 start\n");
uint64_t rdata;
int ret = read(wakeupfd, &rdata, sizeof(rdata));
if (ret < 0) {
perror("thread_func1 read error");
pthread_exit(NULL);
}
printf("thread_func1 success to be waked up, rdata = %" PRId64 "\n", rdata);
}
void* thread_func2(void* arg) /* 喚醒線程 */
{
int wakeupfd = *(int*)arg;
printf("thread_func2 ready to sleep 1 sec\n");
sleep(1);
uint64_t wdata = 10;
int ret = write(wakeupfd, &wdata, sizeof(wdata));
if (ret < 0) {
perror("thread_func2 write error");
pthread_exit(NULL);
}
printf("thread_func2 success to wake up another thread, wdata = %" PRId64 "\n", wdata);
}
/* 創建2個線程,thread_func1阻塞等待eventfd,thread_func2喚醒等等eventfd的線程 */
int main()
{
int evfd = eventfd(0, 0);
if (evfd < 0) {
perror("eventfd error");
exit(1);
}
pthread_t th1, th2;
pthread_create(&th1, NULL, thread_func1, (void*)&evfd);
pthread_create(&th2, NULL, thread_func2, (void*)&evfd);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
return 0;
}
EventLoop使用eventfd喚醒loop線程
1)創建event fd
構造函數中,wakeupFd_ 初值為createEventfd()
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
abort();
}
return evtfd;
}
2)綁定event fd與喚醒通道wakeupChannel_利用event fd構造一個Channel對象后,傳遞給wakeupChannel_,便於Poller監聽、事件回調
// 為wakeupChannel_設置讀回調
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_讀事件
wakeupChannel_->enableReading();
3)啟動loop循環,可能阻塞在poll(2)/epoll_wait(7)
4)其他線程通過queueInLoop()調用wakeup(),喚醒阻塞的loop線程
/**
* 其他線程喚醒等待在wakeupFd_上的線程, 產生讀就緒事件.
* @note write將添加8byte數據到內部計數器. 被喚醒線程必須調用read讀取8byte數據.
*/
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
5)loop線程被喚醒后,讀取event fd
/**
* 處理wakeupChannel_讀事件
* @note read wakeupfd_
*/
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
參考
https://blog.csdn.net/sinat_35261315/article/details/78329657
https://www.cnblogs.com/ailumiyana/p/10087539.html
eventfd https://blog.csdn.net/qq_28114615/article/details/97929524