基於C++11實現的線程池


1.C++11中引入了lambada表達式,很好的支持異步編程

2.C++11中引入了std::thread,可以很方便的構建線程,更方便的可移植特性

3.C++11中引入了std::mutex,可以很方便的構建線程鎖互斥訪問,更方便的可移植特性

4.C++11中引入了std::condition_variable,可以不依賴於win32 api實現自己的消費者生產者模型

5.利用改進版本的shared_ptr,可以很好的解決多線程生命周期的棘手問題

  1 /************************************************************************/
  2 /*                                                                      */
  3 /************************************************************************/
  4 
  5 #ifndef __CARBON_THREAD_POOL_H
  6 #define __CARBON_THREAD_POOL_H
  7 
  8 #include <vector>
  9 #include <memory>
 10 #include <thread>
 11 #include <mutex>
 12 #include <condition_variable>
 13 #include <future>
 14 #include <functional>
 15 #include <stdexcept>
 16 #include <string>
 17 #include <sstream>
 18 #include <deque>
 19 
 20 namespace CARBON {
 21 
 22     //************************************
 23     // Method:    Create
 24     // Returns:   std::shared_ptr
 25     // Qualifier: 用於創建智能指針實例
 26     // Parameter: args, 可變參數,接受任意個數的參數,傳遞給T的構造函數
 27     //************************************
 28     template<typename T, typename... ARG>
 29     std::shared_ptr<T> Create(ARG&&... args)
 30     {
 31         struct TEnableShared : public T
 32         {
 33             TEnableShared(ARG&&... args)
 34                 : T(std::forward<ARG>(args)...)
 35             {}
 36         };
 37 
 38         return std::make_shared<TEnableShared>(std::forward<ARG>(args)...);
 39     }
 40 
 41     class ThreadPool : public std::enable_shared_from_this<ThreadPool>
 42     {
 43     protected:
 44         ThreadPool()
 45             : _stop(false)
 46         {}
 47 
 48         virtual ~ThreadPool()
 49         {
 50             {
 51                 std::unique_lock<std::mutex> lock(_lock);
 52                 _stop = true;
 53             }
 54             _condition.notify_all();
 55             for (std::thread &worker : _workers)
 56                 worker.join();
 57         }
 58 
 59     public:
 60         // initialize thread pool with number of threads
 61         bool InitializePool(size_t threads)
 62         {
 63             if (!_workers.empty()) return true;
 64 
 65             for (size_t i = 0; i < threads; ++i)
 66             {
 67                 std::weak_ptr<ThreadPool> _wtp = this->shared_from_this();
 68                 auto th = [](std::weak_ptr<ThreadPool> wtp) {
 69                     for (;;)
 70                     {
 71                         std::function<void()> task;
 72 
 73                         {
 74                             std::shared_ptr<ThreadPool> stp = wtp.lock();
 75                             if (!stp)
 76                                 return;
 77 
 78                             std::unique_lock<std::mutex> lock(stp->_lock);
 79                             auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); };
 80                             stp->_condition.wait(lock, shipment);
 81                             if (stp->_stop)
 82                                 return;
 83                             if (stp->_tasks.empty())
 84                                 continue;
 85                             task = std::move(stp->_tasks.front()).task;
 86                             stp->_tasks.pop_front();
 87                         }
 88 
 89                         task();
 90                     }
 91                 };
 92                 _workers.emplace_back(th, _wtp);
 93             }
 94 
 95             return !_workers.empty();
 96         }
 97 
 98         //************************************
 99         // Method:    EnqueueTask
100         // Returns:   std::future, 值類型由functor f指定
101         // Qualifier: 可以借由返回的std::future獲取結果,但是更建議在functor中做異步通知
102         // Parameter: taskid 用於接受任務的id描述
103         // Parameter: functor f, 函數對象,用於執行任務
104         // Parameter: args, 可變參數,接受任意個數的參數,傳遞給functor f
105         //************************************
106         template<class F, class... Args>
107         auto EnqueueTask(std::string& taskid, F&& f, Args&&... args)
108             ->std::future<typename std::result_of<F(Args...)>::type>
109         {
110             if (_workers.empty())
111                 throw std::runtime_error("ThreadPool not initialized yet");
112 
113             using return_type = typename std::result_of<F(Args...)>::type;
114 
115             auto task = std::make_shared<std::packaged_task<return_type()>>(
116                 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
117                 );
118 
119             std::future<return_type> res = task->get_future();
120             {
121                 std::unique_lock<std::mutex> lock(_lock);
122 
123                 // don't allow enqueueing after stopping the pool
124                 if (_stop)
125                     throw std::runtime_error("enqueue on stopped ThreadPool");
126 
127                 stThreadTask st;
128                 std::stringstream ss;
129                 ss << (void*)task.get();
130                 ss >> taskid;
131                 st.taskid = taskid;
132                 st.task = [task]() { (*task)(); };
133                 _tasks.push_back(st);
134             }
135             _condition.notify_one();
136             return res;
137         }
138 
139         //************************************
140         // Method:    GetTasksSize
141         // Returns:   size_t
142         // Qualifier: 獲取等待任務隊列的任務數,正在執行的任務已經彈出隊列,所以不參與計算
143         //************************************
144         size_t GetTasksSize()
145         {
146             std::unique_lock<std::mutex> lock(_lock);
147             return _tasks.size();
148         }
149 
150         //************************************
151         // Method:    RemoveTask
152         // Returns:   bool, 找到任務並移除則返回true,否則返回false
153         // Qualifier: 正在執行的任務已經彈出任務隊列,應該在其它地方通知任務退出
154         // Qualifier: 執行完成的任務已經彈出任務隊列,無法移除不存在的任務
155         // Qualifier: 該接口只能移除處在等待中的任務
156         // Parameter: taskid是任務的唯一標示,由EnqueueTask返回
157         //************************************
158         bool RemoveTask(const std::string& taskid)
159         {
160             std::unique_lock<std::mutex> lock(_lock);
161             for (auto& t = _tasks.begin(); t != _tasks.end(); ++t)
162             {
163                 if (taskid == t->taskid)
164                 {
165                     _tasks.erase(t);
166                     return true;
167                 }
168             }
169 
170             return false;
171         }
172 
173     private:
174         typedef struct stThreadTask
175         {
176             std::function<void()> task;
177             std::string taskid;
178         }stThreadTask;
179 
180         // need to keep track of threads so we can join them
181         std::vector< std::thread > _workers;
182         // the task queue
183         std::deque< stThreadTask > _tasks;
184 
185         // synchronization
186         std::mutex _lock;
187         std::condition_variable _condition;
188         bool _stop;
189     };
190 }
191 
192 #endif

使用enable_shared_from_this來確保內部線程訪問指針時,不會因為指針失效造成的非法訪問

weak_ptr很好的保證了ThreadPool的生命周期安全性和實效性

由於使用了share_from_this,將初始化代碼整體拿出來放到InitializePool中實現

ThreadPool的構造函數和析構函數聲明為protected,用於保證外部不要直接生成ThreadPool實例

應該使用Create函數來生成ThreadPool實例

測試代碼如下:

 1 namespace {
 2     std::condition_variable _exit_cv;
 3 }
 4 
 5 void func(int n)
 6 {
 7     std::cout << "func with n " << n << std::endl;
 8 }
 9 
10 using CARBON::ThreadPool;
11 
12 std::string taskid;
13 std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>();
14 std::weak_ptr<ThreadPool> _wtp = stp;
15 stp->InitializePool(2);
16 
17 
18 stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int {
19         std::cout << "task1\n";
20 
21         for (int i = 0; i < 5; ++i) {
22             std::mutex mtx;
23             std::unique_lock<std::mutex> lck(mtx);
24             if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout)
25                 break;
26 
27             if (cbf) cbf(i);
28             if (wtp.expired())
29                 break;
30         }
31 
32         return 5;
33     }, func, _wtp);

當需要中斷線程執行時,應該在外部通知線程中的任務自行退出

例子中可以在主線程中這么做

    _exit_cv.notify_all();
_exit_cv用於模擬sleep操作
func用於模擬任務結果的異步通知,這里為了省事使用了函數指針,實際工作中應該使用functor來傳遞,以保證生命周期的有效性
比如std::bind和shared_ptr一起構造的functor對象


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM