使用c++11寫個最簡跨平台線程池


為什么需要多線程?

最簡單的多線程長啥樣?

為什么需要線程池,有什么問題?

實現的主要原理是什么?

 

帶着這幾個問題,我們依次展開。

1.為什么需要多線程?

    大部分程序畢竟都不是計算密集型的,簡單的說,正常情況下,以單線程的模式來寫對程序員而言是最舒心的。因為所有的代碼都是順序執行,非常容易理解!函數一級一級往下調用,代碼一行一行執行。但是,代碼的世界里,雖然cpu還好,但是卻經常需要用到io資源,或者是其他服務器的網絡資源,比如像數據庫,如果這個時候因此把進程卡住,不管是客戶端還是客戶端都對用戶體驗相當糟糕。當然了,計算密集型的運算就更需要多線程,防止主線程被卡住。

2.最簡單的多線程長啥樣?

    舉個最簡單的例子,服務器采用阻塞式socket,有一個網絡線程負責收發包(IO),然后有一個邏輯主線程負責相應的業務操作,主線程和網絡線程之間通過最簡單的消息隊列進行交換,而這個消息隊例明顯是兩個線程都要訪問(輪詢消息隊列是否為空)到的,所以,我們需要給這個消息隊列上鎖(std::mutex),即可以解決問題。由於比較簡單我們就不需要看這個怎么碼了。這種模式雖然簡單,但是在合適的崗位上,也是極好的!

3.那為什么需要線程池呢,有什么問題?

   還以剛才的服務器舉例,如果業務線程邏輯比較復雜,又或者他需要訪問數據庫或者是其他服務器的資源,讀取文件等等呢?當然他可以采用異步的數據庫接口,但是采用異步意味着業務代碼被碎片化。異步是典型的討厭他,但是又干不掉他的樣子。離題了。回歸。這個時候我們需要多個業務線程處理了。多個線程就意味着多一份處理能力!回到上個問題,我們的多線程采用輪詢消息隊列的方式來交換信息,那么這么多個線程,不斷的上鎖解鎖,光這個成本就夠了。這個時候,條件變量就上線了(std::condition_variable)就登場了

4.實現的主要原理是什么?

    業務線程不要輪詢消息隊列了,而所有的業務線程處於等待狀態,當有消息再來的時候,再由產生消息的人,在我們示例場景就是網絡線程了,隨便喚醒一個工人線程即可。看看最關鍵的代碼

      //消費者
	void consumer()
	{
		//第一次上鎖
		std::unique_lock < std::mutex > lck(mutex_);
		while (active_)
		{
			//如果是活動的,並且任務為空則一直等待
			while (active_ && task_.empty())
				cv_.wait(lck);

			//如果已經停止則退出
			if(!active_)
				break;

			T *quest = task_.front();
			task_.pop();

			//從任務隊列取出后該解鎖(任務隊列鎖)了
			lck.unlock();

			//執行任務后釋放
			proc_(quest);

			//delete quest;   //在proc_已經釋放該指針了

			//重新上鎖
			lck.lock();
		}
	} 

  

算了,還是直接貼完整代碼,看注釋吧

#ifndef _WORKER_POOL_H_
#define _WORKER_POOL_H_

//file: worker_pool.h

//#define  _CRT_SECURE_NO_WARNINGS
// g++ -g -std=c++11 1.cc -D_GLIBCXX_USE_NANOSLEEP -lpthread */

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <chrono>

template<typename T>
class WorkerPool
{
public:
	typedef WorkerPool<T> THIS_TYPE;
	typedef std::function<void(T*)> WorkerProc;
	typedef std::vector< std::thread* > ThreadVec;

	WorkerPool()
	{		
		active_ = false;
	}
	virtual ~WorkerPool()
	{
		for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it)
			delete *it;
		all_thread_.clear();
	}
	void Start(WorkerProc f,int worker_num=1)
	{
		active_ = true;		
		all_thread_.resize(worker_num);
		for (int i = 0; i < worker_num;i++ )
		{
			all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this));
		}
		proc_ = f;
	}
	//生產者
	void Push(T *t)
	{
		std::unique_lock < std::mutex > lck(mutex_);
		task_.push(t);
		cv_.notify_one();
	}

	void Stop()
	{
		//等待所有的任務執行完畢
		mutex_.lock();
		while (!task_.empty())
		{	
			mutex_.unlock();
			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
			cv_.notify_one();
			mutex_.lock();
		}
		mutex_.unlock();

		//關閉連接后,等待線程自動退出
		active_ = false;
		cv_.notify_all();
		for(ThreadVec::iterator it = all_thread_.begin();
			it != all_thread_.end();++it)
			(*it)->join();
	}
private:
	//消費者
	void consumer()
	{
		//第一次上鎖
		std::unique_lock < std::mutex > lck(mutex_);
		while (active_)
		{
			//如果是活動的,並且任務為空則一直等待
			while (active_ && task_.empty())
				cv_.wait(lck);

			//如果已經停止則退出
			if(!active_)
				break;

			T *quest = task_.front();
			task_.pop();

			//從任務隊列取出后該解鎖(任務隊列鎖)了
			lck.unlock();

			//執行任務后釋放
			proc_(quest);

			//delete quest;   //在proc_已經釋放該指針了

			//重新上鎖
			lck.lock();
		}
	}

	std::mutex mutex_;
	std::queue<T*> task_;
	std::condition_variable cv_;
	bool active_;
	std::vector< std::thread* > all_thread_;
	WorkerProc proc_;
};

#endif

  寫一個類繼承一下,並寫一個工作函數和回調函數處理

#include "worker_pool.h"
#include <iostream>

//為了多耗點cpu,計算斐波那契數列吧
static int fibonacci(int a)
{
	//ASSERT(a > 0);
	if (a == 1 || a == 2)
		return 1;
	return fibonacci(a-1) + fibonacci(a-2);
}

//異步計算任務
struct AsyncCalcQuest
{
	AsyncCalcQuest():num(0),result(0)
	{}
	//計算需要用到的變量
	int num;
	int result; 
};

//為了測試方便,引入全局變量用於標識線程池已將所有計算完成
const int TOTAL_COUNT = 1000000;
int now_count = 0;

//繼承一下線程池類,在子類處理計算完成的業務,在我們這里,只是打印一下計算結果
class CalcWorkerPool:public WorkerPool<AsyncCalcQuest>
{
public:
	CalcWorkerPool(){}

	virtual ~CalcWorkerPool()
	{
	}

	//在工人線程中執行
	void DoWork(AsyncCalcQuest *quest)
	{
		//算了,不算這個了,根本算不出來
		quest->result = fibonacci(quest->num);		
		//quest->result = quest->num*0.618;

		//並將已完成任務返回到准備回調的列表
		std::unique_lock<std::mutex > lck(mutex_callbacks_);
		callbacks_.push_back(quest);
	}

	//在主線程執行
	void DoCallback()
	{
		//組回調任務上鎖
		std::unique_lock<std::mutex > lck(mutex_callbacks_);
		while (!callbacks_.empty())
		{
			auto *quest = callbacks_.back();			
			{//此處為業務代碼打印一下吧
				std::cout << quest->num << " " << quest->result << std::endl;
				now_count ++;
			}
			delete quest;		//TODO:這里如果采用內存池就更好了
			callbacks_.pop_back();
		}
	}

private:
	//這里是准備給回調的任務列表
	std::vector<AsyncCalcQuest*> callbacks_;
	std::mutex mutex_callbacks_;
};

int main()
{
	CalcWorkerPool workers;

	//工廠開工了 8個工人喔
	workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8);	
	
	//開始產生任務了
	for (int i=0; i<TOTAL_COUNT; i++)
	{
		AsyncCalcQuest *quest = new AsyncCalcQuest;
		quest->num = i%40+1;
		workers.Push(quest);
	}

	while (now_count != TOTAL_COUNT)
	{
		workers.DoCallback();
	}

	workers.Stop();

    return 0;
}

  linux完整項目 https://github.com/linbc/worker_pool.git


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM