-
同一個queue上有多個消費者的時候,只會有一個消費者收到消息,一般是多個消費者輪流收到消息。
SimpleMessageListenerContainer可以監聽多個隊列,監聽單個或多個隊列、自動啟動、自動聲明功能,container.setQueueNames的api接收的是一個字符串數組對象。 -
設置事務特性、事務管理器、事務屬性、事務並發、是否開啟事務、回滾消息等。在實際生產中,很少使用事務,基本都是采用補償機制。
-
設置消費者數量、最小最大數量、批量消費。
container.setConcurrentConsumers(5); //設置多個並發消費者一起消費,並支持運行時動態修改
container.setMaxConcurrentConsumers(10); //設置最多的並發消費者
-
設置消息確認和自動確認模式、是否重回隊列、異常捕獲 Handler 函數。
-
設置消費者標簽生成策略、是否獨占模式、消費者屬性等。
//設置消費者的consumerTag_tag container.setConsumerTagStrategy(queue -> "order_queue_"+(++count)); //設置消費者的Arguments Map<String, Object> args = new HashMap<>(); args.put("module","訂單模塊"); args.put("fun","發送消息"); container.setConsumerArguments(args);
-
設置具體的監聽器、消息轉換器等等。
注意: SimpleMessageListenerContainer 可以進行動態設置,比如在運行中的應用可以動態的修改其消費者數量的大小、接收消息的模式等。
container.removeQueueNames("mq.#");
示例
@Configuration @Slf4j public class RabbitmqConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("192.168.223.144:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange", false, false); } @Bean public Queue topicQueue() { return new Queue("topic.queue", false); } @Bean public Queue topicQueueAgain() { return new Queue("topic.again", false); } @Bean public Binding topicBinding() { return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("user.#"); } @Bean public Binding topicBindingAgain() { return BindingBuilder.bind(topicQueueAgain()).to(topicExchange()).with("user.#"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { String msg = new String(message.getBody()); log.info("receive message:" + msg); }); return messageListenerContainer; } }
測試:
@Test public void sendMessage(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","自定義描述"); messageProperties.getHeaders().put("type","自定義類型"); messageProperties.setContentType("text/plain"); Message message = new Message("hello message".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic.exchange","user.abc",message); }
