一、消息發送過程
我們將消息流程分為如下三大部分,每一部分都有可能會丟失數據。
-
生產階段:Producer通過網絡將消息發送給Broker,這個發送可能會發生丟失,比如網絡延遲不可達等。
-
存儲階段:Broker肯定是先把消息放到內存的,然后根據刷盤策略持久化到硬盤中,剛收到Producer的消息,再內存中了,但是異常宕機了,導致消息丟失。
-
消費階段:消費失敗了其實也是消息丟失的一種變體吧。
二、Producer生產階段
Producer通過網絡將消息發送給Broker,這個發送可能會發生丟失,比如網絡延遲不可達等。
1、解決方案一
1.1、說明
有三種send方法,同步發送、異步發送、單向發送。我們可以采取同步發送的方式進行發送消息,發消息的時候會同步阻塞等待broker返回的結果,如果沒成功,則不會收到SendResult,這種是最可靠的。其次是異步發送,再回調方法里可以得知是否發送成功。單向發送(OneWay)是最不靠譜的一種發送方式,我們無法保證消息真正可達。
1.2、源碼
/** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer} */ // 同步發送 public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {} // 異步發送,sendCallback作為回調 public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {} // 單向發送,不關心發送結果,最不靠譜 public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}
2、解決方案二
2.1、說明
發送消息如果失敗或者超時了,則會自動重試。默認是重試三次,可以根據api進行更改,比如改為10次:
producer.setRetryTimesWhenSendFailed(10);
2.2、源碼
/** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)} */ // 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()默認為2,如果是同步發送,默認重試3次,否則重試1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // 選擇發送的消息queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { try { // 真正的發送邏輯,sendKernelImpl。 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果發送失敗了,則continue,意味着還會再次進入for,繼續重試發送 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } // 發送成功的話,將發送結果返回給調用者 return sendResult; default: break; } } catch (RemotingException e) { continue; } catch (...) { continue; } } }
說明:
這里只是總結出核心的發送邏輯,並不是全代碼。可以看出如下:
重試次數同步是1 +
this.defaultMQProducer.getRetryTimesWhenSendFailed()
,其他方式默認1次。this.defaultMQProducer.getRetryTimesWhenSendFailed()默認是2,我們可以手動設置
producer.setRetryTimesWhenSendFailed(10);
調用sendKernelImpl真正的去發送消息
如果是sync同步發送,且發送失敗了,則continue,意味着還會再次進入for,繼續重試發送
發送成功的話,將發送結果返回給調用者
如果發送異常進入catch了,則continue繼續下次重試。
3、解決方案三
3.1、說明
假設Broker宕機了,但是生產環境一般都是多M多S的,所以還會有其他master節點繼續提供服務,這也不會影響到我們發送消息,我們消息依然可達。因為比如恰巧發送到broker的時候,broker宕機了,producer收到broker的響應發送失敗了,這時候producer會自動重試,這時候宕機的broker就被踢下線了, 所以producer會換一台broker發送消息。
4、總結
Producer怎么保證發送階段消息可達?
失敗會自動重試,即使重試N次也不行后,那客戶端也會知道消息沒成功,這也可以自己補償等,不會盲目影響到主業務邏輯。再比如即使Broker掛了,那還有其他Broker再提供服務了,高可用,不影響。
總結為幾個字就是:同步發送+自動重試機制+多個Master節點
三、Broker存儲階段
Broker肯定是先把消息放到內存的,然后根據刷盤策略持久化到硬盤中,剛收到Producer的消息,再內存中了,但是異常宕機了,導致消息丟失。
1、解決方案一
MQ持久化消息分為兩種:同步刷盤和異步刷盤。默認情況是異步刷盤,Broker收到消息后會先存到cache里然后立馬通知Producer說消息我收到且存儲成功了,你可以繼續你的業務邏輯了,然后Broker起個線程異步的去持久化到磁盤中,但是Broker還沒持久化到磁盤就宕機的話,消息就丟失了。同步刷盤的話是收到消息存到cache后並不會通知Producer說消息已經ok了,而是會等到持久化到磁盤中后才會通知Producer說消息完事了。這也保障了消息不會丟失,但是性能不如異步高。看業務場景取舍。
修改刷盤策略為同步刷盤。默認情況下是異步刷盤的,如下配置
## 默認情況為 ASYNC_FLUSH,修改為同步刷盤:SYNC_FLUSH,實際場景看業務,同步刷盤效率肯定不如異步刷盤高。
flushDiskType = SYNC_FLUSH
對應的Java配置類如下:
package org.apache.rocketmq.store.config; public enum FlushDiskType { // 同步刷盤 SYNC_FLUSH, // 異步刷盤(默認) ASYNC_FLUSH }
異步刷盤默認10s執行一次,源碼如下:
/* * {@link org.apache.rocketmq.store.CommitLog#run()} */ while (!this.isStopped()) { try { // 等待10s this.waitForRunning(10); // 刷盤 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } }
2、解決方案二
集群部署,主從模式,高可用。
即使Broker設置了同步刷盤策略,但是Broker刷完盤后磁盤壞了,這會導致盤上的消息全TM丟了。但是如果即使是1主1從了,但是Master刷完盤后還沒來得及同步給Slave就磁盤壞了,不也是GG嗎?沒錯!
所以我們還可以配置不僅是等Master刷完盤就通知Producer,而是等Master和Slave都刷完盤后才去通知Producer說消息ok了。
## 默認為 ASYNC_MASTER
brokerRole=SYNC_MASTER
3、總結
若想很嚴格的保證Broker存儲消息階段消息不丟失,則需要如下配置,但是性能肯定遠差於默認配置。
# master 節點配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER # slave 節點配置 brokerRole=slave flushDiskType = SYNC_FLUSH
上面這個配置含義是:
Producer發消息到Broker后,Broker的Master節點先持久化到磁盤中,然后同步數據給Slave節點,Slave節點同步完且落盤完成后才會返回給Producer說消息ok了。
四、Consumer消費階段
消費失敗了其實也是消息丟失的一種變體。
1、解決方案一
消費者會先把消息拉取到本地,然后進行業務邏輯,業務邏輯完成后手動進行ack確認,這時候才會真正的代表消費完成。而不是說pull到本地后消息就算消費完了。舉個例子
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { String str = new String(msg.getBody()); System.out.println(str); } // ack,只有等上面一系列邏輯都處理完后,到這步CONSUME_SUCCESS才會通知broker說消息消費完成,如果上面發生異常沒有走到這步ack,則消息還是未消費狀態。而不是像比如redis的blpop,彈出一個數據后數據就從redis里消失了,並沒有等我們業務邏輯執行完才彈出。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
2、解決方案二
消息消費失敗自動重試。如果消費消息失敗了,沒有進行ack確認,則會自動重試,重試策略和次數(默認15次)如下配置
/** * Broker可以配置的所有選項 */ public class org.apache.rocketmq.store.config.MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }