RabbitMQ:消息發送確認 與 消息接收確認(ACK)


默認情況下如果一個 Message 被消費者所正確接收則會被從 Queue 中移除

如果一個 Queue 沒被任何消費者訂閱,那么這個 Queue 中的消息會被 Cache(緩存),當有消費者訂閱時則會立即發送,當 Message 被消費者正確接收時,就會被從 Queue 中移除

消息發送確認

發送的消息怎么樣才算失敗或成功?如何確認?

  • 當消息無法路由到隊列時,確認消息路由失敗。消息成功路由時,當需要發送的隊列都發送成功后,進行確認消息,對於持久化隊列意味着寫入磁盤,對於鏡像隊列意味着所有鏡像接收成功

消息接收確認

消息消費者如何通知 Rabbit 消息消費成功?

  • 消息通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
  • 自動確認會在消息發送給消費者后立即確認,但存在丟失消息的可能,如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那么就相當於丟失了消息
  • 如果消息已經被處理,但后續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也同樣造成了實際意義的消息丟失
  • 如果手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確認可以在業務失敗后進行一些操作,如果消息未被 ACK 則會發送到下一個消費者
  • 如果某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,因為 RabbitMQ 認為該服務的處理能力有限
  • ACK 機制還可以起到限流作用,比如在接收到某條消息時休眠幾秒鍾
  • 消息確認模式有:
    • AcknowledgeMode.NONE:自動確認
    • AcknowledgeMode.AUTO:根據情況確認
    • AcknowledgeMode.MANUAL:手動確認

確認消息(局部方法處理消息)

  • 默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
  • 或在 RabbitListenerContainerFactory 中進行開啟手動 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //開啟手動 ack
    return factory;
}

  

  • 確認消息
@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    System.out.println(message);
    try {
        channel.basicAck(tag,false);            // 確認消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 需要注意的 basicAck 方法需要傳遞兩個參數
    • deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的范圍僅限於 Channel
    • multiple:為了減少網絡流量,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有消息

手動否認、拒絕消息

  • 發送一個 header 中包含 error 屬性的消息
 
hducA.png
  • 消費者獲取消息時檢查到頭部包含 error 則 nack 消息
@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
    System.out.println(message);
    if (map.get("error")!= null){
        System.out.println("錯誤的消息");
        try {
            channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否認消息
            return;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    try {
        channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //確認消息
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 此時控制台重復打印,說明該消息被 nack 后一直重新入隊列然后一直重新消費
hello
錯誤的消息
hello
錯誤的消息
hello
錯誤的消息
hello
錯誤的消息
  • 也可以拒絕該消息,消息會被丟棄,不會重回隊列
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒絕消息

確認消息 三種類型(全局處理消息)

  • 自動確認 NONE(涉及到一個問題就是如果在處理消息的時候拋出異常,消息處理失敗,但是因為自動確認而導致 Rabbit 將該消息刪除了,造成消息丟失)
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");                 // 監聽的隊列
    container.setAcknowledgeMode(AcknowledgeMode.NONE);     // NONE 代表自動確認
    container.setMessageListener((MessageListener) message -> {         //消息監聽處理
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        //相當於自己的一些消費邏輯拋錯誤
        throw new NullPointerException("consumer fail");
    });
    return container;
}
  • 手動確認消息 MANUAL
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // 監聽的隊列
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        // 手動確認
    container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {      //消息處理
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        if(message.getMessageProperties().getHeaders().get("error") == null){
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消息已經確認");
        }else {
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消息拒絕");
        }

    });
    return container;
}
  • 根據情況確認消息 AUTO
  • AcknowledgeMode 除了 NONE 和 MANUAL 之外還有 AUTO ,它會根據方法的執行情況來決定是否確認還是拒絕(是否重新入queue)
    • 如果消息成功被消費(成功的意思是在消費的過程中沒有拋出異常),則自動確認

    • 當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且 requeue = false(不重新入隊列)

    • 當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認

    • 其他的異常,則消息會被拒絕,且 requeue = true(如果此時只有一個消費者監聽該隊列,則有發生死循環的風險,多消費端也會造成資源的極大浪費,這個在開發過程中一定要避免的)。可以通過 setDefaultRequeueRejected(默認是true)去設置

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // 監聽的隊列
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);     // 根據情況確認消息
    container.setMessageListener((MessageListener) (message) -> {
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        //拋出NullPointerException異常則重新入隊列
        //throw new NullPointerException("消息消費失敗");
        //當拋出的異常是AmqpRejectAndDontRequeueException異常的時候,則消息會被拒絕,且requeue=false
        //throw new AmqpRejectAndDontRequeueException("消息消費失敗");
        //當拋出ImmediateAcknowledgeAmqpException異常,則消費者會被確認
        throw new ImmediateAcknowledgeAmqpException("消息消費失敗");
    });
    return container;
}

消息可靠總結

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 消息確認
    • 啟動消費返回(@ReturnList注解,生產者就可以知道哪些消息沒有發出去)
    • 生產者和Server(broker)之間的消息確認
    • 消費者和Server(broker)之間的消息確認

原文:https://www.jianshu.com/p/2c5eebfd0e95


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM