消息隊列RabbitMQ(三):消息確認機制


引言

RabbitMQ的模型是生產者發送信息到 Broker (代理),消費者從 Broker 中取出信息。但是生產者怎么知道消息是否真的發送到 Broker 中了呢?Broker 又怎么知道消息到底有沒有被消費者消費?

如果由於網絡原因出現故障,生產者生產的消息未到達 Broker 或者 Broker 的消息被虛假消費,而它們又不知道,就會產生很嚴重的問題,如重復消費等。

RabbitMQ的消息確認流程

image-20210519110039225

從圖中可以看出:

消息確認機制分為生產者確認和消費者確認

  • ConfirmCallback 生產者
  • ReturnCallback 生產者
  • ACK 消費者

生產者確認

  • 消息到達RabbitMQ的Exchange:Exchange向生產者發送Confirm確認。成功抑或失敗都會返回一個confirmCallback
  • 消息成功達到Exchange,但是從Exchange投遞Queue失敗:向生產者返回一個returnCallback。只有失敗才會返回

消費者確認

  • 消費者收到消息后需要對 RabbitMQ Server 進行消息 ACK 確認,RabbitMQ 根據確認信息決定是刪除隊列中的該信息還是重新發送

代碼實現

生產者確認

重點在於生產者重寫下面兩個方法

  • rabbitMQTemplate.setConfirmCallback

  • rabbitMQTemplate.setReturnCallback

  1. 開啟生產者消息確認

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        virtual-host: /
        username: root
        password: root
        #    開啟兩個模式的生產者消息確認
        publisher-confirm-type: simple
        publisher-returns: true
    
  2. 聲明交換機、隊列,綁定交換機和隊列

    @Configuration
    public class RabbitMQConfig {
    
        private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
        private static final String SB_TOPIC_QUEUE="sb_topic_queue1";
    
        // 注入交換機 topic類型
        @Bean("topicExchange")
        public Exchange topicExchange(){
            return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
                    .autoDelete().build();
        }
        // 聲明隊列
        @Bean
        public Queue queue1(){
            return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
        }
    
        // 綁定隊列和交換機
        @Bean
        public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
        }
    }
    
  3. 創建消費者

    @Component
    @RabbitListener(queues = "sb_topic_queue1")
    public class Consumer {
    
        @RabbitHandler
        public void testPublishConfirm(String msg) {
            System.out.println("收到的信息:"+msg);
        }
    }
    
  4. 創建生產者

    創建生產者發送消息到消息隊列,模擬兩種異常情況

    @SpringBootTest
    class RabiitmqSpringbootApplicationTests {
    
        @Autowired
        RabbitTemplate template;
    
        @Test
        void testConfirmTrue() {
            // 設置confirm回調函數
            template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                    if (b) System.out.println("消息發送成功");
                    else System.out.println("消息發送失敗");
                }
    
            });
            // 模擬生產者發送信息--正常情況
            template.convertAndSend("sb_topic_exchange","user.info","日志級別:info;日志模塊:user;日志信息:*****");
        }
    
        @Test
        void testConfirmFalse() {
            // 設置confirm回調函數
            template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                    if (b) System.out.println("消息發送成功");
                    else System.out.println("消息發送失敗");
                }
    
            });
            // 模擬生產者發送信息
            // 不存在的交換機--異常情況
         template.convertAndSend("sb_topic_exchange_noexist","user.info","日志級別:info;日志模塊:user;日志信息:*****");
        }
    
        @Test
        void testReturnFalse() {
            // 設置return回調函數
            template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
                    System.out.println(message.toString());
                    System.out.println(s+"*********");
                }
    
            });
            template.setMandatory(true);
            // 模擬生產者發送信息
            // 正確的交換機 錯誤的routekey -- 異常情況
         template.convertAndSend("sb_topic_exchange","noexist.user.info","日志級別:info;日志模塊:user;日志信息:*****");
        }
    }
    

消費者確認

重點在於消費者的下面兩個方法

  • channel.basicAck 消費者簽收
  • channel.basicNAck 消費者拒絕簽收
  1. 開啟消費者確認模式

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        virtual-host: /
        username: root
        password: root
    #    設置消費端手動簽收
        listener:
          direct:
            acknowledge-mode: manual
          simple:
            acknowledge-mode: manual
    
  2. 創建消費者

    /**
     * 注入消費者--手動簽到
     */
    @Component
    @RabbitListener(queues = "sb_topic_queue1")
    public class Consumer2 {
    
        @RabbitHandler
        public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
            // 消費端設置手動簽收代碼
            try {
                System.out.println(msg);
                // 正常簽收,mq收到此消息被正常簽收后即可從隊列中刪除vi信息
                // 是喲了那個channel的方法
                // 第一個參數是deliverytag 標識哪條信息 第二個參數是是否批量簽收
                // int i=2/0; 模擬異常
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
                System.out.println("消費者簽收了該信息,服務器你可以刪了");
            }catch (Exception e){
                // 異常拒絕簽收,讓mq重發此信息
                System.out.println("該信息丟了,給我重發");
           channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
               // 該信息丟了,但是不需要你重發
         // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
            }
        }
    }
    
    
  3. 創建生產者

    @SpringBootTest
    class RabiitmqSpringbootApplicationTests {
    
        @Autowired
        RabbitTemplate template;
    
        @Test
        void testConsumerAck() {
         template.convertAndSend("sb_topic_exchange","noexist.user.info","日志級別:info;日志模塊:user;日志信息:*****");
        }
    }
    

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM