rabbitmq AmqpClient 使用Topic 交換機同一個channel 同時多個隊列 ,多個交換機,C++代碼示例


  1 // 消息發送
  2 bool PublishExchangeTopicMulti(const std::string &strUri)
  3 {
  4     AmqpClient::Channel::ptr_t channel = 
  5         AmqpClient::Channel::CreateFromUri(strUri);
  6 
  7     if (channel == nullptr)
  8     {
  9         return false;
 10     }
 11 
 12     // 聲明交換機,若不存在則創建 
 13     std::string strTopicExchange1 = "topic_exchange_1";
 14     std::string strTopicExchange2 = "topic_exchange_2";
 15     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
 16     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
 17 
 18     while (true)
 19     {
 20         // 可輸入例如 "topic_exchange_1 disk.info 666"
 21         // "topic_exchange_2 any.warning 123"
 22         std::cout << "請輸入[exchange] [routing_key1.routing_key2] [message]: " << std::endl;
 23     
 24         std::string strExchange;
 25         std::string severity;
 26         std::string message;
 27         std::cin >> strExchange;
 28         std::cin >> severity;
 29         std::cin >> message;
 30 
 31         channel->BasicPublish(strExchange, severity,
 32             AmqpClient::BasicMessage::Create(message));
 33 
 34         std::cout << "[X] to " << strExchange << ", send " 
 35             << severity << ": " << message << std::endl;
 36     }
 37 
 38 }
 39 
 40 
 41 void ReceiveTopicExchangeMulti(const std::string &strUri)
 42 {
 43     AmqpClient::Channel::ptr_t channel = 
 44         AmqpClient::Channel::CreateFromUri(strUri);
 45 
 46     if (channel == nullptr)
 47     {
 48         return ;
 49     }
 50 
 51     // 這里我們聲明兩個交換機,類型均為topic , 我們將通過不同的隊列從兩個交換機中取消息。
 52     // 這里交換機的名稱需要與發送端的保持一致。
 53     std::string strTopicExchange1 = "topic_exchange_1";
 54     std::string strTopicExchange2 = "topic_exchange_2";
 55     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
 56     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
 57 
 58     // 這里我們聲明了三個隊列,第一個隊列從交換機1 取數據第二三個隊列從交換機2 取數據;
 59     // 但是第二三個隊列所綁定的routing_key 有所不同。
 60     // 當然了,routing_key 也可以相同,這樣的話相同routing_key 的消息就會在兩個隊列中都出現。
 61     std::string strTopicQueue_1 = "topic_queue_1";
 62     std::string strTopicQueue_2 = "topic_queue_2";
 63     std::string strTopicQueue_3 = "topic_queue_3";
 64     // 第一個參數若為空,則系統默認生成隨機隊列名稱
 65     // 第三個參數 表明隊列是否持久的。true: 服務器重啟將會保留該Exchange,
 66     // 警告,若只設置此項,不代表消息的持久化。即不保證重啟后消息還在。
 67     channel->DeclareQueue(strTopicQueue_1, false, true, false, false);
 68     channel->DeclareQueue(strTopicQueue_2, false, true, false, false);
 69     channel->DeclareQueue(strTopicQueue_3, false, true, false, false);
 70 
 71     // 隊列綁定我們感興趣的routing_key, 表示我們只接收這些routing_key 相關的消息。
 72     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "*.info");
 73     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "disk.*");
 74     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "info.error");
 75 
 76     // 在交換機2 上面我們綁定了隊列2 和隊列3 。但是它們所關心的routing_key 不同。
 77     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "*.info");
 78     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "disk.*");
 79     channel->BindQueue(strTopicQueue_2, strTopicExchange2, "info.error");
 80 
 81     // 創建消費者標志,這個在后面會告訴 channel 我們需要哪些隊列中的相關routing_key 的消息。
 82     // BasicConsume() 第五個參數是指該消息是否以獨占的方式處理,若是則不允許第二個消費者綁定到該隊列 上,
 83     // 若否,則多個消費者同時綁定到該隊列,那么 在該隊列上的消息將隨機分配到某一個消費者。
 84     // 即,同一個消息將不會同時出現 在兩個消費者身上。
 85     std::string strFlagConsume_1 = "tab_consume_1";
 86     std::string strFlagConsume_2 = "tab_consume_2";
 87     std::string strFlagConsume_3 = "tab_consume_3";
 88     channel->BasicConsume(strTopicQueue_1, strFlagConsume_1, true, false, true);
 89     channel->BasicConsume(strTopicQueue_2, strFlagConsume_2, true, false, true);
 90     channel->BasicConsume(strTopicQueue_3, strFlagConsume_3, true, false, true);
 91     // BasicConsume() 的第4 個參數為false 表示,我們需要主動ack 服務器才將該消息清除。
 92 
 93     std::vector<std::string> vecFlagConsume;
 94     vecFlagConsume.push_back(strFlagConsume_1);
 95     vecFlagConsume.push_back(strFlagConsume_2);
 96     vecFlagConsume.push_back(strFlagConsume_3);
 97 
 98     while (true)
 99     {
100         AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(vecFlagConsume);
101 
102         std::string strExchange = envelope->Exchange();
103         std::string strFlagConsume = envelope->ConsumerTag();
104         std::string severity = envelope->RoutingKey();
105         std::string buffer = envelope->Message()->body();
106 
107         std::cout << "[Y] exchange: " << strExchange << ", flagconsume: " << strflagConsume 
108             << ", receive " << severity << ": " << buffer << std::endl;
109 
110         channel->BasicAck(envelope);
111     }
112 
113     for (size_t i = 0; i < vecFlagConsume.size(); ++i)
114         channel->BasicCancel(vecFlagConsume[i]);
115 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM