muduo筆記 網絡庫(八)EventLoop的多線程應用:EventLoopThread、EventLoopThreadPool


EventLoop的多線程應用

前面講的EventLoop為一個IO線程提供運行循環(loop),那muduo庫如何支持多線程呢?

  • EventLoopThread IO線程類
  • EventLoopThreadPool IO線程池類

IO線程池的功能是開啟若干個IO線程,並讓這些IO線程處於線程循環的狀態。

也就是說,EventLoop實現one loop per thread模型中的loop,EventLoopThread 實現的是per thread,EventLoopThreadPool 實現的是multi-thread環境下:one loop per thread。

多個Reactor模型

圖中每個Reactor都是一個線程,mainReactor通常是main線程,關注監聽套接字;subReactor關注的是連接套接字。如果沒有subReactor,所有跟監聽套接字、連接套接字有關的事件,都交由mainReactor處理。

為了簡便,本文下面所有EventLoopThreadPool (事件循環線程池) 統一簡稱IO線程池或線程池,EventLoopThread 統一簡稱IO線程。


EventLoopThreadPool 事件循環線程池類

EventLoopThreadPool 事件循環線程池類對象通常由main線程創建,綁定main線程創建的EventLoop(即baseLoop_),對應mainReactor。該線程池根據用戶指定線程數,創建EventLoopThread對應subReactor。
注意:EventLoopThreadPool 屬於Reactor的一部分,但不等於某個Reactor。

EventLoopThreadPool類聲明

class EventLoopThreadPool : noncopyable // 作為線程池對象,綁定了背后的系統資源,如線程,因此是引用語義(不可拷貝)
{
public:
    typedef std::function<void(EventLoop*)> ThreadInitCallback;

    EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
    ~EventLoopThreadPool();

    /* 設置線程數量, 需要在start()之前調用 */
    void setThreadNum(int numThreads)
    { numThreads_ = numThreads; }

    /* 啟動線程池, 設置線程函數初始回調 */
    void start(const ThreadInitCallback& cb = ThreadInitCallback());

    /*
     * valid after calling start()
     * round-robin(輪詢)
     */
    EventLoop* getNextLoop();

    /*
     * With the same hash code, it will always return the same EventLoop.
     */
    EventLoop* getLoopForHash(size_t hashCode);

    /* 獲取所有loops(EventLoop數組) */
    std::vector<EventLoop*> getAllLoops();

    /* 獲取線程池啟動狀態 */
    bool started() const
    { return started_; }

    /* 獲取線程池名稱 */
    const std::string& name() const
    { return name_; }

private:

    EventLoop* baseLoop_; // 與Acceptor所屬EventLoop相同
    std::string name_;    // 線程池名稱, 通常由用戶指定. 線程池中EventLoopThread名稱依賴於線程池名稱
    bool started_;   // 線程池是否啟動標志
    int numThreads_; // 線程數
    int next_;       // 新連接到來,所選擇的EventLoopThread下標
    std::vector<std::unique_ptr<EventLoopThread>> threads_; // IO線程列表
    std::vector<EventLoop*> loops_;  // EventLoop列表, 指向的是EventLoopThread線程函數創建的EventLoop對象
};

EventLoopThreadPool的構造與析構

構造函數很簡單,對需baseLoop_、name_、started等進行了初始化。

有個問題一直困擾自己:為什么EventLoop指針 baseLoop,沒有通過智能指針管理內存?
這里可以得到解決,因為baseLoop通常是main線程創建的棧變量,loops_數組(std::vector<EventLoop*>)中的EventLoop對象是線程函數創建的棧變量。當離開線程作用域時,棧變量會自動釋放。因此,在這之前,不要delete loop對象。

EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: baseLoop_(baseLoop), // 指向基礎的EventLoop對象
name_(nameArg),        // 線程池名稱
started_(false),       // 啟動狀態
numThreads_(0),        // 線程數量
next_(0)               // 下一個EventLoopThread位於threads_數組中的下標
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop, it's stack variable
}

start() 啟動IO線程池

IO線程池在創建后,通過調用start()啟動線程池。主要工作:
1)確保baseLoop所屬線程調用start;
2)創建用戶指定線程組,啟動線程組線程,並記錄子線程對應EventLoop;
3)如果沒有指定線程數量(或為指定0),調用用戶指定的線程函數初始回調。

/**
* 啟動IO線程池.
* 只能啟動一次, 而且必須是baseLoop_的創建線程調用start().
* @param cb 線程函數初始回調
*/
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
    assert(!started_); // 防止重復啟動線程池
    baseLoop_->assertInLoopThread(); // 斷言baseLoop_對象創建者是線程池的start()調用者

    started_ = true; // 標記線程池已啟動

    // 根據用戶指定線程數, 創建IO線程組
    /* create numThreads_ EventLoopThread, added to threads_ */
    for (int i = 0; i < numThreads_; ++i) { // 線程編號范圍取決於用戶指定的線程數
        char buf[name_.size() + 32];
        snprintf(buf, sizeof(buf), "%s%d", name_.c_str(), i); // IO線程名稱: 線程池名稱 + 線程編號
        EventLoopThread* t = new EventLoopThread(cb, buf);
        threads_.push_back(std::unique_ptr<EventLoopThread>(t)); // 將EventLoopThread對象指針 插入threads_數組
        loops_.push_back(t->startLoop()); // 啟動IO線程, 並將線程函數創建的EventLoop對象地址 插入loops_數組
    }
    if (numThreads_ == 0 && cb)
    { // 如果沒有創建任何線程, 也會調用回調cb; 否則, 會在新建的線程函數初始化完成后(進入loop循環前)調用
        cb(baseLoop_);
    }
}

用戶端TcpServer調用start()。可以看到threadInitCallback_是由TcpServer傳入的,而TcpServer::threadInitCallback_是由更上一層級的用戶傳入。

/**
* 啟動TcpServer, 初始化線程池, 連接接受器Accept開始監聽(Tcp連接請求)
*/
void TcpServer::start()
{
    if (started_.getAndSet(1) == 0) // 防止多次重復啟動
    {
        threadPool_->start(threadInitCallback_); // 啟動線程池, 並設置線程初始化完成的回調函數

        assert(!acceptor_->listening());
        loop_->runInLoop(
                std::bind(&Acceptor::listen, get_pointer(acceptor_)));
    }
}

分派任務給IO線程的利器:getNextLoop()

每當有一個新Tcp連接建立時,TcpServer調用newConnection新建一個TcpConnection對象負責該連接。然而,如何將TcpConnection對象分派給一個IO線程對應的EventLoop對象呢?
這就可以利用getNextLoop(),從IO線程池維護的EventLoop數組loops_中輪詢取得一個EventLoop對象,每次調用數組下標+1,這樣得到負載均衡的目的。

/**
* 從線程池獲取下一個event loop
* @note 默認event loop是baseLoop_ (創建baseLoop_線程, 通常也是創建線程池的線程).
* 沒有調用setThreadNum()設置numThreads_(number of threads)時, numThreads_默認為0,
* 所有IO操作都默認交由baseLoop_的event loop來完成, 因為沒有其他IO線程.
*/
EventLoop* EventLoopThreadPool::getNextLoop()
{
    baseLoop_->assertInLoopThread();
    assert(started_);
    EventLoop* loop = baseLoop_;

    // 如果loops_為空, 則loop指向baseLoop
    // 如果非空, 則按round-robin(RR, 輪叫)的調度方式(從loops_列表中)選擇一個EventLoop
    if (!loops_.empty())
    {
        // round-robin
        loop = loops_[next_];
        ++next_;
        if (implicit_cast<size_t>(next_) >= loops_.size())
        {
            next_ = 0;
        }
    }
    return loop;
}

當然,TcpServer所謂分派TcpConnection給IO線程,實際上就是將TcpConnection對象在構造時,綁定到指定EventLoop對象。
部分代碼見:

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    ...
    /* 從EventLoop線程池中,取出一個EventLoop對象構造TcpConnection對象,便於均衡各EventLoop負責的連接數 */
    EventLoop* ioLoop = threadPool_->getNextLoop(); // next event loop from the event loop thread pool
    ...
    // FIXME poll with zero timeout to double confirm the new connection
    // FIXME use make_shared if necessary
    /* 新建TcpConnection對象, 並加入ConnectionMap */
    TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;
    ...
}

思考:getNextLoop() 為什么可以返回EventLoop 原生指針?*
答案同構造函數EventLoop沒有通過智能指針管理,而是棧變量,這些棧變量只有在程序退出時,才會釋放。因此,可以認為其生命周期是整個應用程序。

不常用的getLoopForHash,getAllLoops

getLoopForHash,getAllLoops這是兩個備用接口,分別用於通過hashCode獲取loops_數組中的EventLoop對象,獲取整個EventLoop對象數組loops_。由於muduo庫沒有任何地方用到,這里不詳述。

測試EventLoopThreadPool

思路:
3個測試用例:為IO線程池創建0個線程,1個線程,3個線程。然后,start線程池、並回調init,3個測試用例分別用getNextLoop調用多次,判斷獲得的EventLoop對象地址是否為期望值。

// EventLoopThreadPool_unittest.cpp
void print(EventLoop* p = NULL)
{
    printf("main(): pid=%d, tid=%d, loop=%p\n",
          getpid(), CurrentThread::tid(), p);
}

void init(EventLoop* p)
{
    printf("init(): pid=%d, tid=%d, loop=%p\n",
           getpid(), CurrentThread::tid(), p);
}

int main()
{
    print();

    EventLoop loop;
    loop.runAfter(11, std::bind(&EventLoop::quit, &loop));

    { // 0線程的IO線程池, 默認用main線程作為baseLoop所屬線程, 處理IO事件
        printf("Single thread %p:\n", &loop);
        EventLoopThreadPool model(&loop, "single"); // 0線程IO線程池
        model.setThreadNum(0);
        model.start(init); // 啟動線程池並回調init
        // 從線程池連續取3次 EventLoop
        assert(model.getNextLoop() == &loop);
        assert(model.getNextLoop() == &loop);
        assert(model.getNextLoop() == &loop);
    }

    { // 單1線程的IO線程池
        printf("Another thread:\n");
        EventLoopThreadPool model(&loop, "another"); // 1個線程的IO線程池
        model.setThreadNum(1);
        model.start(init);
        EventLoop* nextLoop = model.getNextLoop();
        nextLoop->runAfter(2, std::bind(print, nextLoop));
        assert(nextLoop != &loop);
        assert(nextLoop == model.getNextLoop());
        assert(nextLoop == model.getNextLoop());
        ::sleep(3);
    }

    { // 3線程的IO線程池
        printf("Three thread:\n");
        EventLoopThreadPool model(&loop, "three");
        model.setThreadNum(3);
        model.start(init);
        EventLoop* nextLoop = model.getNextLoop();
        nextLoop->runInLoop(std::bind(print, nextLoop));
        assert(nextLoop != &loop);
        assert(nextLoop != model.getNextLoop());
        assert(nextLoop != model.getNextLoop());
        assert(nextLoop == model.getNextLoop()); // 3次以后循環回來
    }
    loop.loop();
    return 0;
}

EventLoopThread 事件循環線程類

一個EventLoopThread對象對應一個IO線程,而IO線程函數負責創建局部EventLoop對象,並啟動EventLoop的loop循環。

class EventLoopThread : noncopyable
{
public:
    typedef std::function<void (EventLoop*)> ThreadInitCallback;

    EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                    const string& name = string());
    ~EventLoopThread();

    /* 啟動IO線程函數中的loop循環, 返回IO線程中創建的EventLoop對象地址(棧空間) */
    EventLoop* startLoop();

private:
    void threadFunc(); // IO線程函數
    
    // 思考:這里為什么需要mutex_保護, 而EventLoopThreadPool::baseLoop_卻不需要?
    EventLoop* loop_ GUARDED_BY(mutex_); // 綁定的EventLoop對象指針
    bool exiting_;  // 暫無特殊用途
    Thread thread_; // 線程, 用於實現IO線程中的線程功能
    MutexLock mutex_; // 互斥鎖
    Condition cond_ GUARDED_BY(mutex_); // 條件變量
    ThreadInitCallback callback_;       // 線程函數初始回調
};

EventLoopThread的構造

EventLoopThread的結構很簡單,對外只提供啟動IO線程的接口startLoop()。
思考:為什么EventLoopThread的EventLoop對象指針loop_,需要互斥鎖保護,而其他類如EventLoopThreadPool的EventLoop對象指針baseLoop_ 卻不需要互斥鎖保護?
因為EventLoopThread中的loop_指針在創建時(startLoop()中),會存在調用線程和子線程函數threadFunc同時讀、寫loop_的情況,也就是並發訪問,因而需要互斥鎖保護。
反觀EventLoopThreadPool::baseLoop_,在構造后,就只是讀操作,沒有寫baseLoop_指針本身。而且baseLoop_在構造時,也是調用者傳入,不存在並發讀寫訪問的問題。

EventLoopThread::EventLoopThread(const EventLoopThread::ThreadInitCallback &cb, const string &name)
: loop_(NULL),
  exiting_(false),
  thread_(std::bind(&EventLoopThread::threadFunc, this), name), // 注意這里只是注冊線程函數, 名稱, 並未啟動線程函數
  mutex_(),
  cond_(mutex_),
  callback_(cb)
{
}

EventLoopThread::~EventLoopThread()
{
    exiting_ = true;
    // 不是100%沒有沖突, 比如threadFunc中正運行callback_回調, 然后立即析構當前對象.
    // 此時, IO線程函數已經啟動, 創建EventLoop了對象, 但還沒有修改loop_, 此時loop_一直為NULL
    // 也就是說, 無法通過析構讓IO線程退出loop循環, 也無法連接線程.
    if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_
    {
        // sitll a tiny change to call destructed object, if threadFunc exists just now.
        // but when EventLoopThread destructs, usually programming is exiting anyway.
        loop_->quit(); // 退出IO線程loop循環
        thread_.join(); // 連接線程, 回收資源
    }
}

startLoop 啟動IO線程

啟動IO線程的過程很簡單,就是啟動一個IO線程,然后(調用線程)等待IO線程函數初始化完成。

/**
* 啟動IO線程(函數), 運行EventLoop循環
* @return 返回EventLoop*, 實際上是線程函數threadFunc創建的EventLoop類型局部變量
*/
EventLoop* EventLoopThread::startLoop()
{
    assert(!thread_.started()); // avoid repeated start loop
    thread_.start(); // 啟動線程

    EventLoop* loop = NULL;
    {
        MutexLockGuard lock(mutex_);
        while (loop_ == NULL)
        {
            cond_.wait();  // 同步等待線程函數完成初始化工作, 喚醒等待在此處的調用線程
        }
        loop = loop_;
    }

    return loop;
}

IO線程函數threadFunc

IO線程函數主要工作:創建EventLoop局部對象, 運行loop循環。

思考:最后為什么要清除loop_?
因為loop_可能在其他地方訪問,而IO線程函數退出時,線程已經不能繼續運行,代表IO線程EventLoop的loop_也就沒有了存在意義。
如果不清空,析構函數可能會導致重復調用loop_->quit(),讓IO線程loop循環重復退出。

這里帶來一個新問題,可能是muduo庫的一個bug:
如果在其他地方通過loop_->quit()讓IO線程退出loop循環,而loop_置為NULL,那么線程資源在哪通過join/detach,以回收線程資源?
顯然不是析構函數,因為析構函數中IO線程join的前提是loop非NULL。

/**
* IO線程函數, 創建EventLoop局部對象, 運行loop循環
*/
void EventLoopThread::threadFunc()
{
    EventLoop loop; // 創建線程函數局部EventLoop對象, 只有線程函數退出, EventLoop::loop()退出時, 才會釋放該對象

    if (callback_) // 運行線程函數初始回調
    {
        callback_(&loop);
    }

    {
        MutexLockGuard lock(mutex_);
        loop_ = &loop;
        cond_.notify(); // 喚醒等待在cond_條件上的線程(i.e. startLoop的調用線程)
    }

    loop.loop(); // 運行IO線程循環, 即事件循環, 通常不會退出, 除非調用EventLoop::quit
    // assert(exiting_);
    MutexLockGuard lock(mutex_);
    loop_ = NULL; // 思考: 最后為什么要清除loop_?
}

測試EventLoopThread

思路:
3個測試用例:
1)不啟動的EventLoopThread對象;
2)利用析構調用quit(),退出IO線程的loop循環;
3)在析構前,調用quit()退出IO線程的loop循環;

// EventLoopThread_unittes.cc
void print(EventLoop* p = NULL)
{
    printf("print: pid=%d, tid=%d, loop=%p\n",
           getpid(), CurrentThread::tid(), p);
}

void quit(EventLoop* p)
{
    print(p);
    p->quit();
}

int main()
{
    print();

    {
        EventLoopThread thr1; // never start
    }

    {
        // dtor calls quit()
        EventLoopThread thr2;
        EventLoop* loop = thr2.startLoop();
        loop->runInLoop(std::bind(print, loop));
        CurrentThread::sleepUsec(500 * 1000);
    }

    {
        // quit() before dtor
        EventLoopThread thr3;
        EventLoop* loop = thr3.startLoop();
        loop->runInLoop(std::bind(quit, loop));
        CurrentThread::sleepUsec(500 * 1000);
    }
    return 0;
}

知識點

互斥鎖 + 條件變量等待指定條件

一個線程通過while語句 + cond_.wait() 的形式,等待指定條件(loop_ != NULL),防止虛假喚醒。另外一個線程在條件滿足后,通過cond_喚醒等待線程。
這樣,可以有效達到一個線程等待另一個線程的目的。當然,這里也可以用門栓CountDownLatch,這里用互斥鎖+條件變量也可以實現同樣目的。

// 線程1
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
    cond_.wait();  // 等待線程函數完成初始化工作, 喚醒等待在此處的調用線程
}

// 線程2
EventLoop loop;
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify(); // 喚醒等待在cond_條件上的線程(i.e. startLoop的調用線程)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM