C++多線程框架--------- 消息隊列


之前,多線程一些基本的東西,包括線程創建,互斥鎖,信號量,我們都已經封裝,下面來看看消息隊列
 
我們盡量少用系統自帶的消息隊列(比如Linux的sys/msgqueue),那樣移植性不是很強,我們希望的消息隊列,在消息打包和提取都是用的標准的C++數據結構,當然,你也可以用鏈表或者是FIFO,那樣得先寫個鏈表或者FIFO出來。
 
 
我比較懶,直接用的C++的STL的deque,即雙端口隊列,這樣可靠性有保證,當然,速度可能沒有自己寫的鏈表快,但是沒關系,使用雙端口隊列還可以根據你自己的需要將數據插入到隊列頭或者隊列尾,這樣在消息有優先級的情況下還是有用的。
 
消息隊列的核心作用其實很簡單,一個或多個線程往一個隊列后面堆數據,另外的一個線程從隊列前面取數據處理,基本操作也只有兩個,一個發,一個收,所以,我們定義消息隊列基類為:
 
[cpp]  view plain  copy
 
 print?
  1. class CMsgQueue  
  2. {  
  3.     public:  
  4.         CMsgQueue(const char *pName=NULL);  
  5.         ~CMsgQueue();  
  6.         //revice data from message queue  
  7.         virtual bool recvMsg(unsigned int &m_msg_code,void *&p_msg)=0;  
  8.         //send data to message queue  
  9.         virtual bool sendMsg(unsigned int m_msg_code,void *p_msg)=0;  
  10.         const char * getName(voidconst {  
  11.                 return msg_queue_name;  
  12.             }         
  13.     private:  
  14.         char *msg_queue_name;  
  15. };  
 
然后記得在COperratingSystemFactory里加上創建消息隊列的方法:
[cpp]  view plain  copy
 
 print?
  1. class COperatingSystemFactory  
  2. {  
  3.     public:   
  4.         static COperatingSystem *newOperatingSystem();  
  5.         static CCountingSem  *newCountingSem(unsigned int init);  
  6.         static CMutex           *newMutex(const char *pName=NULL);  
  7.         static CMsgQueue     *newMsgQueue(const char *pName=NULL);  
  8.   
  9. };  
 
最后,從CMsgQueue繼承一個CLinuxMsgQueue,然后把recvMsg和sendMsg實現吧,實現的時候注意一下。
 
單純的操作雙端口FIFO不行,我們希望是接收消息的時候如果沒有消息,線程阻塞在那里等待消息直到有消息到來才接着運行,所以,接收消息的時候我們用了信號量,阻塞在信號量那里,發送消息的時候操作完隊列,發送一個信號量出去。
 
其次,對於隊列的操作,我們希望是原子性的,不然一個正在收一個正在發就亂了,所以操作隊列的時候我們用互斥鎖來鎖一下,保證基本的原子性。
 
對應到具體的程序就是
1.為每個消息隊列申請一個鎖,一個信號量
[cpp]  view plain  copy
 
 print?
  1. CLinuxMsgQueue::CLinuxMsgQueue(const char *pName):  
  2. CMsgQueue(pName)  
  3. {  
  4.     p_mutex=COperatingSystemFactory::newMutex("Msg Mutex");  
  5.     p_sem=COperatingSystemFactory::newCountingSem(0);  
  6. }  
 
接收消息的時候:
[cpp]  view plain  copy
 
 print?
  1. bool CLinuxMsgQueue::recvMsg(unsigned int &m_msg_code,void *&p_msg)  
  2. {  
  3.     bool result;  
  4.         Elements queue_element;  
  5.     p_sem->Get();  //通過信號量阻塞在這里,有消息到達了才接着往下走  
  6.     p_mutex->Lock();  //鎖定,保證原子性  
  7.         //操作隊列  
  8.     if (m_queue.empty()) {  
  9.                 p_mutex-> UnLock ();   
  10.             return false;     
  11.     }  
  12.     queue_element = m_queue.front();  
  13.     m_queue.pop_front();  
  14.     m_msg_code = queue_element.msg_code;  
  15.     p_msg = queue_element.p_message;  
  16.         //操作隊列結束  
  17.     p_mutex->UnLock(); //解除鎖定  
  18.         return true;  
  19. }  
 
發送的時候也是類似的方式進行,這樣,一個最簡單消息隊列就完成了。如果我們要使用消息隊列的話,很簡單,在main.cpp中
[cpp]  view plain  copy
 
 print?
  1. int main()  
  2. {  
  3.         //首先,新建一個消息隊列  
  4.         CMsgQueue *q=COperatingSystemFactory::newMsgQueue("B to A message Queue");  
  5.         //新建兩個線程,TestThread和TestThreadB都是從CThread繼承下來的線程類  
  6.     TestThread *a=new TestThread("A");  
  7.     TestThreadB *b=new TestThreadB("B");  
  8.         //將消息隊列放到兩個線程實體的局部變量中  
  9.     a->setMsgQueue(q);  
  10.     b->setMsgQueue(q);  
  11.         //啟動線程  
  12.     a->run();  
  13.     b->run();  
  14. }  
 
當要在mainloop中發送消息的時候,只需要調用
     
[cpp]  view plain  copy
 
 print?
  1. p_msg_send->sendMsg(code, (void *)p_msg);   //其中p_msg_send是b線程的局部變量,實際指向的是之前新建的消息隊列q  
在a線程中需要接受消息,調用方法也類似,具體可以之間看代碼。
 
這樣,一個最簡單的C++多線程框架就完成了。具體的代碼可以直接到github上下載
 
 
 
寫在后面的話:
 
當然,這個代碼還非常不完整,整個代碼量也沒有多少行,在這里,我只是提供一個代碼框架的方法,作為一個demo給大家參考,如果真的需要實際使用還有很多很多地方需要修改的,github上我的代碼也不能在生產軟件中實際使用,在實際的項目中,我也實現了一個沒有任何第三方的線程庫,比這個復雜多了,還包括事件處理,等待超時,消息廣播,消息訂閱等模塊,而且能運行在linux,ecos等多個平台上,基本做到平台無關了,但由於各種原因我也沒辦法將代碼都公布出來,這里所說的這個框架只是項目中線程庫提取出來的非常少的一部分,同樣,也只是提供一種編程的設計思想,后面的東西希望大家各自有各自的發掘和完善,也許你看了以后,會提出更加強大和簡潔的框架。
 
另外,github上的代碼我會繼續完善,將其他模塊陸續加上,如果大家感興趣也可以跟我一起來完善,我盡量不使用之前實現過的線程庫的代碼,避免不必要的麻煩


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM