類的功能
-
Task (任務基類)
該類主要實現一個任務類
virtual int doWork() = 0; -
TaskQueue (任務隊列)
該類主要針對任務的存儲、刪除、撤回等狀態做管理 -
ThreadPool (線程池)
整個線程池的核心業務處理類
代碼
- Task.h
//任務的基類
#pragma once
#include <time.h>
#include <atomic>
//任務的基類
class Task
{
public:
//構造、析構函數
Task():_id(_nRequestID++),_isCancelRequired(false),_createTime(clock()){}
~Task(){};
// 任務類虛接口,繼承這個類的必須要實現這個接口
virtual int doWork(void) = 0;
// 任務已取消回調
virtual int onCanceled(void)
{
return 1;
}
// 任務已完成
virtual int onCompleted(int)
{
return 1;
}
// 任務是否超時
virtual bool isTimeout(const clock_t& now)
{
return ((now - _createTime) > 5000);
}
// 獲取任務ID
size_t getID(void)
{
return _id;
}
//獲取任務取消狀態
bool isCancelRequired(void)
{
return _isCancelRequired;
}
//設置任務取消狀態
void setCancelRequired(void)
{
_isCancelRequired = true;
}
protected:
size_t _id; //任務的唯一標識
clock_t _createTime; //任務創建時間,非Unix時間戳
private:
static std::atomic<size_t> _nRequestID;
std::atomic<bool> _isCancelRequired; //任務取消狀態
};
//selectany可以讓我們在.h文件中初始化一個全局變量而不是只能放在.cpp中。
//這樣的代碼來初始化這個全局變量。既是該.h被多次include,鏈接器也會為我們剔除多重定義的錯誤。
__declspec(selectany) std::atomic<size_t> Task::_nRequestID = 100000;
- TaskQueue.h
#pragma once
#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <memory>
#include <thread>
//任務隊列
template<typename T>
class TaskQueue
{
public:
//向隊列的末尾插入任務,task是任務類
void put_back(std::shared_ptr<T>& task)
{
std::unique_lock<std::mutex> lock(_mutexQueue);
_queue.push_back(task);
_conditPut.notify_one();
}
//向隊列的頭部插入任務
void put_front(std::shared_ptr<T>& task)
{
std::unique_lock<std::mutex> lock(_mutexQueue);
_queue.push_front(task);
_conditPut.notify_one();
}
//獲取隊首(並將任務加到運行任務列表中),返回tase是任務類
std::shared_ptr<T> get(void) {
std::unique_lock<std::mutex> lock(_mutexQueue);
if (_queue.empty())
return nullptr;
//lock_guard取代了mutex的lock()和unlock();
std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
std::shared_ptr<T>& task = _queue.front();
_mapDoingTask.insert(std::make_pair(task->getID(), task));
_queue.pop_front();
return task;
}
//獲取雙向鏈表queue的大小
size_t size(void)
{
std::unique_lock<std::mutex> lock(_mutexQueue);
return _queue.size();
}
//釋放隊列
void release(void)
{
deleteAllTasks();
_conditPut.notify_all();
}
//刪除任務(從就緒隊列刪除,如果就緒隊列沒有,則看執行隊列有沒有,有的話置下取消狀態位)
int deleteTask(size_t nID)
{
std::unique_lock<std::mutex> lock(_mutexQueue, std::defer_lock);
lock.lock();
auto it = _queue.begin();
for (; it != _queue.end(); ++it)
{
if ((*it)->getID() == nID)
{
_queue.erase(it);
lock.unlock();
return 0;
}
}
//下面的邏輯可能會造成死鎖,這里要及時釋放
lock.unlock();
// 試圖取消正在執行的任務
{
std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
auto it_map = _mapDoingTask.find(nID);
if (it_map != _mapDoingTask.end())
it_map->second->setCancelRequired();
}
//任務執行完后再返回
while (_mapDoingTask.count(nID))
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return 0;
}
//刪除所有任務
int deleteAllTasks(void)
{
std::unique_lock<std::mutex> lock(_mutexQueue, std::defer_lock);
lock.lock();
if (!_queue.empty())
_queue.clear();//清空
{
std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
if (!_mapDoingTask.empty())
{
auto it_map = _mapDoingTask.begin();
for (; it_map != _mapDoingTask.end(); ++it_map)
it_map->second->setCancelRequired();
}
}
lock.unlock();
//任務執行完后再返回
while (!_mapDoingTask.empty())
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return 0;
}
//任務完成回調(從運行列表中刪除指定任務)
int onTaskFinished(size_t nID)
{
std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
auto it_map = _mapDoingTask.find(nID);
if (it_map != _mapDoingTask.end())
_mapDoingTask.erase(it_map);
return 0;
}
//判斷任務是否執行完畢
std::shared_ptr<T> isTaskProcessed(size_t nId)
{
std::lock_guard<std::mutex> lock_queue(_mutexQueue);
auto it = _queue.begin();
for (; it != _queue.end(); ++it) {
if ((*it)->getID() == nId)
return *it;
}
std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
auto it_map = _mapDoingTask.find(nId);
if (it_map != _mapDoingTask.end())
return it_map->second;
return nullptr;
}
//等待有任務到達(帶超時:超時自動喚醒)
bool wait(std::chrono::milliseconds millsec)
{
std::unique_lock<std::mutex> lock(_mutexConditPut);
_conditPut.wait_for(lock, millsec);
return true;
}
private:
//就緒的任務
std::mutex _mutexQueue;
std::deque<std::shared_ptr<T>> _queue;
//條件變量
std::mutex _mutexConditPut;
std::condition_variable _conditPut;
//運行的任務
std::mutex _mutexDoingTask;
std::unordered_map<size_t, std::shared_ptr<T> > _mapDoingTask;
};
- ThreadPool.h
#pragma once
#include <atomic>
#include <memory>
#include <mutex>
#include <iostream>
#include <thread>
#include "Task.h"
#include "TaskQueue.h"
class ThreadPool
{
public:
// 線程池配置參數
typedef struct tagThreadPoolConfig {
int nMaxThreadsNum; // 最大線程數量
int nMinThreadsNum; // 最小線程數量
double dbTaskAddThreadRate; // 增 最大線程任務比 (任務數量與線程數量,什么比例的時候才加)
double dbTaskSubThreadRate; // 減 最小線程任務比 (任務數量與線程數量,什么比例的時候才減)
} ThreadPoolConfig;
public:
//構造函數
ThreadPool(void):_taskQueue(new TaskQueue<Task>()), _atcCurTotalThrNum(0), _atcWorking(true){}
//析構函數
~ThreadPool(void)
{
release();
}
//初始化資源
int init(const ThreadPoolConfig& threadPoolConfig) {
// 錯誤的設置
if (threadPoolConfig.dbTaskAddThreadRate < threadPoolConfig.dbTaskSubThreadRate)
return 87;
_threadPoolConfig.nMaxThreadsNum = threadPoolConfig.nMaxThreadsNum;
_threadPoolConfig.nMinThreadsNum = threadPoolConfig.nMinThreadsNum;
_threadPoolConfig.dbTaskAddThreadRate = threadPoolConfig.dbTaskAddThreadRate;
_threadPoolConfig.dbTaskSubThreadRate = threadPoolConfig.dbTaskSubThreadRate;
int ret = 0;
// 創建線程池
if (_threadPoolConfig.nMinThreadsNum > 0)
ret = addProThreads(_threadPoolConfig.nMinThreadsNum);
return ret;
}
// 添加任務
int addTask(std::shared_ptr<Task> taskptr, bool priority=false)
{
const double& rate = getThreadTaskRate();
int ret = 0;
if (priority)
{
if (rate > 1000)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
_taskQueue->put_front(taskptr);
}
else
{
// 檢測任務數量
if (rate > 100) {
taskptr->onCanceled();
return 298;
}
// 將任務推入隊列
_taskQueue->put_back(taskptr);
}
// 檢查是否要擴展線程
if (_atcCurTotalThrNum < _threadPoolConfig.nMaxThreadsNum
&& rate > _threadPoolConfig.dbTaskAddThreadRate)
ret = addProThreads(1);
return ret;
}
// 刪除任務(從就緒隊列刪除,如果就緒隊列沒有,則看執行隊列有沒有,有的話置下取消狀態位)
int deleteTask(size_t nID)
{
return _taskQueue->deleteTask(nID);
}
// 刪除所有任務
int deleteAllTasks(void)
{
return _taskQueue->deleteAllTasks();
}
std::shared_ptr<Task> isTaskProcessed(size_t nId)
{
return _taskQueue->isTaskProcessed(nId);
}
// 釋放資源(釋放線程池、釋放任務隊列)
bool release(void)
{
// 1、停止線程池。
// 2、清楚就緒隊列。
// 3、等待執行隊列為0
releaseThreadPool();
_taskQueue->release();
int i = 0;
while (_atcCurTotalThrNum != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 異常等待
if (i++ == 10)
exit(23);
}
_atcCurTotalThrNum = 0;
return true;
}
// 獲取當前線程任務比
double getThreadTaskRate(void)
{
if (_atcCurTotalThrNum != 0)
return _taskQueue->size() * 1.0 / _atcCurTotalThrNum;
return 0;
}
// 當前線程是否需要結束
bool shouldEnd(void)
{
bool bFlag = false;
double dbThreadTaskRate = getThreadTaskRate();
// 檢查線程與任務比率
if (!_atcWorking || _atcCurTotalThrNum > _threadPoolConfig.nMinThreadsNum
&& dbThreadTaskRate < _threadPoolConfig.dbTaskSubThreadRate)
bFlag = true;
return bFlag;
}
// 釋放線程池
bool releaseThreadPool(void)
{
_threadPoolConfig.nMinThreadsNum = 0;
_threadPoolConfig.dbTaskSubThreadRate = 0;
_atcWorking = false;
return true;
}
// 添加指定數量的處理線程
int addProThreads(int nThreadsNum)
{
try {
for (; nThreadsNum > 0; --nThreadsNum)
std::thread(&ThreadPool::taskProcessThread, this).detach();
}
catch (...){
return 155;
}
return 0;
}
// 任務處理線程函數
void taskProcessThread(void)
{
int nTaskProcRet = 0;
// 線程增加
_atcCurTotalThrNum.fetch_add(1);
std::chrono::milliseconds mills_sleep(500);
std::shared_ptr<Task> pTask;
while (_atcWorking)
{
// 從任務隊列中獲取任務
pTask = _taskQueue->get(); //get會將任務添加到運行任務的map中去
if (pTask == nullptr)
{
if (shouldEnd())
break;
// 進入睡眠池
_taskQueue->wait(mills_sleep);
continue;
}
// 檢測任務取消狀態
if (pTask->isCancelRequired())
pTask->onCanceled();
else
// 處理任務
pTask->onCompleted(pTask->doWork());
// 從運行任務隊列中移除任務
_taskQueue->onTaskFinished(pTask->getID());
// 判斷線程是否需要結束
if (shouldEnd())
break;
}
// 線程個數減一
_atcCurTotalThrNum.fetch_sub(1);
}
private:
std::shared_ptr<TaskQueue<Task> > _taskQueue; //任務隊列
ThreadPoolConfig _threadPoolConfig; //線程池配置
std::atomic<bool> _atcWorking; //線程池是否被要求結束
std::atomic<int> _atcCurTotalThrNum; //當前線程個數
};
- FunTask.h
#pragma once
#include <functional>
#include "Task.h"
class FuncTask:public Task
{
public:
FuncTask(std::function<int(void)> f) : _pf(f) {}
FuncTask(void) : _pf(nullptr){}
virtual ~FuncTask(){}
template <typename F,typename... Args>
void asynBind(F(*f)(Args...), Args... args)
{
_pf = std::bind(f, args...);
}
virtual int doWork()
{
if (_pf == nullptr)
return 86;
return _pf();
}
private:
typedef std::function<int(void)> pvFunc;
pvFunc _pf;
};
- main.cpp
#pragma once
#include <time.h>
#include <iostream>
#include <memory>
#include <string>
#include "ThreadPool.h"
#include "FuncTask.h"
using namespace std;
int vFunction(void)
{
std::cout << __FUNCTION__ << std::endl;
return 0;
}
int counter(int a,int b)
{
std::cout << a << ":" << b << std::endl;
return 0;
}
int main()
{
ThreadPool::ThreadPoolConfig threadPoolConfig;
threadPoolConfig.nMaxThreadsNum = 100;
threadPoolConfig.nMinThreadsNum = 5;
threadPoolConfig.dbTaskAddThreadRate = 3;
threadPoolConfig.dbTaskSubThreadRate = 0.5;
clock_t start = clock();
{
std::shared_ptr<ThreadPool> threadPool(new ThreadPool);
threadPool->init(threadPoolConfig);
int i = 1;
while (true)
{
/*std::shared_ptr<FuncTask> request(new FuncTask(vFunction));
threadPool->addTask(request);*/
std::shared_ptr<FuncTask> request(new FuncTask);
request->asynBind(counter, i++, 1);
threadPool->addTask(request);
if (request->getID() == 110000) {
break;
}
}
threadPool->release();
}
clock_t finish = clock();
std::cout << "duration:" << finish - start << "ms" << std::endl;
cout << "main:thread" << endl;
return 0;
}