RocketMQ (三)RocketMQ 怎么保證的消息不丟失?


一、消息發送過程

我們將消息流程分為如下三大部分,每一部分都有可能會丟失數據。

  • 生產階段:Producer通過網絡將消息發送給Broker,這個發送可能會發生丟失,比如網絡延遲不可達等。

  • 存儲階段:Broker肯定是先把消息放到內存的,然后根據刷盤策略持久化到硬盤中,剛收到Producer的消息,再內存中了,但是異常宕機了,導致消息丟失。

  • 消費階段:消費失敗了其實也是消息丟失的一種變體吧。

二、Producer生產階段

Producer通過網絡將消息發送給Broker,這個發送可能會發生丟失,比如網絡延遲不可達等。

1、解決方案一

1.1、說明

有三種send方法,同步發送、異步發送、單向發送。我們可以采取同步發送的方式進行發送消息,發消息的時候會同步阻塞等待broker返回的結果,如果沒成功,則不會收到SendResult,這種是最可靠的。其次是異步發送,再回調方法里可以得知是否發送成功。單向發送(OneWay)是最不靠譜的一種發送方式,我們無法保證消息真正可達。

1.2、源碼

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
 */

// 同步發送
public SendResult send(Message msg) throws MQClientException, RemotingException,      MQBrokerException, InterruptedException {}

// 異步發送,sendCallback作為回調
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {}

// 單向發送,不關心發送結果,最不靠譜
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}

2、解決方案二

2.1、說明

發送消息如果失敗或者超時了,則會自動重試。默認是重試三次,可以根據api進行更改,比如改為10次:

producer.setRetryTimesWhenSendFailed(10);

2.2、源碼

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)}
 */

// 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()默認為2,如果是同步發送,默認重試3次,否則重試1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
for (; times < timesTotal; times++) {
      // 選擇發送的消息queue
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        try {
            // 真正的發送邏輯,sendKernelImpl。
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            switch (communicationMode) {
                case ASYNC:
                    return null;
                case ONEWAY:
                    return null;
                case SYNC:
                    // 如果發送失敗了,則continue,意味着還會再次進入for,繼續重試發送
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    // 發送成功的話,將發送結果返回給調用者
                    return sendResult;
                default:
                    break;
            }
        } catch (RemotingException e) {
            continue;
        } catch (...) {
            continue;
        }
    }
}

說明:

 

這里只是總結出核心的發送邏輯,並不是全代碼。可以看出如下:

 

  • 重試次數同步是1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(),其他方式默認1次。

  • this.defaultMQProducer.getRetryTimesWhenSendFailed()默認是2,我們可以手動設置producer.setRetryTimesWhenSendFailed(10);

  • 調用sendKernelImpl真正的去發送消息

  • 如果是sync同步發送,且發送失敗了,則continue,意味着還會再次進入for,繼續重試發送

  • 發送成功的話,將發送結果返回給調用者

  • 如果發送異常進入catch了,則continue繼續下次重試。

3、解決方案三

3.1、說明

假設Broker宕機了,但是生產環境一般都是多M多S的,所以還會有其他master節點繼續提供服務,這也不會影響到我們發送消息,我們消息依然可達。因為比如恰巧發送到broker的時候,broker宕機了,producer收到broker的響應發送失敗了,這時候producer會自動重試,這時候宕機的broker就被踢下線了, 所以producer會換一台broker發送消息。

4、總結

Producer怎么保證發送階段消息可達?

失敗會自動重試,即使重試N次也不行后,那客戶端也會知道消息沒成功,這也可以自己補償等,不會盲目影響到主業務邏輯。再比如即使Broker掛了,那還有其他Broker再提供服務了,高可用,不影響。

總結為幾個字就是:同步發送+自動重試機制+多個Master節點

三、Broker存儲階段

Broker肯定是先把消息放到內存的,然后根據刷盤策略持久化到硬盤中,剛收到Producer的消息,再內存中了,但是異常宕機了,導致消息丟失。

1、解決方案一

MQ持久化消息分為兩種:同步刷盤和異步刷盤。默認情況是異步刷盤,Broker收到消息后會先存到cache里然后立馬通知Producer說消息我收到且存儲成功了,你可以繼續你的業務邏輯了,然后Broker起個線程異步的去持久化到磁盤中,但是Broker還沒持久化到磁盤就宕機的話,消息就丟失了。同步刷盤的話是收到消息存到cache后並不會通知Producer說消息已經ok了,而是會等到持久化到磁盤中后才會通知Producer說消息完事了。這也保障了消息不會丟失,但是性能不如異步高。看業務場景取舍。

修改刷盤策略為同步刷盤。默認情況下是異步刷盤的,如下配置

## 默認情況為 ASYNC_FLUSH,修改為同步刷盤:SYNC_FLUSH,實際場景看業務,同步刷盤效率肯定不如異步刷盤高。
flushDiskType = SYNC_FLUSH 

對應的Java配置類如下:

package org.apache.rocketmq.store.config;

public enum FlushDiskType {
    // 同步刷盤
    SYNC_FLUSH,
    // 異步刷盤(默認)
    ASYNC_FLUSH
}

異步刷盤默認10s執行一次,源碼如下:

/*
 * {@link org.apache.rocketmq.store.CommitLog#run()}
 */

while (!this.isStopped()) {
    try {
        // 等待10s
        this.waitForRunning(10);
        // 刷盤
        this.doCommit();
    } catch (Exception e) {
        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

2、解決方案二

集群部署,主從模式,高可用。

即使Broker設置了同步刷盤策略,但是Broker刷完盤后磁盤壞了,這會導致盤上的消息全TM丟了。但是如果即使是1主1從了,但是Master刷完盤后還沒來得及同步給Slave就磁盤壞了,不也是GG嗎?沒錯!

所以我們還可以配置不僅是等Master刷完盤就通知Producer,而是等Master和Slave都刷完盤后才去通知Producer說消息ok了。

## 默認為 ASYNC_MASTER
brokerRole=SYNC_MASTER

3、總結

若想很嚴格的保證Broker存儲消息階段消息不丟失,則需要如下配置,但是性能肯定遠差於默認配置。

# master 節點配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

# slave 節點配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

上面這個配置含義是:

Producer發消息到Broker后,Broker的Master節點先持久化到磁盤中,然后同步數據給Slave節點,Slave節點同步完且落盤完成后才會返回給Producer說消息ok了。

四、Consumer消費階段

消費失敗了其實也是消息丟失的一種變體。

1、解決方案一

消費者會先把消息拉取到本地,然后進行業務邏輯,業務邏輯完成后手動進行ack確認,這時候才會真正的代表消費完成。而不是說pull到本地后消息就算消費完了。舉個例子

 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         for (MessageExt msg : msgs) {
             String str = new String(msg.getBody());
             System.out.println(str);
         }
         // ack,只有等上面一系列邏輯都處理完后,到這步CONSUME_SUCCESS才會通知broker說消息消費完成,如果上面發生異常沒有走到這步ack,則消息還是未消費狀態。而不是像比如redis的blpop,彈出一個數據后數據就從redis里消失了,並沒有等我們業務邏輯執行完才彈出。
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

2、解決方案二

消息消費失敗自動重試。如果消費消息失敗了,沒有進行ack確認,則會自動重試,重試策略和次數(默認15次)如下配置

/**
 * Broker可以配置的所有選項
 */
public class org.apache.rocketmq.store.config.MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM