簡述
之前閱讀過一份 C++11 寫的線程池源碼,寫了一篇隨筆 C++11的簡單線程池代碼閱讀 https://www.cnblogs.com/oloroso/p/5881863.html。
這是一個固定線程數量的線程池,絕大部分情況下已經適用了。有一些特殊場景,我們需要一個按需創建線程的線程池,於是我這里改寫了一個動態創建線程的簡單線程池代碼。
代碼
線程池的線程根據線程池內未完成的任務數去動態創建,如果剩余任務超過 10
個,且沒有達到線程池的線程數上線,則創建新線程去處理。
當運行的工作線程,從工作隊列中無法取出新任務(即任務隊列已空),則進行等待,最多等待 180
秒,就會去重新判斷任務池是否能取出新任務,如果連續 3
次等待都沒能有新任務,則當前工作線程退出(也可以不用等待3次,只等待一次就直接退出,這里是為了防止剛好180秒等待結束的時候,入隊了新任務,然后這個線程退出了,又需要創建新的線程,雖然這個是極低的概率)。
#ifndef DYNAMICTHREADPOOL_HPP
#define DYNAMICTHREADPOOL_HPP
#include <condition_variable>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
// 線程池類
class DynamicThreadPool
{
public:
// 構造函數,傳入線程數
DynamicThreadPool(size_t threads);
// 析構
~DynamicThreadPool();
// 入隊任務(傳入函數和函數的參數)
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
// 一個最簡單的函數包裝模板可以這樣寫(C++11)適用於任何函數(變參、成員都可以)
// template<class F, class... Args>
// auto enqueue(F&& f, Args&&... args) -> decltype(declval<F>()(declval<Args>()...))
// { return f(args...); }
// C++14更簡單
// template<class F, class... Args>
// auto enqueue(F&& f, Args&&... args)
// { return f(args...); }
// 停止線程池
void stopAll();
private:
void newThread();
private:
std::atomic_int run_workers; // 當前運行的工作線程數
int max_workers; // 最大工作線程數
// 任務隊列
std::queue<std::function<void()>> tasks;
// synchronization 異步
std::mutex queue_mutex; // 隊列互斥鎖
std::condition_variable condition; // 條件變量
bool stop; // 停止標志
};
// 構造函數僅啟動一些工作線程
inline DynamicThreadPool::DynamicThreadPool(size_t threads)
: run_workers(0)
, max_workers(threads)
, stop(false)
{
if (max_workers == 0 || max_workers > (int)std::thread::hardware_concurrency()) {
max_workers = std::thread::hardware_concurrency();
}
}
// 添加一個新的工作任務到線程池
template<class F, class... Args>
auto DynamicThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
// 將任務函數和其參數綁定,構建一個packaged_task
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// 獲取任務的future
std::future<return_type> res = task->get_future();
size_t tasks_size = 0;
{
std::lock_guard<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
// 不允許入隊到已經停止的線程池
if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); }
// 將任務添加到任務隊列
tasks.emplace([task]() { (*task)(); });
tasks_size = tasks.size();
}
// 判斷當前的任務積累數,如果任務積累太多,就再創建一個線程
if (run_workers == 0 || (tasks_size > 10 && run_workers < max_workers)) {
newThread(); // 創建新線程
}
// 發送通知,喚醒某一個工作線程取執行任務
condition.notify_one();
return res;
}
inline DynamicThreadPool::~DynamicThreadPool()
{
stopAll();
}
inline void DynamicThreadPool::stopAll()
{
{
// 拿鎖
std::unique_lock<std::mutex> lock(queue_mutex);
// 停止標志置true
stop = true;
}
// 通知所有工作線程,喚醒后因為stop為true了,所以都會結束
condition.notify_all();
// 等待所有線程結束
while (run_workers > 0) {
// 如果工作線程是意外結束的,沒有將 run_workers 減一,那么這里會陷入死循環
// 所以這里也可以修改為循環一定次數后就退出,不等了
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
inline void DynamicThreadPool::newThread()
{
++run_workers; // 運行線程數加一
std::thread thr([this]() {
int no_task = 0; // 用於標記任務循環
while (true) {
if (stop) {
break; // 退出線程循環
}
do {
std::function<void()> task; // 任務對象
{
std::lock_guard<std::mutex> lock(this->queue_mutex);
// 從任務隊列取出一個任務
if (this->tasks.empty()) {
++no_task;
break; // 沒有任務,退出任務執行循環
}
// 取得任務隊首任務(注意此處的std::move)
task = std::move(this->tasks.front());
// 從隊列移除
this->tasks.pop();
}
// 執行任務
task();
} while (true);
// 如果超過3次都是空循環,那么就退出線程
if (no_task > 3) { break; }
// 沒有任務的時候,等待條件觸發
// 拿鎖(獨占所有權式),隊列鎖也充當一下條件鎖
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待條件成立(只等待180秒,條件不成立就退出)
this->condition.wait_for(lock, std::chrono::seconds(180), [this] {
return this->stop || !this->tasks.empty();
});
} // end while
// 線程退出的時候,從工作線程組中移除
--run_workers; // 運行線程數減一
});
thr.detach(); // 分離執行
}
#endif // DYNAMICTHREADPOOL_HPP
簡單的測試代碼
#include "DynamicThreadPool.hpp"
#include <iostream>
int main()
{
cout << "開始測試!" << endl;
DynamicThreadPool tp(4);
auto func = [](int i) {
auto id = std::this_thread::get_id();
std::cout << id << " " << i << endl;
//模擬耗時較長任務
std::this_thread::sleep_for(std::chrono::seconds(1));
};
// 先提交9個
int i = 1;
for (; i < 10; ++i) {
tp.enqueue(func, i);
}
// 休眠60秒
std::this_thread::sleep_for(std::chrono::seconds(20));
// 提交50個
for (; i < 60; ++i) {
tp.enqueue(func, i);
}
// 休眠等待結束
std::this_thread::sleep_for(std::chrono::seconds(40));
// 提交50個
for (; i < 120; ++i) {
tp.enqueue(func, i);
}
std::cout<<"測試結束"<<std::endl;
return 0;
}