RabbitMQ的Topic模式發送與接收消息


一、RabbitMQ的工作模式

rabbitMQ總共有六種工作模式:simple簡單模式、work工作模式、publish/subscribe發布訂閱模式、routing路由模式、topic主題模式

routing模式:

 topic主題模式:

 可以看出,topic模式為一種特殊的routing模式,通過圖一可以看到,routing模式通過唯一的routingKey將交換機與隊列綁定起來,只有當消息的路由鍵routing key 與隊列的綁定鍵binging key匹配時,該消息才會進入該隊列。topic模式也叫通配符模式,除了bindingkey 與routing key匹配規則不一樣外,作用和topic模式一樣,特殊匹配原則:

* 表示一個單詞

# 表示任意個單詞

簡單使用方法如下:

application.properties:

spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.username=liufuqiang
spring.rabbitmq.password=wapj1314
spring.rabbitmq.port=5672

RabbitmqConfig.java

新建隊列

@Bean
    Queue customerQueue(){
       return new Queue("liufuqiang_customer");
    }

交換機:

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("liufuqiang_topic_exchange");
    }

將customerQueue與topicExchange綁定起來:

    @Bean
    Binding myTopicBinding(){
        Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*");
        return binding;
    }
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

@Configuration
public class RabbitmqConfig implements RabbitListenerConfigurer {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory(){
        DefaultMessageHandlerMethodFactory defaultMessageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        defaultMessageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return defaultMessageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    Queue customerQueue(){
       return new Queue("liufuqiang_customer");
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("liufuqiang_topic_exchange");
    }


    @Bean
    Binding myTopicBinding(){
        Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*");
        return binding;
    }

    /***
     * 延遲隊列
     * @return
     */
    @Bean
    Queue myDelayQueue() {
        return new Queue("liufuqiang_queue_delay");
    }

    @Bean
    TopicExchange topicExchangeDelay() {
        return new TopicExchange("liufuqiang_topic_exchange_delay");
    }

    @Bean
    Binding myTopicBindingDelay() {
        Binding binding = BindingBuilder.bind(myDelayQueue()).to(topicExchangeDelay()).with("liufuqiang_delay_routing");
        return binding;
    }

    @Bean
    public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean("rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(2);
        factory.setConcurrentConsumers(3);
//        factory.setRecoveryInterval(recoveryInterval);
        configurer.configure(factory,  connectionFactory);
        return factory;
    }



}

二、生成者發送消息:

RabbitProducer.java

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void firstMessage(){
        Map<String, Object> message = new HashMap<>();
        message.put("name", "liufuqiang");
        message.put("age", "23");
        rabbitTemplate.convertAndSend("liufuqiang_topic_exchange", "liufuqiang_rountingKey.random", JSONUtils.toJSONString(message));
    }

調用發送消息接口,開始發送消息

 可以在管理界面看到交換機已新建完成,模式為topic模式

 隊列liufuqiang_customer與交換機通過liufuqiang_rountingKey.*綁定起來,

 可以看到,剛才我們發送消息的rounting Key為liufuqiang_rountingKey.random,通過*匹配到,並且將消息發送給隊列。

三、消費消息

RabbitConsumer.java

@Component
public class RabbitmqConsumer {

    private static Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class);

    @RabbitListener(queues = "liufuqiang_customer")
    @RabbitHandler
    //@Payload
    public void getRabbitmqMessage(String message){
        System.out.println(message);
        logger.warn("獲取消息{}", message);
    }
}

可以看到隊列liufuqiang_customer里的消息已被消費

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM