RabbitMQ消息冪等性問題


文章目錄

1. 什么是冪等性?1.1 消息隊列的冪等性1.2 模擬重試機制1.2.1 生產者代碼1.2.2 消費者代碼1.2.3 消費者 application.yml 配置2. 如何保證消息冪等性,不被重復消費?解決方法

1. 什么是冪等性?

在編程中一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。


HTTP方法的冪等性是指一次和多次請求某一個資源應該具有同樣的副作用。冪等性屬於語義范疇,正如編譯器只能幫助檢查語法錯誤一樣,HTTP規范也沒有辦法通過消息格式等語法手段來定義它。

簡之:一個請求,不管重復來多少次,結果是不會改變的。

1.1 消息隊列的冪等性

如同HTTP方法的冪等性,消息隊列同樣會出現冪等性問題。

消費者在消費 MQ 中的消息時,MQ 已把消息發送給消費者,消費者在給 MQ 返回 ack 時網絡中斷,故 MQ 未收到確認信息,該條消息會重新發給其他的消費者,或者在網絡重連后再次發送給該消費者,但實際上該消費者已成功消費了該條消息,造成消費者消費了重復的消息;注意,RabbitMQ 這種消息重試(補償)機制是默認的。

所以,MQ 消費者的冪等性問題,主要在於 MQ 的重試機制,因為網絡原因或客戶端延遲消費導致重復消費。

那么,如何合適選擇重試機制?我們來看兩種情況。

情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試?

需要重試

情況2: 消費者獲取到消息后,拋出數據轉換異常,是否需要重試?

不需要重試

總結:對於情況2,如果消費者代碼拋出異常是需要發布新版本才能解決的問題,那么不需要重試,重試也無濟於事。應該采用日志記錄+定時任務 job 健康檢查+人工進行補償

1.2 模擬重試機制

我們采用一種短信消費者客戶端異常的情況來模擬 RabbitMQ 的重試機制。

@RabbitListener(queues = "fanout_sms_queue")
public void process(String msg{
    System.out.println("短信消費者獲取生產者消息msg:" + msg);
    int i = 1/0;
}

如上代碼,很顯然會報錯,一擔報錯生產者的消息時不會被消費的?

@RabbitListener 底層使用 AOP 進行異常通知攔截,如果程序沒有拋出異常信息,那么就會自動提交事務;如果 AOP 異常通知攔截有捕獲到異常信息的話,就會自動實現重試(補償)機制,同時,這個補償機制的消息會緩存到 RabbitMQ 服務器端進行存放,一直重試到不拋出異常為止。

1.2.1 生產者代碼
@Component
public class FanoutProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 發送消息
     *
     * @param queueName 隊列名稱
     */

    public void send(String queueName) {
        String msg = "my_fanout_msg:" + System.currentTimeMillis();
        Message message = MessageBuilder
                        .withBody(msg.getBytes())
                        .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                        .setContentEncoding("utf-8")
                        .setMessageId(UUID.randomUUID() + "")
                        .build();
        System.out.println(msg + ":" + msg);
        amqpTemplate.convertAndSend(queueName, message);
    }
}
1.2.2 消費者代碼
@Component
public class FanoutEamilConsumer {

    @RabbitListener(queues = "fanout_eamil_queue")
    public void process(Message message) throws Exception {
        String revMessage = Thread.currentThread().getName() 
                + ",郵件消費者獲取生產者消息msg:" 
                + new String(message.getBody(), "UTF-8")
                + ",messageId:" + message.getMessageProperties().getMessageId();
        System.out.println(revMessage);
    }
}
1.2.3 消費者 application.yml 配置
spring:
  rabbitmq:
  ####連接地址
    host: 127.0.0.1
   ####端口號   
    port: 5672
   ####賬號 
    username: guest
   ####密碼  
    password: guest
   ### 地址
    virtual-host: /admin_host
    listener:
      simple:
        retry:
          ####開啟消費者重試
          enabled: true
          ####最大重試次數
          max-attempts: 5
          ####重試間隔次數
          initial-interval: 3000

server:
  port: 8081

我們通過 RabbitMQ 配置,增加了 RabbitMQ 重試時間以及重試次數限制,在一定程度上解決了重復消費的問題,接下來看一道常問的面試題。

2. 如何保證消息冪等性,不被重復消費?

其實,這個問題也算是 MQ 面試當中經常考察的一點,因為無論是什么 MQ 都會有這個問題。

首先通過上邊我們了解了什么是“冪等性”,以及 MQ 冪等性問題的產生,所以我們要清楚為什么會出現重復性消費?在什么場景會出現重復消費?

解決方法

使用全局 MessageID 判斷消費方使用同一個,解決冪等性問題。
或者使用業務邏輯保證唯一(比如訂單號碼)

生產者關鍵代碼:

@Autowired
private AmqpTemplate amqpTemplate;

/**
 * 發送消息
 *
 * @param queueName 隊列名稱
 */

public void send(String queueName{
    String msg = "my_fanout_msg:" + System.currentTimeMillis();
    Message message = MessageBuilder
                    .withBody(msg.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "")
                    .build();
    System.out.println(msg + ":" + msg);
    amqpTemplate.convertAndSend(queueName, message);
}

如上,生產者在發送消息時(convertAndSend),給消息對象設置了唯一的 MessageID,只有該 MessageID 沒有被消費者標記方能在重試機制中再次被消費。

消費者關鍵代碼:

@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
    String revMessage = Thread.currentThread().getName()
            + ",郵件消費者獲取生產者消息msg:"
            + new String(message.getBody(), "UTF-8")
            + ",messageId:" + message.getMessageProperties().getMessageId();
    System.out.println(revMessage);
    發送郵件的邏輯XXX
}

如上,通過 message.getMessageProperties().getMessageId() 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經被消費者消費過了,如果已經消費則取消再次消費。

通常怎么判斷呢?

比如上方是一個郵件發送的消費者,在做補償時,假如上一步郵件發送成功了,我們會把該 ID 存至 redis中,下次再調用時,先去 redis 判斷是否存在該 ID 了,如果存在表明已經消費過了則直接返回,不再消費,否則消費,然后將記錄存至 redis。

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM