1.C++11中引入了lambada表達式,很好的支持異步編程
2.C++11中引入了std::thread,可以很方便的構建線程,更方便的可移植特性
3.C++11中引入了std::mutex,可以很方便的構建線程鎖互斥訪問,更方便的可移植特性
4.C++11中引入了std::condition_variable,可以不依賴於win32 api實現自己的消費者生產者模型
5.利用改進版本的shared_ptr,可以很好的解決多線程生命周期的棘手問題
1 /************************************************************************/ 2 /* */ 3 /************************************************************************/ 4 5 #ifndef __CARBON_THREAD_POOL_H 6 #define __CARBON_THREAD_POOL_H 7 8 #include <vector> 9 #include <memory> 10 #include <thread> 11 #include <mutex> 12 #include <condition_variable> 13 #include <future> 14 #include <functional> 15 #include <stdexcept> 16 #include <string> 17 #include <sstream> 18 #include <deque> 19 20 namespace CARBON { 21 22 //************************************ 23 // Method: Create 24 // Returns: std::shared_ptr 25 // Qualifier: 用於創建智能指針實例 26 // Parameter: args, 可變參數,接受任意個數的參數,傳遞給T的構造函數 27 //************************************ 28 template<typename T, typename... ARG> 29 std::shared_ptr<T> Create(ARG&&... args) 30 { 31 struct TEnableShared : public T 32 { 33 TEnableShared(ARG&&... args) 34 : T(std::forward<ARG>(args)...) 35 {} 36 }; 37 38 return std::make_shared<TEnableShared>(std::forward<ARG>(args)...); 39 } 40 41 class ThreadPool : public std::enable_shared_from_this<ThreadPool> 42 { 43 protected: 44 ThreadPool() 45 : _stop(false) 46 {} 47 48 virtual ~ThreadPool() 49 { 50 { 51 std::unique_lock<std::mutex> lock(_lock); 52 _stop = true; 53 } 54 _condition.notify_all(); 55 for (std::thread &worker : _workers) 56 worker.join(); 57 } 58 59 public: 60 // initialize thread pool with number of threads 61 bool InitializePool(size_t threads) 62 { 63 if (!_workers.empty()) return true; 64 65 for (size_t i = 0; i < threads; ++i) 66 { 67 std::weak_ptr<ThreadPool> _wtp = this->shared_from_this(); 68 auto th = [](std::weak_ptr<ThreadPool> wtp) { 69 for (;;) 70 { 71 std::function<void()> task; 72 73 { 74 std::shared_ptr<ThreadPool> stp = wtp.lock(); 75 if (!stp) 76 return; 77 78 std::unique_lock<std::mutex> lock(stp->_lock); 79 auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); }; 80 stp->_condition.wait(lock, shipment); 81 if (stp->_stop) 82 return; 83 if (stp->_tasks.empty()) 84 continue; 85 task = std::move(stp->_tasks.front()).task; 86 stp->_tasks.pop_front(); 87 } 88 89 task(); 90 } 91 }; 92 _workers.emplace_back(th, _wtp); 93 } 94 95 return !_workers.empty(); 96 } 97 98 //************************************ 99 // Method: EnqueueTask 100 // Returns: std::future, 值類型由functor f指定 101 // Qualifier: 可以借由返回的std::future獲取結果,但是更建議在functor中做異步通知 102 // Parameter: taskid 用於接受任務的id描述 103 // Parameter: functor f, 函數對象,用於執行任務 104 // Parameter: args, 可變參數,接受任意個數的參數,傳遞給functor f 105 //************************************ 106 template<class F, class... Args> 107 auto EnqueueTask(std::string& taskid, F&& f, Args&&... args) 108 ->std::future<typename std::result_of<F(Args...)>::type> 109 { 110 if (_workers.empty()) 111 throw std::runtime_error("ThreadPool not initialized yet"); 112 113 using return_type = typename std::result_of<F(Args...)>::type; 114 115 auto task = std::make_shared<std::packaged_task<return_type()>>( 116 std::bind(std::forward<F>(f), std::forward<Args>(args)...) 117 ); 118 119 std::future<return_type> res = task->get_future(); 120 { 121 std::unique_lock<std::mutex> lock(_lock); 122 123 // don't allow enqueueing after stopping the pool 124 if (_stop) 125 throw std::runtime_error("enqueue on stopped ThreadPool"); 126 127 stThreadTask st; 128 std::stringstream ss; 129 ss << (void*)task.get(); 130 ss >> taskid; 131 st.taskid = taskid; 132 st.task = [task]() { (*task)(); }; 133 _tasks.push_back(st); 134 } 135 _condition.notify_one(); 136 return res; 137 } 138 139 //************************************ 140 // Method: GetTasksSize 141 // Returns: size_t 142 // Qualifier: 獲取等待任務隊列的任務數,正在執行的任務已經彈出隊列,所以不參與計算 143 //************************************ 144 size_t GetTasksSize() 145 { 146 std::unique_lock<std::mutex> lock(_lock); 147 return _tasks.size(); 148 } 149 150 //************************************ 151 // Method: RemoveTask 152 // Returns: bool, 找到任務並移除則返回true,否則返回false 153 // Qualifier: 正在執行的任務已經彈出任務隊列,應該在其它地方通知任務退出 154 // Qualifier: 執行完成的任務已經彈出任務隊列,無法移除不存在的任務 155 // Qualifier: 該接口只能移除處在等待中的任務 156 // Parameter: taskid是任務的唯一標示,由EnqueueTask返回 157 //************************************ 158 bool RemoveTask(const std::string& taskid) 159 { 160 std::unique_lock<std::mutex> lock(_lock); 161 for (auto& t = _tasks.begin(); t != _tasks.end(); ++t) 162 { 163 if (taskid == t->taskid) 164 { 165 _tasks.erase(t); 166 return true; 167 } 168 } 169 170 return false; 171 } 172 173 private: 174 typedef struct stThreadTask 175 { 176 std::function<void()> task; 177 std::string taskid; 178 }stThreadTask; 179 180 // need to keep track of threads so we can join them 181 std::vector< std::thread > _workers; 182 // the task queue 183 std::deque< stThreadTask > _tasks; 184 185 // synchronization 186 std::mutex _lock; 187 std::condition_variable _condition; 188 bool _stop; 189 }; 190 } 191 192 #endif
使用enable_shared_from_this來確保內部線程訪問指針時,不會因為指針失效造成的非法訪問
weak_ptr很好的保證了ThreadPool的生命周期安全性和實效性
由於使用了share_from_this,將初始化代碼整體拿出來放到InitializePool中實現
ThreadPool的構造函數和析構函數聲明為protected,用於保證外部不要直接生成ThreadPool實例
應該使用Create函數來生成ThreadPool實例
測試代碼如下:
1 namespace { 2 std::condition_variable _exit_cv; 3 } 4 5 void func(int n) 6 { 7 std::cout << "func with n " << n << std::endl; 8 } 9 10 using CARBON::ThreadPool; 11 12 std::string taskid; 13 std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>(); 14 std::weak_ptr<ThreadPool> _wtp = stp; 15 stp->InitializePool(2); 16 17 18 stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int { 19 std::cout << "task1\n"; 20 21 for (int i = 0; i < 5; ++i) { 22 std::mutex mtx; 23 std::unique_lock<std::mutex> lck(mtx); 24 if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout) 25 break; 26 27 if (cbf) cbf(i); 28 if (wtp.expired()) 29 break; 30 } 31 32 return 5; 33 }, func, _wtp);
當需要中斷線程執行時,應該在外部通知線程中的任務自行退出
例子中可以在主線程中這么做
_exit_cv.notify_all();
_exit_cv用於模擬sleep操作
func用於模擬任務結果的異步通知,這里為了省事使用了函數指針,實際工作中應該使用functor來傳遞,以保證生命周期的有效性
比如std::bind和shared_ptr一起構造的functor對象