SimpleMessageListenerContainer


  • 同一个queue上有多个消费者的时候,只会有一个消费者收到消息,一般是多个消费者轮流收到消息。SimpleMessageListenerContainer可以监听多个队列,监听单个多个队列、自动启动、自动声明功能,container.setQueueNames的api接收的是一个字符串数组对象

  • 设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。在实际生产中,很少使用事务,基本都是采用补偿机制

  • 设置消费者数量、最小最大数量、批量消费。

 container.setConcurrentConsumers(5); //设置多个并发消费者一起消费,并支持运行时动态修改
 container.setMaxConcurrentConsumers(10); //设置最多的并发消费者
  • 设置消息确认自动确认模式、是否重回队列、异常捕获 Handler 函数。

  • 设置消费者标签生成策略、是否独占模式、消费者属性等。

    //设置消费者的consumerTag_tag
        container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
        //设置消费者的Arguments
        Map<String, Object> args = new HashMap<>();
        args.put("module","订单模块");
        args.put("fun","发送消息");
        container.setConsumerArguments(args);
  • 设置具体的监听器消息转换器等等。

    注意: SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。

container.removeQueueNames("mq.#");

   示例

@Configuration
@Slf4j
public class RabbitmqConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.223.144:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange", false, false);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue("topic.queue", false);
    }

    @Bean
    public Queue topicQueueAgain() {
        return new Queue("topic.again", false);
    }

    @Bean
    public Binding topicBinding() {
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("user.#");
    }

    @Bean
    public Binding topicBindingAgain() {
        return BindingBuilder.bind(topicQueueAgain()).to(topicExchange()).with("user.#");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());
        messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            String msg = new String(message.getBody());
            log.info("receive message:" + msg);
        });

        return messageListenerContainer;
    }

}

  测试:

    @Test
    public void sendMessage(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","自定义描述");
        messageProperties.getHeaders().put("type","自定义类型");
        messageProperties.setContentType("text/plain");
        Message message = new Message("hello message".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic.exchange","user.abc",message);
    }

 

  


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM