A.Boost線程池實現
參考自: Boost庫實現線程池實例
原理:使用boost的thread_group存儲多個線程,使用bind方法將要處理的函數轉換成線程可調用的函數進行執行;使用隊列存儲待處理任務,利用Mutex實現隊列線程安全。
#ifndef MYTHREADPOOL_H #define MYTHREADPOOL_H #include <iostream> #include <queue> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> #include <boost/noncopyable.hpp> using namespace boost; typedef boost::function<void(void)> MyTask; //任務隊列--noncopyable class MyTaskQueue : boost::noncopyable { private: std::queue<MyTask> m_taskQueue; boost::mutex m_mutex;//互斥鎖 boost::condition_variable_any m_cond;//條件變量 public: void push_Task(const MyTask& task){ //加上互斥鎖 boost::unique_lock<boost::mutex> lock(m_mutex); m_taskQueue.push(task); //通知其他線程啟動 m_cond.notify_one(); } MyTask pop_Task(){ //加上互斥鎖 boost::unique_lock<boost::mutex> lock(m_mutex); if(m_taskQueue.empty()) { //如果隊列中沒有任務,則等待互斥鎖 m_cond.wait(lock);// } //指向隊列首部 MyTask task(m_taskQueue.front()); //出隊列 m_taskQueue.pop(); return task; } int get_size() { return m_taskQueue.size(); } }; class MyThreadPool : boost::noncopyable { private: //任務隊列 MyTaskQueue m_taskQueue; //線程組 boost::thread_group m_threadGroup; int m_threadNum; /* volatile 被設計用來修飾被不同線程訪問和修改的變量。 volatile 告訴編譯器i是隨時可能發生變化的,每次使用它的時候必須從i的地址中讀取, 因而編譯器生成的可執行碼會重新從i的地址讀取數據放在k中。 volatile可以保證對特殊地址的穩定訪問,不會出錯。 */ volatile bool is_run; void run(){//線程池中線程的處理函數 while(is_run){ //一直處理線程池的任務 MyTask task = m_taskQueue.pop_Task(); task();//運行bind的函數 } } public: MyThreadPool(int num):m_threadNum(num),is_run(false)//初始化列表 { } ~MyThreadPool(){ stop(); } void init() { if(m_threadNum <= 0) return; is_run = true; for (int i=0;i<m_threadNum;i++) { //生成多個線程,綁定run函數,添加到線程組 m_threadGroup.add_thread( new boost::thread(boost::bind(&MyThreadPool::run,this))); } } //停止線程池 void stop() { is_run = false; } //添加任務 void AddNewTask(const MyTask& task){ m_taskQueue.push_Task(task); } void wait() { m_threadGroup.join_all();//等待線程池處理完成! } }; typedef void (*pFunCallBack)(int i); void CallBackFun(int i) { std::cout << i <<" call back!"<<std::endl; } void ProcFun(int ti,pFunCallBack callback) { std::cout<<"I am Task "<<ti<<std::endl; //task for (int i=0;i<ti*100000000;i++) { i*i; } if(callback != NULL)callback(ti); } void CallBackFun2(int i) { std::cout << i <<" call back! v2"<<std::endl; } int ProcFun2(int& ti) { std::cout<<"I am Task "<<ti<<std::endl; //task for (int i=0;i<ti*100000000;i++) { i*i; } return ti; } void testThreadPool() { MyThreadPool tp(2); int taskNum = 4; for (int i=0;i<taskNum;i++) { MyTask task = boost::bind(ProcFun,i+1,CallBackFun); //放到線程池中處理,bind(f , i) will produce a "nullary" function object that takes no arguments and returns f(i),調用時,可傳遞任何類型的函數及參數!!! tp.AddNewTask(task); } tp.init(); //等待線程池處理完成! tp.wait(); } #endif
B.基於線程的異步調用實現
原理:使用線程實現異步調用,將耗時的操作放在線程中執行,待其執行完成后,調用回調函數執行后續操作。
//創建一個線程,執行耗時操作,等到操作完成,調用回調函數 void testAsyncCall(int i,pFunCallBack callfun) { boost::thread th(boost::bind(ProcFun,i,callfun)); } void testAsyncCall2(int i) { //bind函數嵌套,回調函數 --bind(f, bind(g, _1))(x); // f(g(x)) boost::thread th(boost::bind(CallBackFun2,boost::bind(ProcFun2,i))); } template <class ParaType,class RetType> class MyTask2{ typedef boost::function<RetType(ParaType&)> ProcFun; typedef boost::function<void(RetType)> CallBackFun; protected: ProcFun m_procFun; CallBackFun m_callbackFun; public: MyTask2():m_procFun(NULL),m_callbackFun(NULL){ } MyTask2(ProcFun proc,CallBackFun callback):m_procFun(proc),m_callbackFun(callback){ } ~MyTask2(){ } void Run(ParaType& para){ if(m_procFun!=NULL && m_callbackFun!=NULL) { m_callbackFun(m_procFun(para)); } } }; void testAsyncCall3(int para)//使用bind注冊執行函數和回調函數 { MyTask2<int,int> tk(ProcFun2, CallBackFun2); //tk.Run(para); MyTask task = boost::bind(&MyTask2<int,int>::Run,tk,para); boost::thread th(task); //boost::thread th(boost::bind(&MyTask2<int,int>::Run,tk,para)); }