MQ 分布式事務 -- 微服務應用


1、背景

友情鏈接:https://www.cnblogs.com/Agui520/p/11187972.html

    https://blog.csdn.net/fd2025/article/details/79863390

以支付、電商下單為例子。一個電商系統包含了好幾大類模塊,就比如有用戶模塊、商品模塊、庫存模塊、訂單模塊、支付模塊、物流模塊,活動模塊等,以下就先列舉幾個最基礎常見的模塊(用戶模塊、商品模塊、庫存模塊、訂單模塊、支付模塊)。

用戶流程如下:

 

2、問題

如果系統規模較小,數據表都在一個數據庫實例上,項目服務端也都在同一個項目,那上面的問題基本不是問題,直接用本地事務(一致性,原子性、隔離性、持久性)解決,比如支付轉賬(A->B)模塊肯定會出現A賬戶減少,B賬戶增加,程序操作加個事務管理就解決。但是如果系統規模較大,比如支付寶賬戶表和余額寶賬戶表顯然不會在同一個數據庫實例上,他們往往分布在不同的物理節點上,又比如商品模塊,訂單模塊不會在同一個數據庫中或是在同一個項目中,這時本地事務就已經失去用武之地了。

3、場景

1.場景1

支付寶轉1萬元到余額寶,如果支付寶扣除1萬后,如果系統掛掉了,余額寶並沒有增加1萬,數據出現不一致情況。

2.場景2

電商系統中,當有用戶下單后,除了在訂單表插入一條記錄外,對應商品表的這個商品數量必須 減1,怎么保證??在 搜索廣告系統中,當用戶點擊某廣告后,除了在點擊事件表中 增加一條
記錄外 ,還得去商家賬戶表中找到這個商家扣除廣告費吧,怎么保證??

不拆分服務最常見的解決方案:

本地事務:

Begin transaction
update A set amount = amount - 10000 where userId = 1
update B set amount = amount + 10000 where userId = 1
End transaction
commit;

4、MQ實現分布式事務

當 支付寶賬戶扣除1萬后,我們只要生成一個憑證(消息)即可,這個憑證(消息)上寫着“讓余額寶賬戶增加一萬”,只要這個憑證(消息)能可靠保證,我們最終是可以拿着這個憑證(消息)讓余額寶賬戶 增加1萬的,即我們能依靠這個憑證(消息)完成最終一致性。

4.1 如何可靠保存憑證

業務與消息耦合的方式

支付寶在完成扣款的同時,同時記錄消息數據,這個消息數據與業務數據保存在同一數據庫實例里(消息記錄表名為message).

Begin transtration

update A set amount = amount -10000 where userId = 1;

insert into message(userId,amount,status) values (1,10000,1);

End transaction

commit;
上述事務能保證只要支付寶賬戶里被扣了錢,消息一定能保存下來。

當上述事務提交成功后,我們通過實時消息服務將此消息通知余額寶,余額寶處理成功后發送回復成功消息后,支付寶收到回復后刪除該條消息數據。

業務與消息解耦

為了解耦,可以采取以下方式:

1、支付寶在扣款事務提交之前,向實時消息服務請求發送消息,實時消息服務只記錄消息數據,而不真正發送(只是知道有這一條消息),只有消息發送成功后才會提交事務。

 

//支付寶 - 10000 (業務需求)

//先把(支付寶-10000)封裝成一個消息(new Message()))

//然后把這個消息提交到MQ服務器上send(producer.send(new Message(),callback(里面處理本地事務)))

//在callback處理本地事務:在callback方法里:

update A set amount = amount - 10000 where userId = 1;

..............

//當本地事務操作完成了以后

1.要么成功:(給MQ一個標識:COMMIT)

2.要么失敗:(給MQ一個標識:ROLLBACK)
2. 當支付寶扣款事務被提交成功后,向實時 消息服務確認發送,只有在得到確認發送指令后,實時消息服務才真正發送該消息。

3. 當支付寶扣款事務提交失敗回滾后,向實時 消息服務取消發送。在得到取消發送指令后, 該消息將不會被發送。

4. 對於那些未確認的消息或者取消的消息,需要有一個消息狀態確認系統定時去支付寶系統查詢這個消息的 狀態進行更新。為什么需要這一個步驟:假設在支付寶扣款事務被成功提交后,系統掛了,此時消息狀態並未被更新為“確認發送”,從而導致消息不能被發送。

優點:消息數據獨立存儲 ,降低業務系統與消息系統之間的耦合。

缺點:一次消息發送需要兩次請求;業務處理服務需要實現消息狀態回查接口

 

 

 綜合上述的描述,RabbitMQ 做了這么三件事:

1,先發送需要發送的消息到消息中間件broker,並獲取到該message的 transactionId。在第一次發送的時候,該消息的狀態為LocalTransactionState.UNKNOW
2,處理本地事物。
3,根據本地事物的執行結果,結合 transactionId,找到該消息的位置,在mq中標志該消息的最終處理結果。
 

5、Rabbit MQ 介紹

相關鏈接:https://www.cnblogs.com/duanxz/p/3542320.html

 

RabbitMQ的結構圖如下:

 

1、幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。

    Exchange類型

    A. direct exchange:將與routing key 比配的消息,直接推入相對應的隊列,創建隊列時,默認就創建同名的routing key。

    B. fanout exchange:是一種廣播模式,忽略routingkey的規則。

    C. topic exchange:應用主題,根據key進行模式匹配路由,例如:若為abc*則推入到所有abc*相對應的queue;若為abc.#則推入到abc.xx.one ,abc.yy.two對應的queue。


Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。是基於Connection之上建立通信通道,因為每次Connection建立TCP協議通信開銷及性能消耗較大,所以一次建立Connection后,使用多個Channel通道通信減少開銷和提高性能。

 

2、消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符 號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還 有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

 

 

 

6、代碼實現

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM