寫了一個簡易線程池,
原理簡單介紹下,就是設置一個任務隊列queue,用來放要執行的函數,還有一個線程數組vector,用來存放所有的線程。
線程創建以后就存放在相應的vector里,空閑的線程去queue里去取要執行的函數地址,在run函數中執行,假如一個線程的run函數執行好后,
發現隊列沒有任務可取,則阻塞該線程,通過conidtion_variable變量的wait()函數進行阻塞,等待新的任務被添加進來后,會有一個cond變量的notify_one()
函數來喚醒阻塞中的run函數。
現在放代碼吧!
線程池頭文件Thread_Pool.h
/******************************************** 線程池頭文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #pragma once #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include<thread> #include<queue> #include<mutex> #include<atomic> #include<vector> #include<condition_variable> typedef std::function<void()> Func;//定義線程執行函數類型,方便后面編碼使用。 //任務類 template<typename T> class Task { public: Task() {} ~Task() {} int push(T func)//添加任務; { try { tasks.emplace(func); } catch (std::exception e) { throw e; return -1; } return 1; } int getTaskNum()//獲得當前隊列中的任務數; { return tasks.size(); } T pop()//取出待執行的任務; { T temp; if (tasks.empty()) return temp; else { temp = tasks.front(); tasks.pop(); return temp; } } private: std::queue<T> tasks;//任務隊列 }; //線程池類 class Thread_Pool { public: Thread_Pool() :IsStart(false) {} ~Thread_Pool(); int addTasks(Func&& tasks);//添加任務; void start();//開啟線程池; void stop();//關閉線程池; int getTaskNum();//獲得當前隊列中的任務數; private: void run();//線程工作函數; private: static const int maxThreadNum = 3;//最大線程數為3; std::mutex mx;//鎖; std::condition_variable cond;//條件量; std::vector<std::thread*> threads;//線程向量; bool IsStart;//原子變量,判斷線程池是否運行; Task<Func> tasks;//任務變量; }; #endif
線程池實現文件 Thread_Pool.cpp
/******************************************** 線程池CPP文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #include"Thread_Pool.h" #include<iostream> int Thread_Pool::addTasks(Func&& func) { std::unique_lock<std::mutex> lock(mx); int ret = tasks.push(func); if (ret == 1) { std::cout << "添加任務成功" << std::endl; cond.notify_one(); } return ret; } void Thread_Pool::start() { if (!IsStart) { { std::unique_lock<std::mutex> lock(mx); IsStart = true; } threads.reserve(maxThreadNum); for (int i = 0; i < maxThreadNum; i++) { threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this))); } } } void Thread_Pool::run() { while (IsStart) { Func f; if (tasks.getTaskNum() == 0 && IsStart) { std::unique_lock<std::mutex> lock(mx); cond.wait(lock); } { std::unique_lock<std::mutex> lock(mx); f = tasks.pop(); } if (f) f(); } } int Thread_Pool::getTaskNum() { return tasks.getTaskNum(); } void Thread_Pool::stop() { { std::unique_lock<std::mutex> lock(mx); IsStart = false; } cond.notify_all(); for (auto T : threads) { std::cout << "線程 " << T->get_id() << " 已停止。" << std::endl; T->join(); if (T != nullptr) { delete T; T = nullptr; } } std::cout << "所有線程已停止。" << std::endl; } Thread_Pool::~Thread_Pool() { if (IsStart) { stop(); } }
測試用的main.cpp文件
#include<iostream> #include"Thread_Pool.h" using namespace std; void string_out_one() { int n = 500; while (n--) { cout << "One!" << endl; } } void string_out_two() { int n = 500; while (n--) cout << "Two!" << endl; } void string_out_three() { int n = 500; while(n--) cout << "Three!" << endl; } void string_out_four() { int n = 500; while (n--) cout << "Four!" << endl; } int main() { clock_t start, finish; double totaltime; start = clock(); { /*實驗對比代碼段1開始處*/ Thread_Pool Pool; try { Pool.start(); } catch (std::exception e) { throw e; cout << "線程池創建失敗。" << endl; } Pool.addTasks(move(string_out_one)); Pool.addTasks(move(string_out_two)); Pool.addTasks(move(string_out_three)); Pool.addTasks(move(string_out_four)); /*實驗對比代碼段1結束處*/ /*實驗對比代碼段2開始處*/ //string_out_one(); //string_out_two(); //string_out_three(); //string_out_four(); /*實驗對比代碼段2結束處*/ finish = clock(); totaltime = (double)(finish - start) / CLOCKS_PER_SEC; cout << "\n此程序的運行時間為" << totaltime << "秒!" << endl; getchar(); } getchar(); return 0; }
總結下這個線程池,主要是要注意鎖的防止位置,放太多,就變成單線程,執行效率還不如單線程,放太少,可能會造成多線程之間的誤讀,放的位置不對,會造成死鎖。。。是真的麻煩。
兩個想改進的地方,
一、希望可以在添加任務時確定任務的類型,而不是在Thread_Pool類中就確定task的類型,並且能支持傳入函數形參。
二、程序的優化做的不是很好吧,雖然不知道但是覺得肯定還有優化空間。
若有不足之處歡迎指出。