讀本文之前,請務必閱讀:
使用C++11的function/bind組件封裝Thread以及回調函數的使用
線程池本質上是一個生產者消費者模型,所以請熟悉這篇文章:Linux組件封裝(五)一個生產者消費者問題示例。
在ThreadPool中,物品為計算任務,消費者為pool內的線程,而生產者則是調用線程池的每個函數。
搞清了這一點,我們很容易就需要得出,ThreadPool需要一把互斥鎖和兩個同步變量,實現同步與互斥。
存儲任務,當然需要一個任務隊列。
除此之外,我們還需要一系列的Thread,因為Thread無法復制,所以我們使用unique_ptr作為一個中間層。
所以Thread的數據變量如下:
class ThreadPool : boost::noncopyable { public: typedef std::function<void ()> Task; ThreadPool(size_t queueSize, size_t threadsNum); ~ThreadPool(); void start(); void stop(); void addTask(Task task); //C++11 Task getTask(); bool isStarted() const { return isStarted_; } void runInThread(); private: mutable MutexLock mutex_; Condition empty_; Condition full_; size_t queueSize_; std::queue<Task> queue_; const size_t threadsNum_; std::vector<std::unique_ptr<Thread> > threads_; bool isStarted_; };
顯然,我們使用了function,作為任務隊列的任務元素。
構造函數的實現較簡單,不過,之前務必注意元素的聲明順序與初始化列表的順序相一致。
ThreadPool::ThreadPool(size_t queueSize, size_t threadsNum) : empty_(mutex_), full_(mutex_), queueSize_(queueSize), threadsNum_(threadsNum), isStarted_(false) { }
添加和取走任務是生產者消費者模型最核心的部分,但是套路較為固定,如下:
void ThreadPool::addTask(Task task) { MutexLockGuard lock(mutex_); while(queue_.size() >= queueSize_) empty_.wait(); queue_.push(std::move(task)); full_.notify(); } ThreadPool::Task ThreadPool::getTask() { MutexLockGuard lock(mutex_); while(queue_.empty()) full_.wait(); Task task = queue_.front(); queue_.pop(); empty_.notify(); return task; }
注意我們的addTask使用了C++11的move語義,在傳入右值時,可以提高性能。
還有一些老生常談的問題,例如:
wait前加鎖
使用while循環判斷wait條件(為什么?)
要想啟動線程,需要給Thread提供一個回調函數,編寫如下:
void ThreadPool::runInThread() { while(1) { Task task(getTask()); if(task) task(); } }
就是不停的取走任務,然后執行。
OK,有了線程的回調函數,那么我們可以編寫start函數。
void ThreadPool::start() { isStarted_ = true; //std::vector<std::unique<Thread> > for(size_t ix = 0; ix != threadsNum_; ++ix) { threads_.push_back( std::unique_ptr<Thread>( new Thread( std::bind(&ThreadPool::runInThread, this)))); } for(size_t ix = 0; ix != threadsNum_; ++ix) { threads_[ix]->start(); } }
這里較難理解的是線程的創建,Thread內存放的是std::unique_ptr<Thread>,而ptr的創建需要使用new動態創建Thread,Thread則需要在創建時,傳入回調函數,我們采用bind適配runInThread的參數值。
這里我們采用C++11的unique_ptr,成功實現vector無法存儲Thread(為什么?)的問題。
我們的第一個版本已經編寫完畢了。
添加stop功能
剛才的ThreadPool只能啟動,無法stop,我們從幾個方面着手,利用bool變量isStarted_,實現正確退出。
改動的有以下幾點:
首先是Thread的回調函數不再是一個死循環,而是:
void ThreadPool::runInThread() { while(isStarted_) { Task task(getTask()); if(task) task(); } }
然后addTask和getTask,在while循環判斷時,加入了bool變量:
void ThreadPool::addTask(Task task) { MutexLockGuard lock(mutex_); while(queue_.size() >= queueSize_ && isStarted_) empty_.wait(); if(!isStarted_) return; queue_.push(std::move(task)); full_.notify(); } ThreadPool::Task ThreadPool::getTask() { MutexLockGuard lock(mutex_); while(queue_.empty() && isStarted_) full_.wait(); if(!isStarted_) //線程池關閉 return Task(); //空任務 assert(!queue_.empty()); Task task = queue_.front(); queue_.pop(); empty_.notify(); return task; }
這里注意,退出while循環后,需要再判斷一次bool變量,因為未必是條件滿足了,可能是線程池需要退出,調整了isStarted變量。
最后一個關鍵是我們的stop函數:
void ThreadPool::stop() { if(isStarted_ == false) return; { MutexLockGuard lock(mutex_); isStarted_ = false; //清空任務 while(!queue_.empty()) queue_.pop(); } full_.notifyAll(); //激活所有的線程 empty_.notifyAll(); for(size_t ix = 0; ix != threadsNum_; ++ix) { threads_[ix]->join(); } threads_.clear(); }
這里有幾個關鍵:
先將bool設置為false,然后調用notifyAll,激活所有等待的線程(為什么)。
最后我們總結下ThreadPool關閉的流程:
1.isStarted設置為false
2.加鎖,清空隊列
3.發信號激活所有線程
4.正在運行的Thread,執行到下一次循環,退出
5.正在等待的線程被激活,然后while判斷為false,執行到下一句,檢查bool值,然后退出。
6.主線程依次join每個線程。
7.退出。
最后補充下析構函數的實現:
ThreadPool::~ThreadPool() { if(isStarted_) stop(); }
完畢。