使用 C++11 編寫可復用多線程任務池


類的功能

  • Task (任務基類)
    該類主要實現一個任務類
    virtual int doWork() = 0;

  • TaskQueue (任務隊列)
    該類主要針對任務的存儲、刪除、撤回等狀態做管理

  • ThreadPool (線程池)
    整個線程池的核心業務處理類

代碼

  • Task.h
//任務的基類
#pragma once

#include <time.h>
#include <atomic>

//任務的基類
class Task
{
public:
	//構造、析構函數
	Task():_id(_nRequestID++),_isCancelRequired(false),_createTime(clock()){}
	~Task(){};

	// 任務類虛接口,繼承這個類的必須要實現這個接口
	virtual int doWork(void) = 0;
	
	// 任務已取消回調
	virtual int onCanceled(void)
	{
		return  1;
	}
	// 任務已完成
	virtual int onCompleted(int)
	{
		return 1;
	}
	// 任務是否超時
	virtual bool isTimeout(const clock_t& now)
	{
		return ((now - _createTime) > 5000);
	}
	// 獲取任務ID
	size_t getID(void)
	{
		return _id;
	}
	//獲取任務取消狀態
	bool isCancelRequired(void)
	{
		return _isCancelRequired;
	}
	//設置任務取消狀態
	void setCancelRequired(void)
	{
		_isCancelRequired = true;
	}

	
	
protected:
	size_t _id;	//任務的唯一標識
	clock_t _createTime;	//任務創建時間,非Unix時間戳
	
private:
	static std::atomic<size_t> _nRequestID;
	std::atomic<bool> _isCancelRequired;	//任務取消狀態
};

//selectany可以讓我們在.h文件中初始化一個全局變量而不是只能放在.cpp中。
//這樣的代碼來初始化這個全局變量。既是該.h被多次include,鏈接器也會為我們剔除多重定義的錯誤。
__declspec(selectany) std::atomic<size_t> Task::_nRequestID = 100000;

  • TaskQueue.h
#pragma once

#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <memory>
#include <thread>

//任務隊列
template<typename T>
class TaskQueue
{
public:
	//向隊列的末尾插入任務,task是任務類
	void put_back(std::shared_ptr<T>& task)
	{
		std::unique_lock<std::mutex> lock(_mutexQueue);
		_queue.push_back(task);
		_conditPut.notify_one();
	}
	
	//向隊列的頭部插入任務
	void put_front(std::shared_ptr<T>& task)
	{
		std::unique_lock<std::mutex> lock(_mutexQueue);
		_queue.push_front(task);
		_conditPut.notify_one();
	}
	
	//獲取隊首(並將任務加到運行任務列表中),返回tase是任務類
	std::shared_ptr<T> get(void) {
		std::unique_lock<std::mutex> lock(_mutexQueue);
		if (_queue.empty())
			return nullptr;
		//lock_guard取代了mutex的lock()和unlock();
		std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
		
		std::shared_ptr<T>& task = _queue.front();
		
		_mapDoingTask.insert(std::make_pair(task->getID(), task));
		
		_queue.pop_front();
		return task;
	}
	
	//獲取雙向鏈表queue的大小
	size_t size(void)
	{
		std::unique_lock<std::mutex> lock(_mutexQueue);
		return _queue.size();
	}
	
	//釋放隊列
	void release(void)
	{
		deleteAllTasks();
		_conditPut.notify_all();
	}

	//刪除任務(從就緒隊列刪除,如果就緒隊列沒有,則看執行隊列有沒有,有的話置下取消狀態位)
	int deleteTask(size_t nID)
	{
		std::unique_lock<std::mutex> lock(_mutexQueue, std::defer_lock);
		lock.lock();

		auto it = _queue.begin();
		for (; it != _queue.end(); ++it) 
		{
			if ((*it)->getID() == nID) 
			{
				_queue.erase(it);
				lock.unlock();
				return 0;
			}
		}
		//下面的邏輯可能會造成死鎖,這里要及時釋放
		lock.unlock();

		// 試圖取消正在執行的任務
		{
			std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);

			auto it_map = _mapDoingTask.find(nID);
			if (it_map != _mapDoingTask.end())
				it_map->second->setCancelRequired();
		}
		
		//任務執行完后再返回
		while (_mapDoingTask.count(nID))
			std::this_thread::sleep_for(std::chrono::milliseconds(20));

		return 0;
	}


	
	//刪除所有任務
	int deleteAllTasks(void)
	{
		std::unique_lock<std::mutex> lock(_mutexQueue, std::defer_lock);
		lock.lock();

		if (!_queue.empty())
			_queue.clear();//清空
		
		{
			std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
			if (!_mapDoingTask.empty()) 
			{
				auto it_map = _mapDoingTask.begin();
				for (; it_map != _mapDoingTask.end(); ++it_map)
					it_map->second->setCancelRequired();
			}
		}

		lock.unlock();

		//任務執行完后再返回
		while (!_mapDoingTask.empty())
			std::this_thread::sleep_for(std::chrono::milliseconds(50));
		return 0;
	}

	//任務完成回調(從運行列表中刪除指定任務)
	int onTaskFinished(size_t nID)
	{
		std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
		auto it_map = _mapDoingTask.find(nID);
		if (it_map != _mapDoingTask.end())
			_mapDoingTask.erase(it_map);
		return 0;
	}

	//判斷任務是否執行完畢
	std::shared_ptr<T> isTaskProcessed(size_t nId)
	{
		std::lock_guard<std::mutex> lock_queue(_mutexQueue);
		auto it = _queue.begin();
		for (; it != _queue.end(); ++it) {

			if ((*it)->getID() == nId)
				return *it;

		}
		std::lock_guard<std::mutex> lock_doing_task(_mutexDoingTask);
		
		auto it_map = _mapDoingTask.find(nId);
		if (it_map != _mapDoingTask.end())
			return it_map->second;
		return nullptr;
	}

	//等待有任務到達(帶超時:超時自動喚醒)
	bool wait(std::chrono::milliseconds millsec)
	{
		std::unique_lock<std::mutex> lock(_mutexConditPut);
		_conditPut.wait_for(lock, millsec);
		return true;
	}

private:
	//就緒的任務
	std::mutex _mutexQueue;
	std::deque<std::shared_ptr<T>> _queue;
	//條件變量
	std::mutex _mutexConditPut;
	std::condition_variable _conditPut;

	//運行的任務
	std::mutex _mutexDoingTask;
	std::unordered_map<size_t, std::shared_ptr<T> > _mapDoingTask;
	
};
  • ThreadPool.h
#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <iostream>
#include <thread>

#include "Task.h"
#include "TaskQueue.h"

class ThreadPool
{
public:
    // 線程池配置參數
    typedef struct tagThreadPoolConfig {
        int nMaxThreadsNum; // 最大線程數量
        int nMinThreadsNum; // 最小線程數量
        double dbTaskAddThreadRate; // 增 最大線程任務比 (任務數量與線程數量,什么比例的時候才加)
        double dbTaskSubThreadRate; // 減 最小線程任務比 (任務數量與線程數量,什么比例的時候才減)
    } ThreadPoolConfig;

public:
	//構造函數
    ThreadPool(void):_taskQueue(new TaskQueue<Task>()), _atcCurTotalThrNum(0), _atcWorking(true){}

	//析構函數
	~ThreadPool(void)
    {
        release();
    }

    //初始化資源
    int init(const ThreadPoolConfig& threadPoolConfig) {
        // 錯誤的設置
        if (threadPoolConfig.dbTaskAddThreadRate < threadPoolConfig.dbTaskSubThreadRate)
            return 87;


        _threadPoolConfig.nMaxThreadsNum = threadPoolConfig.nMaxThreadsNum;
        _threadPoolConfig.nMinThreadsNum = threadPoolConfig.nMinThreadsNum;
        _threadPoolConfig.dbTaskAddThreadRate = threadPoolConfig.dbTaskAddThreadRate;
        _threadPoolConfig.dbTaskSubThreadRate = threadPoolConfig.dbTaskSubThreadRate;


        int ret = 0;
        // 創建線程池
        if (_threadPoolConfig.nMinThreadsNum > 0)
            ret = addProThreads(_threadPoolConfig.nMinThreadsNum);
        return ret;
    }

    // 添加任務
    int addTask(std::shared_ptr<Task> taskptr, bool priority=false)
	{
        const double& rate = getThreadTaskRate();
        int ret = 0;
        if (priority) 
        {
            if (rate > 1000)
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            _taskQueue->put_front(taskptr);
        }
        else 
        {
            // 檢測任務數量
            if (rate > 100) {
                taskptr->onCanceled();
                return 298;
            }
            // 將任務推入隊列
            _taskQueue->put_back(taskptr);

        }
        // 檢查是否要擴展線程
        if (_atcCurTotalThrNum < _threadPoolConfig.nMaxThreadsNum
            && rate > _threadPoolConfig.dbTaskAddThreadRate)
            ret = addProThreads(1);
        return ret;
    }

    // 刪除任務(從就緒隊列刪除,如果就緒隊列沒有,則看執行隊列有沒有,有的話置下取消狀態位)
    int deleteTask(size_t nID)
	{
        return _taskQueue->deleteTask(nID);
    }

    // 刪除所有任務
    int deleteAllTasks(void)
	{
        return _taskQueue->deleteAllTasks();
    }

    std::shared_ptr<Task> isTaskProcessed(size_t nId)
	{
        return _taskQueue->isTaskProcessed(nId);
    }

    // 釋放資源(釋放線程池、釋放任務隊列)
    bool release(void)
	{
        // 1、停止線程池。
    	// 2、清楚就緒隊列。
    	// 3、等待執行隊列為0
        releaseThreadPool();
        _taskQueue->release();

        int i = 0;
        while (_atcCurTotalThrNum != 0) 
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            // 異常等待
            if (i++ == 10)
                exit(23);
        }

        _atcCurTotalThrNum = 0;
        return true;
    }

    // 獲取當前線程任務比
    double getThreadTaskRate(void)
	{
        if (_atcCurTotalThrNum != 0)
            return _taskQueue->size() * 1.0 / _atcCurTotalThrNum;
        return 0;
    }
    // 當前線程是否需要結束
    bool shouldEnd(void)
	{

        bool bFlag = false;
        double dbThreadTaskRate = getThreadTaskRate();

        // 檢查線程與任務比率
        if (!_atcWorking || _atcCurTotalThrNum > _threadPoolConfig.nMinThreadsNum
            && dbThreadTaskRate < _threadPoolConfig.dbTaskSubThreadRate)
            bFlag = true;

        return bFlag;
    }
	
    // 釋放線程池
    bool releaseThreadPool(void)
	{
        _threadPoolConfig.nMinThreadsNum = 0;
        _threadPoolConfig.dbTaskSubThreadRate = 0;
        _atcWorking = false;
        return true;
    }

	
    // 添加指定數量的處理線程
    int addProThreads(int nThreadsNum)
	{
    	try {
            for (; nThreadsNum > 0; --nThreadsNum)
                std::thread(&ThreadPool::taskProcessThread, this).detach();
        }
        catch (...){
            return 155;
        }

        return 0;
    }

    // 任務處理線程函數
    void taskProcessThread(void)
	{
        int nTaskProcRet = 0;
        // 線程增加
        _atcCurTotalThrNum.fetch_add(1);
        std::chrono::milliseconds mills_sleep(500);


        std::shared_ptr<Task> pTask;
        while (_atcWorking) 
        {
            // 從任務隊列中獲取任務
            pTask = _taskQueue->get();  //get會將任務添加到運行任務的map中去
            if (pTask == nullptr) 
            {
                if (shouldEnd())
                    break;

                // 進入睡眠池
                _taskQueue->wait(mills_sleep);
                continue;
            }

            // 檢測任務取消狀態
            if (pTask->isCancelRequired())
                pTask->onCanceled();
            else
                // 處理任務
                pTask->onCompleted(pTask->doWork());


            // 從運行任務隊列中移除任務
            _taskQueue->onTaskFinished(pTask->getID());


            // 判斷線程是否需要結束
            if (shouldEnd())
                break;

        }

        // 線程個數減一
        _atcCurTotalThrNum.fetch_sub(1);
    }
	
private:
	std::shared_ptr<TaskQueue<Task> > _taskQueue;	//任務隊列
    ThreadPoolConfig _threadPoolConfig; //線程池配置
    std::atomic<bool> _atcWorking;  //線程池是否被要求結束
    std::atomic<int> _atcCurTotalThrNum;    //當前線程個數
};
  • FunTask.h
#pragma once

#include <functional>
#include "Task.h"

class FuncTask:public Task
{
public:
	FuncTask(std::function<int(void)> f) : _pf(f) {}
	FuncTask(void) : _pf(nullptr){}

	virtual ~FuncTask(){}

	template <typename F,typename... Args>
	void asynBind(F(*f)(Args...), Args... args)
	{
		_pf = std::bind(f, args...);
	}

	virtual int doWork()
	{
		if (_pf == nullptr)
			return 86;
		return _pf();
	}

private:
	typedef std::function<int(void)> pvFunc;
	pvFunc _pf;
};
  • main.cpp
#pragma once

#include <time.h>
#include <iostream>
#include <memory>
#include <string>
#include "ThreadPool.h"
#include "FuncTask.h"

using namespace std;

int vFunction(void)
{
	std::cout << __FUNCTION__ << std::endl;
	return 0;
}

int counter(int a,int b)
{
	std::cout << a << ":" << b << std::endl;
	return 0;
}


int main()
{
	
	ThreadPool::ThreadPoolConfig threadPoolConfig;
	threadPoolConfig.nMaxThreadsNum = 100;
	threadPoolConfig.nMinThreadsNum = 5;
	threadPoolConfig.dbTaskAddThreadRate = 3;
	threadPoolConfig.dbTaskSubThreadRate = 0.5;

	clock_t start = clock();
	{
		std::shared_ptr<ThreadPool> threadPool(new ThreadPool);
		threadPool->init(threadPoolConfig);

		int i = 1;
		while (true)
		{
			/*std::shared_ptr<FuncTask> request(new FuncTask(vFunction));
			threadPool->addTask(request);*/
			
			std::shared_ptr<FuncTask> request(new FuncTask);
			request->asynBind(counter, i++, 1);
			threadPool->addTask(request);
            if (request->getID() == 110000) {
				break;
            }
		}

		threadPool->release();
	}
	
	clock_t finish = clock();
	std::cout << "duration:" << finish - start << "ms" << std::endl;
	
	cout << "main:thread" << endl;
	return 0;
}

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM