概述:什么是線程池?
因為程序邊運行邊創建線程是比較耗時的,所以我們通過池化的思想:在程序開始運行前創建多個線程,這樣,程序在運行時,只需要從線程池中拿來用就可以了.大大提高了程序運行效率.
如何實現:
一般線程池都會有以下幾個部分構成:
1. 線程池管理器(ThreadPoolManager):用於創建並管理線程池
2. 工作線程(WorkThread): 線程池中線程
3. 任務隊列:用於存放沒有處理的任務。提供一種緩沖機制。
4. 用於添加任務的接口
總的來講,就是先創建幾個線程,然后這些線程等待任務隊列,不為空拿出任務執行即可(任務可以是對象,也可以是某個函數).
第一種實現:
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <condition_variable>
#include <memory> //unique_ptr
const int MAX_THREADS = 1000; //最大線程數目
template <typename T>
class threadPool
{
public:
/*默認開一個線程*/
threadPool(int number = 1);
~threadPool();
/*往請求隊列<task_queue>中添加任務<T *>*/
bool append(T *request);
private:
/*工作線程需要運行的函數,不斷的從任務隊列中取出並執行*/
static void *worker(void *arg);
void run();
private:
std::vector<std::thread> work_threads; /*工作線程*/
std::queue<T *> tasks_queue; /*任務隊列*/
std::mutex queue_mutex;
std::condition_variable condition; /*必須與unique_lock配合使用*/
bool stop;
};
template <typename T>
threadPool<T>::threadPool(int number) : stop(false)
{
if (number <= 0 || number > MAX_THREADS)
throw std::exception();
for (int i = 0; i < number; i++)
{
std::cout << "創建第" << i << "個線程 " << std::endl;
/* std::thread temp(worker, this); 不能先構造再插入 */
work_threads.emplace_back(worker, this);
}
}
template <typename T>
inline threadPool<T>::~threadPool()
{
/*世上最大 bug 就是因為我寫的這個.shit */
//work_threads.clear();
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto &ww : work_threads)
ww.join();
}
template <typename T>
bool threadPool<T>::append(T *request)
{
/*操作工作隊列時一定要加鎖,因為他被所有線程共享*/
queue_mutex.lock();
tasks_queue.push(request);
queue_mutex.unlock();
condition.notify_one(); //線程池添加進去了任務,自然要通知等待的線程
return true;
}
template <typename T>
void *threadPool<T>::worker(void *arg)
{
threadPool *pool = (threadPool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadPool<T>::run()
{
while (!stop)
{
std::unique_lock<std::mutex> lk(this->queue_mutex);
/* unique_lock() 出作用域會自動解鎖 */
this->condition.wait(lk, [this] { return !this->tasks_queue.empty(); });
//如果任務隊列不為空,就停下來等待喚醒
if (this->tasks_queue.empty())
{
continue;
}
else
{
T *request = tasks_queue.front();
tasks_queue.pop();
if (request)
request->process();
}
}
}
#endif
測試代碼:
#include "threadPool.h"
#include<string>
using namespace std;
class Task
{
public:
void process()
{
cout << "run........." << endl;
}
};
int main(void)
{
threadPool<Task> pool(6);
std::string str;
while (1)
{
Task *tt = new Task();
//使用智能指針
pool.append(tt);
delete tt;
}
}
我測試的時候是沒有一點毛病的哦......
第二種實現(適用於需傳參,需要將非靜態線程函數寫在類中的情況):
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <condition_variable>
#include <memory> //unique_ptr
#include <functional>
const int MAX_THREADS = 1000; //最大線程數目
typedef std::function<void(void)> Task;
class threadPool
{
public:
/*默認開一個線程*/
threadPool(int number = 1);
~threadPool();
/*往請求隊列<task_queue>中添加任務<T *>*/
bool append(Task task);
private:
/*工作線程需要運行的函數,不斷的從任務隊列中取出並執行*/
static void *worker(void *arg);
void run();
private:
std::vector<std::thread> work_threads; /*工作線程*/
std::queue<Task> tasks_queue; /*任務隊列*/
std::mutex queue_mutex;
std::condition_variable condition; /*必須與unique_lock配合使用*/
bool stop;
};
threadPool::threadPool(int number) : stop(false)
{
if (number <= 0 || number > MAX_THREADS)
throw std::exception();
for (int i = 0; i < number; i++)
{
std::cout << "創建第" << i << "個線程 " << std::endl;
work_threads.emplace_back(threadPool::worker, this);
}
}
inline threadPool::~threadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto &ww : work_threads)
ww.join();
}
bool threadPool::append(Task task)
{
/*操作工作隊列時一定要加鎖,因為他被所有線程共享*/
queue_mutex.lock();
tasks_queue.push(task);
queue_mutex.unlock();
condition.notify_one(); //線程池添加進去了任務,自然要通知等待的線程
return true;
}
void *threadPool::worker(void *arg)
{
threadPool *pool = (threadPool *)arg;
pool->run();
return pool;
}
void threadPool::run()
{
while (!stop)
{
std::unique_lock<std::mutex> lk(this->queue_mutex);
/* unique_lock() 出作用域會自動解鎖 */
this->condition.wait(lk, [this] { return !this->tasks_queue.empty(); });
//如果任務隊列不為空,就停下來等待喚醒
if (this->tasks_queue.empty())
{
continue;
}
else
{
Task task = tasks_queue.front();
tasks_queue.pop();
task();
}
}
}
#endif
測試:
#include "threadPool_2.h"
using namespace std;
class Test
{
public:
void process_no_static_bind(const int i, const int j) /*推薦使用*/
{
cout << "bind: i==" << i << " "
<< "j==" << j << endl;
}
};
int main(void)
{
threadPool pool(6);
Test tt_bind;
while (true)
{
pool.append(std::bind(&Test::process_no_static_bind, &tt_bind, 3, 4));
}
}