RabbitMQ消息最終一致性解決方案
隨着分布式服務架構的流行與普及,原來在單體應用中執行的多個邏輯操作,現在被拆分成了多個服務之間的遠程調用。雖然服務化為我們的系統帶來了水平伸縮的能力,然而隨之而來挑戰就是分布式事務問題,多個服務之間使用自己單獨維護的數據庫,它們彼此之間不在同一個事務中,假如A執行成功了,B執行卻失敗了,而A的事務此時已經提交,無法回滾,那么最終就會導致兩邊數據不一致性的問題;盡管很早之前就有基於兩階段提交的XA分布式事務,但是這類方案因為需要資源的全局鎖定,導致性能極差;因此后面就逐漸衍生出了消息最終一致性、TCC等柔性事務的分布式事務方案,本文主要分析的是基於消息的最終一致性方案。
1.普通消息的處理流程
MQ消息最終一致性解決方案:
消息生成者發送消息
MQ收到消息,將消息進行持久化,在存儲中新增一條記錄
返回ACK給生產者
MQ push 消息給對應的消費者,然后等待消費者返回ACK
如果消息消費者在指定時間內成功返回ack,那么MQ認為消息消費成功,在存儲中刪除消息,即執行第6步;如果MQ在指定時間內沒有收到ACK,則認為消息消費失敗,會嘗試重新push消息,重復執行4、5、6步驟
MQ刪除消息
1.1 普通消息處理存在的一致性問題
我們以訂單創建為例,訂單系統先創建訂單(本地事務),再發送消息給下游處理;如果訂單創建成功,然而消息沒有發送出去,那么下游所有系統都無法感知到這個事件,會出現臟數據;
public void processOrder() { // 訂單處理(業務操作) orderService.process(); // 發送訂單處理成功消息(發送消息) sendBizMsg (); }
如果先發送訂單消息,再創建訂單;那么就有可能消息發送成功,但是在訂單創建的時候卻失敗了,此時下游系統卻認為這個訂單已經創建,也會出現臟數據。
public void processOrder() { // 發送訂單處理成功消息(發送消息) sendBizMsg (); // 訂單處理(業務操作) orderService.process(); }
1.2 一個錯誤的想法
此時可能有同學會想,我們可否將消息發送和業務處理放在同一個本地事務中來進行處理,如果業務消息發送失敗,那么本地事務就回滾,這樣是不是就能解決消息發送的一致性問題呢?
@Transactionnal public void processOrder() { try{ // 訂單處理(業務操作) orderService.process(); // 發送訂單處理成功消息(發送消息) sendBizMsg (); }catch(Exception e){ 事務回滾; } }
1.3 消息發送的異常情況分析
可能的情況 一致性 訂單處理成功,然后突然宕機,事務未提交,消息沒有發送出去 一致 訂單處理成功,由於網絡原因或者MQ宕機,消息沒有發送出去,事務回滾 一致 訂單處理成功,消息發送成功,但是MQ由於其他原因,導致消息存儲失敗,事務回滾 一致 訂單處理成功,消息存儲成功,但是MQ處理超時,從而ACK確認失敗,導致發送方本地事務回滾 不一致 從上面的情況分析,我們可以看到,使用普通的處理方式,無論如何,都無法保證業務處理與消息發送兩邊的一致性,其根本的原因就在於:遠程調用,結果最終可能為成功、失敗、超時;而對於超時的情況,處理方最終的結果可能是成功,也可能是失敗,調用方是無法知曉的。 筆者就曾經在項目中出現類似的情況,調用方先在本地寫數據,然后發起RPC服務調用,但是處理方由於DB數據量比較大,導致處理超時,調用方在出現超時異常后,直接回滾本地事務,從而導致調用方這邊沒數據,而處理方那邊數據卻已經寫入了,最終導致兩邊業務數據的不一致。為了保證兩邊數據的一致性,我們只能從其他地方尋找新的突破口。
事務消息:
由於傳統的處理方式無法解決消息生成者本地事務處理成功與消息發送成功兩者的一致性問題,因此事務消息就誕生了,它實現了消息生成者本地事務與消息發送的原子性,保證了消息生成者本地事務處理成功與消息發送成功的最終一致性問題。
事務消息處理的流程:
MQ消息最終一致性解決方案
事務消息與普通消息的區別就在於消息生產環節,生產者首先預發送一條消息到MQ(這也被稱為發送half消息)
MQ接受到消息后,先進行持久化,則存儲中會新增一條狀態為待發送的消息
然后返回ACK給消息生產者,此時MQ不會觸發消息推送事件
生產者預發送消息成功后,執行本地事務
執行本地事務,執行完成后,發送執行結果給MQ
MQ會根據結果刪除或者更新消息狀態為可發送
如果消息狀態更新為可發送,則MQ會push消息給消費者,后面消息的消費和普通消息是一樣的
注意點:由於MQ通常都會保證消息能夠投遞成功,因此,如果業務沒有及時返回ACK結果,那么就有可能造成MQ的重復消息投遞問題。因此,對於消息最終一致性的方案,消息的消費者必須要對消息的消費支持冪等,不能造成同一條消息的重復消費的情況。
3.1 事務消息異常情況分析
異常情況 一致性 處理異常方法 消息未存儲,業務操作未執行 一致 無 存儲待發送消息成功,但是ACK失敗,導致業務未執行(可能是MQ處理超時、網絡抖動等原因) 不一致 MQ確認業務操作結果,處理消息(刪除消息) 存儲待發送消息成功,ACK成功,業務執行(可能成功也可能失敗),但是MQ沒有收到生產者業務處理的最終結果 不一致 MQ確認業務操作結果,處理消息(根據就業務處理結果,更新消息狀態,如果業務執行成功,則投遞消息,失敗則刪除消息) 業務處理成功,並且發送結果給MQ,但是MQ更新消息失敗,導致消息狀態依舊為待發送 不一致 同上
支持事務消息的MQ:
現在目前較為主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事務消息。據筆者了解,早年阿里對MQ增加事務消息也是因為支付寶那邊因為業務上的需求而產生的。因此,如果我們希望強依賴一個MQ的事務消息來做到消息最終一致性的話,在目前的情況下,技術選型上只能去選擇RocketMQ來解決。上面我們也分析了事務消息所存在的異常情況,即MQ存儲了待發送的消息,但是MQ無法感知到上游處理的最終結果。對於RocketMQ而言,它的解決方案非常的簡單,就是其內部實現會有一個定時任務,去輪訓狀態為待發送的消息,然后給producer發送check請求,而producer必須實現一個check監聽器,監聽器的內容通常就是去檢查與之對應的本地事務是否成功(一般就是查詢DB),如果成功了,則MQ會將消息設置為可發送,否則就刪除消息。
常見的問題:
(1)問:如果預發送消息失敗,是不是業務就不執行了?
答:是的,對於基於消息最終一致性的方案,一般都會強依賴這步,如果這個步驟無法得到保證,那么最終也 就不可能做到最終一致性了。
(2)問:為什么要增加一個消息預發送機制,增加兩次發布出去消息的重試機制,為什么不在業務成功之后,發送失敗的話使用一次重試機制?
答:如果業務執行成功,再去發消息,此時如果還沒來得及發消息,業務系統就已經宕機了,系統重啟后,根本沒有記錄之前是否發送過消息,這樣就會導致業務執行成功,消息最終沒發出去的情況。
(3)問:如果consumer消費失敗,是否需要producer做回滾呢?
答:這里的事務消息,producer不會因為consumer消費失敗而做回滾,采用事務消息的應用,其所追求的是高可用和最終一致性,消息消費失敗的話,MQ自己會負責重推消息,直到消費成功。因此,事務消息是針對生產端而言的,而消費端,消費端的一致性是通過MQ的重試機制來完成的。
(4)問:如果consumer端因為業務異常而導致回滾,那么豈不是兩邊最終無法保證一致性?
答:基於消息的最終一致性方案必須保證消費端在業務上的操作沒障礙,它只允許系統異常的失敗,不允許業務上的失敗,比如在你業務上拋出個NPE之類的問題,導致你消費端執行事務失敗,那就很難做到一致了。
由於並非所有的MQ都支持事務消息,假如我們不選擇RocketMQ來作為系統的MQ,是否能夠做到消息的最終一致性呢?答案是可以的。
基於本地消息的最終一致性
MQ消息最終一致性解決方案
基於本地消息的最終一致性方案的最核心做法就是在執行業務操作的時候,記錄一條消息數據到DB,並且消息數據的記錄與業務數據的記錄必須在同一個事務內完成,這是該方案的前提核心保障。在記錄完成后消息數據后,后面我們就可以通過一個定時任務到DB中去輪訓狀態為待發送的消息,然后將消息投遞給MQ。這個過程中可能存在消息投遞失敗的可能,此時就依靠重試機制來保證,直到成功收到MQ的ACK確認之后,再將消息狀態更新或者消息清除;而后面消息的消費失敗的話,則依賴MQ本身的重試來完成,其最后做到兩邊系統數據的最終一致性。基於本地消息服務的方案雖然可以做到消息的最終一致性,但是它有一個比較嚴重的弊端,每個業務系統在使用該方案時,都需要在對應的業務庫創建一張消息表來存儲消息。針對這個問題,我們可以將該功能單獨提取出來,做成一個消息服務來統一處理,因而就衍生出了我們下面將要討論的方案。
獨立消息服務的最終一致性
MQ消息最終一致性解決方案
獨立消息服務最終一致性與本地消息服務最終一致性最大的差異就在於將消息的存儲單獨地做成了一個RPC的服務,這個過程其實就是模擬了事務消息的消息預發送過程,如果預發送消息失敗,那么生產者業務就不會去執行,因此對於生產者的業務而言,它是強依賴於該消息服務的。不過好在獨立消息服務支持水平擴容,因此只要部署多台,做成HA的集群模式,就能夠保證其可靠性。在消息服務中,還有一個單獨地定時任務,它會定期輪訓長時間處於待發送狀態的消息,通過一個check補償機制來確認該消息對應的業務是否成功,如果對應的業務處理成功,則將消息修改為可發送,然后將其投遞給MQ;如果業務處理失敗,則將對應的消息更新或者刪除即可。因此在使用該方案時,消息生產者必須同時實現一個check服務,來供消息服務做消息的確認。對於消息的消費,該方案與上面的處理是一樣,都是通過MQ自身的重發機制來保證消息被消費。
總結:上游事務提交之后,在基於MQ的場景下就不考慮回滾了。失敗的可能是由於網絡、服務宕機所導致,文章中提到說業務上執行是無障礙的。如果下游服務長時間沒有恢復,那么就應該設置告警,在這里有幾種機制來解決一些牛皮癬類型的問題,假如上游消息始終發送失敗(這種可能性基本不存在除非代碼是假的)這種情況我們可以設置報警機制比如發生異常時可以打印日志,發送短信,發送郵件,將異常訂單保存到數據庫,這些措施可以同時用於下游一些異常訂單,同時也可以在發生異常的時候新建一個異常Topic的消息提示,讓人工來介入數據訂正。實際操作和這篇文章類似:SpringBoot+RabbitMQ (保證消息100%投遞成功並被消費)
如何解決消息冪等性: MQ實現消息的冪等性
如何保證消息被投遞成功:消息如何保障投遞成功