EventLoop的多線程應用
前面講的EventLoop為一個IO線程提供運行循環(loop),那muduo庫如何支持多線程呢?
- EventLoopThread IO線程類
- EventLoopThreadPool IO線程池類
IO線程池的功能是開啟若干個IO線程,並讓這些IO線程處於線程循環的狀態。
也就是說,EventLoop實現one loop per thread模型中的loop,EventLoopThread 實現的是per thread,EventLoopThreadPool 實現的是multi-thread環境下:one loop per thread。
多個Reactor模型
圖中每個Reactor都是一個線程,mainReactor通常是main線程,關注監聽套接字;subReactor關注的是連接套接字。如果沒有subReactor,所有跟監聽套接字、連接套接字有關的事件,都交由mainReactor處理。
為了簡便,本文下面所有EventLoopThreadPool (事件循環線程池) 統一簡稱IO線程池或線程池,EventLoopThread 統一簡稱IO線程。
EventLoopThreadPool 事件循環線程池類
EventLoopThreadPool 事件循環線程池類對象通常由main線程創建,綁定main線程創建的EventLoop(即baseLoop_),對應mainReactor。該線程池根據用戶指定線程數,創建EventLoopThread對應subReactor。
注意:EventLoopThreadPool 屬於Reactor的一部分,但不等於某個Reactor。
EventLoopThreadPool類聲明
class EventLoopThreadPool : noncopyable // 作為線程池對象,綁定了背后的系統資源,如線程,因此是引用語義(不可拷貝)
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
~EventLoopThreadPool();
/* 設置線程數量, 需要在start()之前調用 */
void setThreadNum(int numThreads)
{ numThreads_ = numThreads; }
/* 啟動線程池, 設置線程函數初始回調 */
void start(const ThreadInitCallback& cb = ThreadInitCallback());
/*
* valid after calling start()
* round-robin(輪詢)
*/
EventLoop* getNextLoop();
/*
* With the same hash code, it will always return the same EventLoop.
*/
EventLoop* getLoopForHash(size_t hashCode);
/* 獲取所有loops(EventLoop數組) */
std::vector<EventLoop*> getAllLoops();
/* 獲取線程池啟動狀態 */
bool started() const
{ return started_; }
/* 獲取線程池名稱 */
const std::string& name() const
{ return name_; }
private:
EventLoop* baseLoop_; // 與Acceptor所屬EventLoop相同
std::string name_; // 線程池名稱, 通常由用戶指定. 線程池中EventLoopThread名稱依賴於線程池名稱
bool started_; // 線程池是否啟動標志
int numThreads_; // 線程數
int next_; // 新連接到來,所選擇的EventLoopThread下標
std::vector<std::unique_ptr<EventLoopThread>> threads_; // IO線程列表
std::vector<EventLoop*> loops_; // EventLoop列表, 指向的是EventLoopThread線程函數創建的EventLoop對象
};
EventLoopThreadPool的構造與析構
構造函數很簡單,對需baseLoop_、name_、started等進行了初始化。
有個問題一直困擾自己:為什么EventLoop指針 baseLoop,沒有通過智能指針管理內存?
這里可以得到解決,因為baseLoop通常是main線程創建的棧變量,loops_數組(std::vector<EventLoop*>)中的EventLoop對象是線程函數創建的棧變量。當離開線程作用域時,棧變量會自動釋放。因此,在這之前,不要delete loop對象。
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: baseLoop_(baseLoop), // 指向基礎的EventLoop對象
name_(nameArg), // 線程池名稱
started_(false), // 啟動狀態
numThreads_(0), // 線程數量
next_(0) // 下一個EventLoopThread位於threads_數組中的下標
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}
start() 啟動IO線程池
IO線程池在創建后,通過調用start()啟動線程池。主要工作:
1)確保baseLoop所屬線程調用start;
2)創建用戶指定線程組,啟動線程組線程,並記錄子線程對應EventLoop;
3)如果沒有指定線程數量(或為指定0),調用用戶指定的線程函數初始回調。
/**
* 啟動IO線程池.
* 只能啟動一次, 而且必須是baseLoop_的創建線程調用start().
* @param cb 線程函數初始回調
*/
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
assert(!started_); // 防止重復啟動線程池
baseLoop_->assertInLoopThread(); // 斷言baseLoop_對象創建者是線程池的start()調用者
started_ = true; // 標記線程池已啟動
// 根據用戶指定線程數, 創建IO線程組
/* create numThreads_ EventLoopThread, added to threads_ */
for (int i = 0; i < numThreads_; ++i) { // 線程編號范圍取決於用戶指定的線程數
char buf[name_.size() + 32];
snprintf(buf, sizeof(buf), "%s%d", name_.c_str(), i); // IO線程名稱: 線程池名稱 + 線程編號
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t)); // 將EventLoopThread對象指針 插入threads_數組
loops_.push_back(t->startLoop()); // 啟動IO線程, 並將線程函數創建的EventLoop對象地址 插入loops_數組
}
if (numThreads_ == 0 && cb)
{ // 如果沒有創建任何線程, 也會調用回調cb; 否則, 會在新建的線程函數初始化完成后(進入loop循環前)調用
cb(baseLoop_);
}
}
用戶端TcpServer調用start()。可以看到threadInitCallback_是由TcpServer傳入的,而TcpServer::threadInitCallback_是由更上一層級的用戶傳入。
/**
* 啟動TcpServer, 初始化線程池, 連接接受器Accept開始監聽(Tcp連接請求)
*/
void TcpServer::start()
{
if (started_.getAndSet(1) == 0) // 防止多次重復啟動
{
threadPool_->start(threadInitCallback_); // 啟動線程池, 並設置線程初始化完成的回調函數
assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
分派任務給IO線程的利器:getNextLoop()
每當有一個新Tcp連接建立時,TcpServer調用newConnection新建一個TcpConnection對象負責該連接。然而,如何將TcpConnection對象分派給一個IO線程對應的EventLoop對象呢?
這就可以利用getNextLoop(),從IO線程池維護的EventLoop數組loops_中輪詢取得一個EventLoop對象,每次調用數組下標+1,這樣得到負載均衡的目的。
/**
* 從線程池獲取下一個event loop
* @note 默認event loop是baseLoop_ (創建baseLoop_線程, 通常也是創建線程池的線程).
* 沒有調用setThreadNum()設置numThreads_(number of threads)時, numThreads_默認為0,
* 所有IO操作都默認交由baseLoop_的event loop來完成, 因為沒有其他IO線程.
*/
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
assert(started_);
EventLoop* loop = baseLoop_;
// 如果loops_為空, 則loop指向baseLoop
// 如果非空, 則按round-robin(RR, 輪叫)的調度方式(從loops_列表中)選擇一個EventLoop
if (!loops_.empty())
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
當然,TcpServer所謂分派TcpConnection給IO線程,實際上就是將TcpConnection對象在構造時,綁定到指定EventLoop對象。
部分代碼見:
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
...
/* 從EventLoop線程池中,取出一個EventLoop對象構造TcpConnection對象,便於均衡各EventLoop負責的連接數 */
EventLoop* ioLoop = threadPool_->getNextLoop(); // next event loop from the event loop thread pool
...
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
/* 新建TcpConnection對象, 並加入ConnectionMap */
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
...
}
思考:getNextLoop() 為什么可以返回EventLoop 原生指針?*
答案同構造函數EventLoop沒有通過智能指針管理,而是棧變量,這些棧變量只有在程序退出時,才會釋放。因此,可以認為其生命周期是整個應用程序。
不常用的getLoopForHash,getAllLoops
getLoopForHash,getAllLoops這是兩個備用接口,分別用於通過hashCode獲取loops_數組中的EventLoop對象,獲取整個EventLoop對象數組loops_。由於muduo庫沒有任何地方用到,這里不詳述。
測試EventLoopThreadPool
思路:
3個測試用例:為IO線程池創建0個線程,1個線程,3個線程。然后,start線程池、並回調init,3個測試用例分別用getNextLoop調用多次,判斷獲得的EventLoop對象地址是否為期望值。
// EventLoopThreadPool_unittest.cpp
void print(EventLoop* p = NULL)
{
printf("main(): pid=%d, tid=%d, loop=%p\n",
getpid(), CurrentThread::tid(), p);
}
void init(EventLoop* p)
{
printf("init(): pid=%d, tid=%d, loop=%p\n",
getpid(), CurrentThread::tid(), p);
}
int main()
{
print();
EventLoop loop;
loop.runAfter(11, std::bind(&EventLoop::quit, &loop));
{ // 0線程的IO線程池, 默認用main線程作為baseLoop所屬線程, 處理IO事件
printf("Single thread %p:\n", &loop);
EventLoopThreadPool model(&loop, "single"); // 0線程IO線程池
model.setThreadNum(0);
model.start(init); // 啟動線程池並回調init
// 從線程池連續取3次 EventLoop
assert(model.getNextLoop() == &loop);
assert(model.getNextLoop() == &loop);
assert(model.getNextLoop() == &loop);
}
{ // 單1線程的IO線程池
printf("Another thread:\n");
EventLoopThreadPool model(&loop, "another"); // 1個線程的IO線程池
model.setThreadNum(1);
model.start(init);
EventLoop* nextLoop = model.getNextLoop();
nextLoop->runAfter(2, std::bind(print, nextLoop));
assert(nextLoop != &loop);
assert(nextLoop == model.getNextLoop());
assert(nextLoop == model.getNextLoop());
::sleep(3);
}
{ // 3線程的IO線程池
printf("Three thread:\n");
EventLoopThreadPool model(&loop, "three");
model.setThreadNum(3);
model.start(init);
EventLoop* nextLoop = model.getNextLoop();
nextLoop->runInLoop(std::bind(print, nextLoop));
assert(nextLoop != &loop);
assert(nextLoop != model.getNextLoop());
assert(nextLoop != model.getNextLoop());
assert(nextLoop == model.getNextLoop()); // 3次以后循環回來
}
loop.loop();
return 0;
}
EventLoopThread 事件循環線程類
一個EventLoopThread對象對應一個IO線程,而IO線程函數負責創建局部EventLoop對象,並啟動EventLoop的loop循環。
class EventLoopThread : noncopyable
{
public:
typedef std::function<void (EventLoop*)> ThreadInitCallback;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const string& name = string());
~EventLoopThread();
/* 啟動IO線程函數中的loop循環, 返回IO線程中創建的EventLoop對象地址(棧空間) */
EventLoop* startLoop();
private:
void threadFunc(); // IO線程函數
// 思考:這里為什么需要mutex_保護, 而EventLoopThreadPool::baseLoop_卻不需要?
EventLoop* loop_ GUARDED_BY(mutex_); // 綁定的EventLoop對象指針
bool exiting_; // 暫無特殊用途
Thread thread_; // 線程, 用於實現IO線程中的線程功能
MutexLock mutex_; // 互斥鎖
Condition cond_ GUARDED_BY(mutex_); // 條件變量
ThreadInitCallback callback_; // 線程函數初始回調
};
EventLoopThread的構造
EventLoopThread的結構很簡單,對外只提供啟動IO線程的接口startLoop()。
思考:為什么EventLoopThread的EventLoop對象指針loop_,需要互斥鎖保護,而其他類如EventLoopThreadPool的EventLoop對象指針baseLoop_ 卻不需要互斥鎖保護?
因為EventLoopThread中的loop_指針在創建時(startLoop()中),會存在調用線程和子線程函數threadFunc同時讀、寫loop_的情況,也就是並發訪問,因而需要互斥鎖保護。
反觀EventLoopThreadPool::baseLoop_,在構造后,就只是讀操作,沒有寫baseLoop_指針本身。而且baseLoop_在構造時,也是調用者傳入,不存在並發讀寫訪問的問題。
EventLoopThread::EventLoopThread(const EventLoopThread::ThreadInitCallback &cb, const string &name)
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name), // 注意這里只是注冊線程函數, 名稱, 並未啟動線程函數
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
// 不是100%沒有沖突, 比如threadFunc中正運行callback_回調, 然后立即析構當前對象.
// 此時, IO線程函數已經啟動, 創建EventLoop了對象, 但還沒有修改loop_, 此時loop_一直為NULL
// 也就是說, 無法通過析構讓IO線程退出loop循環, 也無法連接線程.
if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_
{
// sitll a tiny change to call destructed object, if threadFunc exists just now.
// but when EventLoopThread destructs, usually programming is exiting anyway.
loop_->quit(); // 退出IO線程loop循環
thread_.join(); // 連接線程, 回收資源
}
}
startLoop 啟動IO線程
啟動IO線程的過程很簡單,就是啟動一個IO線程,然后(調用線程)等待IO線程函數初始化完成。
/**
* 啟動IO線程(函數), 運行EventLoop循環
* @return 返回EventLoop*, 實際上是線程函數threadFunc創建的EventLoop類型局部變量
*/
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started()); // avoid repeated start loop
thread_.start(); // 啟動線程
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(); // 同步等待線程函數完成初始化工作, 喚醒等待在此處的調用線程
}
loop = loop_;
}
return loop;
}
IO線程函數threadFunc
IO線程函數主要工作:創建EventLoop局部對象, 運行loop循環。
思考:最后為什么要清除loop_?
因為loop_可能在其他地方訪問,而IO線程函數退出時,線程已經不能繼續運行,代表IO線程EventLoop的loop_也就沒有了存在意義。
如果不清空,析構函數可能會導致重復調用loop_->quit(),讓IO線程loop循環重復退出。
這里帶來一個新問題,可能是muduo庫的一個bug:
如果在其他地方通過loop_->quit()讓IO線程退出loop循環,而loop_置為NULL,那么線程資源在哪通過join/detach,以回收線程資源?
顯然不是析構函數,因為析構函數中IO線程join的前提是loop非NULL。
/**
* IO線程函數, 創建EventLoop局部對象, 運行loop循環
*/
void EventLoopThread::threadFunc()
{
EventLoop loop; // 創建線程函數局部EventLoop對象, 只有線程函數退出, EventLoop::loop()退出時, 才會釋放該對象
if (callback_) // 運行線程函數初始回調
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify(); // 喚醒等待在cond_條件上的線程(i.e. startLoop的調用線程)
}
loop.loop(); // 運行IO線程循環, 即事件循環, 通常不會退出, 除非調用EventLoop::quit
// assert(exiting_);
MutexLockGuard lock(mutex_);
loop_ = NULL; // 思考: 最后為什么要清除loop_?
}
測試EventLoopThread
思路:
3個測試用例:
1)不啟動的EventLoopThread對象;
2)利用析構調用quit(),退出IO線程的loop循環;
3)在析構前,調用quit()退出IO線程的loop循環;
// EventLoopThread_unittes.cc
void print(EventLoop* p = NULL)
{
printf("print: pid=%d, tid=%d, loop=%p\n",
getpid(), CurrentThread::tid(), p);
}
void quit(EventLoop* p)
{
print(p);
p->quit();
}
int main()
{
print();
{
EventLoopThread thr1; // never start
}
{
// dtor calls quit()
EventLoopThread thr2;
EventLoop* loop = thr2.startLoop();
loop->runInLoop(std::bind(print, loop));
CurrentThread::sleepUsec(500 * 1000);
}
{
// quit() before dtor
EventLoopThread thr3;
EventLoop* loop = thr3.startLoop();
loop->runInLoop(std::bind(quit, loop));
CurrentThread::sleepUsec(500 * 1000);
}
return 0;
}
知識點
互斥鎖 + 條件變量等待指定條件
一個線程通過while語句 + cond_.wait() 的形式,等待指定條件(loop_ != NULL),防止虛假喚醒。另外一個線程在條件滿足后,通過cond_喚醒等待線程。
這樣,可以有效達到一個線程等待另一個線程的目的。當然,這里也可以用門栓CountDownLatch,這里用互斥鎖+條件變量也可以實現同樣目的。
// 線程1
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(); // 等待線程函數完成初始化工作, 喚醒等待在此處的調用線程
}
// 線程2
EventLoop loop;
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify(); // 喚醒等待在cond_條件上的線程(i.e. startLoop的調用線程)