52.RocketMQ 事務


今天的博客有點多,因為前幾天一直用筆記錄,今天都補上了。后面的博客先停一段時間,后面還有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等內容,只是因為最近准備跳槽,停更一段時間,等到新公司后再繼續更新。

場景1:支付寶轉1w到余額寶,支付寶扣了1w,服務掛了怎么辦?余額還沒有加上

場景2:訂單系統和庫存系統如何保持一致

如果是本地的話很好解決

  • begin transaction
      update 支付寶 - 1w;
      update 余額寶 + 1W;
    end transaction
  • 用Spring的話,方法上加 @Transaction注釋

那如果是跨系統的呢?該如何解決?

有一種思路是這樣的:

  1.  client發送轉賬請求給事務協調器
  2. 事務協調器先發送扣款請求給支付寶,返回執行結果(這時並沒有提交)
  3. 事務協調器在發送加款請求給余額寶,返回執行結果(這時也沒有提交)
  4. 事務協調器看兩個執行結果都返回OK 就執行第四步,提交2和3沒有提交的更新請求。

但是這個有個問題,那就是性能很受影響,主要卡在事務協調器這里。

RocketMQ的實現方式如下(圖片來自網絡):

 

支付寶先生成 扣款信息 --> 消息隊列 --> 余額寶消費消息

 

發送消息:

 1 import com.alibaba.rocketmq.client.exception.MQClientException;
 2 import com.alibaba.rocketmq.client.producer.SendResult;
 3 import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
 4 import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
 5 import com.alibaba.rocketmq.common.message.Message;
 6 
 7 
 8 /**
 9  * 發送事務消息例子
10  * 
11  */
12 public class TransactionProducer {
13     public static void main(String[] args) throws MQClientException, InterruptedException {
14 
15         TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
16         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
17         // 事務回查最小並發數
18         producer.setCheckThreadPoolMinSize(2);
19         // 事務回查最大並發數
20         producer.setCheckThreadPoolMaxSize(2);
21         // 隊列數
22         producer.setCheckRequestHoldMax(2000);
23         producer.setTransactionCheckListener(transactionCheckListener);
24         producer.start();
25 
26         String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
27         TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
28         for (int i = 0; i < 100; i++) {
29             try {
30                 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
31                     ("Hello RocketMQ " + i).getBytes());
32                 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
33                 System.out.println(sendResult);
34 
35                 Thread.sleep(10);
36             }
37             catch (MQClientException e) {
38                 e.printStackTrace();
39             }
40         }
41 
42         for (int i = 0; i < 100000; i++) {
43             Thread.sleep(1000);
44         }
45 
46         producer.shutdown();
47 
48     }
49 }

 

執行本地事務

 1 import java.util.concurrent.atomic.AtomicInteger;
 2 
 3 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
 4 import com.alibaba.rocketmq.client.producer.LocalTransactionState;
 5 import com.alibaba.rocketmq.common.message.Message;
 6 
 7 
 8 /**
 9  * 執行本地事務
10  */
11 public class TransactionExecuterImpl implements LocalTransactionExecuter {
12     private AtomicInteger transactionIndex = new AtomicInteger(1);
13 
14 
15     @Override
16     public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
17         int value = transactionIndex.getAndIncrement();
18 
19         if (value == 0) {
20             throw new RuntimeException("Could not find db");
21         }
22         else if ((value % 5) == 0) {
23             return LocalTransactionState.ROLLBACK_MESSAGE;
24         }
25         else if ((value % 4) == 0) {
26             return LocalTransactionState.COMMIT_MESSAGE;
27         }
28 
29         return LocalTransactionState.UNKNOW;
30     }
31 }

 

服務器回查客戶端(這個功能在開源版本中已經被咔掉了,但是我們還是要寫,不然報錯)

 1 import java.util.concurrent.atomic.AtomicInteger;
 2 
 3 import com.alibaba.rocketmq.client.producer.LocalTransactionState;
 4 import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
 5 import com.alibaba.rocketmq.common.message.MessageExt;
 6 
 7 
 8 /**
 9  * 未決事務,服務器回查客戶端
10  */
11 public class TransactionCheckListenerImpl implements TransactionCheckListener {
12     private AtomicInteger transactionIndex = new AtomicInteger(0);
13 
14 
15     @Override
16     public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
17         System.out.println("server checking TrMsg " + msg.toString());
18 
19         int value = transactionIndex.getAndIncrement();
20         if ((value % 6) == 0) {
21             throw new RuntimeException("Could not find db");
22         }
23         else if ((value % 5) == 0) {
24             return LocalTransactionState.ROLLBACK_MESSAGE;
25         }
26         else if ((value % 4) == 0) {
27             return LocalTransactionState.COMMIT_MESSAGE;
28         }
29 
30         return LocalTransactionState.UNKNOW;
31     }
32 }

 

到這就完了,為什么只介紹發送不介紹接收呢?因為一旦消息提交到MQ就不用管了, 要相信MQ會把消息送達consumer,如果消息未能被成功消費的話,那么Producer也會回滾

 

如何保證分布式系統的全局性事務?

因為阿里在3.2.6版本后,砍掉了消息回查的功能,也就是consumer端是否成功消費,Producer端並不知道,所以如果要保證全局性事務,我們要有自己的實現機制:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM