配置文件application.properties:
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest server.port = 5678
RabbitMQ配置文件類(注釋的代碼可以直接刪掉):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic 是RabbitMQ中最靈活的一種方式,可以根據routing_key自由的綁定不同的隊列
* 首先對topic規則配置
*/
//@Configuration
public class TopicRabbitConfig {
final public static String QUEUE_NAME = "queue.name";
final public static String TEST_TOPIC_ROUTINGKEY = "test.topic.routingKey";
final public static String TEST_EXCHANGE_HAHA = "test.exchange.haha";
/**
* 設置交換器的名稱
* @return
*//*
@Bean
TopicExchange exchange() {
return new TopicExchange(TopicRabbitConfig.TEST_EXCHANGE_HAHA);
}
*//**
* 隊列名稱
* @return
*//*
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.QUEUE_NAME);
}
*//**
* 將指定routing key的名稱綁定交換器的隊列
* @param queueMessage
* @param exchange
* @return
*//*
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with(TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY);
}*/
/**
* 匹配以topic開頭的路由鍵routing key
* 交換機綁定多個隊列
*/
/*@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}*/
}
生產者,這里根據Exchange和Routing Key,直接發送一個字符串:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;
@Component
@Slf4j
public class HelloSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 通過exchange和routingKey的方式
* rabbitTemplate.convertAndSend(String exchange, String routingKey, Object object)
* @param i
*/
public void send(int i) {
String context = "hello " + i;
log.info("Sender : {}", context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TEST_EXCHANGE_HAHA,TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY, context);
}
}
消費者,綁定對應的Exchange,Queue和Routing Key,直接打印獲取的信息:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;
@Component
@Slf4j
public class HelloReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
)
public void onOrgDeleted(@Payload String msg) {
log.info("HelloReceiver msg : {}",msg);
}
}
測試類,調用生產者發送信息的函數send,消費者會監聽消費:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import xy.study.rabbitmq.producer.HelloSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private HelloSender sender;
@Test
public void testSend() {
sender.send(666);
}
}
如圖,控制台日志,能生成消息,並且能被對應的消費者消費。

topic exchange 通配路由中多個消費者的情況
修改消費者的代碼如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;
@Component
@Slf4j
public class HelloReceiver {
/**
* 下面四個消費者,exchange和RoutingKey都相同,最后兩個消費者隊列名都相同
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
)
public void queueName(@Payload String msg) {
log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME,msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = TopicRabbitConfig.QUEUE_NAME+".test", durable = "true"),
exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
)
public void queueNameTest(@Payload String msg) {
log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME+".test",msg);
}
/**
* 這里我的消費者隊列名"123445",是亂寫的,也能夠接受
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = 123445+"", durable = "true"),
exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
)
public void queueNameNumber(@Payload String msg) {
log.info("{}-----HelloReceiver msg : {}",123445+""+".test",msg);
}
/**
* 由於這個和上面的Exchange、RoutingKey、queue完全相同,所以這兩個消費者,一條消息,只有一個能消費(隨機)
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = 123445+"", durable = "true"),
exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
)
public void queueNameNumberSame(@Payload String msg) {
log.info("same+{}-----HelloReceiver msg : {}",123445+""+".test",msg);
}
}
再次執行測試,測試結果如下:

總結:
上面四個消費者代碼,Exchange和RoutingKey都相同,最后兩個消費者隊列名都相同。
根據結果可知,當Exchange和RoutingKey相同、queue不同時,所有消費者都能消費同樣的信息;
當Exchange和RoutingKey、queue都相同時(最后兩個消費者),消費者中只有一個能消費信息,其他消費者都不能消費該信息。
