一、RabbitMQ的工作模式
rabbitMQ總共有六種工作模式:simple簡單模式、work工作模式、publish/subscribe發布訂閱模式、routing路由模式、topic主題模式
routing模式:
topic主題模式:
可以看出,topic模式為一種特殊的routing模式,通過圖一可以看到,routing模式通過唯一的routingKey將交換機與隊列綁定起來,只有當消息的路由鍵routing key 與隊列的綁定鍵binging key匹配時,該消息才會進入該隊列。topic模式也叫通配符模式,除了bindingkey 與routing key匹配規則不一樣外,作用和topic模式一樣,特殊匹配原則:
* 表示一個單詞
# 表示任意個單詞
簡單使用方法如下:
application.properties:
spring.rabbitmq.host=xxx.xxx.xxx.xxx spring.rabbitmq.username=liufuqiang spring.rabbitmq.password=wapj1314 spring.rabbitmq.port=5672
RabbitmqConfig.java
新建隊列
@Bean Queue customerQueue(){ return new Queue("liufuqiang_customer"); }
交換機:
@Bean TopicExchange topicExchange() { return new TopicExchange("liufuqiang_topic_exchange"); }
將customerQueue與topicExchange綁定起來:
@Bean Binding myTopicBinding(){ Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*"); return binding; }
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; @Configuration public class RabbitmqConfig implements RabbitListenerConfigurer { @Value("${spring.rabbitmq.host}") private String rabbitmqHost; @Value("${spring.rabbitmq.username}") private String rabbitmqUsername; @Value("${spring.rabbitmq.password}") private String rabbitmqPassword; @Value("${spring.rabbitmq.port}") private int rabbitmqPort; @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory defaultMessageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); defaultMessageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return defaultMessageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } @Bean Queue customerQueue(){ return new Queue("liufuqiang_customer"); } @Bean TopicExchange topicExchange() { return new TopicExchange("liufuqiang_topic_exchange"); } @Bean Binding myTopicBinding(){ Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*"); return binding; } /*** * 延遲隊列 * @return */ @Bean Queue myDelayQueue() { return new Queue("liufuqiang_queue_delay"); } @Bean TopicExchange topicExchangeDelay() { return new TopicExchange("liufuqiang_topic_exchange_delay"); } @Bean Binding myTopicBindingDelay() { Binding binding = BindingBuilder.bind(myDelayQueue()).to(topicExchangeDelay()).with("liufuqiang_delay_routing"); return binding; } @Bean public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() { com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setUsername(rabbitmqUsername); connectionFactory.setPassword(rabbitmqPassword); connectionFactory.setHost(rabbitmqHost); connectionFactory.setPort(rabbitmqPort); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean("rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(2); factory.setConcurrentConsumers(3); // factory.setRecoveryInterval(recoveryInterval); configurer.configure(factory, connectionFactory); return factory; } }
二、生成者發送消息:
RabbitProducer.java
@Autowired private RabbitTemplate rabbitTemplate; public void firstMessage(){ Map<String, Object> message = new HashMap<>(); message.put("name", "liufuqiang"); message.put("age", "23"); rabbitTemplate.convertAndSend("liufuqiang_topic_exchange", "liufuqiang_rountingKey.random", JSONUtils.toJSONString(message)); }
調用發送消息接口,開始發送消息
可以在管理界面看到交換機已新建完成,模式為topic模式
隊列liufuqiang_customer與交換機通過liufuqiang_rountingKey.*綁定起來,
可以看到,剛才我們發送消息的rounting Key為liufuqiang_rountingKey.random,通過*匹配到,並且將消息發送給隊列。
三、消費消息
RabbitConsumer.java
@Component public class RabbitmqConsumer { private static Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class); @RabbitListener(queues = "liufuqiang_customer") @RabbitHandler //@Payload public void getRabbitmqMessage(String message){ System.out.println(message); logger.warn("獲取消息{}", message); } }
可以看到隊列liufuqiang_customer里的消息已被消費