使用C++11實現線程池的兩種方法


概述:什么是線程池?

   因為程序邊運行邊創建線程是比較耗時的,所以我們通過池化的思想:在程序開始運行前創建多個線程,這樣,程序在運行時,只需要從線程池中拿來用就可以了.大大提高了程序運行效率.

如何實現:

   一般線程池都會有以下幾個部分構成:

1. 線程池管理器(ThreadPoolManager):用於創建並管理線程池
2. 工作線程(WorkThread): 線程池中線程
3. 任務隊列:用於存放沒有處理的任務。提供一種緩沖機制。
4. 用於添加任務的接口 

總的來講,就是先創建幾個線程,然后這些線程等待任務隊列,不為空拿出任務執行即可(任務可以是對象,也可以是某個函數).

第一種實現:

#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <condition_variable>
#include <memory> //unique_ptr

const int MAX_THREADS = 1000; //最大線程數目

template <typename T>
class threadPool
{
  public:
	/*默認開一個線程*/
	threadPool(int number = 1);
	~threadPool();
	/*往請求隊列<task_queue>中添加任務<T *>*/
	bool append(T *request);

  private:
	/*工作線程需要運行的函數,不斷的從任務隊列中取出並執行*/
	static void *worker(void *arg);
	void run();

  private:
	std::vector<std::thread> work_threads; /*工作線程*/
	std::queue<T *> tasks_queue;		   /*任務隊列*/
	std::mutex queue_mutex;
	std::condition_variable condition;  /*必須與unique_lock配合使用*/
	bool stop;
};

template <typename T>
threadPool<T>::threadPool(int number) : stop(false)
{
	if (number <= 0 || number > MAX_THREADS)
		throw std::exception();
	for (int i = 0; i < number; i++)
	{
		std::cout << "創建第" << i << "個線程 " << std::endl;
		/* std::thread temp(worker, this); 不能先構造再插入 */
		work_threads.emplace_back(worker, this);
	}
}
template <typename T>
inline threadPool<T>::~threadPool()
{
	/*世上最大 bug 就是因為我寫的這個.shit */
	//work_threads.clear();
	{
		std::unique_lock<std::mutex> lock(queue_mutex);
		stop = true;
	}
	condition.notify_all();
	for (auto &ww : work_threads)
		ww.join();
}
template <typename T>
bool threadPool<T>::append(T *request)
{
	/*操作工作隊列時一定要加鎖,因為他被所有線程共享*/
	queue_mutex.lock();
	tasks_queue.push(request);
	queue_mutex.unlock();
	condition.notify_one(); //線程池添加進去了任務,自然要通知等待的線程
	return true;
}
template <typename T>
void *threadPool<T>::worker(void *arg)
{
	threadPool *pool = (threadPool *)arg;
	pool->run();
	return pool;
}
template <typename T>
void threadPool<T>::run()
{
	while (!stop)
	{
		std::unique_lock<std::mutex> lk(this->queue_mutex);
		/* unique_lock() 出作用域會自動解鎖 */
		this->condition.wait(lk, [this] { return !this->tasks_queue.empty(); });
		//如果任務隊列不為空,就停下來等待喚醒
		if (this->tasks_queue.empty())
		{
			continue;
		}
		else
		{
			T *request = tasks_queue.front();
			tasks_queue.pop();
			if (request)
				request->process();
		}
	}
}
#endif

測試代碼:

#include "threadPool.h"
#include<string>
using namespace std;
class Task
{
	public:
	void process()
	{
		cout << "run........." << endl;
	}
};
int main(void)
{
	threadPool<Task> pool(6);
    std::string str;
	while (1)
	{
			Task *tt = new Task();
			//使用智能指針
			pool.append(tt);
            delete tt;
    }
}

我測試的時候是沒有一點毛病的哦......

第二種實現(適用於需傳參,需要將非靜態線程函數寫在類中的情況):

#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <condition_variable>
#include <memory> //unique_ptr
#include <functional>

const int MAX_THREADS = 1000; //最大線程數目

typedef std::function<void(void)> Task;

class threadPool
{
  public:
	/*默認開一個線程*/
	threadPool(int number = 1);
	~threadPool();
	/*往請求隊列<task_queue>中添加任務<T *>*/
	bool append(Task task);

  private:
	/*工作線程需要運行的函數,不斷的從任務隊列中取出並執行*/
	static void *worker(void *arg);
	void run();

  private:
	std::vector<std::thread> work_threads; /*工作線程*/
	std::queue<Task> tasks_queue;		   /*任務隊列*/

	std::mutex queue_mutex;
	std::condition_variable condition; /*必須與unique_lock配合使用*/
	bool stop;
};

threadPool::threadPool(int number) : stop(false)
{
	if (number <= 0 || number > MAX_THREADS)
		throw std::exception();
	for (int i = 0; i < number; i++)
	{
		std::cout << "創建第" << i << "個線程 " << std::endl;
		work_threads.emplace_back(threadPool::worker, this);
	}
}

inline threadPool::~threadPool()
{
	{
		std::unique_lock<std::mutex> lock(queue_mutex);
		stop = true;
	}
	condition.notify_all();
	for (auto &ww : work_threads)
		ww.join();
}

bool threadPool::append(Task task)
{
	/*操作工作隊列時一定要加鎖,因為他被所有線程共享*/
	queue_mutex.lock();
	tasks_queue.push(task);
	queue_mutex.unlock();
	condition.notify_one(); //線程池添加進去了任務,自然要通知等待的線程
	return true;
}
void *threadPool::worker(void *arg)
{
	threadPool *pool = (threadPool *)arg;
	pool->run();
	return pool;
}
void threadPool::run()
{
	while (!stop)
	{
		std::unique_lock<std::mutex> lk(this->queue_mutex);
		/* unique_lock() 出作用域會自動解鎖 */
		this->condition.wait(lk, [this] { return !this->tasks_queue.empty(); });
		//如果任務隊列不為空,就停下來等待喚醒
		if (this->tasks_queue.empty())
		{
			continue;
		}
		else
		{
			Task task = tasks_queue.front();
			tasks_queue.pop();
			task();
		}
	}
}
#endif

測試:

#include "threadPool_2.h"
using namespace std;
class Test
{
  public:
	void process_no_static_bind(const int i, const int j) /*推薦使用*/
	{
		cout << "bind: i==" << i << " "
			 << "j==" << j << endl;
	}
};
int main(void)
{
	threadPool pool(6);
	Test tt_bind;
	while (true)
	{
		pool.append(std::bind(&Test::process_no_static_bind, &tt_bind, 3, 4));
	}
}

參考:github上star與fork過千的線程池代碼,源地址如下.

https://github.com/progschj/ThreadPool


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM