Message Queue(后文簡寫成MQ或消息隊列)是boost庫中用來封裝進程間通信的一種實現,同一台機器上的進程或線程可以通過消息隊列來進行通迅。消息隊列中的消息由優先級、消息長度、消息數據三部分組成。這里需要注意的事,MQ只是簡單的將要發送的數據在內存中進行拷貝,所以我們在發送復雜結構或對象時,我們需要將其序列化后再發送,接收端接收時要反序列化,也就是說我們要自己去
定義區分一條消息(就是自定義網絡通迅協議)。在MQ中,我們可以使用三模式去發送和接收消息:
- 阻塞:在發送消息時,若消息隊列滿了,那么發送接口將會阻塞直到隊列沒有滿。在接收消息時,若隊列為空,那么接收接口也會阻塞直到隊列不空。
- 超時:用戶可以自定義超時時間,在超時時間到了,那么發送接口或接收接口都會返回,無論隊列滿或空
- Try:在隊列為空或滿時,都能立即返回
MQ使用命名的共享內存來實現進程間通信。共享內存換句話來說,就是用戶可以指定一個名稱來創建一塊共享內存,然后像打一個文件一樣去打開這塊共享內存,同樣別的進程也可以根據這個名稱來打開這塊共享內存,這樣一個進程向共享內存中寫,另一個進程就可以從共享內存中讀。這里兩個進程的讀寫就涉及到同步問題。另外,
在創建一個MQ時,我們需要指定MQ的最大消息數量以及消息的最大size。
- //Create a message_queue. If the queue
- //exists throws an exception
- message_queue mq
- (create_only //only create
- ,"message_queue" //name
- ,100 //max message number
- ,100 //max message size
- );
- using boost::interprocess;
- //Creates or opens a message_queue. If the queue
- //does not exist creates it, otherwise opens it.
- //Message number and size are ignored if the queue
- //is opened
- message_queue mq
- (open_or_create //open or create
- ,"message_queue" //name
- ,100 //max message number
- ,100 //max message size
- );
- using boost::interprocess;
- //Opens a message_queue. If the queue
- //does not exist throws an exception.
- message_queue mq
- (open_only //only open
- ,"message_queue" //name
- );
使用message_queue::remove("message_queue");來移除一個指定的消息隊列。
接下來,我們看一個使用消息隊列的生產者與消息者的例子。第一個進程做為生產者,第二個進程做為消費者。
生產者進程:
- #include <boost/interprocess/ipc/message_queue.hpp>
- #include <iostream>
- #include <vector>
- using namespace boost::interprocess;
- int main ()
- {
- try{
- //Erase previous message queue
- message_queue::remove("message_queue");
- //Create a message_queue.
- message_queue mq
- (create_only //only create
- ,"message_queue" //name
- ,100 //max message number
- ,sizeof(int) //max message size
- );
- //Send 100 numbers
- for(int i = 0; i < 100; ++i){
- mq.send(&i, sizeof(i), 0);
- }
- }
- catch(interprocess_exception &ex){
- std::cout << ex.what() << std::endl;
- return 1;
- }
- return 0;
- }
消費者進程:
- #include <boost/interprocess/ipc/message_queue.hpp>
- #include <iostream>
- #include <vector>
- using namespace boost::interprocess;
- int main ()
- {
- try{
- //Open a message queue.
- message_queue mq
- (open_only //only create
- ,"message_queue" //name
- );
- unsigned int priority;
- message_queue::size_type recvd_size;
- //Receive 100 numbers
- for(int i = 0; i < 100; ++i){
- int number;
- mq.receive(&number, sizeof(number), recvd_size, priority);
- if(number != i || recvd_size != sizeof(number))
- return 1;
- }
- }
- catch(interprocess_exception &ex){
- message_queue::remove("message_queue");
- std::cout << ex.what() << std::endl;
- return 1;
- }
- message_queue::remove("message_queue");
- return 0;
- }