RabbitMQ topic模式以及headers模式實踐


參考自: https://www.jianshu.com/p/e647758a7c50

https://how2j.cn/k/message/message-rabbitmq-topic/2033.html?p=78908

fanout模式: 與指定交換機綁定的所有隊列都可以接收到消息

direct模式: 將消息發送到由exchange和routingKey指定的隊列中,如果多個消息隊列有相同的routingKey,都可以接收到消息

topic模式:  例如發送到交換機test.topic的消息routingKey有routing_topic.1、routing_topic.2, 則在綁定消息隊列與路由時指定routing_topic.*就可以同時匹配這兩個routingKey, 就可以接收發送到交換機test.topic, 路由為routing_topic.1、routing_topic.2的消息。

headers模式: 根據消息的headers來匹配對應的隊列,在消息接收回調中指定headers, 可以是Map<String, Object>、String可變數組類型的keys等

添加springboot支持jar包

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.0.4.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>2.0.4.RELEASE</version>
    </dependency>

創建topic模式、headers模式、direct模式交換機,對應的消息就可以根據exchangeName、routingKey發送到指定隊列

    private void createTopicExchange(RabbitAdmin rabbitAdmin) {
        // 創建交換機,類型為 topic
        // durable 參數表示是否持久化
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        // 創建隊列
        // durable 參數表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));


        /**
         * 鏈式寫法
         */
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列
                        .to(new TopicExchange("test.topic", false, false)) // 直接創建交換機,並建立關聯關系
                        .with("routing_topic.*") // 指定路由 key
        );
    }

    private void createHeadersExchange(RabbitAdmin rabbitAdmin) {
        rabbitAdmin.declareExchange(new HeadersExchange("test.headers", false, false));

        rabbitAdmin.declareQueue(new Queue("test.headers.queue", false));

        Map<String, Object> map = new HashMap<>();
        map.put("type", "headers");

        Binding binding = BindingBuilder.bind(new Queue("test.headers.queue", false))
                .to(new HeadersExchange("test.headers", false, false))
                .whereAll(map).match();

        rabbitAdmin.declareBinding(binding);
    }

private void createDirectExchange(RabbitAdmin rabbitAdmin) {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

rabbitAdmin.declareQueue(new Queue("test.direct.queue1", false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue2", false));

rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue1", false)
).to(new DirectExchange("test.direct", false, false))
.with("test.direct.routing"));

rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue2", false)
).to(new DirectExchange("test.direct", false, false))
.with("test.direct.routing"));
}
 

RabbitMQ配置:

package com.test;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
return rabbitTemplate;
}

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
createTopicExchange(rabbitAdmin);
createHeadersExchange(rabbitAdmin);
createDirectExchange(rabbitAdmin);
return rabbitAdmin;
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory factory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(factory);
container.setQueueNames("test.topic.queue", "test.headers.queue", "test.direct.queue1", "test.direct.queue2");
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("接收到:" + new String(message.getBody()));
}
});

return container;
}
}

springboot啟動時發送消息:

package com.test;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;

@Component
public class ExecuteRunnable implements CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void run(String... args) throws Exception {
        //topic模式
        rabbitTemplate.send("test.topic", "routing_topic.1", new Message("Message1".getBytes(), new MessageProperties()));
        rabbitTemplate.send("test.topic", "routing_topic.2", new Message("Message2".getBytes(), new MessageProperties()));
        //headers模式
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("type", "headers");
        rabbitTemplate.send("test.headers", "", new Message("Message3".getBytes(), messageProperties));
        //direct模式
        rabbitTemplate.send("test.direct", "test.direct.routing", new Message("Message4".getBytes(), new MessageProperties()));
    }
}

然后就可以在SimpleMessageListenerContainer  實例獲取方法中接收到發送的消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM