◆ 概要
筆者在 《簡單的線程池(一)》 中采用了非阻塞的(nonblocking)線程同步方式,在此文中筆者將采用阻塞的(blocking)線程同步方式實現相同特性的線程池。
本文中不再贅述與 《簡單的線程池(一)》 相同的內容。如有不明之處,請參考該博客。
◆ 實現
以下代碼給出了此線程池的實現,(blocking_shared_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _done_;
Blocking_Queue<Task_Wrapper> _queue_; // #1
unsigned _workersize_;
thread* _workers_;
void work() {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_queue_.pop(task); // #3
task();
}
}
void stop() {
size_t remaining = _queue_.size();
while (!_queue_.empty()) // #2
std::this_thread::yield();
std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
for (unsigned i = 0; i < _workersize_; ++i) // #5
_queue_.push([] {});
for (unsigned i = 0; i < _workersize_; ++i) {
if (_workers_[i].joinable())
_workers_[i].join(); // #4
}
delete[] _workers_;
}
public:
Thread_Pool() : _done_(false) {
try {
_workersize_ = thread::hardware_concurrency();
_workers_ = new thread[_workersize_];
for (unsigned i = 0; i < _workersize_; ++i) {
_workers_[i] = thread(&Thread_Pool::work, this);
}
} catch (...) {
stop();
throw;
}
}
~Thread_Pool() {
stop();
}
template<class Callable>
future<typename std::result_of<Callable()>::type> submit(Callable c) {
typedef typename std::result_of<Callable()>::type R;
packaged_task<R()> task(c);
future<R> r = task.get_future();
_queue_.push(std::move(task));
return r;
}
};
Task_Wrapper 具體化了線程安全的任務隊列 Blocking_Queue<>(#1)。稍后對 Blocking_Queue<> 做說明。
終止線程池時,任務隊列中的剩余任務被執行(#2)完后,任務隊列處於被清空的狀態。在 _done_ 還未被置為 true 之前,工作線程可能會因為 _queue_.pop(task) 而進入循環等待的阻塞狀態(#3)。如果此時主線程先調用工作線程的 join() 函數(#4),將導致死鎖(deadlock)狀態。即,工作線程正在等待有任務入隊,而主線程又要等待工作線程的結束。為了打破循環等待條件,在主線程調用工作線程的 join() 函數之前,向隊列中放入 _workersize_ 個假任務(#5)。其目的,是確保在任務隊列上等待的所有工作線程退出循環等待(#3)。
阻塞式的隊列 Blocking_Queue<> 如下,(blocking_queue.h)
template<class T>
class Blocking_Queue {
private:
mutex mutable _m_; // #1
condition_variable _cv_;
queue<T> _q_;
public:
void push(T&& element) {
lock_guard<mutex> lk(_m_);
_q_.push(std::move(element));
_cv_.notify_one(); // #3
}
void pop(T& element) {
unique_lock<mutex> lk(_m_);
_cv_.wait(lk, [this]{ return !_q_.empty(); }); // #2
element = std::move(_q_.front());
_q_.pop();
}
bool empty() const {
lock_guard<mutex> lk(_m_);
return _q_.empty();
}
size_t size() const {
lock_guard<mutex> lk(_m_);
return _q_.size();
}
};
隊列采用了 std::mutex 和 std::condition_variable 控制工作線程對任務隊列的並發訪問(#1)。如果沒有可出隊的任務,當前線程就會在 _cv_ 上循環等待(#2);任務入隊后,由 _cv_ 通知在其上等待的線程(#3)。
◆ 邏輯
以下類圖展現了此線程池的代碼主要邏輯結構。
[注] 圖中用構造型(stereotype)標識出工作線程的初始函數,並在注解中加以說明調用關系。
以下順序圖展現了線程池用戶提交任務與工作線程執行任務的並發過程。
◆ 驗證
驗證所使用的的測試用例及結果,與 《簡單的線程池(一)》 的保持一致。
◆ 最后
完整的代碼請參考 [github] cnblogs/15607633/ 。
寫作過程中,筆者參考了 C++並發編程實戰 / (美)威廉姆斯 (Williams, A.) 著; 周全等譯. - 北京: 人民郵電出版社, 2015.6 (2016.4重印) 一書中的部分設計思路。致 Anthony Williams、周全等譯者。