可伸縮多線程任務隊列


  在我們的工作中,我們經常需要異步執行一些任務,下面介紹的這個可伸縮多線程隊列,可滿足我們的需求。

  出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下幾個功能:

    1、任務隊列是多線程,許多任務可以異步進行,任務隊列使用線程池來執行任務。

    2、任務隊列支持優先級,優先級高的任務優先執行(即使是后來添加的)

    3、任務隊列可以被暫停,但是用戶還是可以添加任務,當任務隊列被喚醒時,任務可以繼續執行下去

    4、在運行過程中,任務隊列使用的線程池,用戶可以自行增加和減少

  大體框架主要由3個類構成

    1、CJob,任務類,用戶需要從該類派生來實現自身需要完成的任務

    2、CJobExecuter,任務執行類,任務均由該類來調用執行,每一個類相當於對應一個線程

    3、CMThreadedJobQ,多線程任務隊列,添加任務已經任務的分發均由該類完成,該類維護一個任務隊列和一個完成隊列的線程池。

  類圖如下:

  該例子中,CJobExecuter和CMThreadJobQ這兩個類的調用關系是非常值得我們學習的,同時,CJob作為一個基類,子類派生可以實現不同的任務,可擴展性也不錯。源代碼解析如下:

  Job.h文件:

class CJob
{
public:
    CJob();
    virtual ~CJob();
    
    BOOL m_Completed;         //任務是否完成:TRUE 完成,FALSE 未完成
    static long lastUsedID;   //最后的ID
    
    //================================================================================================
    //函數名:                  setPriority
    //函數描述:                設置任務優先級
    //輸入:                    [in] priority 優先級別
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void setPriority(int priority);

    //================================================================================================
    //函數名:                  getPriority
    //函數描述:                返回任務優先級
    //輸入:                    無
    //輸出:                    無
    //返回:                    任務優先級
    //================================================================================================
    int getPriority();
    
    //================================================================================================
    //函數名:                  getID
    //函數描述:                返回任務ID
    //輸入:                    無
    //輸出:                    無
    //返回:                    任務ID
    //================================================================================================
    long getID();
    
    //================================================================================================
    //函數名:                  setAutoDelete
    //函數描述:                設置完成任務后是否刪除任務
    //輸入:                    [in] autoDeleteFlag
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void setAutoDelete(BOOL autoDeleteFlag = TRUE);

    //================================================================================================
    //函數名:                  AutoDelete
    //函數描述:                返回刪除任務標記
    //輸入:                    無
    //輸出:                    無
    //返回:                    任務標記
    //================================================================================================
    BOOL AutoDelete();

    //================================================================================================
    //函數名:                  execute
    //函數描述:                任務真正工作的函數,純虛函數,需要子類化實現
    //輸入:                    無
    //輸出:                    無
    //返回:                    任務ID
    //================================================================================================
    virtual void execute() = 0;    
private:
    long m_ID;               //任務ID
    BOOL m_autoDeleteFlag;   //是否自動刪除任務標記,TRUE 刪除,FALSE 不刪除,默認為TRUE
    int m_priority;          //任務優先級,默認為5

};

  Job.cpp文件:

long CJob::lastUsedID = 0;

CJob::CJob()
{
    this->m_ID = InterlockedIncrement(&lastUsedID);
    this->m_autoDeleteFlag = TRUE;
    this->m_priority = 5;
    this->m_Completed= FALSE;
}

CJob::~CJob()
{
}

BOOL CJob::AutoDelete()
{
    return m_autoDeleteFlag;
}

void CJob::setAutoDelete(BOOL autoDeleteFlag)
{
    m_autoDeleteFlag = autoDeleteFlag;
}

long CJob::getID()
{
    return this->m_ID;
}

int CJob::getPriority()
{
    return this->m_priority;    
}

void CJob::setPriority(int priority)
{
    this->m_priority = priority;
}

  JobExecuter.h文件:

//一個對象對應一個線程,執行任務Job
class CJobExecuter
{
public:
    CJobExecuter(CMThreadedJobQ *pJobQ);
    virtual ~CJobExecuter();
    
    //================================================================================================
    //函數名:                  stop
    //函數描述:                停止執行任務
    //輸入:                    無
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void stop();
    
    //================================================================================================
    //函數名:                  execute
    //函數描述:                執行一個任務
    //輸入:                    [in] pJob 任務指針
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void execute(CJob* pJob);
    
    static UINT ThreadFunction(LPVOID pParam); //線程函數
    
    CMThreadedJobQ* m_pJobQ;                   //指向線程任務隊列指針
    CJob* m_pJob2Do;                           //指向正在執行任務的指針
    int m_flag;                                //線程執行標記
    CWinThread* m_pExecuterThread;             //線程標識符
};

  JobExecuter.cpp文件:

#define STOP_WORKING -1
#define KEEP_WORKING  0

CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
{
    this->m_pJobQ= pJobQ;
    this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
    this->m_pJob2Do = NULL;
    this->m_flag = KEEP_WORKING;
}

CJobExecuter::~CJobExecuter()
{
    if(this->m_pExecuterThread!= NULL )    
    {
        this->m_pExecuterThread->ExitInstance();
        delete m_pExecuterThread;    
    }
}

UINT CJobExecuter::ThreadFunction(LPVOID pParam)
{    
    CJobExecuter *pExecuter = (CJobExecuter *)pParam;
    pExecuter->m_flag = 1;
    ::Sleep(1);
    CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
    while(pExecuter->m_flag !=STOP_WORKING )
    {
        if(pExecuter->m_pJob2Do!=  NULL)
        {
            pExecuter->m_pJob2Do->execute();
            pExecuter->m_pJob2Do->m_Completed = TRUE;    
            if(pExecuter->m_pJob2Do->AutoDelete())
                delete pExecuter->m_pJob2Do;
            pExecuter->m_pJob2Do = NULL;
        }

        if(pExecuter->m_pJobQ == NULL) break;
        
        CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
        singleLock.Lock();
        if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter個數大於最大值,自動銷毀
        {
            pExecuter->stop();    
            singleLock.Unlock();    
        }
        else
        {
            pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任務后,添加到CMThreadedJobQ的空閑隊列中
            singleLock.Unlock();    
            pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();        
            pExecuter->m_pExecuterThread->SuspendThread();        
        }                
    }
    
    if(pExecuter->m_pJobQ != NULL)
    {
        pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
    }
    else
    {
        delete pExecuter;
    }

    return 0;
}

void CJobExecuter::execute(CJob* pJob)
{
    this->m_pJob2Do = pJob;
    ::Sleep(0);
    this->m_pExecuterThread->ResumeThread();
}

void CJobExecuter::stop()
{
    this->m_flag = STOP_WORKING;
    this->m_pExecuterThread->ResumeThread();
}

  MThreadedJobQ.h文件:

typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;

//線程池任務隊列
class CMThreadedJobQ
{

public:
    typedef struct THNODE
    {
        CJobExecuter* pExecuter;
        THNODE * pNext ;
    } THNODE;
    
    CMThreadedJobQ();
    virtual ~CMThreadedJobQ();

    //================================================================================================
    //函數名:                  deleteJobExecuter
    //函數描述:                刪除一個JobExecuter對象
    //輸入:                    [in] pEx
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void deleteJobExecuter(CJobExecuter *pEx);
    
    //================================================================================================
    //函數名:                  setMaxNoOfExecuter
    //函數描述:                設置CJobExecuter的個數
    //輸入:                    [in] value
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void setMaxNoOfExecuter(int value);

    //================================================================================================
    //函數名:                  addJobExecuter
    //函數描述:                添加一個CJobExecuter
    //輸入:                    [in] pEx
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void addJobExecuter(CJobExecuter *pEx);
    
    //================================================================================================
    //函數名:                  getJobExecuter
    //函數描述:                返回一個CJobExecuter
    //輸入:                    無
    //輸出:                    無
    //返回:                    處理任務的指針
    //================================================================================================
    CJobExecuter* getJobExecuter();

    //================================================================================================
    //函數名:                  addFreeJobExecuter
    //函數描述:                添加一個CJobExecuter
    //輸入:                    [in] pEx
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void addFreeJobExecuter(CJobExecuter *pEx);

    //================================================================================================
    //函數名:                  addJob
    //函數描述:                添加一個任務
    //輸入:                    [in] pJob
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void addJob(CJob *pJob);
    
    //================================================================================================
    //函數名:                  getMaxNoOfExecuter
    //函數描述:                獲取CJobExecuter個數的最大值
    //輸入:                    無
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    int getMaxNoOfExecuter();
    
    //================================================================================================
    //函數名:                  getNoOfExecuter
    //函數描述:                獲取當前CJobExecuter的個數
    //輸入:                    無
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    int getNoOfExecuter();

    static UINT JobObserverThreadFunction(LPVOID);

    //================================================================================================
    //函數名:                  pause
    //函數描述:                掛起JobObserverThread線程
    //輸入:                    無
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void pause();

    //================================================================================================
    //函數名:                  resume
    //函數描述:                喚醒JobObserverThread線程
    //輸入:                    無
    //輸出:                    無
    //返回:                    無
    //================================================================================================
    void resume();    
        
    CWinThread* m_pObserverThread; //向空閑的executer線程添加任務的線程
    CCriticalSection m_cs;         //關鍵代碼段,用於互斥
    CJobQList m_jobQList;          //任務隊列
private :
    BOOL m_pause;                  //JobObserverThread線程運行標記
    int m_MaxNoOfExecuter;         //CJobExecuter最大個數
    int m_NoOfExecuter;            //當前CJobExecuter個數
    THNODE* m_pFreeEList;          //維護空閑處理任務線程的隊列
    THNODE* m_pAllEList;           //維護所有處理任務線程的隊列
};

  MThreadedJobQ.cpp文件:

CMThreadedJobQ::CMThreadedJobQ()
{
    m_MaxNoOfExecuter = 2;
    m_pause = FALSE;
    m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
    m_pFreeEList =NULL;
    m_NoOfExecuter =0;
    m_pAllEList = NULL;
}

CMThreadedJobQ::~CMThreadedJobQ()
{
    THNODE* pTempNode;
    while (m_pAllEList != NULL) 
    {    
        pTempNode = m_pAllEList->pNext;
        delete m_pAllEList->pExecuter;        
        delete m_pAllEList;        
        m_pAllEList = pTempNode;    
    }    

    while (m_pFreeEList != NULL) 
    {    pTempNode = m_pFreeEList->pNext;        
        delete m_pFreeEList;        
        m_pFreeEList = pTempNode;    
    }    

    m_pObserverThread->ExitInstance();    
    delete m_pObserverThread;
}


void CMThreadedJobQ::pause()
{
    this->m_pause = TRUE;
}

void CMThreadedJobQ::resume()
{
    this->m_pause = FALSE;
    this->m_pObserverThread->ResumeThread();
}

UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
{
    CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
    CJobExecuter *pJExecuter;

    while(TRUE)
    {
        Sleep(100);
        if(pMTJQ->m_pause != TRUE)
        {
            while(!pMTJQ->m_jobQList.IsEmpty() )
            {
                pJExecuter = pMTJQ->getJobExecuter();
                if( pJExecuter!=NULL)
                {                
                    pMTJQ->m_cs.Lock();
                    pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
                    pMTJQ->m_jobQList.RemoveHead();
                    AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
                    pMTJQ->m_cs.Unlock();
                }
                else
                {
                    break;
                }
                if(pMTJQ->m_pause == TRUE)
                    break;
            }
        }
        pMTJQ->m_pObserverThread->SuspendThread();
    }
    return 0;
}

int CMThreadedJobQ::getNoOfExecuter()
{
    return this->m_NoOfExecuter;
}

int CMThreadedJobQ::getMaxNoOfExecuter()
{
    return this->m_MaxNoOfExecuter;
}

void CMThreadedJobQ::addJob(CJob *pJob)
{
    CJob * pTempJob;
    CSingleLock sLock(&this->m_cs);
    sLock.Lock();    
    POSITION pos,lastPos;
    pos = this->m_jobQList.GetHeadPosition();    
    lastPos = pos;
    if(pos != NULL)
        pTempJob =this->m_jobQList.GetHead();
    while(pos != NULL )
    {        
        if( pJob->getPriority() > pTempJob->getPriority())
            break;
        lastPos = pos;
        pTempJob =     this->m_jobQList.GetNext(pos);        
    }    
    if(pos == NULL)    
        this->m_jobQList.AddTail(pJob);
    else
        this->m_jobQList.InsertBefore(lastPos,pJob);
    this->m_pObserverThread->ResumeThread();
    sLock.Unlock();
}

void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter = pEx;
    node->pNext = this->m_pFreeEList;
    this->m_pFreeEList = node;
    m_cs.Unlock();
}

CJobExecuter* CMThreadedJobQ::getJobExecuter()
{
    THNODE *pTemp;
    CJobExecuter *pEx=NULL;
    m_cs.Lock();

    if(this->m_pFreeEList != NULL)  //有空閑CJobExecuter,就返回
    {
        pTemp = this->m_pFreeEList;
        this->m_pFreeEList = this->m_pFreeEList->pNext;
        pEx = pTemp->pExecuter;
        delete pTemp ;
        m_cs.Unlock();
        return pEx;
    }

    if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //沒有空閑CJobExecuter,並且當前CJobExecuter小於最大值,就生成一個新的CJobExecuter
    {
        pEx =  new CJobExecuter(this);
        this->addJobExecuter(pEx);
        this->m_NoOfExecuter++;
        m_cs.Unlock();
        return pEx;
    }
    m_cs.Unlock();
    return NULL;
}

void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter= pEx;
    node->pNext = this->m_pAllEList;
    this->m_pAllEList = node;
    m_cs.Unlock();
}

void CMThreadedJobQ::setMaxNoOfExecuter(int value)
{
    this->m_cs.Lock();
    if(value >1 && value <11)
        this->m_MaxNoOfExecuter = value;
    m_pObserverThread->ResumeThread();
    this->m_cs.Unlock();
}

void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
{
    THNODE* pNode,*pNodeP;
    CSingleLock singleLock(&m_cs);    
    singleLock.Lock();    
    if(this->m_pAllEList != NULL)
    {
        pNode = this->m_pAllEList;
        if(pNode->pExecuter == pEx )    
        {
          this->m_pAllEList = pNode->pNext;
          delete pNode;          
        }
        else
        {
            pNodeP =pNode;
            pNode  = pNode->pNext ;            
            while(pNode != NULL )
            {
                if(pNode->pExecuter== pEx ) break;
                pNodeP = pNode;
                pNode  = pNode->pNext ;            
            }
            if(pNode!= NULL)
            {
                pNodeP->pNext = pNode->pNext;
                delete pNode;
            }
        }
    }
    this->m_NoOfExecuter--;
    singleLock.Unlock();
    pEx->stop();
    Sleep(1);
    delete pEx;
}

  以上,就是該可伸縮多線程任務的主體框架,當我們工作需要實現類似這樣的需要:異步執行多個不同的任務時,這個例子就是一個很好的參考例子,我研究這些代碼只是為了讓我在遇到這種問題的時候,可以有一個思路去思考,而不至於無從下手,僅此而已。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM