C++ 動態伸縮線程池


簡述

之前閱讀過一份 C++11 寫的線程池源碼,寫了一篇隨筆 C++11的簡單線程池代碼閱讀 https://www.cnblogs.com/oloroso/p/5881863.html
這是一個固定線程數量的線程池,絕大部分情況下已經適用了。有一些特殊場景,我們需要一個按需創建線程的線程池,於是我這里改寫了一個動態創建線程的簡單線程池代碼。

代碼

線程池的線程根據線程池內未完成的任務數去動態創建,如果剩余任務超過 10 個,且沒有達到線程池的線程數上線,則創建新線程去處理。
當運行的工作線程,從工作隊列中無法取出新任務(即任務隊列已空),則進行等待,最多等待 180 秒,就會去重新判斷任務池是否能取出新任務,如果連續 3 次等待都沒能有新任務,則當前工作線程退出(也可以不用等待3次,只等待一次就直接退出,這里是為了防止剛好180秒等待結束的時候,入隊了新任務,然后這個線程退出了,又需要創建新的線程,雖然這個是極低的概率)。

#ifndef DYNAMICTHREADPOOL_HPP
#define DYNAMICTHREADPOOL_HPP


#include <condition_variable>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>

// 線程池類
class DynamicThreadPool
{
public:
    // 構造函數,傳入線程數
    DynamicThreadPool(size_t threads);
    // 析構
    ~DynamicThreadPool();

    // 入隊任務(傳入函數和函數的參數)
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
    // 一個最簡單的函數包裝模板可以這樣寫(C++11)適用於任何函數(變參、成員都可以)
    // template<class F, class... Args>
    // auto enqueue(F&& f, Args&&... args) -> decltype(declval<F>()(declval<Args>()...))
    // {    return f(args...); }
    // C++14更簡單
    // template<class F, class... Args>
    // auto enqueue(F&& f, Args&&... args)
    // {    return f(args...); }

    // 停止線程池
    void stopAll();


private:
    void newThread();

private:
    std::atomic_int run_workers;   // 當前運行的工作線程數
    int             max_workers;   // 最大工作線程數
    // 任務隊列
    std::queue<std::function<void()>> tasks;

    // synchronization 異步
    std::mutex              queue_mutex;   // 隊列互斥鎖
    std::condition_variable condition;     // 條件變量
    bool                    stop;          // 停止標志
};

// 構造函數僅啟動一些工作線程
inline DynamicThreadPool::DynamicThreadPool(size_t threads)
    : run_workers(0)
    , max_workers(threads)
    , stop(false)
{
    if (max_workers == 0 || max_workers > (int)std::thread::hardware_concurrency()) {
        max_workers = std::thread::hardware_concurrency();
    }
}

// 添加一個新的工作任務到線程池
template<class F, class... Args>
auto DynamicThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    // 將任務函數和其參數綁定,構建一個packaged_task
    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    // 獲取任務的future
    std::future<return_type> res = task->get_future();

    size_t tasks_size = 0;
    {
        std::lock_guard<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        // 不允許入隊到已經停止的線程池
        if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); }
        // 將任務添加到任務隊列
        tasks.emplace([task]() { (*task)(); });
        tasks_size = tasks.size();
    }
    // 判斷當前的任務積累數,如果任務積累太多,就再創建一個線程
    if (run_workers == 0 || (tasks_size > 10 && run_workers < max_workers)) {
        newThread();   // 創建新線程
    }
    // 發送通知,喚醒某一個工作線程取執行任務
    condition.notify_one();
    return res;
}

inline DynamicThreadPool::~DynamicThreadPool()
{
    stopAll();
}

inline void DynamicThreadPool::stopAll()
{
    {
        // 拿鎖
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 停止標志置true
        stop = true;
    }
    // 通知所有工作線程,喚醒后因為stop為true了,所以都會結束
    condition.notify_all();
    // 等待所有線程結束
    while (run_workers > 0) {
        // 如果工作線程是意外結束的,沒有將 run_workers 減一,那么這里會陷入死循環
        // 所以這里也可以修改為循環一定次數后就退出,不等了
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

inline void DynamicThreadPool::newThread()
{
    ++run_workers;   // 運行線程數加一

    std::thread thr([this]() {
        int no_task = 0;   // 用於標記任務循環
        while (true) {
            if (stop) {
                break;   // 退出線程循環
            }
            do {
                std::function<void()> task;   // 任務對象
                {
                    std::lock_guard<std::mutex> lock(this->queue_mutex);
                    // 從任務隊列取出一個任務
                    if (this->tasks.empty()) {
                        ++no_task;
                        break;   // 沒有任務,退出任務執行循環
                    }
                    // 取得任務隊首任務(注意此處的std::move)
                    task = std::move(this->tasks.front());
                    // 從隊列移除
                    this->tasks.pop();
                }
                // 執行任務
                task();

            } while (true);
            // 如果超過3次都是空循環,那么就退出線程
            if (no_task > 3) { break; }

            // 沒有任務的時候,等待條件觸發
            // 拿鎖(獨占所有權式),隊列鎖也充當一下條件鎖
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            // 等待條件成立(只等待180秒,條件不成立就退出)
            this->condition.wait_for(lock, std::chrono::seconds(180), [this] {
                return this->stop || !this->tasks.empty();
            });
        }   // end while
        // 線程退出的時候,從工作線程組中移除
        --run_workers;   // 運行線程數減一
    });
    thr.detach();   // 分離執行
}

#endif   // DYNAMICTHREADPOOL_HPP

簡單的測試代碼

#include "DynamicThreadPool.hpp"
#include <iostream>

int main()
{
    cout << "開始測試!" << endl;
    DynamicThreadPool tp(4);

    auto func = [](int i) {
        auto id = std::this_thread::get_id();
        std::cout << id << "  " << i << endl;
        //模擬耗時較長任務
        std::this_thread::sleep_for(std::chrono::seconds(1));
    };

    // 先提交9個
    int i = 1;
    for (; i < 10; ++i) {
        tp.enqueue(func, i);
    }
    // 休眠60秒
    std::this_thread::sleep_for(std::chrono::seconds(20));
    // 提交50個
    for (; i < 60; ++i) {
        tp.enqueue(func, i);
    }
    // 休眠等待結束
    std::this_thread::sleep_for(std::chrono::seconds(40));
    // 提交50個
    for (; i < 120; ++i) {
        tp.enqueue(func, i);
    }

    std::cout<<"測試結束"<<std::endl;
    return 0;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM