c++11 boost技術交流群:296561497,歡迎大家來交流技術。
線程池可以高效的處理任務,線程池中開啟多個線程,等待同步隊列中的任務到來,任務到來多個線程會搶着執行任務,當到來的任務太多,達到上限時需要等待片刻,任務上限保證內存不會溢出。線程池的效率和cpu核數相關,多核的話效率更高,線程數一般取cpu數量+2比較合適,否則線程過多,線程切換頻繁反而會導致效率降低。
線程池有兩個活動過程:1.外面不停的往線程池添加任務;2.線程池內部不停的取任務執行。活動圖如下:
線程池中的隊列是用的上一篇博文中的同步隊列。具體代碼:
#include<vector> #include<thread> #include<functional> #include<memory> #include <atomic> #include"SyncQueue.hpp" const int MaxTaskCount = 100; class ThreadPool { public: using Task = std::function<void()>; ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); } ~ThreadPool(void) { //如果沒有停止時則主動停止線程池 Stop(); } void Stop() { std::call_once(m_flag, [this]{StopThreadGroup(); }); //保證多線程情況下只調用一次StopThreadGroup } void AddTask(Task&&task) { m_queue.Put(std::forward<Task>(task)); } void AddTask(const Task& task) { m_queue.Put(task); } private: void Start(int numThreads) { m_running = true; //創建線程組 for (int i = 0; i <numThreads; ++i) { m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this)); } } void RunInThread() { while (m_running) { //取任務分別執行 std::list<Task> list; m_queue.Take(list); for (auto& task : list) { if (!m_running) return; task(); } } } void StopThreadGroup() { m_queue.Stop(); //讓同步隊列中的線程停止 m_running = false; //置為false,讓內部線程跳出循環並退出 for (auto thread : m_threadgroup) //等待線程結束 { if (thread) thread->join(); } m_threadgroup.clear(); } std::list<std::shared_ptr<std::thread>> m_threadgroup; //處理任務的線程組 SyncQueue<Task> m_queue; //同步隊列 atomic_bool m_running; //是否停止的標志 std::once_flag m_flag; };
上面的代碼中用到了同步隊列SyncQueue,它的實現在這里。測試代碼如下:
void TestThdPool() { ThreadPool pool;bool runing = true; std::thread thd1([&pool,&runing]{ while(runing) { cout<<"produce "<<this_thread::get_id()<< endl; pool.AddTask([]{ std::cout <<"consume "<<this_thread::get_id()<< endl; }); } }); this_thread::sleep_for(std::chrono::seconds(10)); runing = false; pool.Stop(); thd1.join(); getchar(); }
上面的測試代碼中,thd1是生產者線程,線程池內部會不斷消費生產者產生的任務。在需要的時候可以提前停止線程池,只要調用Stop函數就行了。