簡單C++線程池


簡單C++線程池

Java 中有一個很方便的 ThreadPoolExecutor,可以用做線程池。想找一下 C++ 的類似設施,尤其是能方便理解底層原理可上手的。網上找到的 demo,基本都是介紹的 projschj 的C++11線程池。這份源碼最后的commit日期是2014年,現在是2021年了,本文將在閱讀源碼的基礎上,對這份代碼進行一些改造。關於線程池,目前網上講解最好的一篇文章是這篇 Java線程池實現原理及其在美團業務中的實踐,值得一讀。

改造后的源碼在 https://gitee.com/zhcpku/ThreadPool 進行提供。


更新,微軟工程院大佬指出了參考代碼的一些問題,並給出了自己的固定個數線程池代碼,貼在文末了。感覺這部分的討論更有價值。

projschj 的代碼

1. 數據結構

主要包含兩個部分,一組執行線程、一個任務隊列。執行線程空閑時,總是從任務隊列中取出任務執行。具體執行邏輯后面會進行解釋。

class ThreadPool {
    // ...
private:
    using task_type = std::function<void()>;
    // need to keep track of threads so we can join them
    std::vector<std::thread> workers;
    // the task queue
    std::queue<task_type> tasks;
};

2. 同步機制

這里包括一把鎖、一個條件變量,還有一個bool變量:

  • 鎖用於保護任務隊列、條件變量、bool變量的訪問;
  • 條件變量用於喚醒線程,通知任務到來、或者線程池停用;
  • bool變量用於停用線程池;
class ThreadPool {
    // ...
private:
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

3. 線程池啟動

啟動線程池,首先要做的是構造指定數量的線程出來,然后讓每個線程開始運行。
對於每個線程,運行邏輯是一樣的:嘗試從任務隊列中獲取任務並執行,如果拿不到任務、並且線程池沒有被停用,則睡眠等待。
這里線程等待任務使用的是條件變量,而不是信號量或者自旋鎖等其他設施,是為了讓線程睡眠,避免CPU空轉浪費。

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t thread_num)
    : stop(false)
{
    for (size_t i = 0; i < thread_num; ++i) {
        workers.emplace_back([this] {
            for (;;) {
                task_type task;
                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
                    this->condition.wait(
                        lock, [this] { return this->stop || !this->tasks.empty(); });
                    if (this->stop && this->tasks.empty()) {
                        return;
                    }
                    task = std::move(this->tasks.front());
                    this->tasks.pop();
                }
                task();
            }
        });
    }
}

4.停用線程池

線程的停用,需要讓每一個線程停下來,並且等到每個線程都停止再退出主線程才是比較安全的操作。
停止分三步:設置停止標識、通知到每一個線程(睡眠的線程需要喚醒)、等到每一個線程停止。

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) {
        worker.join();
    }
}

5. 提交新任務

這是整個線程池的核心,也是寫的最復雜,用C++新特性最多的地方,包括但不限於:
自動類型推導、變長模板函數、右值引用、完美轉發、原地構造、智能指針、future、bind ……
順帶提一句,要是早有變長模板參數,std::min / std::max 也不至於只能比較兩個數大小,再多就得用大括號包起來作為 initialize_list 傳進去了。

這里提交任務時,由於我們的任務類型定義為一個無參無返回值的函數對象,所以需要先通過 std::bind 把函數及其參數打包成一個 對應類型的可調用對象,返回值將通過 future 異步獲取。然后是要把這個任務插入任務隊列末尾,因為任務隊列被多線程並發訪問,所以需要加鎖。
另外需要處理的兩個情況,一個是線程睡眠時,新入隊任務需要主要喚醒線程;另一個是線程池要停用時,入隊操作是非法的。

// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

改造

以上代碼已經足以闡釋線程池基本原理了,以下改進主要從可靠性、易用性、使用場景等方面進行改進。

1. non-copyable

線程池本身應該是不可復制的,這里我們通過刪除拷貝構造函數和賦值操作符,以及其對用的右值引用版本來實現:

class ThreadPool {
  // ...
private:
    // non-copyable
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;
};

2. default-thread-number

除了手動指定線程個數,更合適的做法是主動探測CPU支持的物理線程數,並以此作為執行線程個數:

class ThreadPool {
public:
    explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency());
    size_t ThreadCount() { return workers.size(); }
    // ...
};

3. 延遲創建線程

線程不必一次就創建出來,可以等到任務到來的時候再創建,降低資源占用。
// TBD

4. 臨時線程數量擴充

線程池的應用場景主要針對的是CPU密集型應用,但是遇到IO密集型場景,也要保證可用性。如果我們的線程個數固定的話,會出現一些問題,比如:

  • 幾個IO任務占據了線程,並且進入了睡眠,這個時候CPU空閑,但是后面的任務卻得不到處理,任務隊列越來越長;
  • 幾個線程在睡眠等待某個信號或者資源,但是這個信號或資源的提供者是任務隊列中的某個任務,沒有空閑線程,提供者永遠提供此信號或資源。
    因此我們需要一種機制,臨時擴充線程數量,從線程池中的睡眠線程手中“搶回”CPU。
    其實,更好的解決辦法是改造線程池,使用固定個數的線程,然后把任務打包到協程中執行,當遇到IO的時候協程主動讓出CPU,這樣其他任務就能上CPU運行了。畢竟,多線程擅長處理的是CPU密集型任務,多協程才是處理IO密集型任務的。…… 這不就是協程庫了嘛!比如 libco、libgo 就是這種解決方案。
    // TBD

5. 線程池停用啟動

上面的線程池,其啟動停止時機分別是構造和析構的時候,還是太粗糙了。我們為其提供手動啟動、停止的函數,並支持停止之后重新啟動:
// TBD


總結

不干了,2021年了,研究協程庫去了!

更新:微軟工程師的簡單線程池

微軟工程師在 用 C++ 寫線程池是怎樣一種體驗? 這個問題下,指出之前的參考代碼存在以下幾個問題:

1. 沒有必要把停止標志設計為 atomic,更沒有必要用 acquire-release 同步。
2. 線程池銷毀時等待所有任務執行完成,通常是沒有必要的。
3. 工作線程內部存在冗余邏輯;在尚有任務未完成時沒有必要檢查線程池是否停止。
4. 添加任務時沒有必要將任務封裝為 std::packaged_task,因為線程池的基本職能是管理 Execution Agents; 如果一定要設計這樣一個方法,那也應該保留一個直接提交任務不返回 Future 的方法;事實上,在 C++ 提案 P0443 (http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0443r5.html)中,也有類似的設計,不返回 Future 的叫做 "one-way",返回 Future 的叫做 "two-way",很多時候需要從外部控制同步時,"one-way" 比 "two-way" 更實用。
5. 一個 bug:"add" 方法實現中使用了 std::bind,而 std::bind 看起來並不適用於這里。我猜你設計這個方法的語義是執行 add(std::forward<F>(fcn), std::forward<Args>(args)...),從你的返回值類型上也可以看出這一點;但不幸的是,std::bind 的返回值被調用時會講所有參數轉為左值引用! 也就是說,在你的實現中,所有參數在 fnc 執行時都會被拷貝構造一份,對於不能拷貝構造的參數會直接編譯出錯!
6. 另一個 bug:在線程池的析構函數中,沒有對 stop_ 的賦值加鎖。為什么需要對 stop_ 的賦值加鎖呢?因為這個操作 必須 與工作線程對於 std::condition_variable 的 wait 檢查操作互斥!具體原因如下:對於 std::condition_variable 的 wait(帶 Predicate 的版本)展開的代碼與下面的代碼等價:
while (!pred()) {
  cond_var.wait();
}
如果 pred() 不與對於狀態的修改互斥的話,工作線程可能會陷入無線等待,也就導致了線程泄漏。

對於為什么等待工作線程結束不必要,他的解釋如下:

跌宕的月光​2018-11-29
線程池銷毀時等待所有任務執行完成,通常是沒有必要的。可否麻煩解釋一下這個呢?因為比方說當main exit的時候,除非main 去等,否則detached thread就直接停掉了,出現一些任務做了一半的情況

「已注銷」 (作者) 回復跌宕的月光​2018-11-29
確實有這樣的問題,但我認為這屬於更底層的“線程模型”的問題范疇,而非“線程池”本身的問題;即使不使用線程池,這個問題依然存在,在某些情況下我們也希望子線程運行更久些。
解決這個問題的方法就是引入“守護線程”和“非守護線程”的概念,這個概念也廣泛存在於其他編程語言(如 Java)和操作系統(參考“守護進程”)中。作為我並發庫的一部分,我也自己實現過 C++ 的守護線程模型,可以參考:https://github.com/wmx16835/wang/blob/b8ecd554c4bf18cd181500e9594223e96dfede30/src/main/experimental/concurrent.h#L508-L525
這樣,創建線程的時候直接使用 thread_executor<false>::operator(F&&) 即可讓新創建的線程(看上去)擁有與主線程同等的地位。

至於等待任務結束,討論的結論是,應該由任務提交方選擇主動等待之后再結束線程池,而不是線程池自己來等待:

章佳傑​2018-06-25
你好,看了上面的代碼受益良多。我有一個小問題,如果在上面這個代碼的基礎上要實現這樣的功能:提交了一批 task,然后等這批 task 都完成,再繼續提交其他 task。這個中間的「等這批 task 都完成」的操作怎么來實現比較好呢?

「已注銷」 (作者) 回復章佳傑​2018-06-25
首先,如果有這樣的需求,線程池可以開一個批量提交任務的接口,不是為了批量等待,而是減少多次進入臨界區的開銷。
然后回到你的問題。現在普遍的做法有兩種,一種是使用 Future,即將每一個任務包裝為 std::packaged_task 再提交(需要考慮我回答中提及的“拷貝構造”問題),然后將這些 Future 保存起來逐一等待;另一種是使用 Latch,這樣代碼量會稍微增加一點,即在每個任務結束后對 Latch 執行“減一”操作,外部對所述 Latch 執行阻塞等待。
如果使用我回答中提及的 ISO C++ 並發提案(http://www.http://open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0642r1.pdf )中的解決方案,則不需要使用 Future 或 Latch,所需代碼量較少,並且對於阻塞敏感的程序可以使用異步的方式避免等待的阻塞。

關於第5點的bug討論:

鄭威狄2018-06-13
您好,針對您第五點提出的bug有沒有更好的實現方法來避免這個問題呢?

「已注銷」 (作者) 回復鄭威狄2018-06-14
需要將可調用對象的可變參數列表中的參數全部轉為右值。如果不使用第三方庫的話,我見過最簡單的方法就是寫一個轉發參數的中間層仿函數。但在我見過的大多數情形中都沒有必要在這一層添加調用所需參數,所以在我提供的實現中沒有 `Args&&...` 參數列表,也就沒有這個 bug。

參考代碼

他給出的參考代碼更簡單一些,對於有返回值的函數,並沒有直接在 execute 時返回返回值的 future,而是統一返回void,需要返回值的話手動把 future 打包一個 packged_task 傳入 execute 即可:

#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>

class fixed_thread_pool {
public:
    explicit fixed_thread_pool(size_t thread_count)
        : data_(std::make_shared<data>())
    {
        for (size_t i = 0; i < thread_count; ++i) {
            std::thread([data = data_] {
                std::unique_lock<std::mutex> lk(data->mtx_);
                for (;;) {
                    if (!data->tasks_.empty()) {
                        auto current = std::move(data->tasks_.front());
                        data->tasks_.pop();
                        lk.unlock();
                        current();
                        lk.lock();
                    } else if (data->is_shutdown_) {
                        break;
                    } else {
                        data->cond_.wait(lk);
                    }
                }
            }).detach();
        }
    }

    fixed_thread_pool() = default;
    fixed_thread_pool(fixed_thread_pool&&) = default;

    ~fixed_thread_pool()
    {
        if ((bool)data_) {
            {
                std::lock_guard<std::mutex> lk(data_->mtx_);
                data_->is_shutdown_ = true;
            }
            data_->cond_.notify_all();
        }
    }

    template <class F>
    void execute(F&& task)
    {
        {
            std::lock_guard<std::mutex> lk(data_->mtx_);
            data_->tasks_.emplace(std::forward<F>(task));
        }
        data_->cond_.notify_one();
    }

private:
    struct data {
        std::mutex mtx_;
        std::condition_variable cond_;
        bool is_shutdown_ = false;
        std::queue<std::function<void()>> tasks_;
    };
    std::shared_ptr<data> data_;
};

補充測試用例

這里我補充了一個例子來說明,如何等待任務結束,以及返回值的獲取:

class count_down_latch {
public:
    explicit count_down_latch(size_t n = 1) : cnt(n) {};
    void done() {
        std::unique_lock<std::mutex> lck(mu);
        if (--cnt <= 0) {
            cv.notify_one();
        }
    }
    void wait() {
        std::unique_lock<std::mutex> lck(mu);
        for(;;) {
            cv.wait(lck);
            mu.unlock();
            if (cnt <= 0) break;
            mu.lock();
        }
    }
private:
    size_t cnt;
    std::mutex mu;
    std::condition_variable cv;
};

void test_lambda()
{
    fixed_thread_pool pool(4);
    std::vector<int> results(8);
    count_down_latch latch(8);

    for (int i = 0; i < 8; ++i) {
        pool.execute([i, &latch, &results] {
            printf("hello %d\n", i);
            std::this_thread::sleep_for(std::chrono::seconds(1));
            printf("world %d\n", i);
            latch.done();
            results[i] = i * i;
        });
    }
    latch.wait();
    for (auto&& result : results) {
        printf("%d ", result);
    }
    printf("\n");
    printf("--------------------\n");
}

參考文獻

  1. projschj 的C++11 線程池
  2. Java線程池實現原理及其在美團業務中的實踐
  3. 用 C++ 寫線程池是怎樣一種體驗? - 「已注銷」的回答 - 知乎


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM