◆ 概要
此線程池擁有一個被所有工作線程共享的任務隊列。線程池用戶提交的任務,被線程池保存在任務隊列中,工作線程從任務隊列中獲取任務並執行。
任務是可擁有返回值的、無參數的可調用(callable)對象,或者是經 std::bind 綁定了可調用對象及其參數后的調用包裝器。具體而言可以是
- 自由函數(也稱為全局函數)
- lambda
- 函數對象(也稱為函數符)
- 類成員函數
- 包裝了上述類型的 std::function
- bind 調用包裝器
該線程池異步地執行任務。當任務被提交進線程池后,用戶不必等待任務執行和返回結果。
◆ 實現
以下代碼給出了此線程池的實現,(lockwise_shared_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _done_; // #2
Lockwise_Queue<Task_Wrapper> _queue_; // #3
unsigned _workersize_;
thread* _workers_; // #4
void work() {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_queue_.pop(task))
task();
else
std::this_thread::yield();
}
}
void stop() {
size_t remaining = _queue_.size();
while (!_queue_.empty())
std::this_thread::yield();
std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
_done_.store(true, memory_order_release);
for (unsigned i = 0; i < _workersize_; ++i) {
if (_workers_[i].joinable())
_workers_[i].join();
}
delete[] _workers_;
}
public:
Thread_Pool() : _done_(false) { // #1
try {
_workersize_ = thread::hardware_concurrency(); // #5
_workers_ = new thread[_workersize_];
for (unsigned i = 0; i < _workersize_; ++i) {
_workers_[i] = thread(&Thread_Pool::work, this); // #6
}
} catch (...) {
stop(); // #7
throw;
}
}
~Thread_Pool() {
stop();
}
template<class Callable>
future<typename std::result_of<Callable()>::type> submit(Callable c) { // #8
typedef typename std::result_of<Callable()>::type R;
packaged_task<R()> task(c);
future<R> r = task.get_future();
_queue_.push(std::move(task)); // #9
return r; // #10
}
};
我們從構造 Thread_Pool 對象(#1)開始了解這個線程池。atomic<bool> 數據成員用於標志線程池是否結束,並強制同步內存順序(#2);Task_Wrapper 具體化了線程安全的任務隊列 Lockwise_Queue<>(#3);thread* 用於引用所有的工作線程對象(#4)。Task_Wrapper 和 Lockwise_Queue<> 稍后再做說明。
線程池通過 thread::hardware_concurrency() 獲取當前硬件支持的並發線程數量(#5),並依據此數量創建出工作線程。Thread_Pool 對象的成員函數 work() 作為所有工作線程的初始函數(#6),這使得線程池中的任務隊列能被所有工作線程共享。創建 thread 對象和 new 操作可能失敗並引發異常,因此用 try-catch 捕獲潛在的異常。處理異常過程中,需要標志線程池結束,保證任何創建的線程都能正常的停止,並回收內存資源(#7)。線程池對象析構時的工作與此一致。
Thread_Pool 對象構建完成后,任務通過 Thread_Pool::submit<>() 被提交進入線程池(#8)。為了支持任務的異步執行,任務先被封裝在 std::packaged_task<> 中,再被放入線程安全的任務隊列(#9)。任務執行結果被封裝在返回的 std::future<> 對象中(#10),允許用戶在未來需要結果時,等待任務結束並獲取結果。
因為每一個任務都是一個特定類型的 std::packaged_task<> 對象,為了實現任務隊列的泛型化,需要設計一個通用的數據結構 Task_Wrapper,用於封裝特定類型的 std::packaged_task<> 對象,(lockwise_shared_pool.h)
struct Task_Wrapper {
struct Task_Base {
virtual ~Task_Base() {}
virtual void call() = 0;
};
template<class T>
struct Task : Task_Base { // #5
T _t_;
Task(T&& t) : _t_(std::move(t)) {} // #6
void call() { _t_(); } // #9
};
Task_Base* _ptr_; // #7
Task_Wrapper() : _ptr_(nullptr) {};
template<class T>
Task_Wrapper(T&& t) : _ptr_(new Task<T>(std::move(t))) {} // #1
// support move
Task_Wrapper(Task_Wrapper&& other) { // #2
_ptr_ = other._ptr_;
other._ptr_ = nullptr;
}
Task_Wrapper& operator=(Task_Wrapper&& other) { // #3
_ptr_ = other._ptr_;
other._ptr_ = nullptr;
return *this;
}
// no copy
Task_Wrapper(Task_Wrapper&) = delete;
Task_Wrapper& operator=(Task_Wrapper&) = delete;
~Task_Wrapper() {
if (_ptr_) delete _ptr_;
}
void operator()() const { // #4
_ptr_->call(); // #8
}
};
std::packaged_task<> 的實例只是可移動的,而不可復制。Task_Wrapper 必須能移動封裝 std::packaged_task<R()> 對象(#1)。為了保持一致性,Task_Wrapper 也實現了移動構造(#2)和移動賦值(#3),同時實現了 operator()(#4)。ABC 的繼承結構(#5)用於支持泛型化地封裝和調用 std::packaged_task<> 對象。std::packaged_task<> 封裝在派生類 Task<> 中(#6),由指向非泛型的抽象基類 Task_Base 的指針引用派生類對象(#7)。對 Task_Wrapper 對象的調用由虛調用(#8)委托給派生類並執行實際的任務(#9)。
另一個關鍵的數據結構是非阻塞式的隊列 Lockwise_Queue<>,(lockwise_queue.h)
template<class T>
class Lockwise_Queue {
private:
struct Spinlock_Mutex { // #3
atomic_flag _af_;
Spinlock_Mutex() : _af_(false) {}
void lock() {
while (_af_.test_and_set(memory_order_acquire));
}
void unlock() {
_af_.clear(memory_order_release);
}
} mutable _m_; // #2
queue<T> _q_; // #1
public:
void push(T&& element) { // #4
lock_guard<Spinlock_Mutex> lk(_m_);
_q_.push(std::move(element));
}
bool pop(T& element) { // #5
lock_guard<Spinlock_Mutex> lk(_m_);
if (_q_.empty())
return false;
element = std::move(_q_.front());
_q_.pop();
return true;
}
bool empty() const {
lock_guard<Spinlock_Mutex> lk(_m_);
return _q_.empty();
}
size_t size() const {
lock_guard<Spinlock_Mutex> lk(_m_);
return _q_.size();
}
};
所有 Task_Wrapper 對象保存在 std::queue<> 中(#1)。互斥元控制工作線程對任務隊列的並發訪問(#2)。為了提高並發程度,采用非阻塞自旋鎖作為互斥元(#3)。任務的入隊和出隊操作,分別由支持移動語義的 push 函數(#4) 和 pop 函數(#5)完成。
◆ 邏輯
以下類圖展現了此線程池的代碼主要邏輯結構。
[注] 圖中用構造型(stereotype)標識出工作線程的初始函數,並在注解中補充說明此關系。
以下順序圖展現了線程池用戶提交任務與工作線程執行任務的並發過程。
◆ 驗證
為了驗證此線程池滿足概要中描述的能力,設計了如下的各可調用對象,(archery.h)
void shoot() {
std::fprintf(stdout, "\n\t[Free Function] Let an arrow fly...\n");
}
bool shoot(size_t n) {
std::fprintf(stdout, "\n\t[Free Function] Let %zu arrows fly...\n", n);
return false;
}
auto shootAnarrow = [] {
std::fprintf(stdout, "\n\t[Lambda] Let an arrow fly...\n");
};
auto shootNarrows = [](size_t n) -> bool {
std::fprintf(stdout, "\n\t[Lambda] Let %zu arrows fly...\n", n);
return true;
};
class Archer {
public:
void operator()() {
std::fprintf(stdout, "\n\t[Functor] Let an arrow fly...\n");
}
bool operator()(size_t n) {
std::fprintf(stdout, "\n\t[Functor] Let %zu arrows fly...\n", n);
return false;
}
void shoot() {
std::fprintf(stdout, "\n\t[Member Function] Let an arrow fly...\n");
}
bool shoot(size_t n) {
std::fprintf(stdout, "\n\t[Member Function] Let %zu arrows fly...\n", n);
return true;
}
};
對這些函數做好必要的參數封裝,將其提交給線程池,(test.cpp)
minutes PERIOD(1);
size_t counter[11];
time_point<steady_clock> start;
atomic<bool> go(false);
{
Thread_Pool pool;
thread t1([PERIOD, &counter, &start, &go, &pool] { // test free function of void()
while (!go.load(memory_order_acquire))
std::this_thread::yield();
void (*task)() = shoot;
for (counter[1] = 0; steady_clock::now() - start <= PERIOD; ++counter[1]) {
pool.submit(task);
//pool.submit(std::bind<void(*)()>(shoot));
std::this_thread::yield();
}
});
thread t2([PERIOD, &counter, &start, &go, &pool] { // test free function of bool(size_t)
while (!go.load(memory_order_acquire))
std::this_thread::yield();
bool (*task)(size_t) = shoot;
for (counter[2] = 0; steady_clock::now() - start <= PERIOD; ++counter[2]) {
future<bool> r = pool.submit(std::bind(task, counter[2]));
//future<bool> r = pool.submit(std::bind<bool(*)(size_t)>(shoot, counter[2]));
std::this_thread::yield();
}
});
thread t3([PERIOD, &counter, &start, &go, &pool] { // test lambda of void()
while (!go.load(memory_order_acquire))
std::this_thread::yield();
for (counter[3] = 0; steady_clock::now() - start <= PERIOD; ++counter[3]) {
pool.submit(shootAnarrow);
std::this_thread::yield();
}
});
thread t4([PERIOD, &counter, &start, &go, &pool] { // test lambda of bool(size_t)
while (!go.load(memory_order_acquire))
std::this_thread::yield();
for (counter[4] = 0; steady_clock::now() - start <= PERIOD; ++counter[4]) {
future<bool> r = pool.submit(std::bind(shootNarrows, counter[4]));
std::this_thread::yield();
}
});
thread t5([PERIOD, &counter, &start, &go, &pool] { // test functor of void()
while (!go.load(memory_order_acquire))
std::this_thread::yield();
Archer hoyt;
for (counter[5] = 0; steady_clock::now() - start <= PERIOD; ++counter[5]) {
pool.submit(hoyt);
std::this_thread::yield();
}
});
thread t6([PERIOD, &counter, &start, &go, &pool] { // test functor of bool(size_t)
while (!go.load(memory_order_acquire))
std::this_thread::yield();
Archer hoyt;
for (counter[6] = 0; steady_clock::now() - start <= PERIOD; ++counter[6]) {
future<bool> r = pool.submit(std::bind(hoyt, counter[6]));
std::this_thread::yield();
}
});
thread t7([PERIOD, &counter, &start, &go, &pool] { // test member function of void()
while (!go.load(memory_order_acquire))
std::this_thread::yield();
Archer hoyt;
for (counter[7] = 0; steady_clock::now() - start <= PERIOD; ++counter[7]) {
pool.submit(std::bind<void(Archer::*)()>(&Archer::shoot, &hoyt));
//pool.submit(std::bind(static_cast<void(Archer::*)()>(&Archer::shoot), &hoyt));
std::this_thread::yield();
}
});
thread t8([PERIOD, &counter, &start, &go, &pool] { // test member function of bool(size_t)
while (!go.load(memory_order_acquire))
std::this_thread::yield();
Archer hoyt;
for (counter[8] = 0; steady_clock::now() - start <= PERIOD; ++counter[8]) {
future<bool> r = pool.submit(std::bind<bool(Archer::*)(size_t)>(&Archer::shoot, &hoyt, counter[8]));
//future<bool> r = pool.submit(std::bind(static_cast<bool(Archer::*)(size_t)>(&Archer::shoot), &hoyt, counter[8]));
std::this_thread::yield();
}
});
thread t9([PERIOD, &counter, &start, &go, &pool] { // test std::function<> of void()
while (!go.load(memory_order_acquire))
std::this_thread::yield();
std::function<void()> task = static_cast<void(*)()>(shoot);
for (counter[9] = 0; steady_clock::now() - start <= PERIOD; ++counter[9]) {
pool.submit(task);
std::this_thread::yield();
}
});
thread t10([PERIOD, &counter, &start, &go, &pool] { // test std::function<> of bool(size_t)
while (!go.load(memory_order_acquire))
std::this_thread::yield();
std::function<bool(size_t)> task = static_cast<bool(*)(size_t)>(shoot);
for (counter[10] = 0; steady_clock::now() - start <= PERIOD; ++counter[10]) {
future<bool> r = pool.submit(std::bind(task, counter[10]));
std::this_thread::yield();
}
});
......
}
編譯代碼(-std=c++11)成功后運行可執行文件。以下是執行過程中的部分輸出,
...
[Functor] Let an arrow fly...
[Free Function] Let 9224 arrows fly...
[Free Function] Let 9445 arrows fly...
[Member Function] Let 9375 arrows fly...
[Lambda] Let 9449 arrows fly...
[Free Function] Let an arrow fly...
[Lambda] Let an arrow fly...
[Member Function] Let an arrow fly...
[Functor] Let 9469 arrows fly...
...
◆ 最后
完整的代碼請參考 [github] cnblogs/15592234/ 。
寫作過程中,筆者參考了 C++並發編程實戰 / (美)威廉姆斯 (Williams, A.) 著; 周全等譯. - 北京: 人民郵電出版社, 2015.6 (2016.4重印) 一書中的部分設計思路。致 Anthony Williams、周全等譯者。