C++ 並發消息隊列


C++ 並發消息隊列

  在網上找到了一份POSIX線程顯示的並發消息隊列示例代碼:

        http://codereview.stackexchange.com/questions/41604/thread-safe-concurrent-fifo-queue-in-c

  上面的示例代碼其實是有問題的,他只能對並發Push或者並發Pop進行上鎖,二並不能保證同時Push和Pop是線程安全的,所以在鎖隊列時只能使用一個鎖。同時該代碼並不支持Windows,所以按照這篇文檔的思路想使用標准模板庫(STL)實現一份平台無關的代碼,具體實現如下所示。

  1 #include <queue>
  2 #include <mutex>
  3 #include <thread>
  4 #include <chrono>
  5 #include <memory>
  6 #include <condition_variable>
  7 
  8 typedef struct task_tag
  9 {
 10     int data;
 11     task_tag( int i ) : data(i) { }
 12 } Task, *PTask;
 13 
 14 class MessageQueue
 15 {
 16 public:
 17     MessageQueue(){}
 18     ~MessageQueue()
 19     {
 20         if ( !m_queue.empty() )
 21         {
 22             PTask pRtn = m_queue.front();
 23             delete pRtn;
 24         }
 25         
 26     }
 27 
 28     void PushTask( PTask pTask )
 29     {
 30         std::unique_lock<std::mutex> lock( m_queueMutex );
 31         m_queue.push( pTask );
 32         m_cond.notify_one();
 33     }
 34 
 35     PTask PopTask()
 36     {
 37         PTask pRtn = NULL;
 38         std::unique_lock<std::mutex> lock( m_queueMutex );
 39         while ( m_queue.empty() )
 40         {
 41             m_cond.wait_for( lock, std::chrono::seconds(1) );
 42         }
 43 
 44         if ( !m_queue.empty() )
 45         {
 46             pRtn = m_queue.front();
 47             if ( pRtn->data != 0 )
 48                 m_queue.pop();
 49         }
 50 
 51         return pRtn;
 52     }
 53 
 54 private:
 55     std::mutex m_queueMutex;
 56     std::condition_variable m_cond; 
 57     std::queue<PTask> m_queue;
 58 };
 59 
 60 void thread_fun( MessageQueue *arguments )
 61 {
 62     while ( true )
 63     {
 64         PTask data = arguments->PopTask();
 65 
 66         if (data != NULL)
 67         {
 68             printf( "Thread is: %d\n", std::this_thread::get_id() );
 69             printf("   %d\n", data->data );
 70             if ( 0 == data->data ) //Thread end.
 71                 break;
 72             else
 73                 delete data;
 74         }
 75     }
 76 
 77     return;
 78 }
 79 
 80  int main( int argc, char *argv[] )
 81 {
 82     MessageQueue cq;
 83 
 84     #define THREAD_NUM 3
 85     std::thread threads[THREAD_NUM];
 86 
 87     for ( int i=0; i<THREAD_NUM; ++i )
 88         threads[i] = std::thread( thread_fun, &cq );
 89 
 90     int i = 100000;
 91     while( i > 0 )
 92     {
 93         Task *pTask = new Task( --i );
 94         cq.PushTask( pTask );
 95     }
 96 
 97     for ( int i=0; i<THREAD_NUM; ++i) 
 98         threads[i].join();
 99 
100     //system( "pause" );
101     return 0;
102 }

  在示例代碼中,我們使主線程向公共隊列cq中Push任務,而其他的線程則負責取出任務並打印任務,由於std::cout並不支持並發線程安全,所以在打印任務時使用printf。主線程new出的任務,在其他線程中使用並銷毀,當主線程發送data為0的任務時,則規定任務發送完畢,而其他的線程獲取到data為0的任務后退出線程,data為0的任務則有消息隊列負責銷毀。整個消息隊列使用標准模板庫實現,現實跨平台。

  在最初設計std::queue<PTask>的時候,想使用std::queue<std::shared_ptr<Task>>來管理主線程new出來的任務,這樣智能指針則負責處理任務的銷毀工作,但是在多線程並發的時候程序莫名的崩潰,仔細調試了半天,還是沒有找到問題,最終我懷疑智能指針在多線程中是不是有問題呢?所以不得不放棄最初的設計。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM