發送到topic的消息不能有任意的綁定鍵,綁定鍵的規則:必須由(.)分割的單詞列表。比如apple.banana.orange
綁定鍵也有兩個特殊字符:
* 可以代替一個單詞。
# 可以替代零個或多個單詞。
比如:apple.# *.banana.*

生產者:
package com.example.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 有選擇的接受消息 */ public class TopicSend { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 連接工廠 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 獲取連接 Channel channel = connection.createChannel(); // topic類型 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String[] msg = {"666的兔子","懶惰的大象"}; // 第二個參數為綁定鍵 channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg[0].getBytes()); channel.basicPublish(EXCHANGE_NAME, "lazy.write.elephant", null, msg[1].getBytes()); System.out.println("PS-Send:" + msg.toString()); channel.close(); connection.close(); } }
消費者:
package com.example.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TopicReceive { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 連接工廠 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 獲取連接 Channel channel = connection.createChannel(); // 聲明一個topic交換類型 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 當聲明隊列,不加任何參數,產生的將是一個臨時隊列,getQueue返回的是隊列名稱 String queueA = channel.queueDeclare().getQueue(); String queueB = channel.queueDeclare().getQueue(); System.out.println("臨時隊列:" + queueA); System.out.println("臨時隊列:" + queueB); // 第三個參數為“綁定建” // * 可以代替一個單詞。 // # 可以替代零個或多個單詞。 channel.queueBind(queueA, EXCHANGE_NAME, "*.orange.*"); channel.queueBind(queueB, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueB, EXCHANGE_NAME, "lazy.#"); Consumer consumerA = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Direct-Receive-A:" + recv); } }; Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Direct-Receive-B:" + recv); } }; channel.basicConsume(queueA, true, consumerA); channel.basicConsume(queueB, true, consumerB); } }
先啟動消費者:再啟動生產者:控制台

..

第一條消息,匹配A和B
第二條消息,只匹配B
