-
同一个queue上有多个消费者的时候,只会有一个消费者收到消息,一般是多个消费者轮流收到消息。
SimpleMessageListenerContainer
可以监听多个队列,监听单个或多个队列、自动启动、自动声明功能,container.setQueueNames
的api接收的是一个字符串数组对象。 -
设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。在实际生产中,很少使用事务,基本都是采用补偿机制。
-
设置消费者数量、最小最大数量、批量消费。
container.setConcurrentConsumers(5); //设置多个并发消费者一起消费,并支持运行时动态修改
container.setMaxConcurrentConsumers(10); //设置最多的并发消费者
-
设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数。
-
设置消费者标签生成策略、是否独占模式、消费者属性等。
//设置消费者的consumerTag_tag container.setConsumerTagStrategy(queue -> "order_queue_"+(++count)); //设置消费者的Arguments Map<String, Object> args = new HashMap<>(); args.put("module","订单模块"); args.put("fun","发送消息"); container.setConsumerArguments(args);
-
设置具体的监听器、消息转换器等等。
注意: SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。
container.removeQueueNames("mq.#");
示例
@Configuration @Slf4j public class RabbitmqConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("192.168.223.144:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange", false, false); } @Bean public Queue topicQueue() { return new Queue("topic.queue", false); } @Bean public Queue topicQueueAgain() { return new Queue("topic.again", false); } @Bean public Binding topicBinding() { return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("user.#"); } @Bean public Binding topicBindingAgain() { return BindingBuilder.bind(topicQueueAgain()).to(topicExchange()).with("user.#"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { String msg = new String(message.getBody()); log.info("receive message:" + msg); }); return messageListenerContainer; } }
测试:
@Test public void sendMessage(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","自定义描述"); messageProperties.getHeaders().put("type","自定义类型"); messageProperties.setContentType("text/plain"); Message message = new Message("hello message".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic.exchange","user.abc",message); }