為什么需要多線程?
最簡單的多線程長啥樣?
為什么需要線程池,有什么問題?
實現的主要原理是什么?
帶着這幾個問題,我們依次展開。
1.為什么需要多線程?
大部分程序畢竟都不是計算密集型的,簡單的說,正常情況下,以單線程的模式來寫對程序員而言是最舒心的。因為所有的代碼都是順序執行,非常容易理解!函數一級一級往下調用,代碼一行一行執行。但是,代碼的世界里,雖然cpu還好,但是卻經常需要用到io資源,或者是其他服務器的網絡資源,比如像數據庫,如果這個時候因此把進程卡住,不管是客戶端還是客戶端都對用戶體驗相當糟糕。當然了,計算密集型的運算就更需要多線程,防止主線程被卡住。
2.最簡單的多線程長啥樣?
舉個最簡單的例子,服務器采用阻塞式socket,有一個網絡線程負責收發包(IO),然后有一個邏輯主線程負責相應的業務操作,主線程和網絡線程之間通過最簡單的消息隊列進行交換,而這個消息隊例明顯是兩個線程都要訪問(輪詢消息隊列是否為空)到的,所以,我們需要給這個消息隊列上鎖(std::mutex),即可以解決問題。由於比較簡單我們就不需要看這個怎么碼了。這種模式雖然簡單,但是在合適的崗位上,也是極好的!
3.那為什么需要線程池呢,有什么問題?
還以剛才的服務器舉例,如果業務線程邏輯比較復雜,又或者他需要訪問數據庫或者是其他服務器的資源,讀取文件等等呢?當然他可以采用異步的數據庫接口,但是采用異步意味着業務代碼被碎片化。異步是典型的討厭他,但是又干不掉他的樣子。離題了。回歸。這個時候我們需要多個業務線程處理了。多個線程就意味着多一份處理能力!回到上個問題,我們的多線程采用輪詢消息隊列的方式來交換信息,那么這么多個線程,不斷的上鎖解鎖,光這個成本就夠了。這個時候,條件變量就上線了(std::condition_variable)就登場了
4.實現的主要原理是什么?
業務線程不要輪詢消息隊列了,而所有的業務線程處於等待狀態,當有消息再來的時候,再由產生消息的人,在我們示例場景就是網絡線程了,隨便喚醒一個工人線程即可。看看最關鍵的代碼
//消費者 void consumer() { //第一次上鎖 std::unique_lock < std::mutex > lck(mutex_); while (active_) { //如果是活動的,並且任務為空則一直等待 while (active_ && task_.empty()) cv_.wait(lck); //如果已經停止則退出 if(!active_) break; T *quest = task_.front(); task_.pop(); //從任務隊列取出后該解鎖(任務隊列鎖)了 lck.unlock(); //執行任務后釋放 proc_(quest); //delete quest; //在proc_已經釋放該指針了 //重新上鎖 lck.lock(); } }
算了,還是直接貼完整代碼,看注釋吧
#ifndef _WORKER_POOL_H_ #define _WORKER_POOL_H_ //file: worker_pool.h //#define _CRT_SECURE_NO_WARNINGS // g++ -g -std=c++11 1.cc -D_GLIBCXX_USE_NANOSLEEP -lpthread */ #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> //#include <chrono> template<typename T> class WorkerPool { public: typedef WorkerPool<T> THIS_TYPE; typedef std::function<void(T*)> WorkerProc; typedef std::vector< std::thread* > ThreadVec; WorkerPool() { active_ = false; } virtual ~WorkerPool() { for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it) delete *it; all_thread_.clear(); } void Start(WorkerProc f,int worker_num=1) { active_ = true; all_thread_.resize(worker_num); for (int i = 0; i < worker_num;i++ ) { all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this)); } proc_ = f; } //生產者 void Push(T *t) { std::unique_lock < std::mutex > lck(mutex_); task_.push(t); cv_.notify_one(); } void Stop() { //等待所有的任務執行完畢 mutex_.lock(); while (!task_.empty()) { mutex_.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); cv_.notify_one(); mutex_.lock(); } mutex_.unlock(); //關閉連接后,等待線程自動退出 active_ = false; cv_.notify_all(); for(ThreadVec::iterator it = all_thread_.begin(); it != all_thread_.end();++it) (*it)->join(); } private: //消費者 void consumer() { //第一次上鎖 std::unique_lock < std::mutex > lck(mutex_); while (active_) { //如果是活動的,並且任務為空則一直等待 while (active_ && task_.empty()) cv_.wait(lck); //如果已經停止則退出 if(!active_) break; T *quest = task_.front(); task_.pop(); //從任務隊列取出后該解鎖(任務隊列鎖)了 lck.unlock(); //執行任務后釋放 proc_(quest); //delete quest; //在proc_已經釋放該指針了 //重新上鎖 lck.lock(); } } std::mutex mutex_; std::queue<T*> task_; std::condition_variable cv_; bool active_; std::vector< std::thread* > all_thread_; WorkerProc proc_; }; #endif
寫一個類繼承一下,並寫一個工作函數和回調函數處理
#include "worker_pool.h" #include <iostream> //為了多耗點cpu,計算斐波那契數列吧 static int fibonacci(int a) { //ASSERT(a > 0); if (a == 1 || a == 2) return 1; return fibonacci(a-1) + fibonacci(a-2); } //異步計算任務 struct AsyncCalcQuest { AsyncCalcQuest():num(0),result(0) {} //計算需要用到的變量 int num; int result; }; //為了測試方便,引入全局變量用於標識線程池已將所有計算完成 const int TOTAL_COUNT = 1000000; int now_count = 0; //繼承一下線程池類,在子類處理計算完成的業務,在我們這里,只是打印一下計算結果 class CalcWorkerPool:public WorkerPool<AsyncCalcQuest> { public: CalcWorkerPool(){} virtual ~CalcWorkerPool() { } //在工人線程中執行 void DoWork(AsyncCalcQuest *quest) { //算了,不算這個了,根本算不出來 quest->result = fibonacci(quest->num); //quest->result = quest->num*0.618; //並將已完成任務返回到准備回調的列表 std::unique_lock<std::mutex > lck(mutex_callbacks_); callbacks_.push_back(quest); } //在主線程執行 void DoCallback() { //組回調任務上鎖 std::unique_lock<std::mutex > lck(mutex_callbacks_); while (!callbacks_.empty()) { auto *quest = callbacks_.back(); {//此處為業務代碼打印一下吧 std::cout << quest->num << " " << quest->result << std::endl; now_count ++; } delete quest; //TODO:這里如果采用內存池就更好了 callbacks_.pop_back(); } } private: //這里是准備給回調的任務列表 std::vector<AsyncCalcQuest*> callbacks_; std::mutex mutex_callbacks_; }; int main() { CalcWorkerPool workers; //工廠開工了 8個工人喔 workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8); //開始產生任務了 for (int i=0; i<TOTAL_COUNT; i++) { AsyncCalcQuest *quest = new AsyncCalcQuest; quest->num = i%40+1; workers.Push(quest); } while (now_count != TOTAL_COUNT) { workers.DoCallback(); } workers.Stop(); return 0; }
linux完整項目 https://github.com/linbc/worker_pool.git