rocketmq源碼分析4-事務消息實現原理


為什么消息要具備事務能力

參見阿里雲消息隊列 MQ關於事務消息的文檔的圖
還是比較清晰的。簡單的說 就是在你業務邏輯過程中,需要發送一條消息給訂閱消息的人,但是期望是 此邏輯過程完全成功完成之后才能使訂閱者收到消息。
業務邏輯過程 假設是這樣的:
邏輯部分a-->發消息給MQ-->邏輯部分b
假設我們在發送消息給MQ之后執行邏輯部分b時產生了異常,那如果MQ不具備事務消息能力時,訂閱者也收到了消息。這是我們不希望見到的。

分布式事務基礎概念

  1. 關於分布式事務、兩階段提交協議、三階提交協議
  2. 理解分布式事務的兩階段提交2pc
  3. 分布式事務(一)兩階段提交及JTA
  4. 分布式系統常用思想和技術總結
  5. 【整理】腦裂問題
  6. 分布式系統的事務處理
  7. 多版本並發控制(MVCC)在分布式系統中的應用
  8. 戲說PAXOS
  9. 阿里雲消息隊列 MQ關於事務消息的文檔

rocketmq具備事務能力的demo

參見TransactionProducerDemo.java

向producer注冊的TransactionCheckListener實現並沒有用,因為返回LocalTransactionState.UNKNOW狀態時,在3.2.6版本中,是不支持此狀態下回調TransactionCheckListener的,具體參見以下兩個issue。

事務消息 LocalTransactionState.UNKNOW 狀態下不回查 #221
開源版本支持事務消息嗎 #364
測試過程中發現返回UNKNOW狀態是不能正確達到期望的,但是返回ROLLBACK_MESSAGE狀態還是能達到期望的。

實現分析入口

這個實現的入口還是比較容易找的,只要搜尋ROLLBACK_MESSAGE這個變量的引用即可發現。順着搜索查看,其實很容易發現,客戶端在收到業務邏輯返回的事務狀態時會發送一條結束事務的指令給broker。

// com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行  
RemotingCommand request =
                RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

按broker對外部指令的常規做法,一般會有一個Processor與之對應。是EndTransactionProcessor,看BrokerController374行其注冊的地方,沒錯。

EndTransactionProcessor分析(broker側)

如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE時,EndTransactionProcessor會清空message的body的置成null,queueOffset也不會更新,那么consumer就收不到消息了。

//--EndTransactionProcessor.processRequest  200行--
if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {
    msgInner.setBody(null);
}

如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor則會照常put message。

事務消息分為兩個階段,prepare階段與commit階段。prepare階段的消息會寫入store,只是寫完之后的queueOffset(邏輯位置)為0(commit階段寫完消息后的queueOffset就不是0了。);

 

// -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 --
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queue
case MessageSysFlag.TransactionPreparedType:
case MessageSysFlag.TransactionRollbackType:
    queueOffset = 0L;
    break;
case MessageSysFlag.TransactionNotType:
case MessageSysFlag.TransactionCommitType:
default:
    break;
待分析問題列表:
1. prepare階段已經將消息發了過去,commit的時候是否還會再發送一次消息?
2. rollback的時候是否會將prepare的消息刪除?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM