在我們的工作中,我們經常需要異步執行一些任務,下面介紹的這個可伸縮多線程隊列,可滿足我們的需求。
出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下幾個功能:
1、任務隊列是多線程,許多任務可以異步進行,任務隊列使用線程池來執行任務。
2、任務隊列支持優先級,優先級高的任務優先執行(即使是后來添加的)
3、任務隊列可以被暫停,但是用戶還是可以添加任務,當任務隊列被喚醒時,任務可以繼續執行下去
4、在運行過程中,任務隊列使用的線程池,用戶可以自行增加和減少
大體框架主要由3個類構成
1、CJob,任務類,用戶需要從該類派生來實現自身需要完成的任務
2、CJobExecuter,任務執行類,任務均由該類來調用執行,每一個類相當於對應一個線程
3、CMThreadedJobQ,多線程任務隊列,添加任務已經任務的分發均由該類完成,該類維護一個任務隊列和一個完成隊列的線程池。
類圖如下:
該例子中,CJobExecuter和CMThreadJobQ這兩個類的調用關系是非常值得我們學習的,同時,CJob作為一個基類,子類派生可以實現不同的任務,可擴展性也不錯。源代碼解析如下:
Job.h文件:
class CJob { public: CJob(); virtual ~CJob(); BOOL m_Completed; //任務是否完成:TRUE 完成,FALSE 未完成 static long lastUsedID; //最后的ID //================================================================================================ //函數名: setPriority //函數描述: 設置任務優先級 //輸入: [in] priority 優先級別 //輸出: 無 //返回: 無 //================================================================================================ void setPriority(int priority); //================================================================================================ //函數名: getPriority //函數描述: 返回任務優先級 //輸入: 無 //輸出: 無 //返回: 任務優先級 //================================================================================================ int getPriority(); //================================================================================================ //函數名: getID //函數描述: 返回任務ID //輸入: 無 //輸出: 無 //返回: 任務ID //================================================================================================ long getID(); //================================================================================================ //函數名: setAutoDelete //函數描述: 設置完成任務后是否刪除任務 //輸入: [in] autoDeleteFlag //輸出: 無 //返回: 無 //================================================================================================ void setAutoDelete(BOOL autoDeleteFlag = TRUE); //================================================================================================ //函數名: AutoDelete //函數描述: 返回刪除任務標記 //輸入: 無 //輸出: 無 //返回: 任務標記 //================================================================================================ BOOL AutoDelete(); //================================================================================================ //函數名: execute //函數描述: 任務真正工作的函數,純虛函數,需要子類化實現 //輸入: 無 //輸出: 無 //返回: 任務ID //================================================================================================ virtual void execute() = 0; private: long m_ID; //任務ID BOOL m_autoDeleteFlag; //是否自動刪除任務標記,TRUE 刪除,FALSE 不刪除,默認為TRUE int m_priority; //任務優先級,默認為5 };
Job.cpp文件:
long CJob::lastUsedID = 0; CJob::CJob() { this->m_ID = InterlockedIncrement(&lastUsedID); this->m_autoDeleteFlag = TRUE; this->m_priority = 5; this->m_Completed= FALSE; } CJob::~CJob() { } BOOL CJob::AutoDelete() { return m_autoDeleteFlag; } void CJob::setAutoDelete(BOOL autoDeleteFlag) { m_autoDeleteFlag = autoDeleteFlag; } long CJob::getID() { return this->m_ID; } int CJob::getPriority() { return this->m_priority; } void CJob::setPriority(int priority) { this->m_priority = priority; }
JobExecuter.h文件:
//一個對象對應一個線程,執行任務Job class CJobExecuter { public: CJobExecuter(CMThreadedJobQ *pJobQ); virtual ~CJobExecuter(); //================================================================================================ //函數名: stop //函數描述: 停止執行任務 //輸入: 無 //輸出: 無 //返回: 無 //================================================================================================ void stop(); //================================================================================================ //函數名: execute //函數描述: 執行一個任務 //輸入: [in] pJob 任務指針 //輸出: 無 //返回: 無 //================================================================================================ void execute(CJob* pJob); static UINT ThreadFunction(LPVOID pParam); //線程函數 CMThreadedJobQ* m_pJobQ; //指向線程任務隊列指針 CJob* m_pJob2Do; //指向正在執行任務的指針 int m_flag; //線程執行標記 CWinThread* m_pExecuterThread; //線程標識符 };
JobExecuter.cpp文件:
#define STOP_WORKING -1 #define KEEP_WORKING 0 CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ) { this->m_pJobQ= pJobQ; this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this); this->m_pJob2Do = NULL; this->m_flag = KEEP_WORKING; } CJobExecuter::~CJobExecuter() { if(this->m_pExecuterThread!= NULL ) { this->m_pExecuterThread->ExitInstance(); delete m_pExecuterThread; } } UINT CJobExecuter::ThreadFunction(LPVOID pParam) { CJobExecuter *pExecuter = (CJobExecuter *)pParam; pExecuter->m_flag = 1; ::Sleep(1); CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); while(pExecuter->m_flag !=STOP_WORKING ) { if(pExecuter->m_pJob2Do!= NULL) { pExecuter->m_pJob2Do->execute(); pExecuter->m_pJob2Do->m_Completed = TRUE; if(pExecuter->m_pJob2Do->AutoDelete()) delete pExecuter->m_pJob2Do; pExecuter->m_pJob2Do = NULL; } if(pExecuter->m_pJobQ == NULL) break; CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); singleLock.Lock(); if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter個數大於最大值,自動銷毀 { pExecuter->stop(); singleLock.Unlock(); } else { pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter); //完成任務后,添加到CMThreadedJobQ的空閑隊列中 singleLock.Unlock(); pExecuter->m_pJobQ->m_pObserverThread->ResumeThread(); pExecuter->m_pExecuterThread->SuspendThread(); } } if(pExecuter->m_pJobQ != NULL) { pExecuter->m_pJobQ->deleteJobExecuter(pExecuter); } else { delete pExecuter; } return 0; } void CJobExecuter::execute(CJob* pJob) { this->m_pJob2Do = pJob; ::Sleep(0); this->m_pExecuterThread->ResumeThread(); } void CJobExecuter::stop() { this->m_flag = STOP_WORKING; this->m_pExecuterThread->ResumeThread(); }
MThreadedJobQ.h文件:
typedef CTypedPtrList< CPtrList ,CJob*>CJobQList; //線程池任務隊列 class CMThreadedJobQ { public: typedef struct THNODE { CJobExecuter* pExecuter; THNODE * pNext ; } THNODE; CMThreadedJobQ(); virtual ~CMThreadedJobQ(); //================================================================================================ //函數名: deleteJobExecuter //函數描述: 刪除一個JobExecuter對象 //輸入: [in] pEx //輸出: 無 //返回: 無 //================================================================================================ void deleteJobExecuter(CJobExecuter *pEx); //================================================================================================ //函數名: setMaxNoOfExecuter //函數描述: 設置CJobExecuter的個數 //輸入: [in] value //輸出: 無 //返回: 無 //================================================================================================ void setMaxNoOfExecuter(int value); //================================================================================================ //函數名: addJobExecuter //函數描述: 添加一個CJobExecuter //輸入: [in] pEx //輸出: 無 //返回: 無 //================================================================================================ void addJobExecuter(CJobExecuter *pEx); //================================================================================================ //函數名: getJobExecuter //函數描述: 返回一個CJobExecuter //輸入: 無 //輸出: 無 //返回: 處理任務的指針 //================================================================================================ CJobExecuter* getJobExecuter(); //================================================================================================ //函數名: addFreeJobExecuter //函數描述: 添加一個CJobExecuter //輸入: [in] pEx //輸出: 無 //返回: 無 //================================================================================================ void addFreeJobExecuter(CJobExecuter *pEx); //================================================================================================ //函數名: addJob //函數描述: 添加一個任務 //輸入: [in] pJob //輸出: 無 //返回: 無 //================================================================================================ void addJob(CJob *pJob); //================================================================================================ //函數名: getMaxNoOfExecuter //函數描述: 獲取CJobExecuter個數的最大值 //輸入: 無 //輸出: 無 //返回: 無 //================================================================================================ int getMaxNoOfExecuter(); //================================================================================================ //函數名: getNoOfExecuter //函數描述: 獲取當前CJobExecuter的個數 //輸入: 無 //輸出: 無 //返回: 無 //================================================================================================ int getNoOfExecuter(); static UINT JobObserverThreadFunction(LPVOID); //================================================================================================ //函數名: pause //函數描述: 掛起JobObserverThread線程 //輸入: 無 //輸出: 無 //返回: 無 //================================================================================================ void pause(); //================================================================================================ //函數名: resume //函數描述: 喚醒JobObserverThread線程 //輸入: 無 //輸出: 無 //返回: 無 //================================================================================================ void resume(); CWinThread* m_pObserverThread; //向空閑的executer線程添加任務的線程 CCriticalSection m_cs; //關鍵代碼段,用於互斥 CJobQList m_jobQList; //任務隊列 private : BOOL m_pause; //JobObserverThread線程運行標記 int m_MaxNoOfExecuter; //CJobExecuter最大個數 int m_NoOfExecuter; //當前CJobExecuter個數 THNODE* m_pFreeEList; //維護空閑處理任務線程的隊列 THNODE* m_pAllEList; //維護所有處理任務線程的隊列 };
MThreadedJobQ.cpp文件:
CMThreadedJobQ::CMThreadedJobQ() { m_MaxNoOfExecuter = 2; m_pause = FALSE; m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this); m_pFreeEList =NULL; m_NoOfExecuter =0; m_pAllEList = NULL; } CMThreadedJobQ::~CMThreadedJobQ() { THNODE* pTempNode; while (m_pAllEList != NULL) { pTempNode = m_pAllEList->pNext; delete m_pAllEList->pExecuter; delete m_pAllEList; m_pAllEList = pTempNode; } while (m_pFreeEList != NULL) { pTempNode = m_pFreeEList->pNext; delete m_pFreeEList; m_pFreeEList = pTempNode; } m_pObserverThread->ExitInstance(); delete m_pObserverThread; } void CMThreadedJobQ::pause() { this->m_pause = TRUE; } void CMThreadedJobQ::resume() { this->m_pause = FALSE; this->m_pObserverThread->ResumeThread(); } UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam) { CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam; CJobExecuter *pJExecuter; while(TRUE) { Sleep(100); if(pMTJQ->m_pause != TRUE) { while(!pMTJQ->m_jobQList.IsEmpty() ) { pJExecuter = pMTJQ->getJobExecuter(); if( pJExecuter!=NULL) { pMTJQ->m_cs.Lock(); pJExecuter->execute(pMTJQ->m_jobQList.GetHead()); pMTJQ->m_jobQList.RemoveHead(); AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST); pMTJQ->m_cs.Unlock(); } else { break; } if(pMTJQ->m_pause == TRUE) break; } } pMTJQ->m_pObserverThread->SuspendThread(); } return 0; } int CMThreadedJobQ::getNoOfExecuter() { return this->m_NoOfExecuter; } int CMThreadedJobQ::getMaxNoOfExecuter() { return this->m_MaxNoOfExecuter; } void CMThreadedJobQ::addJob(CJob *pJob) { CJob * pTempJob; CSingleLock sLock(&this->m_cs); sLock.Lock(); POSITION pos,lastPos; pos = this->m_jobQList.GetHeadPosition(); lastPos = pos; if(pos != NULL) pTempJob =this->m_jobQList.GetHead(); while(pos != NULL ) { if( pJob->getPriority() > pTempJob->getPriority()) break; lastPos = pos; pTempJob = this->m_jobQList.GetNext(pos); } if(pos == NULL) this->m_jobQList.AddTail(pJob); else this->m_jobQList.InsertBefore(lastPos,pJob); this->m_pObserverThread->ResumeThread(); sLock.Unlock(); } void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx) { m_cs.Lock(); THNODE* node = new THNODE; node->pExecuter = pEx; node->pNext = this->m_pFreeEList; this->m_pFreeEList = node; m_cs.Unlock(); } CJobExecuter* CMThreadedJobQ::getJobExecuter() { THNODE *pTemp; CJobExecuter *pEx=NULL; m_cs.Lock(); if(this->m_pFreeEList != NULL) //有空閑CJobExecuter,就返回 { pTemp = this->m_pFreeEList; this->m_pFreeEList = this->m_pFreeEList->pNext; pEx = pTemp->pExecuter; delete pTemp ; m_cs.Unlock(); return pEx; } if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //沒有空閑CJobExecuter,並且當前CJobExecuter小於最大值,就生成一個新的CJobExecuter { pEx = new CJobExecuter(this); this->addJobExecuter(pEx); this->m_NoOfExecuter++; m_cs.Unlock(); return pEx; } m_cs.Unlock(); return NULL; } void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx) { m_cs.Lock(); THNODE* node = new THNODE; node->pExecuter= pEx; node->pNext = this->m_pAllEList; this->m_pAllEList = node; m_cs.Unlock(); } void CMThreadedJobQ::setMaxNoOfExecuter(int value) { this->m_cs.Lock(); if(value >1 && value <11) this->m_MaxNoOfExecuter = value; m_pObserverThread->ResumeThread(); this->m_cs.Unlock(); } void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx) { THNODE* pNode,*pNodeP; CSingleLock singleLock(&m_cs); singleLock.Lock(); if(this->m_pAllEList != NULL) { pNode = this->m_pAllEList; if(pNode->pExecuter == pEx ) { this->m_pAllEList = pNode->pNext; delete pNode; } else { pNodeP =pNode; pNode = pNode->pNext ; while(pNode != NULL ) { if(pNode->pExecuter== pEx ) break; pNodeP = pNode; pNode = pNode->pNext ; } if(pNode!= NULL) { pNodeP->pNext = pNode->pNext; delete pNode; } } } this->m_NoOfExecuter--; singleLock.Unlock(); pEx->stop(); Sleep(1); delete pEx; }
以上,就是該可伸縮多線程任務的主體框架,當我們工作需要實現類似這樣的需要:異步執行多個不同的任務時,這個例子就是一個很好的參考例子,我研究這些代碼只是為了讓我在遇到這種問題的時候,可以有一個思路去思考,而不至於無從下手,僅此而已。