這個類非常強大,我們可以對他做很多設置,對於消費者的配置項,這個類都可以滿足
監聽隊列(多個隊列)、自動啟動、自動聲明功能
可以設置事務特性、事務管理器、事務屬性、事務容量(並發)、是否開啟事務、回滾消息等
可以設置消費者數量、最大最小數量、批量消費
設置消息確認和自動確認模式、是否重回隊列、異常捕獲handler函數
設置消費者標簽生成策略、是否獨占模式、消費者屬性等
設置具體的轉換器、消息轉換器等
很多基於RabbitMQ的自制定化后端管控台在進行動態配置的時候,也是根據這一特性去實現的。
注意:SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態
修改其消費者數量的大小、接收消息的模式等
SimpleMessageListenerContainer為什么可以進行動態感知設置變更?
package com.dwz.spring; import java.util.UUID; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; @Configuration @ComponentScan("com.dwz.spring.*") public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setVirtualHost("/vhost_dwz"); connectionFactory.setUsername("root_dwz"); connectionFactory.setPassword("123456"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); System.err.println("RabbitAdmin啟動了。。。"); //設置啟動spring容器時自動加載這個類(這個參數現在默認已經是true,可以不用設置) rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * 針對消費者的配置 * 1.設置交換機的類型 * 2.將隊列綁定到交換機 * FanoutExchange:將消息分發到所有綁定的隊列,無routingkey的概念 * TopicExchange:多關鍵字匹配 * HeadersExchange:通過添加屬性key-value匹配 * DirectExchange:按照routingkey分發到指定隊列 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true);//隊列持久化 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true);//隊列持久化 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public TopicExchange exchange003() { return new TopicExchange("topic003", true, false); } @Bean public Queue queue003() { return new Queue("queue003", true);//隊列持久化 } @Bean public Binding binding003() { return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true);//隊列持久化 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true);//隊列持久化 } /* * 簡單消息監聽容器 */ @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //同時監聽多個隊列 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); //設置當前的消費者數量 container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); //設置是否重回隊列 container.setDefaultRequeueRejected(false); //設置自動簽收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //設置監聽外露 container.setExposeListenerChannel(true); //設置消費端標簽策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //設置消息監聽 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody(), "utf-8"); System.out.println("-----------消費者:" + msg); } }); return container; } }
自定義消費端標簽策略效果圖: