為什么需要多線程?
最簡單的多線程長啥樣?
為什么需要線程池,有什么問題?
實現的主要原理是什么?
帶着這幾個問題,我們依次展開。
1.為什么需要多線程?
大部分程序畢竟都不是計算密集型的,簡單的說,正常情況下,以單線程的模式來寫對程序員而言是最舒心的。因為所有的代碼都是順序執行,非常容易理解!函數一級一級往下調用,代碼一行一行執行。但是,代碼的世界里,雖然cpu還好,但是卻經常需要用到io資源,或者是其他服務器的網絡資源,比如像數據庫,如果這個時候因此把進程卡住,不管是客戶端還是客戶端都對用戶體驗相當糟糕。當然了,計算密集型的運算就更需要多線程,防止主線程被卡住。
2.最簡單的多線程長啥樣?
舉個最簡單的例子,服務器采用阻塞式socket,有一個網絡線程負責收發包(IO),然后有一個邏輯主線程負責相應的業務操作,主線程和網絡線程之間通過最簡單的消息隊列進行交換,而這個消息隊例明顯是兩個線程都要訪問(輪詢消息隊列是否為空)到的,所以,我們需要給這個消息隊列上鎖(std::mutex),即可以解決問題。由於比較簡單我們就不需要看這個怎么碼了。這種模式雖然簡單,但是在合適的崗位上,也是極好的!
3.那為什么需要線程池呢,有什么問題?
還以剛才的服務器舉例,如果業務線程邏輯比較復雜,又或者他需要訪問數據庫或者是其他服務器的資源,讀取文件等等呢?當然他可以采用異步的數據庫接口,但是采用異步意味着業務代碼被碎片化。異步是典型的討厭他,但是又干不掉他的樣子。離題了。回歸。這個時候我們需要多個業務線程處理了。多個線程就意味着多一份處理能力!回到上個問題,我們的多線程采用輪詢消息隊列的方式來交換信息,那么這么多個線程,不斷的上鎖解鎖,光這個成本就夠了。這個時候,條件變量就上線了(std::condition_variable)就登場了
4.實現的主要原理是什么?
業務線程不要輪詢消息隊列了,而所有的業務線程處於等待狀態,當有消息再來的時候,再由產生消息的人,在我們示例場景就是網絡線程了,隨便喚醒一個工人線程即可。看看最關鍵的代碼
//消費者
void consumer()
{
//第一次上鎖
std::unique_lock < std::mutex > lck(mutex_);
while (active_)
{
//如果是活動的,並且任務為空則一直等待
while (active_ && task_.empty())
cv_.wait(lck);
//如果已經停止則退出
if(!active_)
break;
T *quest = task_.front();
task_.pop();
//從任務隊列取出后該解鎖(任務隊列鎖)了
lck.unlock();
//執行任務后釋放
proc_(quest);
//delete quest; //在proc_已經釋放該指針了
//重新上鎖
lck.lock();
}
}
算了,還是直接貼完整代碼,看注釋吧
#ifndef _WORKER_POOL_H_
#define _WORKER_POOL_H_
//file: worker_pool.h
//#define _CRT_SECURE_NO_WARNINGS
// g++ -g -std=c++11 1.cc -D_GLIBCXX_USE_NANOSLEEP -lpthread */
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <chrono>
template<typename T>
class WorkerPool
{
public:
typedef WorkerPool<T> THIS_TYPE;
typedef std::function<void(T*)> WorkerProc;
typedef std::vector< std::thread* > ThreadVec;
WorkerPool()
{
active_ = false;
}
virtual ~WorkerPool()
{
for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it)
delete *it;
all_thread_.clear();
}
void Start(WorkerProc f,int worker_num=1)
{
active_ = true;
all_thread_.resize(worker_num);
for (int i = 0; i < worker_num;i++ )
{
all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this));
}
proc_ = f;
}
//生產者
void Push(T *t)
{
std::unique_lock < std::mutex > lck(mutex_);
task_.push(t);
cv_.notify_one();
}
void Stop()
{
//等待所有的任務執行完畢
mutex_.lock();
while (!task_.empty())
{
mutex_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
cv_.notify_one();
mutex_.lock();
}
mutex_.unlock();
//關閉連接后,等待線程自動退出
active_ = false;
cv_.notify_all();
for(ThreadVec::iterator it = all_thread_.begin();
it != all_thread_.end();++it)
(*it)->join();
}
private:
//消費者
void consumer()
{
//第一次上鎖
std::unique_lock < std::mutex > lck(mutex_);
while (active_)
{
//如果是活動的,並且任務為空則一直等待
while (active_ && task_.empty())
cv_.wait(lck);
//如果已經停止則退出
if(!active_)
break;
T *quest = task_.front();
task_.pop();
//從任務隊列取出后該解鎖(任務隊列鎖)了
lck.unlock();
//執行任務后釋放
proc_(quest);
//delete quest; //在proc_已經釋放該指針了
//重新上鎖
lck.lock();
}
}
std::mutex mutex_;
std::queue<T*> task_;
std::condition_variable cv_;
bool active_;
std::vector< std::thread* > all_thread_;
WorkerProc proc_;
};
#endif
寫一個類繼承一下,並寫一個工作函數和回調函數處理
#include "worker_pool.h"
#include <iostream>
//為了多耗點cpu,計算斐波那契數列吧
static int fibonacci(int a)
{
//ASSERT(a > 0);
if (a == 1 || a == 2)
return 1;
return fibonacci(a-1) + fibonacci(a-2);
}
//異步計算任務
struct AsyncCalcQuest
{
AsyncCalcQuest():num(0),result(0)
{}
//計算需要用到的變量
int num;
int result;
};
//為了測試方便,引入全局變量用於標識線程池已將所有計算完成
const int TOTAL_COUNT = 1000000;
int now_count = 0;
//繼承一下線程池類,在子類處理計算完成的業務,在我們這里,只是打印一下計算結果
class CalcWorkerPool:public WorkerPool<AsyncCalcQuest>
{
public:
CalcWorkerPool(){}
virtual ~CalcWorkerPool()
{
}
//在工人線程中執行
void DoWork(AsyncCalcQuest *quest)
{
//算了,不算這個了,根本算不出來
quest->result = fibonacci(quest->num);
//quest->result = quest->num*0.618;
//並將已完成任務返回到准備回調的列表
std::unique_lock<std::mutex > lck(mutex_callbacks_);
callbacks_.push_back(quest);
}
//在主線程執行
void DoCallback()
{
//組回調任務上鎖
std::unique_lock<std::mutex > lck(mutex_callbacks_);
while (!callbacks_.empty())
{
auto *quest = callbacks_.back();
{//此處為業務代碼打印一下吧
std::cout << quest->num << " " << quest->result << std::endl;
now_count ++;
}
delete quest; //TODO:這里如果采用內存池就更好了
callbacks_.pop_back();
}
}
private:
//這里是准備給回調的任務列表
std::vector<AsyncCalcQuest*> callbacks_;
std::mutex mutex_callbacks_;
};
int main()
{
CalcWorkerPool workers;
//工廠開工了 8個工人喔
workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8);
//開始產生任務了
for (int i=0; i<TOTAL_COUNT; i++)
{
AsyncCalcQuest *quest = new AsyncCalcQuest;
quest->num = i%40+1;
workers.Push(quest);
}
while (now_count != TOTAL_COUNT)
{
workers.DoCallback();
}
workers.Stop();
return 0;
}
linux完整項目 https://github.com/linbc/worker_pool.git
