Boost庫實現線程池學習及線程實現的異步調用


A.Boost線程池實現

參考自: Boost庫實現線程池實例

原理:使用boost的thread_group存儲多個線程,使用bind方法將要處理的函數轉換成線程可調用的函數進行執行;使用隊列存儲待處理任務,利用Mutex實現隊列線程安全。

#ifndef MYTHREADPOOL_H
#define MYTHREADPOOL_H

#include <iostream>
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
using namespace boost;

typedef boost::function<void(void)> MyTask;
//任務隊列--noncopyable
class MyTaskQueue : boost::noncopyable
{
private:
    std::queue<MyTask> m_taskQueue;
    boost::mutex m_mutex;//互斥鎖
    boost::condition_variable_any m_cond;//條件變量
public:
    void push_Task(const MyTask& task){
        //加上互斥鎖
        boost::unique_lock<boost::mutex> lock(m_mutex);
        m_taskQueue.push(task);
        //通知其他線程啟動
        m_cond.notify_one();
    }

    MyTask pop_Task(){
        //加上互斥鎖
        boost::unique_lock<boost::mutex> lock(m_mutex);
        if(m_taskQueue.empty())
        {
            //如果隊列中沒有任務,則等待互斥鎖 
            m_cond.wait(lock);//
        }
        //指向隊列首部
        MyTask task(m_taskQueue.front());
        //出隊列  
        m_taskQueue.pop();
        return task;
    }
    int get_size()
    {
        return m_taskQueue.size();
    }
};

class MyThreadPool : boost::noncopyable
{
private:
    //任務隊列
    MyTaskQueue m_taskQueue;
    //線程組 
    boost::thread_group m_threadGroup;
    int m_threadNum;
    /*
    volatile 被設計用來修飾被不同線程訪問和修改的變量。
    volatile 告訴編譯器i是隨時可能發生變化的,每次使用它的時候必須從i的地址中讀取,
    因而編譯器生成的可執行碼會重新從i的地址讀取數據放在k中。 
    volatile可以保證對特殊地址的穩定訪問,不會出錯。
    */
    volatile bool is_run;
    void run(){//線程池中線程的處理函數
        while(is_run){
            //一直處理線程池的任務
            MyTask task = m_taskQueue.pop_Task();
            task();//運行bind的函數
        }
    }
public:
    MyThreadPool(int num):m_threadNum(num),is_run(false)//初始化列表
    {

    }
    ~MyThreadPool(){
        stop();
    }
    void init()
    {
        if(m_threadNum <= 0) return;
        is_run = true;
        for (int i=0;i<m_threadNum;i++)
        {
            //生成多個線程,綁定run函數,添加到線程組
            m_threadGroup.add_thread(
                new boost::thread(boost::bind(&MyThreadPool::run,this)));
        }
    }
    //停止線程池
    void stop()
    {
        is_run = false;
    }
    //添加任務
    void AddNewTask(const MyTask& task){
        m_taskQueue.push_Task(task);
    }
    void wait()
    {
        m_threadGroup.join_all();//等待線程池處理完成!
    }
};

typedef void (*pFunCallBack)(int i);
void CallBackFun(int i)
{
    std::cout << i <<" call back!"<<std::endl;
}

void ProcFun(int ti,pFunCallBack callback)
{
     std::cout<<"I am Task "<<ti<<std::endl; 
     //task
     for (int i=0;i<ti*100000000;i++)
     {
         i*i;
     }
     if(callback != NULL)callback(ti);
}


void CallBackFun2(int i)
{
    std::cout << i <<" call back! v2"<<std::endl;
}

int ProcFun2(int& ti)
{
    std::cout<<"I am Task "<<ti<<std::endl; 
    //task
    for (int i=0;i<ti*100000000;i++)
    {
        i*i;
    }
    return ti;
}


void testThreadPool()
{
    MyThreadPool tp(2);
    int taskNum = 4;
    for (int i=0;i<taskNum;i++)
    {
        MyTask task = boost::bind(ProcFun,i+1,CallBackFun);
        //放到線程池中處理,bind(f , i) will produce a "nullary" function object that takes no arguments and returns f(i),調用時,可傳遞任何類型的函數及參數!!!
        tp.AddNewTask(task);
    }

    tp.init();
    //等待線程池處理完成!  
    tp.wait();
}

#endif

 B.基於線程的異步調用實現

原理:使用線程實現異步調用,將耗時的操作放在線程中執行,待其執行完成后,調用回調函數執行后續操作。

//創建一個線程,執行耗時操作,等到操作完成,調用回調函數
void testAsyncCall(int i,pFunCallBack callfun)
{
    boost::thread th(boost::bind(ProcFun,i,callfun));
}
void testAsyncCall2(int i)
{
    //bind函數嵌套,回調函數 --bind(f, bind(g, _1))(x); // f(g(x))
    boost::thread th(boost::bind(CallBackFun2,boost::bind(ProcFun2,i)));
}

template <class ParaType,class RetType>
class MyTask2{
    typedef boost::function<RetType(ParaType&)> ProcFun;
    typedef boost::function<void(RetType)> CallBackFun;
protected:
    ProcFun m_procFun;
    CallBackFun m_callbackFun;
public:
    MyTask2():m_procFun(NULL),m_callbackFun(NULL){

    }
    MyTask2(ProcFun proc,CallBackFun callback):m_procFun(proc),m_callbackFun(callback){

    }
    ~MyTask2(){

    }
    void Run(ParaType& para){
        if(m_procFun!=NULL && m_callbackFun!=NULL)
        {
            m_callbackFun(m_procFun(para));
        }
    }
};


void testAsyncCall3(int para)//使用bind注冊執行函數和回調函數
{
    MyTask2<int,int> tk(ProcFun2, CallBackFun2);
    //tk.Run(para);
    MyTask task = boost::bind(&MyTask2<int,int>::Run,tk,para);
    boost::thread th(task);
    //boost::thread th(boost::bind(&MyTask2<int,int>::Run,tk,para));
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM