QT 線程池 + TCP 小試(一)線程池的簡單實現


*免分資源鏈接點擊打開鏈接http://download.csdn.net/detail/goldenhawking/4492378

    很久以前做過ACE + MFC/QT 的中輕量級線程池應用,大概就是利用線程池執行客戶機上的運算需求,將結果返回。ACE是跨平台重量級的通信中間件,與常見的應用程序框架需要精心契合,才能不出問題。最近想到既然QT框架本身就已經具有各類功能,何不玩一玩呢吐舌頭,那就開搞!這個實驗的代碼可以從我的資源內下載。

    第一步打算實現的模式,我們需要一個設置為CPU核心數的線程池,這個線程池可以異步接受N個數據生產者傳入的數據,均衡的分配處理任務,處理后的數據返回給某1個或者幾個消費者。有兩種均衡方法。一種是生產者粒度的均衡。同一個生產者的各批數據FIFO順序不被打破,這需要判斷,當處理線程隊列中還有該生產者的數據時,不改變當前處理線程。第二種是數據粒度的並行,某個生產者傳來的數據被分配到不同的線程,不保證后到的數據后被處理(也可能先到的處理的慢,后到的快)。

    這種異步隊列機制如果在MFC、WinAPI中,需要手工使用 Mutex 同步隊列,更可惡的是分配的數據對象的生存期非常微妙,一不小心就會出紅叉叉。QT首先為我們提供了信號和槽的機制,且該機制原生支持跨線程。假設我們在16核心服務器上,則使用 15個 QThread對象管理15組工作線程(留一個給主界面)。但是,如果仔細看了QT的文檔,就會發現QThread的信號事件循環默認是在創建者中(很多時候就是主線程!),所以,要想讓槽在子線程運行,一般是派生一個QObject的類,並把對象MoveToThread到某個QThread管理的線程上去。這樣,信號和槽就是全異步FIFO了。其次,QT提供了引用計數的QByteArray封裝,這個東西在參數傳遞的時候,速度很快,很少出現memcpy,生存期也特別容易控制。雖然C++11里有 shared_ptr<T>,但是那個東西還是需要在一開始new 一個int8型的存儲區,很討厭。

說了這么多,上關鍵代碼。

 先是線程池的封裝qghthreadengine.h

[cpp]  view plain  copy
 
  1. #ifndef QGHTHREADENGINE_H  
  2. #define QGHTHREADENGINE_H  
  3.   
  4. #include <QObject>  
  5. #include <QThread>  
  6. #include <QVector>  
  7. #include <QList>  
  8. #include <QMap>  
  9. #include <QMutex>  
  10. #include "qghthreadtaskitem.h"  
  11. #include "qghthreadobject.h"  
  12.   
  13. //線程池引擎,幫助用戶進行動態平衡  
  14. class QGHThreadEngine : public QObject  
  15. {  
  16.     Q_OBJECT  
  17. public:  
  18.     QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads = 2,bool bFIFOKeep = true);  
  19.     ~QGHThreadEngine();  
  20. protected:  
  21.     QVector<QThread *> m_ThreadPool;  
  22.     QVector<QGHThreadObject *> m_ThreadObjs;  
  23.     QGHThreadTaskItem * m_pThreadTaskItem;  
  24.     int m_nThreads;  
  25.     bool m_bFIFOKeep;  
  26. private:  
  27.     //各個m_ThreadPool\m_ThreadObjs的任務數  
  28.     QMap<QObject *,qint32> m_map_Tasks;         
  29.     //m_bFIFOKeep == true 時,下面兩個成員將保證非空閑的單個 data_source 將始終在單一線程處理  
  30.     //各個data_source 目前的處理線程  
  31.     QMap<QObject *,QObject *> m_map_busy_source_task;   
  32.     //各個data_source 目前的排隊數目  
  33.     QMap<QObject *,int> m_map_busy_source_counter;          
  34. public:  
  35.     void SetThreadTaskItem(QGHThreadTaskItem * pTaskItem);  
  36.     QList<qint32> CurrentLoad()  
  37.     {  
  38.         return m_map_Tasks.values();  
  39.     }  
  40. public slots:  
  41.     void append_new(QObject * data_source, const QByteArray & data);  
  42.     //捕獲QGHThreadObject::sig_process_finished, 以便管理data_source的 FIFO 順序  
  43.     void on_sig_process_finished(QObject * data_source);  
  44. signals:  
  45.     //************************************  
  46.     // Method:    do_task  
  47.     // FullName:  QGHThreadEngine::do_task  
  48.     // Access:    public   
  49.     // Returns:   void  
  50.     // Qualifier:  
  51.     // Parameter: QObject *     任務來源 (相同任務源的任務,在隊列非空時會被安排到同一個線程處理,以確保對相同源的FIFO)  
  52.     // Parameter: QByteArray    任務體   
  53.     // Parameter: QObject *     處理任務的線程對象(QGHThreadObject)  
  54.     //************************************  
  55.     void do_task(QObject *, const QByteArray &,QObject *);  
  56. };  
  57.   
  58. #endif // QGHTHREADENGINE_H  



 
        

 實現qghthreadengine.cpp:

 

 

[cpp]  view plain  copy
 
  1. #include "qghthreadengine.h"  
  2. #include <assert.h>  
  3. QGHThreadEngine::QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads,bool bFIFOKeep)  
  4.     : QObject(parent),  
  5.     m_nThreads(nThreads),  
  6.     m_pThreadTaskItem(pTaskItem),  
  7.     m_bFIFOKeep(bFIFOKeep)  
  8. {  
  9.     assert(nThreads>0 && nThreads<512 && pTaskItem!=NULL);  
  10.     //創建固定數目的線程  
  11.     for (int i=0;i<nThreads;i++)  
  12.     {  
  13.         QThread * pNewThread = new QThread(this);  
  14.         QGHThreadObject * pNewObject = new QGHThreadObject(0,pTaskItem);  
  15.         //記錄下來  
  16.         m_ThreadPool.push_back(pNewThread);  
  17.         m_ThreadObjs.push_back(pNewObject);  
  18.         m_map_Tasks[pNewObject] = 0;  
  19.         pNewThread->start();  
  20.         //把QGHThreadObject的信號、曹處理搬移到子線程內  
  21.         pNewObject->moveToThread(pNewThread);  
  22.         //連接處理完成消息  
  23.         connect(pNewObject,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));  
  24.         //連接處理新任務消息  
  25.         connect(this,SIGNAL(do_task(QObject *, const QByteArray &,QObject *)),pNewObject,SLOT(process(QObject *, const QByteArray &,QObject *)));  
  26.   
  27.     }  
  28. }  
  29.   
  30. QGHThreadEngine::~QGHThreadEngine()  
  31. {  
  32.     foreach(QGHThreadObject * obj,m_ThreadObjs)  
  33.     {  
  34.         disconnect(obj,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));  
  35.         obj->deleteLater();  
  36.     }  
  37.     foreach(QThread * th ,m_ThreadPool)  
  38.     {  
  39.         disconnect(this,SIGNAL(do_task(QObject *, QByteArray,QObject *)),th,SLOT(process(QObject *, QByteArray,QObject *)));  
  40.         th->exit(0);  
  41.         th->wait();  
  42.     }  
  43. }  
  44.   
  45. //負載均衡添加任務,生產者的信號要掛接到這個槽上  
  46. void QGHThreadEngine::append_new(QObject * data_source, const QByteArray &  data)  
  47. {  
  48.     QObject * pMinObj = 0;  
  49.     //對一批來自同一數據源的數據,使用同樣的數據源處理,以免發生多線程擾亂FIFO對單個data_source的完整性  
  50.     if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()&& m_bFIFOKeep==true)  
  51.     {  
  52.         m_map_busy_source_counter[data_source]++;  
  53.         pMinObj = m_map_busy_source_task[data_source];  
  54.     }  
  55.     else  
  56.     {  
  57.         qint32 nMinCost = 0x7fffffff;  
  58.         //尋找現在最空閑的一個線程  
  59.         for (QMap<QObject *,qint32>::iterator p = m_map_Tasks.begin();p!=m_map_Tasks.end();p++)  
  60.         {  
  61.             if (p.value()< nMinCost)  
  62.             {  
  63.                 nMinCost = p.value();  
  64.                 pMinObj = p.key();  
  65.             }  
  66.         }  
  67.         if (pMinObj)  
  68.         {  
  69.             m_map_busy_source_counter[data_source] = 1;  
  70.             m_map_busy_source_task[data_source] = pMinObj;  
  71.         }  
  72.     }  
  73.     if (pMinObj)  
  74.     {  
  75.         m_map_Tasks[pMinObj]++;  
  76.         emit do_task(data_source,data,pMinObj);  
  77.     }  
  78. }  
  79. void QGHThreadEngine::on_sig_process_finished(QObject * data_source)  
  80. {  
  81.     if (m_map_Tasks.find(sender())!=m_map_Tasks.end())  
  82.     {  
  83.         m_map_Tasks[sender()]--;  
  84.     }  
  85.     if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end())  
  86.     {  
  87.         m_map_busy_source_counter[data_source]--;  
  88.         if (m_map_busy_source_counter[data_source]<=0)  
  89.         {  
  90.             m_map_busy_source_counter.remove(data_source);  
  91.             m_map_busy_source_task.remove(data_source);  
  92.         }  
  93.     }  
  94. }     

用於綁定的 qghthreadobject.h

 

 

[cpp]  view plain  copy
 
  1. #ifndef QGHTHREADOBJECT_H  
  2. #define QGHTHREADOBJECT_H  
  3. #include <QObject>  
  4. #include "qghthreadtaskitem.h"  
  5. //用於在子線程內具體承擔事件循環的類,用戶無需重載  
  6. class QGHThreadObject:public QObject  
  7. {  
  8.     Q_OBJECT  
  9.   
  10. public:  
  11.     QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem);  
  12.     ~QGHThreadObject();  
  13. public:  
  14.     void SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem);  
  15. public slots:  
  16.     //************************************  
  17.     // Method:    process  
  18.     // FullName:  QGHThreadObject::process  
  19.     // Access:    public   
  20.     // Returns:   void  
  21.     // Qualifier:  
  22.     // Parameter: QObject *     任務來源 (相同任務源的任務,在隊列非空時會被安排到同一個線程處理,以確保對相同源的FIFO)  
  23.     // Parameter: QByteArray    任務體   
  24.     // Parameter: QObject *     處理任務的線程對象(QGHThreadObject)  
  25.     //************************************  
  26.     void process(QObject * data_source, const QByteArray &data,QObject * target);  
  27. private:  
  28.     QGHThreadTaskItem * m_pThreadTaskItem;  
  29. signals:  
  30.     //信號,表示一次處理已經完成。QGHThreadEngine捕獲該信號,管理data_source的 FIFO 順序  
  31.     void sig_process_finished(QObject * data_source);  
  32. };  
  33. #endif  


相應實現qghthreadobject.cpp

 

 

[cpp]  view plain  copy
 
  1. #include "qghthreadobject.h"  
  2. #include <assert.h>  
  3.   
  4. QGHThreadObject::QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem)  
  5.     : QObject(parent),  
  6.     m_pThreadTaskItem(pThreadTaskItem)  
  7. {  
  8.     assert(pThreadTaskItem!=NULL);  
  9.   
  10. }  
  11.   
  12. QGHThreadObject::~QGHThreadObject()  
  13. {  
  14. }  
  15. void QGHThreadObject::process(QObject * data_source, const QByteArray &data,QObject * target)  
  16. {  
  17.     if (target==this)  
  18.     {  
  19.         m_pThreadTaskItem->run(data_source,data);  
  20.         emit sig_process_finished(data_source);  
  21.     }  
  22. }  
  23.   
  24. void QGHThreadObject::SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem)  
  25. {  
  26.     assert(pThreadTaskItem!=NULL);  
  27.     m_pThreadTaskItem = pThreadTaskItem;  
  28. }  


最后,是供用戶重載的實際處理方法的純虛基類qghthreadtaskitem.h

 

 

[cpp]  view plain  copy
 
  1. #ifndef QGHTHREADTASKITEM_H  
  2. #define QGHTHREADTASKITEM_H  
  3. #include <QObject>  
  4. //用戶重載該類,實現自定義方法的線程池調用  
  5. class QGHThreadTaskItem:public QObject  
  6. {  
  7.     Q_OBJECT  
  8.   
  9. public:  
  10.     QGHThreadTaskItem(QObject *parent);  
  11.     ~QGHThreadTaskItem();  
  12. public:  
  13.     virtual void run(QObject * task_source, const QByteArray & data_array) = 0;  
  14.   
  15. };  
  16. #endif  


下次,繼續寫如何實現一個TCP鏈路,讓這個線程池活起來

http://blog.csdn.net/goldenhawking/article/details/7854413


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM