C++ 並發消息隊列
在網上找到了一份POSIX線程顯示的並發消息隊列示例代碼:
http://codereview.stackexchange.com/questions/41604/thread-safe-concurrent-fifo-queue-in-c
上面的示例代碼其實是有問題的,他只能對並發Push或者並發Pop進行上鎖,二並不能保證同時Push和Pop是線程安全的,所以在鎖隊列時只能使用一個鎖。同時該代碼並不支持Windows,所以按照這篇文檔的思路想使用標准模板庫(STL)實現一份平台無關的代碼,具體實現如下所示。
1 #include <queue> 2 #include <mutex> 3 #include <thread> 4 #include <chrono> 5 #include <memory> 6 #include <condition_variable> 7 8 typedef struct task_tag 9 { 10 int data; 11 task_tag( int i ) : data(i) { } 12 } Task, *PTask; 13 14 class MessageQueue 15 { 16 public: 17 MessageQueue(){} 18 ~MessageQueue() 19 { 20 if ( !m_queue.empty() ) 21 { 22 PTask pRtn = m_queue.front(); 23 delete pRtn; 24 } 25 26 } 27 28 void PushTask( PTask pTask ) 29 { 30 std::unique_lock<std::mutex> lock( m_queueMutex ); 31 m_queue.push( pTask ); 32 m_cond.notify_one(); 33 } 34 35 PTask PopTask() 36 { 37 PTask pRtn = NULL; 38 std::unique_lock<std::mutex> lock( m_queueMutex ); 39 while ( m_queue.empty() ) 40 { 41 m_cond.wait_for( lock, std::chrono::seconds(1) ); 42 } 43 44 if ( !m_queue.empty() ) 45 { 46 pRtn = m_queue.front(); 47 if ( pRtn->data != 0 ) 48 m_queue.pop(); 49 } 50 51 return pRtn; 52 } 53 54 private: 55 std::mutex m_queueMutex; 56 std::condition_variable m_cond; 57 std::queue<PTask> m_queue; 58 }; 59 60 void thread_fun( MessageQueue *arguments ) 61 { 62 while ( true ) 63 { 64 PTask data = arguments->PopTask(); 65 66 if (data != NULL) 67 { 68 printf( "Thread is: %d\n", std::this_thread::get_id() ); 69 printf(" %d\n", data->data ); 70 if ( 0 == data->data ) //Thread end. 71 break; 72 else 73 delete data; 74 } 75 } 76 77 return; 78 } 79 80 int main( int argc, char *argv[] ) 81 { 82 MessageQueue cq; 83 84 #define THREAD_NUM 3 85 std::thread threads[THREAD_NUM]; 86 87 for ( int i=0; i<THREAD_NUM; ++i ) 88 threads[i] = std::thread( thread_fun, &cq ); 89 90 int i = 100000; 91 while( i > 0 ) 92 { 93 Task *pTask = new Task( --i ); 94 cq.PushTask( pTask ); 95 } 96 97 for ( int i=0; i<THREAD_NUM; ++i) 98 threads[i].join(); 99 100 //system( "pause" ); 101 return 0; 102 }
在示例代碼中,我們使主線程向公共隊列cq中Push任務,而其他的線程則負責取出任務並打印任務,由於std::cout並不支持並發線程安全,所以在打印任務時使用printf。主線程new出的任務,在其他線程中使用並銷毀,當主線程發送data為0的任務時,則規定任務發送完畢,而其他的線程獲取到data為0的任務后退出線程,data為0的任務則有消息隊列負責銷毀。整個消息隊列使用標准模板庫實現,現實跨平台。
在最初設計std::queue<PTask>的時候,想使用std::queue<std::shared_ptr<Task>>來管理主線程new出來的任務,這樣智能指針則負責處理任務的銷毀工作,但是在多線程並發的時候程序莫名的崩潰,仔細調試了半天,還是沒有找到問題,最終我懷疑智能指針在多線程中是不是有問題呢?所以不得不放棄最初的設計。