1 // 消息發送 2 bool PublishExchangeTopicMulti(const std::string &strUri) 3 { 4 AmqpClient::Channel::ptr_t channel = 5 AmqpClient::Channel::CreateFromUri(strUri); 6 7 if (channel == nullptr) 8 { 9 return false; 10 } 11 12 // 聲明交換機,若不存在則創建 13 std::string strTopicExchange1 = "topic_exchange_1"; 14 std::string strTopicExchange2 = "topic_exchange_2"; 15 channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC); 16 channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC); 17 18 while (true) 19 { 20 // 可輸入例如 "topic_exchange_1 disk.info 666" 21 // "topic_exchange_2 any.warning 123" 22 std::cout << "請輸入[exchange] [routing_key1.routing_key2] [message]: " << std::endl; 23 24 std::string strExchange; 25 std::string severity; 26 std::string message; 27 std::cin >> strExchange; 28 std::cin >> severity; 29 std::cin >> message; 30 31 channel->BasicPublish(strExchange, severity, 32 AmqpClient::BasicMessage::Create(message)); 33 34 std::cout << "[X] to " << strExchange << ", send " 35 << severity << ": " << message << std::endl; 36 } 37 38 } 39 40 41 void ReceiveTopicExchangeMulti(const std::string &strUri) 42 { 43 AmqpClient::Channel::ptr_t channel = 44 AmqpClient::Channel::CreateFromUri(strUri); 45 46 if (channel == nullptr) 47 { 48 return ; 49 } 50 51 // 這里我們聲明兩個交換機,類型均為topic , 我們將通過不同的隊列從兩個交換機中取消息。 52 // 這里交換機的名稱需要與發送端的保持一致。 53 std::string strTopicExchange1 = "topic_exchange_1"; 54 std::string strTopicExchange2 = "topic_exchange_2"; 55 channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC); 56 channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC); 57 58 // 這里我們聲明了三個隊列,第一個隊列從交換機1 取數據第二三個隊列從交換機2 取數據; 59 // 但是第二三個隊列所綁定的routing_key 有所不同。 60 // 當然了,routing_key 也可以相同,這樣的話相同routing_key 的消息就會在兩個隊列中都出現。 61 std::string strTopicQueue_1 = "topic_queue_1"; 62 std::string strTopicQueue_2 = "topic_queue_2"; 63 std::string strTopicQueue_3 = "topic_queue_3"; 64 // 第一個參數若為空,則系統默認生成隨機隊列名稱 65 // 第三個參數 表明隊列是否持久的。true: 服務器重啟將會保留該Exchange, 66 // 警告,若只設置此項,不代表消息的持久化。即不保證重啟后消息還在。 67 channel->DeclareQueue(strTopicQueue_1, false, true, false, false); 68 channel->DeclareQueue(strTopicQueue_2, false, true, false, false); 69 channel->DeclareQueue(strTopicQueue_3, false, true, false, false); 70 71 // 隊列綁定我們感興趣的routing_key, 表示我們只接收這些routing_key 相關的消息。 72 channel->BindQueue(strTopicQueue_1, strTopicExchange1, "*.info"); 73 channel->BindQueue(strTopicQueue_1, strTopicExchange1, "disk.*"); 74 channel->BindQueue(strTopicQueue_1, strTopicExchange1, "info.error"); 75 76 // 在交換機2 上面我們綁定了隊列2 和隊列3 。但是它們所關心的routing_key 不同。 77 channel->BindQueue(strTopicQueue_3, strTopicExchange2, "*.info"); 78 channel->BindQueue(strTopicQueue_3, strTopicExchange2, "disk.*"); 79 channel->BindQueue(strTopicQueue_2, strTopicExchange2, "info.error"); 80 81 // 創建消費者標志,這個在后面會告訴 channel 我們需要哪些隊列中的相關routing_key 的消息。 82 // BasicConsume() 第五個參數是指該消息是否以獨占的方式處理,若是則不允許第二個消費者綁定到該隊列 上, 83 // 若否,則多個消費者同時綁定到該隊列,那么 在該隊列上的消息將隨機分配到某一個消費者。 84 // 即,同一個消息將不會同時出現 在兩個消費者身上。 85 std::string strFlagConsume_1 = "tab_consume_1"; 86 std::string strFlagConsume_2 = "tab_consume_2"; 87 std::string strFlagConsume_3 = "tab_consume_3"; 88 channel->BasicConsume(strTopicQueue_1, strFlagConsume_1, true, false, true); 89 channel->BasicConsume(strTopicQueue_2, strFlagConsume_2, true, false, true); 90 channel->BasicConsume(strTopicQueue_3, strFlagConsume_3, true, false, true); 91 // BasicConsume() 的第4 個參數為false 表示,我們需要主動ack 服務器才將該消息清除。 92 93 std::vector<std::string> vecFlagConsume; 94 vecFlagConsume.push_back(strFlagConsume_1); 95 vecFlagConsume.push_back(strFlagConsume_2); 96 vecFlagConsume.push_back(strFlagConsume_3); 97 98 while (true) 99 { 100 AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(vecFlagConsume); 101 102 std::string strExchange = envelope->Exchange(); 103 std::string strFlagConsume = envelope->ConsumerTag(); 104 std::string severity = envelope->RoutingKey(); 105 std::string buffer = envelope->Message()->body(); 106 107 std::cout << "[Y] exchange: " << strExchange << ", flagconsume: " << strflagConsume 108 << ", receive " << severity << ": " << buffer << std::endl; 109 110 channel->BasicAck(envelope); 111 } 112 113 for (size_t i = 0; i < vecFlagConsume.size(); ++i) 114 channel->BasicCancel(vecFlagConsume[i]); 115 }
