muduo筆記 線程池ThreadPool


muduo線程池ThreadPool,采用的是固定線程數目的線程池方案。

線程池模型

模型圖如下:

這個是通用線程池,雙端隊列存放的是多個可調用對象(即用戶任務),而非函數指針,因此可以通過std::bind配接器傳參。雙端隊列queue_,有時也稱為工作隊列。

其工作原理:首先創建並啟動一組線程,稱為線程池threads_,由用戶指定其大小maxQueueSize_,每個元素對對應一個線程。每個線程函數都是一樣的,在其中會運行一個loop循環:從雙端隊列取出一個任務對象task,如果非空,就執行之,如此往復。
當有一個用戶線程想要通過線程池運行一個用戶任務時,就可以將用戶任務函數及參數封裝成一個可調用對象Task f,然后通過線程池接口,將f加入雙端隊列末尾。當線程池有線程空閑時(未執行用戶任務),就會從雙端隊列頭部取出一個Task對象task,然后執行之。

[======]

線程池的組成

線程池主要由以下幾個部分組成:
1)工作隊列queue_,用雙端隊列實現,能從尾部加入用戶任務對應的可調用對象;
2)用戶任務Task f,封裝了用戶任務,包含任務函數和參數;
3)線程組threads_,用於管理工作的線程數組;
4)工作線程,執行回調函數。

[======]

ThreadPool接口

ThreadPool提供以下public接口,供用戶使用:

class ThreadPool : noncopyable
{
public:
    typedef std::function<void ()> Task;

    explicit ThreadPool(const string& nameArg = string("ThreadPool")); // 構造函數, 初始化各數據成員
    ~ThreadPool();

    // Must be called before start().
    void setMaxQueueSize(int maxSize); // 設置工作隊列最大大小
    void setThreadInitCallback(const Task& cb); // 設置線程初始化完成后回調函數
    void start(int numThreads); // 指定創建線程個數, 啟動線程池中的所有線程
    void stop();  // 停止線程池各線程

    const std::string& name() const; // 當前線程池名稱
    size_t queueSize() const; // 返回工作隊列大小

    void run(Task f);  // 運行用戶任務f
    ...
}

ThreadPool使用的線程Thread是Linux下NPTL線程庫(Pthreads)的封裝。ThreadPool將線程池的創建(構造)與啟動(start())分隔開來,並沒有直接在構造函數中啟動線程組;將停止(stop())與析構分隔開。

[======]

ThreadPool實現

數據成員

private:
    ...
    mutable MutexLock mutex_; // 用於線程安全保護數據成員的互斥鎖
    Condition notEmpty_ GUARDED_BY(mutex_); // 工作隊列非空條件(元素個數為0)
    Condition notFull_ GUARDED_BY(mutex_);  // 工作隊列非滿條件(元素個數未達到上限值)

    std::string name_;

    Task threadInitCallback_; // 線程初始化完成后的回調對象
    std::vector<std::unique_ptr<muduo::Thread>> threads_; // 線程組指針
    std::deque<Task> queue_ GUARDED_BY(mutex_); // 工作隊列
    size_t maxQueueSize_; // 工作隊列最大大小
    bool running_;        // 線程(循環)是否運行標志

主要成員:工作隊列queue_,以及限制工作隊列大小的maxQueueSize_,線程組指針threads_, 線程運行標志,以及用於保護它們的互斥鎖mutex_,用於喚醒阻塞線程的條件變量notEmpty_, notFull_。

每個線程中都有一個loop,用running表示是否運行的標志,running只有在線程池停止的時候,才會被其他線程調用,可以用鎖來保護,其他時候,只會被同一個線程訪問,因此無需使用原子類型。

線程池的構造

ThreadPool::ThreadPool(const string &nameArg)
: mutex_(),
  notEmpty_(mutex_),
  notFull_(mutex_),
  name_(nameArg),
  maxQueueSize_(0),
  running_(false)
{
}

用戶可以指定線程池名稱,默認為"ThreadPool",便於調試跟蹤,日志診斷問題;值得注意的是工作隊列最大大小maxQueueSize_初值0,用戶可通過setMaxQueueSize修改其大小;

啟動與停止

用戶可通過start()啟動線程池,需要指定線程組中子線程數量,一旦創建成功后,各子線程就會投入運行,直到調用stop() 停止線程池運行。
由於 Thread已內含一個門閥,會讓調用線程等待新線程函數啟動,因此,這里不必再設置門閥等待線程池中線程的啟動。相反,如果有子線程運行所需要的數據,就需要在創建之前就准備好,比如running_,要在線程循環前就設置為true,否則子線程loop不會運行,而是直接退出。

void ThreadPool::start(int numThreads)
{
    assert(threads_.empty());
    running_ = true;
    threads_.reserve(static_cast<size_t>(numThreads));
    for (int i = 0; i < numThreads; ++i)
    {
        char id[32];
        snprintf(id, sizeof(id), "%d", i + 1);
        threads_.emplace_back(new muduo::Thread(
                std::bind(&ThreadPool::runInThread, this), name_ + id));
        threads_[i]->start();
    }
    if (numThreads == 0 && threadInitCallback_)
    {
        threadInitCallback_();
    }
}

void ThreadPool::stop()
{
    {
        MutexLockGuard lock(mutex_);
        running_ = false;
        notEmpty_.notifyAll();
        notFull_.notifyAll();
    }
    for (auto& thr : threads_)
    {
        thr->join();
    }
}

為什么start()中不加鎖,而stop()卻要加鎖?
因為start() 中,在子線程啟動后,並沒有對共享數據進行訪問,也就不存在競態條件。而stop()中,有對共享數據,如running_、notEmpty、notFull,因此,需要加鎖對這些數據進行保護。

這里,子線程退出方式是連接(join)線程,而非分離(detach)線程。個人認為兩種方案都可以,不過,join更容易在開發階段,排查問題,因為如果線程無法正常退出,調用線程會阻塞在join調用上。

往工作隊列加入任務對象

調用線程通過run(),向線程池的請求運行用戶指定的任務對象,該對象會被加入到工作隊列末尾,空閑子線程會自動從工作隊列中取任務對象執行。

void ThreadPool::run(Task task)
{
    if (threads_.empty()) // 子線程數量為0
    {
        task();
    }
    else
    { // 子線程數量非0
        MutexLockGuard lock(mutex_);
        while (isFull() && running_)
        {
            notFull_.wait();
        }
        if (!running_) return;
        assert(!isFull());

        queue_.push_back(std::move(task));
        notEmpty_.notify();
    }
}

這里有2個特殊情況需要注意:
1)threads_為空,即沒有創建線程,可能是用戶指定線程數為0或非法數量(如負數),也有可能是進程創建的線程數達到系統限制,從而創建線程失敗。
不論什么原因,為避免進程崩潰,可以直接在當前線程中調用用戶任務。

2)采用的是isFull()成員來判斷工作隊列是否滿,而不是容器自帶的size()來判斷。
在isFull()內部,添加了一個互斥鎖斷言,確保isFull()的調用線程已經取得了mutex_鎖;否則,一旦有其他線程在未取得鎖的情況下,訪問應受鎖保護工作隊列成員,可能導致意外情況。

bool ThreadPool::isFull() const
{
    mutex_.assertLocked();
    return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

從工作隊列取任務對象

用take從工作隊列頭部取出一個任務對象。通常是子線程空閑時調用,取出后,用來執行用戶任務。

ThreadPool::Task ThreadPool::take()
{
    MutexLockGuard lock(mutex_);
    // always use a while-loop, due to spurious wakeup
    while (queue_.empty() && running_)
    {
        notEmpty_.wait();
    }
    Task task;
    if (!queue_.empty())
    {
        task = queue_.front();
        queue_.pop_front();
        if (maxQueueSize_ > 0)
        {
            notFull_.notify();
        }
    }
    return task;
}

子線程loop

主要工作:從工作隊列取用戶任務,然后執行之。循環往復,直到線程池停止工作。

實現該工作的runInThread()是在用戶調用start()時,自動啟動的,不需要用戶自行調研。

void ThreadPool::runInThread()
{
    try
    {
        if (threadInitCallback_)
        {
            threadInitCallback_();
        }
        while (running_)
        {
            Task task(take());
            if (task)
            {
                task();
            }
        }
    }
    catch (const Exception& ex)
    {
        fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
        fprintf(stderr, "reason: %s\n", ex.what());
        fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
        abort();
    }
    catch (const std::exception& ex)
    {
        fprintf(stderr, "exception caught in TheadPool %s\n", name_.c_str());
        fprintf(stderr, "reason: %s\n", ex.what());
        abort();
    }
    catch (...)
    {
        fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
        throw ; // rethrow
    }
}

這里,用了try-catch語句塊將代碼包裹起來,因為不知道用戶代碼會干些什么,很有可能會產生異常,因此需要捕獲異常。對於不確定的異常,可以rethrow(繼續上拋)。

另外,threadInitCallback_讓用戶有機會在線程初始化完成后,運行用戶任務之前,做一些事情。

[======]

ThreadPool的使用、測試

基本流程

// 創建線程池對象
ThreadPool pool("MyThreadPool");
// 設置工作隊列最大尺寸
pool.setMaxQueueSize(maxSize);
// 啟動線程池線程組, 指定線程數量
pool.start(threadNum);
// 運行用戶指定任務
pool.run(userTask); // userTask是用戶任務(可調用對象)
...
// 停止線程池(如有需要)
pool.stop();

截取自muduo的部分代碼,對ThreadPool進行測試:

// from muduo project
// muduo/base/tests/ThreadPool_test.cc

void print()
{
    printf("tid=%d\n", muduo::CurrentThread::tid());
}

void printString(const std::string& str)
{
    LOG_INFO << str;
    usleep(100*1000);
}

void test(int maxSize)
{
    LOG_WARN << "Test ThreadPool with max queue size = " << maxSize;
    muduo::ThreadPool pool("MainThreadPool");
    pool.setMaxQueueSize(maxSize);
    pool.start(5);

    LOG_WARN << "Adding";
    pool.run(print);
    pool.run(print);
    for (int i = 0; i < 100; ++i) {
        char buf[32];
        snprintf(buf, sizeof(buf), "task %d", i);
        pool.run(std::bind(printString, std::string(buf))); // 演示了如何向線程池加入含參的可調用對象
    }
    LOG_WARN << "Done";
    
    // 演示了如何等待線程池運行完用戶任務
    muduo::CountDownLatch latch(1);
    pool.run(std::bind(&muduo::CountDownLatch::countDown, &latch));
    latch.wait(); // wait for pool running latch.countDown()
    pool.stop();
}

int main()
{
    test(0);
    test(1);
    test(5);
    test(10);
    test(50);
    return 0;
}

有2點問題:
1)run只接受一個參數,那么調用線程如何向線程池傳參?
解決方案有很多,一種是使用模板函數,為向工作隊列加用戶任務的run函數添加不定參數的重載版本;另一種,是使用std::bind配機器,向run傳遞一個新的可調用對象。muduo采用的是后者。

2)調用線程端的用戶,如何獲取用戶任務執行結果?
run()沒有任何返回值,用戶只能自行設計用戶任務函數及參數,通過參數狀態取得結果。
當然,還有另外的辦法就是,讓run()返回一個std::future<return_type>,通過future異步獲取結果。參考:http://www.purecpp.org/detail?id=2260

[======]

小結

1)線程池為避免頻繁創建、銷毀線程,提供一組子線程,能從工作隊列取任務、執行任務,而用戶可以向工作隊列加入任務,從而完成用戶任務。

[======]

參考

https://docs.microsoft.com/zh-cn/windows/win32/procthread/thread-pools
http://www.purecpp.org/detail?id=2260


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM