分布式事務之可靠消息


什么是可靠消息?

為什么我們需要它,為什么我們要強調可靠?

生產方 消息發送出去了,如果生產方收到了消息的正常反饋,那么我們就可以知道消息的確切的狀態。 如果消息無響應 或者超時了呢? 有多個情況,

1 消息未到達mq,發送途中 就某些原因丟失了,

2 消息送達mq,但是mq處理未完成就丟失(這里又可以細分為:mq未記錄日志,已記錄日志但未落盤消息,已落盤但未來得及響應請求,已落盤但未完成推送(僅僅針對推的情況))。

3 消息送達mq,消息也已經被mq 處理完畢,但是響應在 網絡途中 丟失。

4 生產方對發送的消息設置超時時間。 雖然消息送達mq,消息也已經被mq處理,也返回來了,但是由於此時已經超時,生產方已經斷開了網絡連接,從而丟棄了響應。

盡管我們可以盡量的確保MQ可靠,讓mq 可靠的持久化消息,但是網絡 是不可靠的, 幾乎沒有辦法確保 網絡 可靠。。。 ( 網絡可靠就這么難嗎??)

如果知道是情況1、2,我們可以重新發送消息即可,也就是重試。(當然,如果網絡問題,或者mq掛掉了,重試也沒有,只有等待 這些問題回復才重試才有意義,因此,我們可以設置一個 比較長的、“按照指數爆炸” 的  “重試間隔時間”)

如果知道是情況3,如果我們不需要消息id,那么我們可以認為 消息發送成功,業務也處理成功。不用重試了!

對於前面3個情況,生產方是無法判斷 消息到底mq 是否已經處理好了, 這就顯得 “不可靠”了, 除了量子力學,沒人喜歡不確定性。 有可能1 、 2  也有可能是3,怎么辦? 或許我們可以 通過查詢mq 的方式(也就是peek 一下,但是不消費)判斷 是否是3。 

 

所以,我們期望有一個可靠消息,能夠避免任何問題,包括網絡問題。 如果消息不可靠,那么我們就需要采取其他的措施,比如之前講的 本地消息表。。。

 

分布式事務大致可以分為以下四種( 不知道是什么樣的一個分類 准則):

  • 兩階段型
  • 補償型
  • 異步確保型
  • 最大努力通知型

可靠消息, 屬於 異步確保型。 why?后面會說明。

 

可靠消息 的實現

可靠消息 可能有很多實現方式,但一般就是指事務型消息。可靠消息 一般也是基於MQ的。 前面說過了基於本地消息表的分布式事務。基於本地消息表的分布式事務 其實也可以認為是 基於MQ的分布式事務的 一種情況。

基於MQ的分布式事務:

生產方處理過程: 1 主動方應用先把消息發給消息中間件,消息狀態標記為“待確認”;
2 消息中間件收到消息后,把消息持久化到消息存儲中,但並不向被動方應用投遞消息;
3 消息中間件返回消息持久化結果(成功/失敗),主動方應用根據返回結果進行判斷如何進行業務操作處理:
      失敗:放棄業務操作處理,結束(必要時向上層返回失敗結果);
      成功:執行業務操作處理;
4 業務操作完成后,把業務操作結果(成功/失敗)發送給消息中間件;
5 消息中間件收到業務操作結果后,根據業務結果進行處理;
      失敗:刪除消息存儲中的消息,結束;
      成功:更新消息存儲中的消息狀態為“待發送(可發送)”,緊接着執行
消息投遞;

6 前面的正向流程都成功后,向被動方應用投遞消息;


消息發送一致性方案的正向流程是可行的,但異常流程怎么處理呢?
消息發送到消息中間件中能得到保障了,但消息的准確消費(投遞)又如何保障呢?
有沒有支持這種發送一致性流程的現成消息中間件?
—— 其實是有的,RocketMQ, 另外我認為, 可以消費方自己去消費,而不是推消息給 消費方,
會不會更好? 推的話 會有一些延遲,但是 這樣也降低了 MQ的壓力。
--------------------- 
作者:chenshiying007 
來源:CSDN 
原文:    https://blog.csdn.net/qq_27384769/article/details/79305402 
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

 

基於RocketMQ的分布式事務:

在RocketMQ中實現了分布式事務,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部。

下面簡單介紹一下MQ事務,如果想對其詳細了解可以參考: https://www.jianshu.com/p/453c6e7ff81c。 

 

基本流程如下: 第一階段Prepared消息,會拿到消息的地址。

第二階段執行本地事務。

第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。消息接受者就能使用這個消息。

如果確認消息失敗,在RocketMq Broker中提供了定時掃描沒有更新狀態的消息,如果有消息沒有得到確認,會向消息發送者發送消息,來判斷是否提交,在rocketmq中是以listener的形式給發送者,用來處理。 

 

如果消費超時,則需要一直重試,消息接收端需要保證冪等。如果消息消費失敗,這個就需要人工進行處理,因為這個概率較低,如果為了這種小概率時間而設計這個復雜的流程反而得不償失。

===========================================================================

上面的說明 摘抄於 ,我看了后還是有些懵。仔細看了https://blog.csdn.net/u010425776/article/details/79516298, 之后,我明白了一些。

消息生產過程的可靠性保證

在系統A處理任務A前,首先向消息中間件發送一條消息
消息中間件收到后將該條消息持久化,但並不投遞。此時下游系統B仍然不知道該條消息的存在。
消息中間件持久化成功后,便向系統A返回一個確認應答;
系統A收到確認應答后,則可以開始處理任務A;
任務A處理完成后,向消息中間件發送Commit請求。該請求發送完成后,對系統A而言,該事務的處理過程就結束了,此時它可以處理別的任務了。 
但commit消息可能會在傳輸途中丟失,從而消息中間件並不會向系統B投遞這條消息,從而系統就會出現不一致性。這個問題由消息中間件的事務回查機制完成,下文會介紹。
消息中間件收到Commit指令后,便向系統B投遞該消息,從而觸發任務B的執行;
當任務B執行完成后,系統B向消息中間件返回一個確認應答,告訴消息中間件該消息已經成功消費,此時,這個分布式事務完成。
--------------------- 
作者:凌瀾星空 
來源:CSDN 
原文:https://blog.csdn.net/u010425776/article/details/79516298 
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

 

 

上述過程中,如果任務A處理失敗,那么需要進入回滾流程,如下圖所示:  

  • 若系統A在處理任務A時失敗,那么就會向消息中間件發送Rollback請求。和發送Commit請求一樣,系統A發完之后便可以認為回滾已經完成,它便可以去做其他的事情。
  • 消息中間件收到回滾請求后,直接將該消息丟棄,而不投遞給系統B,從而不會觸發系統B的任務B。

上面所介紹的Commit和Rollback都屬於理想情況,但在實際系統中,Commit和Rollback指令都有可能在傳輸途中丟失。那么當出現這種情況的時候,消息中間件是如何保證數據一致性呢?——答案就是超時詢問機制。

 

系統A除了實現正常的業務流程外,還需提供一個事務詢問的接口,供消息中間件調用。當消息中間件收到一條事務型消息后便開始計時,如果到了超時時間也沒收到系統A發來的Commit或Rollback指令的話,就會主動調用系統A提供的事務詢問接口詢問該系統目前的狀態。該接口會返回三種結果:

    • 提交 
      若獲得的狀態是“提交”,則將該消息投遞給系統B。
    • 回滾 
      若獲得的狀態是“回滾”,則直接將條消息丟棄。
    • 處理中 
      若獲得的狀態是“處理中”,則繼續等待
消息中間件的超時詢問機制能夠防止上游系統因在傳輸過程中丟失Commit/Rollback指令而導致的系統不一致情況,而且能降低上游系統的阻塞時間,
上游系統只要發出Commit/Rollback指令后便可以處理其他任務,無需等待確認應答。而Commit/Rollback指令丟失的情況通過超時詢問機制來彌補,
這樣大大降低上游系統的阻塞時間,提升系統的並發度。
--------------------- 作者:凌瀾星空 來源:CSDN 原文:https://blog.csdn.net/u010425776/article/details/79516298 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

系統A發送消息的操作應該是同步的,因為我們需要獲取消息的地址,否則后面就無法進行消息更新和確認或取消了。 但是呢,這一步驟,如前所述,也是可能出現問題的,也就是無法區分前述情況1、2、3。 但是呢,這個也不要緊的, 因為 消息必須要確認后, 后面的系統才會進行消費。 如果出現情況3,那么我們 盡可以的把 這個待確認的消息丟棄。 而系統A 因為無法收到mq 的反饋, 不會進行下一步, 也可以保證整個系統的 一致性。

 

下面來說一說消息投遞(消費)過程的可靠性保證。

當上游系統執行完任務並向消息中間件提交了Commit指令后,便可以處理其他任務了,此時它可以認為事務已經完成,接下來消息中間件一定會保證消息被下游系統成功消費掉!那么這是怎么做到的呢?這由消息中間件的投遞流程來保證。

消息中間件向下游系統投遞完消息后便進入阻塞等待狀態,下游系統便立即進行任務的處理,任務處理完成后便向消息中間件返回應答。消息中間件收到確認應答后便認為該事務處理完畢!

如果消息在投遞過程中丟失,

 

或消息的確認應答在返回途中丟失,

 

那么消息中間件在等待確認應答超時之后就會重新投遞,直到下游消費者返回消費成功響應為止。當然,一般消息中間件可以設置消息重試的次數和時間間隔,比如:當第一次投遞失敗后,每隔五分鍾重試一次,一共重試3次。如果重試3次之后仍然投遞失敗,那么這條消息就需要人工干預。

 


有的同學可能要問:消息投遞失敗后為什么不回滾消息,而是不斷嘗試重新投遞?

這就涉及到整套分布式事務系統的實現成本問題。
我們知道,當系統A將向消息中間件發送Commit指令后,它便去做別的事情了。如果此時消息投遞失敗,需要回滾的話,就需要讓系統A事先提供回滾接口,這無疑增加了額外的開發成本,業務系統的復雜度也將提高。對於一個業務系統的設計目標是,在保證性能的前提下,最大限度地降低系統復雜度,從而能夠降低系統的運維成本。

————  如果不斷重試, 還是失敗了, 那么就需要想想其他方法了,比如發通知然后人工介入啊等等。。

 

 

不知大家是否發現,上游系統A向消息中間件提交Commit/Rollback消息采用的是異步方式,也就是當上游系統提交完消息后便可以去做別的事情,接下來提交、回滾就完全交給消息中間件來完成,並且完全信任消息中間件,認為它一定能正確地完成事務的提交或回滾。然而,消息中間件向下游系統投遞消息的過程是同步的。也就是消息中間件將消息投遞給下游系統后,它會阻塞等待,等下游系統成功處理完任務返回確認應答后才取消阻塞等待。為什么這兩者在設計上是不一致的呢?

首先,上游系統和消息中間件之間采用異步通信是為了提高系統並發度。業務系統直接和用戶打交道,用戶體驗尤為重要,因此這種異步通信方式能夠極大程度地降低用戶等待時間。此外,異步通信相對於同步通信而言,沒有了長時間的阻塞等待,因此系統的並發性也大大增加。但異步通信可能會引起Commit/Rollback指令丟失的問題,這就由消息中間件的超時詢問機制來彌補。

那么,消息中間件和下游系統之間為什么要采用同步通信呢?

異步能提升系統性能,但隨之會增加系統復雜度;而同步雖然降低系統並發度,但實現成本較低。因此,在對並發度要求不是很高的情況下,或者服務器資源較為充裕的情況下,我們可以選擇同步來降低系統的復雜度。
我們知道,消息中間件是一個獨立於業務系統的第三方中間件,它不和任何業務系統產生直接的耦合,它也不和用戶產生直接的關聯,它一般部署在獨立的服務器集群上,具有良好的可擴展性,所以不必太過於擔心它的性能,如果處理速度無法滿足我們的要求,可以增加機器來解決。而且,即使消息中間件處理速度有一定的延遲那也是可以接受的,因為前面所介紹的BASE理論就告訴我們了,我們追求的是最終一致性,而非實時一致性,因此消息中間件產生的時延導致事務短暫的不一致是可以接受的。
---------------------

 

為什么可靠消息屬於 異步確保? 我們可以看到 系統A發送commit、rollback 都是 異步發送的, 也就是直接發送,但不獲取任何反饋結果。 也大概就是為什么稱作 “異步確保” 的原因吧!

 

示例

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());

            try {
                //這里執行第一次發送消息,也就是預發送,並獲取sendResult,這里包含msg的所有消息
                sendResult = this.send(msg);
            } catch (Exception var10) {
                throw new MQClientException("send message Exception", var10);
            }

            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            //根據預發送消息的狀態做不同的處理,這里主要看SEND_OK
            switch(sendResult.getSendStatus()) {
            case SEND_OK:
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }

// 這里做第二步,執行業務邏輯,即本地事物,
//具體的本地事物在LocalTransactionExecuter參數的實現類中,
//需要根據自己的業務邏輯去寫,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);會執行實
//現類中的executeLocalTransactionBranch業務。
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (Throwable var9) {
                    this.log.info("executeLocalTransactionBranch exception", var9);
                    this.log.info(msg.toString());
                    localException = var9;
                }
                break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }

            try {
// 這里的方法,其中的localTransactionState是第二次執行業務邏輯的結果
//可以根據這個結果,知道本地事物執行的成功還是失敗。或者是異常localException,
//這樣可以根據第一次發送消息的結果sendResult,去修改mq中第一次發送消息的狀態,完成第三步操作。
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception var8) {
                this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
            }

            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    }

    
public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

// 獲取第一次發送消息的id
        MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

    //獲取事物id
        String transactionId = sendResult.getTransactionId();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());

       //根據本地事物執行狀態localTransactionState,告知mq修改狀態

        switch(localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(8));
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(12));
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(Integer.valueOf(0));
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
//具體執行第三步完成整個事務。      
  this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
}
 
 
作者:時之令
鏈接:https://www.jianshu.com/p/8c997d0917c6
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
View Code

從代碼看, 這個處理有些復雜,或許我們需要把rocketmq 的文檔和 api 仔細看看。

 

參考:

作者:凌瀾星空
來源:CSDN
原文:https://blog.csdn.net/u010425776/article/details/79516298
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM