什么是可靠消息?
為什么我們需要它,為什么我們要強調可靠?
生產方 消息發送出去了,如果生產方收到了消息的正常反饋,那么我們就可以知道消息的確切的狀態。 如果消息無響應 或者超時了呢? 有多個情況,
1 消息未到達mq,發送途中 就某些原因丟失了,
2 消息送達mq,但是mq處理未完成就丟失(這里又可以細分為:mq未記錄日志,已記錄日志但未落盤消息,已落盤但未來得及響應請求,已落盤但未完成推送(僅僅針對推的情況))。
3 消息送達mq,消息也已經被mq 處理完畢,但是響應在 網絡途中 丟失。
4 生產方對發送的消息設置超時時間。 雖然消息送達mq,消息也已經被mq處理,也返回來了,但是由於此時已經超時,生產方已經斷開了網絡連接,從而丟棄了響應。
盡管我們可以盡量的確保MQ可靠,讓mq 可靠的持久化消息,但是網絡 是不可靠的, 幾乎沒有辦法確保 網絡 可靠。。。 ( 網絡可靠就這么難嗎??)
如果知道是情況1、2,我們可以重新發送消息即可,也就是重試。(當然,如果網絡問題,或者mq掛掉了,重試也沒有,只有等待 這些問題回復才重試才有意義,因此,我們可以設置一個 比較長的、“按照指數爆炸” 的 “重試間隔時間”)
如果知道是情況3,如果我們不需要消息id,那么我們可以認為 消息發送成功,業務也處理成功。不用重試了!
對於前面3個情況,生產方是無法判斷 消息到底mq 是否已經處理好了, 這就顯得 “不可靠”了, 除了量子力學,沒人喜歡不確定性。 有可能1 、 2 也有可能是3,怎么辦? 或許我們可以 通過查詢mq 的方式(也就是peek 一下,但是不消費)判斷 是否是3。
所以,我們期望有一個可靠消息,能夠避免任何問題,包括網絡問題。 如果消息不可靠,那么我們就需要采取其他的措施,比如之前講的 本地消息表。。。
分布式事務大致可以分為以下四種( 不知道是什么樣的一個分類 准則):
- 兩階段型
- 補償型
- 異步確保型
- 最大努力通知型
可靠消息, 屬於 異步確保型。 why?后面會說明。
可靠消息 的實現
可靠消息 可能有很多實現方式,但一般就是指事務型消息。可靠消息 一般也是基於MQ的。 前面說過了基於本地消息表的分布式事務。基於本地消息表的分布式事務 其實也可以認為是 基於MQ的分布式事務的 一種情況。
基於MQ的分布式事務:
生產方處理過程: 1 主動方應用先把消息發給消息中間件,消息狀態標記為“待確認”; 2 消息中間件收到消息后,把消息持久化到消息存儲中,但並不向被動方應用投遞消息; 3 消息中間件返回消息持久化結果(成功/失敗),主動方應用根據返回結果進行判斷如何進行業務操作處理: 失敗:放棄業務操作處理,結束(必要時向上層返回失敗結果); 成功:執行業務操作處理; 4 業務操作完成后,把業務操作結果(成功/失敗)發送給消息中間件; 5 消息中間件收到業務操作結果后,根據業務結果進行處理; 失敗:刪除消息存儲中的消息,結束; 成功:更新消息存儲中的消息狀態為“待發送(可發送)”,緊接着執行 消息投遞; 6 前面的正向流程都成功后,向被動方應用投遞消息;
消息發送一致性方案的正向流程是可行的,但異常流程怎么處理呢?
消息發送到消息中間件中能得到保障了,但消息的准確消費(投遞)又如何保障呢?
有沒有支持這種發送一致性流程的現成消息中間件?
—— 其實是有的,RocketMQ, 另外我認為, 可以消費方自己去消費,而不是推消息給 消費方,會不會更好? 推的話 會有一些延遲,但是 這樣也降低了 MQ的壓力。
--------------------- 作者:chenshiying007 來源:CSDN 原文: https://blog.csdn.net/qq_27384769/article/details/79305402 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
基於RocketMQ的分布式事務:
在RocketMQ中實現了分布式事務,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部。
下面簡單介紹一下MQ事務,如果想對其詳細了解可以參考: https://www.jianshu.com/p/453c6e7ff81c。
基本流程如下: 第一階段Prepared消息,會拿到消息的地址。
第二階段執行本地事務。
第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。消息接受者就能使用這個消息。
如果確認消息失敗,在RocketMq Broker中提供了定時掃描沒有更新狀態的消息,如果有消息沒有得到確認,會向消息發送者發送消息,來判斷是否提交,在rocketmq中是以listener的形式給發送者,用來處理。
如果消費超時,則需要一直重試,消息接收端需要保證冪等。如果消息消費失敗,這個就需要人工進行處理,因為這個概率較低,如果為了這種小概率時間而設計這個復雜的流程反而得不償失。
===========================================================================
上面的說明 摘抄於 ,我看了后還是有些懵。仔細看了https://blog.csdn.net/u010425776/article/details/79516298, 之后,我明白了一些。
消息生產過程的可靠性保證
在系統A處理任務A前,首先向消息中間件發送一條消息 消息中間件收到后將該條消息持久化,但並不投遞。此時下游系統B仍然不知道該條消息的存在。 消息中間件持久化成功后,便向系統A返回一個確認應答; 系統A收到確認應答后,則可以開始處理任務A; 任務A處理完成后,向消息中間件發送Commit請求。該請求發送完成后,對系統A而言,該事務的處理過程就結束了,此時它可以處理別的任務了。 但commit消息可能會在傳輸途中丟失,從而消息中間件並不會向系統B投遞這條消息,從而系統就會出現不一致性。這個問題由消息中間件的事務回查機制完成,下文會介紹。 消息中間件收到Commit指令后,便向系統B投遞該消息,從而觸發任務B的執行; 當任務B執行完成后,系統B向消息中間件返回一個確認應答,告訴消息中間件該消息已經成功消費,此時,這個分布式事務完成。 --------------------- 作者:凌瀾星空 來源:CSDN 原文:https://blog.csdn.net/u010425776/article/details/79516298 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
上述過程中,如果任務A處理失敗,那么需要進入回滾流程,如下圖所示:
- 若系統A在處理任務A時失敗,那么就會向消息中間件發送Rollback請求。和發送Commit請求一樣,系統A發完之后便可以認為回滾已經完成,它便可以去做其他的事情。
- 消息中間件收到回滾請求后,直接將該消息丟棄,而不投遞給系統B,從而不會觸發系統B的任務B。
上面所介紹的Commit和Rollback都屬於理想情況,但在實際系統中,Commit和Rollback指令都有可能在傳輸途中丟失。那么當出現這種情況的時候,消息中間件是如何保證數據一致性呢?——答案就是超時詢問機制。
系統A除了實現正常的業務流程外,還需提供一個事務詢問的接口,供消息中間件調用。當消息中間件收到一條事務型消息后便開始計時,如果到了超時時間也沒收到系統A發來的Commit或Rollback指令的話,就會主動調用系統A提供的事務詢問接口詢問該系統目前的狀態。該接口會返回三種結果:
- 提交
若獲得的狀態是“提交”,則將該消息投遞給系統B。 - 回滾
若獲得的狀態是“回滾”,則直接將條消息丟棄。 - 處理中
若獲得的狀態是“處理中”,則繼續等待
消息中間件的超時詢問機制能夠防止上游系統因在傳輸過程中丟失Commit/Rollback指令而導致的系統不一致情況,而且能降低上游系統的阻塞時間,
上游系統只要發出Commit/Rollback指令后便可以處理其他任務,無需等待確認應答。而Commit/Rollback指令丟失的情況通過超時詢問機制來彌補,
這樣大大降低上游系統的阻塞時間,提升系統的並發度。 --------------------- 作者:凌瀾星空 來源:CSDN 原文:https://blog.csdn.net/u010425776/article/details/79516298 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
系統A發送消息的操作應該是同步的,因為我們需要獲取消息的地址,否則后面就無法進行消息更新和確認或取消了。 但是呢,這一步驟,如前所述,也是可能出現問題的,也就是無法區分前述情況1、2、3。 但是呢,這個也不要緊的, 因為 消息必須要確認后, 后面的系統才會進行消費。 如果出現情況3,那么我們 盡可以的把 這個待確認的消息丟棄。 而系統A 因為無法收到mq 的反饋, 不會進行下一步, 也可以保證整個系統的 一致性。
下面來說一說消息投遞(消費)過程的可靠性保證。
當上游系統執行完任務並向消息中間件提交了Commit指令后,便可以處理其他任務了,此時它可以認為事務已經完成,接下來消息中間件一定會保證消息被下游系統成功消費掉!那么這是怎么做到的呢?這由消息中間件的投遞流程來保證。
消息中間件向下游系統投遞完消息后便進入阻塞等待狀態,下游系統便立即進行任務的處理,任務處理完成后便向消息中間件返回應答。消息中間件收到確認應答后便認為該事務處理完畢!
如果消息在投遞過程中丟失,
或消息的確認應答在返回途中丟失,
那么消息中間件在等待確認應答超時之后就會重新投遞,直到下游消費者返回消費成功響應為止。當然,一般消息中間件可以設置消息重試的次數和時間間隔,比如:當第一次投遞失敗后,每隔五分鍾重試一次,一共重試3次。如果重試3次之后仍然投遞失敗,那么這條消息就需要人工干預。
有的同學可能要問:消息投遞失敗后為什么不回滾消息,而是不斷嘗試重新投遞?
這就涉及到整套分布式事務系統的實現成本問題。
我們知道,當系統A將向消息中間件發送Commit指令后,它便去做別的事情了。如果此時消息投遞失敗,需要回滾的話,就需要讓系統A事先提供回滾接口,這無疑增加了額外的開發成本,業務系統的復雜度也將提高。對於一個業務系統的設計目標是,在保證性能的前提下,最大限度地降低系統復雜度,從而能夠降低系統的運維成本。
———— 如果不斷重試, 還是失敗了, 那么就需要想想其他方法了,比如發通知然后人工介入啊等等。。
不知大家是否發現,上游系統A向消息中間件提交Commit/Rollback消息采用的是異步方式,也就是當上游系統提交完消息后便可以去做別的事情,接下來提交、回滾就完全交給消息中間件來完成,並且完全信任消息中間件,認為它一定能正確地完成事務的提交或回滾。然而,消息中間件向下游系統投遞消息的過程是同步的。也就是消息中間件將消息投遞給下游系統后,它會阻塞等待,等下游系統成功處理完任務返回確認應答后才取消阻塞等待。為什么這兩者在設計上是不一致的呢?
首先,上游系統和消息中間件之間采用異步通信是為了提高系統並發度。業務系統直接和用戶打交道,用戶體驗尤為重要,因此這種異步通信方式能夠極大程度地降低用戶等待時間。此外,異步通信相對於同步通信而言,沒有了長時間的阻塞等待,因此系統的並發性也大大增加。但異步通信可能會引起Commit/Rollback指令丟失的問題,這就由消息中間件的超時詢問機制來彌補。
那么,消息中間件和下游系統之間為什么要采用同步通信呢?
異步能提升系統性能,但隨之會增加系統復雜度;而同步雖然降低系統並發度,但實現成本較低。因此,在對並發度要求不是很高的情況下,或者服務器資源較為充裕的情況下,我們可以選擇同步來降低系統的復雜度。
我們知道,消息中間件是一個獨立於業務系統的第三方中間件,它不和任何業務系統產生直接的耦合,它也不和用戶產生直接的關聯,它一般部署在獨立的服務器集群上,具有良好的可擴展性,所以不必太過於擔心它的性能,如果處理速度無法滿足我們的要求,可以增加機器來解決。而且,即使消息中間件處理速度有一定的延遲那也是可以接受的,因為前面所介紹的BASE理論就告訴我們了,我們追求的是最終一致性,而非實時一致性,因此消息中間件產生的時延導致事務短暫的不一致是可以接受的。
---------------------
為什么可靠消息屬於 異步確保? 我們可以看到 系統A發送commit、rollback 都是 異步發送的, 也就是直接發送,但不獲取任何反饋結果。 也大概就是為什么稱作 “異步確保” 的原因吧!
示例

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", (Throwable)null); } else { Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, "TRAN_MSG", "true"); MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup()); try { //這里執行第一次發送消息,也就是預發送,並獲取sendResult,這里包含msg的所有消息 sendResult = this.send(msg); } catch (Exception var10) { throw new MQClientException("send message Exception", var10); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; //根據預發送消息的狀態做不同的處理,這里主要看SEND_OK switch(sendResult.getSendStatus()) { case SEND_OK: try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } // 這里做第二步,執行業務邏輯,即本地事物, //具體的本地事物在LocalTransactionExecuter參數的實現類中, //需要根據自己的業務邏輯去寫,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);會執行實 //現類中的executeLocalTransactionBranch業務。 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { this.log.info("executeLocalTransactionBranch return {}", localTransactionState); this.log.info(msg.toString()); } } catch (Throwable var9) { this.log.info("executeLocalTransactionBranch exception", var9); this.log.info(msg.toString()); localException = var9; } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; } try { // 這里的方法,其中的localTransactionState是第二次執行業務邏輯的結果 //可以根據這個結果,知道本地事物執行的成功還是失敗。或者是異常localException, //這樣可以根據第一次發送消息的結果sendResult,去修改mq中第一次發送消息的狀態,完成第三步操作。 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception var8) { this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; } } public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { // 獲取第一次發送消息的id MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } //獲取事物id String transactionId = sendResult.getTransactionId(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); //根據本地事物執行狀態localTransactionState,告知mq修改狀態 switch(localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(Integer.valueOf(8)); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(Integer.valueOf(12)); break; case UNKNOW: requestHeader.setCommitOrRollback(Integer.valueOf(0)); } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null; //具體執行第三步完成整個事務。 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout()); } 作者:時之令 鏈接:https://www.jianshu.com/p/8c997d0917c6 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
從代碼看, 這個處理有些復雜,或許我們需要把rocketmq 的文檔和 api 仔細看看。
參考:
作者:凌瀾星空
來源:CSDN
原文:https://blog.csdn.net/u010425776/article/details/79516298
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!