SimpleMessageListenerContainer


  • 同一個queue上有多個消費者的時候,只會有一個消費者收到消息,一般是多個消費者輪流收到消息。SimpleMessageListenerContainer可以監聽多個隊列,監聽單個多個隊列、自動啟動、自動聲明功能,container.setQueueNames的api接收的是一個字符串數組對象

  • 設置事務特性、事務管理器、事務屬性、事務並發、是否開啟事務、回滾消息等。在實際生產中,很少使用事務,基本都是采用補償機制

  • 設置消費者數量、最小最大數量、批量消費。

 container.setConcurrentConsumers(5); //設置多個並發消費者一起消費,並支持運行時動態修改
 container.setMaxConcurrentConsumers(10); //設置最多的並發消費者
  • 設置消息確認自動確認模式、是否重回隊列、異常捕獲 Handler 函數。

  • 設置消費者標簽生成策略、是否獨占模式、消費者屬性等。

    //設置消費者的consumerTag_tag
        container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
        //設置消費者的Arguments
        Map<String, Object> args = new HashMap<>();
        args.put("module","訂單模塊");
        args.put("fun","發送消息");
        container.setConsumerArguments(args);
  • 設置具體的監聽器消息轉換器等等。

    注意: SimpleMessageListenerContainer 可以進行動態設置,比如在運行中的應用可以動態的修改其消費者數量的大小、接收消息的模式等。

container.removeQueueNames("mq.#");

   示例

@Configuration
@Slf4j
public class RabbitmqConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.223.144:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange", false, false);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue("topic.queue", false);
    }

    @Bean
    public Queue topicQueueAgain() {
        return new Queue("topic.again", false);
    }

    @Bean
    public Binding topicBinding() {
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("user.#");
    }

    @Bean
    public Binding topicBindingAgain() {
        return BindingBuilder.bind(topicQueueAgain()).to(topicExchange()).with("user.#");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());
        messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            String msg = new String(message.getBody());
            log.info("receive message:" + msg);
        });

        return messageListenerContainer;
    }

}

  測試:

    @Test
    public void sendMessage(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","自定義描述");
        messageProperties.getHeaders().put("type","自定義類型");
        messageProperties.setContentType("text/plain");
        Message message = new Message("hello message".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic.exchange","user.abc",message);
    }

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM