參考自: https://www.jianshu.com/p/e647758a7c50
https://how2j.cn/k/message/message-rabbitmq-topic/2033.html?p=78908
fanout模式: 與指定交換機綁定的所有隊列都可以接收到消息
direct模式: 將消息發送到由exchange和routingKey指定的隊列中,如果多個消息隊列有相同的routingKey,都可以接收到消息
topic模式: 例如發送到交換機test.topic的消息routingKey有routing_topic.1、routing_topic.2, 則在綁定消息隊列與路由時指定routing_topic.*就可以同時匹配這兩個routingKey, 就可以接收發送到交換機test.topic, 路由為routing_topic.1、routing_topic.2的消息。
headers模式: 根據消息的headers來匹配對應的隊列,在消息接收回調中指定headers, 可以是Map<String, Object>、String可變數組類型的keys等
添加springboot支持jar包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.0.4.RELEASE</version> </dependency>
創建topic模式、headers模式、direct模式交換機,對應的消息就可以根據exchangeName、routingKey發送到指定隊列
private void createTopicExchange(RabbitAdmin rabbitAdmin) { // 創建交換機,類型為 topic // durable 參數表示是否持久化 rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); // 創建隊列 // durable 參數表示是否持久化 rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); /** * 鏈式寫法 */ rabbitAdmin.declareBinding( BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列 .to(new TopicExchange("test.topic", false, false)) // 直接創建交換機,並建立關聯關系 .with("routing_topic.*") // 指定路由 key ); } private void createHeadersExchange(RabbitAdmin rabbitAdmin) { rabbitAdmin.declareExchange(new HeadersExchange("test.headers", false, false)); rabbitAdmin.declareQueue(new Queue("test.headers.queue", false)); Map<String, Object> map = new HashMap<>(); map.put("type", "headers"); Binding binding = BindingBuilder.bind(new Queue("test.headers.queue", false)) .to(new HeadersExchange("test.headers", false, false)) .whereAll(map).match(); rabbitAdmin.declareBinding(binding); }
private void createDirectExchange(RabbitAdmin rabbitAdmin) {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue1", false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue2", false));
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue1", false)
).to(new DirectExchange("test.direct", false, false))
.with("test.direct.routing"));
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue2", false)
).to(new DirectExchange("test.direct", false, false))
.with("test.direct.routing"));
}
RabbitMQ配置:
package com.test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
createTopicExchange(rabbitAdmin);
createHeadersExchange(rabbitAdmin);
createDirectExchange(rabbitAdmin);
return rabbitAdmin;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory factory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(factory);
container.setQueueNames("test.topic.queue", "test.headers.queue", "test.direct.queue1", "test.direct.queue2");
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("接收到:" + new String(message.getBody()));
}
});
return container;
}
}
springboot啟動時發送消息:
package com.test; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.HashMap; @Component public class ExecuteRunnable implements CommandLineRunner { @Autowired private RabbitTemplate rabbitTemplate; public void run(String... args) throws Exception { //topic模式 rabbitTemplate.send("test.topic", "routing_topic.1", new Message("Message1".getBytes(), new MessageProperties())); rabbitTemplate.send("test.topic", "routing_topic.2", new Message("Message2".getBytes(), new MessageProperties())); //headers模式 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("type", "headers"); rabbitTemplate.send("test.headers", "", new Message("Message3".getBytes(), messageProperties)); //direct模式 rabbitTemplate.send("test.direct", "test.direct.routing", new Message("Message4".getBytes(), new MessageProperties())); } }
然后就可以在SimpleMessageListenerContainer 實例獲取方法中接收到發送的消息