今天的博客有點多,因為前幾天一直用筆記錄,今天都補上了。后面的博客先停一段時間,后面還有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等內容,只是因為最近准備跳槽,停更一段時間,等到新公司后再繼續更新。
場景1:支付寶轉1w到余額寶,支付寶扣了1w,服務掛了怎么辦?余額還沒有加上
場景2:訂單系統和庫存系統如何保持一致
如果是本地的話很好解決
- begin transaction
update 支付寶 - 1w;
update 余額寶 + 1W;
end transaction - 用Spring的話,方法上加 @Transaction注釋
那如果是跨系統的呢?該如何解決?
有一種思路是這樣的:
- client發送轉賬請求給事務協調器
- 事務協調器先發送扣款請求給支付寶,返回執行結果(這時並沒有提交)
- 事務協調器在發送加款請求給余額寶,返回執行結果(這時也沒有提交)
- 事務協調器看兩個執行結果都返回OK 就執行第四步,提交2和3沒有提交的更新請求。
但是這個有個問題,那就是性能很受影響,主要卡在事務協調器這里。
RocketMQ的實現方式如下(圖片來自網絡):
支付寶先生成 扣款信息 --> 消息隊列 --> 余額寶消費消息
發送消息:
1 import com.alibaba.rocketmq.client.exception.MQClientException; 2 import com.alibaba.rocketmq.client.producer.SendResult; 3 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 4 import com.alibaba.rocketmq.client.producer.TransactionMQProducer; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 發送事務消息例子 10 * 11 */ 12 public class TransactionProducer { 13 public static void main(String[] args) throws MQClientException, InterruptedException { 14 15 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); 16 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); 17 // 事務回查最小並發數 18 producer.setCheckThreadPoolMinSize(2); 19 // 事務回查最大並發數 20 producer.setCheckThreadPoolMaxSize(2); 21 // 隊列數 22 producer.setCheckRequestHoldMax(2000); 23 producer.setTransactionCheckListener(transactionCheckListener); 24 producer.start(); 25 26 String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; 27 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); 28 for (int i = 0; i < 100; i++) { 29 try { 30 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, 31 ("Hello RocketMQ " + i).getBytes()); 32 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); 33 System.out.println(sendResult); 34 35 Thread.sleep(10); 36 } 37 catch (MQClientException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 for (int i = 0; i < 100000; i++) { 43 Thread.sleep(1000); 44 } 45 46 producer.shutdown(); 47 48 } 49 }
執行本地事務
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; 4 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 執行本地事務 10 */ 11 public class TransactionExecuterImpl implements LocalTransactionExecuter { 12 private AtomicInteger transactionIndex = new AtomicInteger(1); 13 14 15 @Override 16 public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { 17 int value = transactionIndex.getAndIncrement(); 18 19 if (value == 0) { 20 throw new RuntimeException("Could not find db"); 21 } 22 else if ((value % 5) == 0) { 23 return LocalTransactionState.ROLLBACK_MESSAGE; 24 } 25 else if ((value % 4) == 0) { 26 return LocalTransactionState.COMMIT_MESSAGE; 27 } 28 29 return LocalTransactionState.UNKNOW; 30 } 31 }
服務器回查客戶端(這個功能在開源版本中已經被咔掉了,但是我們還是要寫,不然報錯)
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 4 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 5 import com.alibaba.rocketmq.common.message.MessageExt; 6 7 8 /** 9 * 未決事務,服務器回查客戶端 10 */ 11 public class TransactionCheckListenerImpl implements TransactionCheckListener { 12 private AtomicInteger transactionIndex = new AtomicInteger(0); 13 14 15 @Override 16 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { 17 System.out.println("server checking TrMsg " + msg.toString()); 18 19 int value = transactionIndex.getAndIncrement(); 20 if ((value % 6) == 0) { 21 throw new RuntimeException("Could not find db"); 22 } 23 else if ((value % 5) == 0) { 24 return LocalTransactionState.ROLLBACK_MESSAGE; 25 } 26 else if ((value % 4) == 0) { 27 return LocalTransactionState.COMMIT_MESSAGE; 28 } 29 30 return LocalTransactionState.UNKNOW; 31 } 32 }
到這就完了,為什么只介紹發送不介紹接收呢?因為一旦消息提交到MQ就不用管了, 要相信MQ會把消息送達consumer,如果消息未能被成功消費的話,那么Producer也會回滾
如何保證分布式系統的全局性事務?
因為阿里在3.2.6版本后,砍掉了消息回查的功能,也就是consumer端是否成功消費,Producer端並不知道,所以如果要保證全局性事務,我們要有自己的實現機制: