rocketmq事務消息
參考:
https://blog.csdn.net/u011686226/article/details/78106215
https://yq.aliyun.com/articles/55630
https://my.oschina.net/u/2950586/blog/760677
https://blog.csdn.net/chunlongyu/article/details/53844393
分布式消息隊列RocketMQ--事務消息--解決分布式事務的最佳實踐
說到分布式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分布處於2個不同的DB,或者說2個不同的子系統里面,A要扣錢,B要加錢,如何保證原子性?
一般的思路都是通過消息中間件來實現“最終一致性”:A系統扣錢,然后發條消息給中間件,B系統接收此消息,進行加錢。
但這里面有個問題:A是先update DB,后發送消息呢? 還是先發送消息,后update DB?
假設先update DB成功,發送消息網絡失敗,重發又失敗,怎么辦?
假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎么辦?
所以,這里下個結論: 只要發送消息和update DB這2個操作不是原子的,無論誰先誰后,都是有問題的。
那這個問題怎么解決呢??
錯誤的方案0
有人可能想到了,我可以把“發送消息”這個網絡調用和update DB放在同1個事務里面,如果發送消息失敗,update DB自動回滾。這樣不就保證2個操作的原子性了嗎?
這個方案看似正確,其實是錯誤的,原因有2:
(1)網絡的2將軍問題:發送消息失敗,發送方並不知道是消息中間件真的沒有收到消息呢?還是消息已經收到了,只是返回response的時候失敗了?
如果是已經收到消息了,而發送端認為沒有收到,執行update db的回滾操作。則會導致A賬號的錢沒有扣,B賬號的錢卻加了。
(2)把網絡調用放在DB事務里面,可能會因為網絡的延時,導致DB長事務。嚴重的,會block整個DB。這個風險很大。
基於以上分析,我們知道,這個方案其實是錯誤的!
方案1–業務方自己實現
假設消息中間件沒有提供“事務消息”功能,比如你用的是Kafka。那如何解決這個問題呢?
解決方案如下:
(1)Producer端准備1張消息表,把update DB和insert message這2個操作,放在一個DB事務里面。
(2)准備一個后台程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。允許消息重復,但消息不會丟,順序也不會打亂。
(3)Consumer端准備一個判重表。處理過的消息,記在判重表里面。實現業務的冪等。但這里又涉及一個原子性問題:如果保證消息消費 + insert message到判重表這2個操作的原子性?
消費成功,但insert判重表失敗,怎么辦?關於這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過討論。
通過上面3步,我們基本就解決了這里update db和發送網絡消息這2個操作的原子性問題。
但這個方案的一個缺點就是:需要設計DB消息表,同時還需要一個后台任務,不斷掃描本地消息。導致消息的處理和業務邏輯耦合額外增加業務方的負擔。
方案2 – RocketMQ 事務消息
為了能解決該問題,同時又不和業務耦合,RocketMQ提出了“事務消息”的概念。
具體來說,就是把消息的發送分成了2個階段:Prepare階段和確認階段。
具體來說,上面的2個步驟,被分解成3個步驟:
(1) 發送Prepared消息
(2) update DB
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。
可能有人會問了,前2步執行成功了,最后1步失敗了怎么辦?這里就涉及到了RocketMQ的關鍵點:RocketMQ會定期(默認是1分鍾)掃描所有的Prepared消息,詢問發送方,到底是要確認這條消息發出去?還是取消此條消息?
具體代碼實現如下:
也就是定義了一個checkListener,RocketMQ會回調此Listener,從而實現上面所說的方案。
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,相當於示例中檢查Bob賬戶並扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();
public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.如果消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }
總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把“掃描消息表”這個事情,不讓業務方做,而是消息中間件幫着做了。
至於消息表,其實還是沒有省掉。因為消息中間件要詢問發送方,事物是否執行成功,還是需要一個“變相的本地消息表”,記錄事物執行狀態。
人工介入
可能有人又要說了,無論方案1,還是方案2,發送端把消息成功放入了隊列,但消費端消費失敗怎么辦?
消費失敗了,重試,還一直失敗怎么辦?是不是要自動回滾整個流程?
答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是非常巨大的,不但實現復雜,還會引入新的問題。比如自動回滾失敗,又怎么處理?
對應這種極低概率的case,采取人工處理,會比實現一個高復雜的自動化回滾系統,更加可靠,也更加簡單。
rocketmq事務消息的理解
http://www.cnblogs.com/wxd0108/p/6038543.html
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
如果endTransaction方法執行失敗,導致數據沒有發送到broker,broker會有回查線程定時(默認1分鍾)掃描每個存儲事務狀態的表格文件,如果是已經提交或者回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法,最后調用endTransactionOneway讓broker來更新消息的最終狀態。
再回到轉賬的例子,如果Bob的賬戶的余額已經減少,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程中有可能會出現消息重復的問題,按照前面的思路解決即可。
本質上還是個二階段提交
重復消費冪等性要自己做
RocketMQ 事務消息
源代碼版本是3.2.6,還是直接跑源代碼。rocketmq事務消息是發生在Producer和Broker之間,是二階段提交。
二階段提交過程看圖:

第一階段是:步驟1,2,3。
第二階段是:步驟4,5。
具體說明:
只有在消息發送成功,並且本地操作執行成功時,才發送提交事務消息,做事務提交。
其他的情況,例如消息發送失敗,直接發送回滾消息,進行回滾,或者發送消息成功,但是執行本地操作失敗,也是發送回滾消息,進行回滾。
事務消息原理實現過程:
一階段:
Producer向Broker發送1條類型為TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通過該變量可以直接定位到消息本身),由於該類型的消息在保存的時候,commitLogOffset沒有被保存到consumerQueue中,此時客戶端通過consumerQueue取不到commitLogOffset,所以該類型的消息無法被取到,導致不會被消費。
一階段的過程中,Broker保存了1條消息。
二階段:
Producer端的TransactionExecuterImpl執行本地操作,返回本地事務的狀態,然后發送一條類型為TransactionCommitType或者TransactionRollbackType的消息到Broker確認提交或者回滾,Broker通過Request中的commitLogOffset,獲取到上面狀態為TransactionPreparedType的消息(簡稱消息A),然后重新構造一條與消息A內容相同的消息B,設置狀態為TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType類型的,會放commitLogOffset到consumerQueue中,TransactionRollbackType類型的,消息體設置為空,不會放commitLogOffset到consumerQueue中。
二階段的過程中,Broker也保存了1條消息。
總結:事務消息過程中,broker一共保存2條消息。
貼代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
TransactionCheckListenerImpl.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
本地操作類TransactionExecuterImpl.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Producer類:TransactionProducer.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
RocketMQ 事務消息
RocketMQ將事務拆分成小事務異步執行的方式來執行。
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。RocketMQ會定期掃描消息集群中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
RocketMQ事務消息:
TransactionCheckListenerImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }
TransactionExecuterImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }
TransactionProducer:
package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
RocketMQConsumer:
package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
