一、分布式事務的概念
1,什么是事務
事務可以看做是一次大的活動,它由不同的小活動組成,這些活動要么全部成功,要么全部失敗。
2,本地事務
- A(Atomic):原子性,構成事務的所有操作,要么都執行完成,要么全部不執行,不可能出現部分成功部分失敗的情況。
- C(Consistency):一致性,在事務執行前后,數據庫的一致性約束沒有被破壞。比如:張三向李四轉100元,轉賬前和轉賬后的數據是正確狀態這叫一致性,如果出現張三轉出100元,李四賬戶沒有增加100元這就出現了數據錯誤,就沒有達到一致性。
- I(Isolation):隔離性,數據庫中的事務一般都是並發的,隔離性是指並發的兩個事務的執行互不干擾,一個事務不能看到其他事務運行過程的中間狀態。通過配置事務隔離級別可以避臟讀、重復讀等問題。
- D(Durability):持久性,事務完成之后,該事務對數據的更改會被持久化到數據庫,且不會被回滾。
數據庫事務在實現時會將一次事務涉及的所有操作全部納入到一個不可分割的執行單元,該執行單元中的所有操作要么都成功,要么都失敗,只要其中任一操作執行失敗,都將導致整個事務的回滾。
3,分布式事務
分布式系統會把一個應用系統拆分為可獨立部署的多個服務,因此需要服務與服務之間遠程協作才能完成事務操作,這種分布式系統環境下由不同的服務之間通過網絡遠程協作完成事務稱之為分布式事務,例如用戶注冊送積分事務、創建訂單減庫存事務,銀行轉賬事務等都是分布式事務。
4,分布式事務產生的場景
- 典型的場景就是微服務架構,微服務之間通過遠程調用完成事務操作。比如:訂單微服務和庫存微服務,下單的同時訂單微服務請求庫存微服務減庫存。 簡言之:跨JVM進程產生分布式事務。
- 單體系統訪問多個數據庫實例,跨數據庫實例產生分布式事務。
- 多服務訪問同一個數據庫實例,比如:訂單微服務和庫存微服務即使訪問同一個數據庫也會產生分布式事務,原因就是跨JVM進程,兩個微服務持有了不同的數據庫鏈接進行數據庫操作,此時產生分布式事務。
二、分布式事務基礎理論
1,CAP理論
a)概念
CAP是 Consistency、Availability、Partition tolerance三個詞語的縮寫,分別表示一致性、可用性、分區容忍性。
b)組合方式
在所有分布式事務場景中不會同時具備CAP三個特性,因為在具備了P的前提下C和A是不能共存的。
- AP:放棄一致性,追求分區容忍性和可用性。這是很多分布式系統設計時的選擇。Eureka集群就是采用的AP設計思想。
- CP:放棄可用性,追求一致性和分區容錯性。zookeeper集群。
- CA:放棄分區容忍性,即不進行分區,不考慮由於網絡不通或結點掛掉的問題,則可以實現一致性和可用性。那么系統將不是一個標准的分布式系統,我們最常用的關系型數據就滿足了CA。
c)總結
CAP是一個已經被證實的理論:一個分布式系統最多只能同時滿足一致性(Consistency)、可用性(Availability)和分區容忍性(Partition tolerance)這三項中的兩項。它可以作為我們進行架構設計、技術選型的考量標准。對於多數大型互聯網應用的場景,結點眾多、部署分散,而且現在的集群規模越來越大,所以節點故障、網絡故障是常態,而且要保證服務可用性達到N個9(99.99..%),並要達到良好的響應性能來提高用戶體驗,因此一般都會做出如下選擇:保證P和A,舍棄C強一致,保證最終一致性。
2,BASE理論
a)強一致性與最終一致性
- 強一致性:CAP中的一致性要求在任何時間查詢每個結點數據都必須一致,它強調的是強一致性。
- 最終一致性:允許可以在一段時間內每個結點的數據不一致,但是經過一段時間每個結點的數據必須一致,它強調的是最終數據的一致性。
b)概念
BASE 是 Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent (最終一致性)三個短語的縮寫。BASE理論是對CAP中AP的一個擴展,通過犧牲強一致性來獲得可用性,當出現故障允許部分不可用但要保證核心功能可用,允許數據在一段時間內是不一致的,但最終達到一致狀態。滿足BASE理論的事務,我們稱之為“柔性事務”。
- 基本可用:分布式系統在出現故障時,允許損失部分可用功能,保證核心功能可用。如,電商網站交易付款出現問題了,商品依然可以正常瀏覽。
- 軟狀態:由於不要求強一致性,所以BASE允許系統中存在中間狀態(也叫軟狀態),這個狀態不影響系統可用性,如訂單的“支付中”、“數據同步中”等狀態,待數據最終一致后狀態改為“成功”狀態。
- 最終一致性:最終一致是指經過一段時間后,所有節點數據都將會達到一致。如訂單的"支付中"狀態,最終會變為“支付成功”或者"支付失敗",使訂單狀態與實際交易結果達成一致,但需要一定時間的延遲、等待。
三、解決方案之2PC
1,什么是2PC
2PC即兩階段提交協議,是將整個事務流程分為兩個階段,准備階段(Prepare phase)、提交階段(commit phase),2是指兩個階段,P是指准備階段,C是指提交階段。
- 准備階段(Prepare phase):事務管理器給每個參與者發送Prepare消息,每個數據庫參與者在本地執行事務,並寫本地的Undo/Redo日志,此時事務沒有提交。(Undo日志是記錄修改前的數據,用於數據庫回滾,Redo日志是記錄修改后的數據,用於提交事務后寫入數據文件)
-
提交階段(commit phase):如果事務管理器收到了參與者的執行失敗或者超時消息時,直接給每個參與者發送回滾(Rollback)消息;否則,發送提交(Commit)消息;參與者根據事務管理器的指令執行提交或者回滾操作,並釋放事務處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
成功情況:
失敗情況:
2,解決方案之XA
2PC的傳統方案是在數據庫層面實現的,如Oracle、MySQL都支持2PC協議,為了統一標准減少行業內不必要的對接成本,需要制定標准化的處理模型及接口標准,國際開放標准組織Open Group定義了分布式事務處理模型DTP(Distributed Transaction Processing Reference Model)。
整個2PC的事務流程涉及到三個角色AP、RM、TM。AP指的是使用2PC分布式事務的應用程序;RM指的是資源管理器,它控制着分支事務;TM指的是事務管理器,它控制着整個全局事務。
3,解決方案之Seata
a)seata的設計思想
Seata的設計目標其一是對業務無侵入,因此從業務無侵入的2PC方案着手,在傳統2PC的基礎上演進,並解決2PC方案面臨的問題。
Seata把一個分布式事務理解成一個包含了若干分支事務的全局事務。全局事務的職責是協調其下管轄的分支事務達成一致,要么一起成功提交,要么一起失敗回滾。此外,通常分支事務本身就是一個關系數據庫的本地事務,下圖是全局事務與分支事務的關系圖:
與 傳統2PC 的模型類似,Seata定義了3個組件來協議分布式事務的處理過程:
-
Transaction Coordinator (TC): 事務協調器,它是獨立的中間件,需要獨立部署運行,它維護全局事務的運行狀態,接收TM指令發起全局事務的提交與回滾,負責與RM通信協調各各分支事務的提交或回滾。
-
Transaction Manager (TM): 事務管理器,TM需要嵌入應用程序中工作,它負責開啟一個全局事務,並最終向TC發起全局提交或全局回滾的指令。
-
Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器TC的指令,驅動分支(本地)事務的提交和回滾。
b)Seata的執行流程
- 用戶服務的 TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的XID。
- 用戶服務的 RM 向 TC 注冊 分支事務,該分支事務在用戶服務執行新增用戶邏輯,並將其納入 XID 對應全局事務的管轄。
- 用戶服務執行分支事務,向用戶表插入一條記錄。
- 邏輯執行到遠程調用積分服務時(XID 在微服務調用鏈路的上下文中傳播)。積分服務的RM 向 TC 注冊分支事務,該分支事務執行增加積分的邏輯,並將其納入 XID 對應全局事務的管轄。
- 積分服務執行分支事務,向積分記錄表插入一條記錄,執行完畢后,返回用戶服務。
- 用戶服務分支事務執行完畢。
- TM 向 TC 發起針對 XID 的全局提交或回滾決議。
- TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。
c)Seata的具體實現
詳情見:Spring Cloud Alibaba Seata
4,Seata與傳統2PC
-
架構層次方面,傳統2PC方案的 RM 實際上是在數據庫層,RM 本質上就是數據庫自身,通過 XA 協議實現,而Seata的 RM 是以jar包的形式作為中間件層部署在應用程序這一側的。
-
兩階段提交方面,傳統2PC無論第二階段的決議是commit還是rollback,事務性資源的鎖都要保持到Phase2完成才釋放。而Seata的做法是在Phase1 就將本地事務提交,這樣就可以省去Phase2持鎖的時間,整體提高效率。
四、解決方案之TCC
1,什么是TCC
TCC是Try、Confirm、Cancel三個詞語的縮寫,TCC要求每個分支事務實現三個操作:預處理Try、確認Confirm、撤銷Cancel。Try操作做業務檢查及資源預留,Confirm做業務確認操作,Cancel實現一個與Try相反的操作即回滾操作。TM首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM將會發起所有分支事務的Cancel操作,若try操作全部成功,TM將會發起所有分支事務的Confifirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。
成功情況:
失敗情況:
TCC分為三個階段:
-
Try 階段是做業務檢查(一致性)及資源預留(隔離),此階段僅是一個初步操作,它和后續的Confirm 一起才能真正構成一個完整的業務邏輯。
-
Confirm 階段是做確認提交,Try階段所有分支事務執行成功后開始執行 Confirm。通常情況下,采用TCC則認為 Confifirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。若Confirm階段真的出錯了,需引入重試機制或人工處理。
-
Cancel 階段是在業務執行錯誤需要回滾的狀態下執行分支事務的業務取消,預留資源釋放。通常情況下,采用TCC則認為Cancel階段也是一定成功的。若Cancel階段真的出錯了,需引入重試機制或人工處理。
2,TCC解決方案
框架名稱 | Github地址 |
tcc-transaction
|
|
Hmily | |
ByteTCC
|
|
EasyTransaction
|
3,TCC需要注意的問題
a)空回滾
在沒有調用 TCC 資源 Try 方法的情況下,調用了二階段的 Cancel 方法,Cancel 方法需要識別出這是一個空回滾,然后直接返回成功。
出現原因:是當一個分支事務所在服務宕機或網絡異常,分支事務調用記錄為失敗,這個時候其實是沒有執行Try階段,當故障恢復后,分布式事務進行回滾則會調用二階段的Cancel方法,從而形成空回滾。
解決方法:識別出這個空回滾。需要知道一階段是否執行,如果執行了,那就是正常回滾;如果沒執行,那就是空回滾。前面已經說過TM在發起全局事務時生成全局事務記錄,全局事務ID貫穿整個分布式事務調用鏈條。再額外增加一張分支事務記錄表,其中有全局事務 ID 和分支事務 ID,第一階段 Try 方法里會插入一條記錄,表示一階段執行了。
//在cancel中cancel空回滾處理,如果try沒有執行,cancel不允許執行 if(accountInfoDao.isExistTry(transId)<=0){ log.info("bank1 空回滾處理,try沒有執行,不允許cancel執行,xid:{}",transId); return ; }
b)冪等
為了保證TCC二階段提交重試機制不會引發數據不一致,要求 TCC 的二階段 Try、Confirm 和 Cancel 接口保證冪等,這樣不會重復使用或者釋放資源。如果冪等控制沒有做好,很有可能導致數據不一致等嚴重問題。
//當前是在try中進行冪等判斷 判斷local_try_log表中是否有try日志記錄,如果有則不再執行 if(accountInfoDao.isExistTry(transId)>0){ log.info("bank1 try 已經執行,無需重復執行,xid:{}",transId); return ; }
c)懸掛
懸掛就是對於一個分布式事務,其二階段 Cancel 接口比 Try 接口先執行。
出現原因:RPC 調用分支事務try時,先注冊分支事務,再執行RPC調用,如果此時 RPC 調用的網絡發生擁堵,通常 RPC 調用是有超時時間的,RPC 超時以后,TM就會通知RM回滾該分布式事務,可能回滾完,RPC 請求才到達參與者真正執行,而一個 Try 方法預留的業務資源。
解決思路:如果二階段執行完成,那一階段就不能再繼續執行。在執行一階段事務時判斷在該全局事務下,“分支事務記錄”表中是否已經有二階段事務記錄,如果有則不執行Try。
//try懸掛處理,如果cancel、confirm有一個已經執行了,try不再執行 if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){ log.info("bank1 try懸掛處理 cancel或confirm已經執行,不允許執行try,xid:{}",transId); return ; }
4,Hmily
項目源碼:cloud-dtx-tcc
a)導入數據庫
sql文件下載地址為:dtx-tcc-sql
b)工程配置
涉及到分布式事務的工程均需要的配置
maven配置
<!-- hmily依賴 --> <dependency> <groupId>org.dromara</groupId> <artifactId>hmily‐springcloud</artifactId> <version>2.0.4‐RELEASE</version> </dependency>
application.yaml中添加hmily
org: dromara: hmily: serializer: kryo recoverDelayTime: 30 retryMax: 30 scheduledDelay: 30 scheduledThreadMax: 10 repositorySupport: db #對於發起方的時候,把此屬性設置為true。參與方為false。 started: true hmilyDbConfig: driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/hmily?useUnicode=true username: root password: 123456
注入hmily的配置Bean
@Bean public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer")); hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap; }
啟動類上添加注解
@ComponentScan({"org.dromara.hmily"})
c)調用方(bank1)實現
try: try冪等校驗 try懸掛處理 檢查余額是夠扣減金額 扣減金額 confirm: 空 cancel cancel冪等校驗 cancel空回滾處理 增加可用余額
注意:遠程調用bank2時,在feign調用的接口上加注解@Hmily
d)參與方(bank2)實現
try: 空 confirm: confirm冪等校驗 正式增加金額 cancel: 空
e)小結
五、解決方案之可靠消息最終一致性
項目源碼:cloud-dtx-txmsg
1,什么是可靠消息最終一致性
可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
可靠消息需要解決的問題:
- 本地事務與消息發送的原子性問題
-
//先發消息如果數據庫操作錯誤,消息已經發送 begin transaction; //1.發送MQ //2.數據庫操作 commit transation; //如果數據庫超時,此時數據庫回滾,但是消息可能也已經發送 begin transaction; //1.數據庫操作 //2.發送MQ commit transation;
- 事務參與方接受消息的可靠性
-
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。
- 消息重復消費的問題
-
由於網絡2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重 復消費。 要解決消息重復消費的問題就要實現事務參與方的方法冪等性。
2,RocketMQ事務消息方案
- Producer 發送事務消息 :Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。
- MQ Server回應消息發送成功 :MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
-
Producer 執行本地事務:Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。
- 消息投遞:若Producer 本地事務執行成功則自動向MQServer發送commit消息,此時MQ訂閱方(積分服務)即正常消費消息;若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息后 將刪除”增加積分消息“ 。 MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即程序執行正常則自動回應ack。
- 事務回查:如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
3,RocketMQ實現可靠消息最終一致性事務
a)SQL
bank1

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` DOUBLE NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES ( 2, '張三的賬戶', '1', '', 10000 ); DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
bank2

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` DOUBLE NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES ( 3, '李四的賬戶', '2', NULL, 0 ); CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
b)安裝RocketMQ
c)工程配置
maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
properties配置
rocketmq.producer.group = producer_bank2 rocketmq.name‐server = 127.0.0.1:9876
d)bank1
Service:AccountInfoServiceImpl
//兩個方法 //1,向mq發送轉賬消息 //2,更新賬戶,扣減金額 (通過事務id保證冪等性)
Controller:AccountInfoController
//生成事務id,調用service的發消息接口
message:ProducerTxmsgListener
//兩個方法executeLocalTransaction和checkLocalTransaction //事務消息發送后的回調方法。此時保證本地事務,調用Service扣減金額同時將消息改為COMMIT(可消費狀態),如果捕獲異常,將消息改為ROLLBACK回滾 //事務回查。查詢是否在調用方已經處理,如果已經處理需修改消息為COMMIT可消費,否則就是UNKOWN狀態。
e)bank2
Service:AccountInfoServiceImpl
//更新賬戶bank2,增加金額。(通過事務id保證冪等性)
message:TxmsgConsumer
//監聽bank1發送的消息topic,調用Service增加金額
4,總結
可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能:
- 本地事務與消息發送的原子性問題。
- 事務參與方接收消息的可靠性。
六、解決方案之最大努力通知
1,什么是最大努力通知
- 有一定的消息重復通知機制。因為接收通知方可能沒有接收到通知,此時要有一定的機制對消息重復通知。
- 消息校對機制。如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息信息來滿足需求。
2,最大努力通知與可靠消息一致性的異同
- 思想不同:可靠消息一致性,發起通知方需要保證將消息發出去,並且將消息發到接收通知方,消息的可靠性關鍵由發起通知方來保證。最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能消息接收不到,此時需要接收通知方主動調用發起通知方的接口查詢業務處理結果,通知的可靠性關鍵在接收通知方。
- 業務場景不同:可靠消息一致性關注的是交易過程的事務一致,以異步的方式完成交易。最大努力通知關注的是交易后的通知事務,即將交易結果可靠的通知出去。
- 技術解決方向不同:可靠消息一致性要解決消息從發出到接收的一致性,即消息發出並且被接收到;最大努力通知無法保證消息從發出到接收的一致性,只提供消息接收的可靠性機制。可靠機制是,最大努力的將消息通知給接收方,當消息無法被接收方接收時,由接收方主動查詢消息(業務處理結果)。
3,解決方案
a)解決方案一:
具體流程:
-
發起通知方將通知發給MQ。使用普通消息機制將通知發給MQ。
-
接收通知方監聽 MQ。
-
接收通知方接收消息,業務處理完成回應ack。
- 接收通知方若沒有回應ack則MQ會重復通知。 MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔 (如果MQ采用rocketMq,在broker中可進行配置),直到達到通知要求的時間窗口上限。
-
接收通知方可通過消息校對接口來校對消息的一致性。
b)解決方案二:
與方案1不同的是應用程序向接收通知方發送通知,如下圖:
具體流程:
- 發起通知方將通知發給MQ:使用可靠消息一致方案中的事務消息保證本地事務與消息的原子性,最終將通知先發給MQ。
- 通知程序監聽 MQ,接收MQ的消息。 通知程序若沒有回應ack則MQ會重復通知。
- 通知程序通過互聯網接口協議(如http、webservice)調用接收通知方案接口,完成通知。 通知程序調用接收通知方案接口成功就表示通知成功,即消費MQ消息成功,MQ將不再向通知程序投遞通知消息。
-
接收通知方可通過消息校對接口來校對消息的一致性。
c)兩種方案比較
- 方案1中接收通知方與MQ接口,即接收通知方案監聽 MQ,此方案主要應用與內部應用之間的通知。
- 方案2中由通知程序與MQ接口,通知程序監聽MQ,收到MQ的消息后由通知程序通過互聯網接口協議調用接收通知方。此方案主要應用於外部應用之間的通知,例如支付寶、微信的支付結果通知。
4,最大努力通知實現
a)sql

CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1_pay` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank1_pay`; /*Table structure for table `account_pay` */ DROP TABLE IF EXISTS `account_pay`; CREATE TABLE `account_pay` ( `id` varchar(64) COLLATE utf8_bin NOT NULL, `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '賬號', `pay_amount` double DEFAULT NULL COMMENT '充值余額', `result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值結果:success,fail', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_pay` */ insert into `account_pay`(`id`,`account_no`,`pay_amount`,`result`) values ('5678ef0a-1ff0-4cfd-97ac-640d749d596f','1',2,'success'),('7d7d469c-f100-4066-b927-014c0c3aa010','1',2,'success'),('947fafad-c19c-46bc-b0f0-43703a124fd4','1',2,'success');
bank1.sql

CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank1`; /*Table structure for table `account_info` */ DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'張三','1',NULL,1000); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
b)工程配置
基本配置同可靠消息一致性
c)pay支付方
Service:AccountPayServiceImpl
//兩個方法 //1,插入充值記錄。生成事務id,將事務id和充值信息發送給MQ隊列 //2,查詢充值記錄。提供給調用方查詢。
Controller:AccountPayController
//直接調用Service中的方法插入充值記錄
d)bank1
Service:AccountInfoServiceImpl
//兩個方法 //1,更新賬戶金額。根據事務id保證更新的冪等性。 //2,遠程調用pay的查詢充值結果。如果發現狀態改變同時更新當前賬號情況。
message:NotifyMsgListener
//監聽消息。調用Service的更新賬戶金額,冪等更新。
Controller:AccountInfoController
//調用Service的查詢充值結果
5,總結
最大努力通知方案是分布式事務中對一致性要求最低的一種,適用於一些最終一致性時間敏感度低的業務;最大努力通知方案需要實現如下功能:
- 消息重復通知機制。
- 消息校對機制。 主動調用接口查詢並修改。
七、四種分布式事務對比
