一個服務發送一個消息給 MQ,即消息中間件,比如 RocketMQ、RabbitMQ、Kafka、ActiveMQ 等等。然后,另一個服務從 MQ 消費到一條消息后進行處理。這就成了基於 MQ 的異步調用。
一、可靠消息最終一致性方案的核心流程
1、上游服務投遞消息
如果要實現可靠消息最終一致性方案,一般可以自己寫一個可靠消息服務,實現一些業務邏輯。
首先,上游服務需要發送一條消息給可靠消息服務。這條消息一般就是對下游一個服務接口的調用,里面包含了對應的一些請求參數。
然后,可靠消息服務就得把這條消息存儲到自己的數據庫里面取,狀態為“待確認”。
接着,上游服務就可以執行自己本地的數據庫操作,根據自己的執行結果,再次調用可靠消息服務接口。
如果本地數據庫操作執行成功了,那么就找可靠消息服務確認那條消息。如果本地數據庫操作失敗了,那么就找可靠消息服務刪除那條消息。
此時如果是確認消息,那么可靠消息服務就把數據庫里的消息狀態更新為“已發送”,同時將消息發送給 MQ。
這里有一個很關鍵的點,就是更新數據庫里的消息狀態和投遞消息到 MQ。這倆操作,需要放在一個方法里,且開啟本地事務。
1)如果數據庫更新消息的狀態失敗了,那么就拋異常退出,就別投遞到 MQ。
2)如果投遞 MQ 失敗報錯了,那么就要拋異常讓本地數據庫事務回滾。
3)這倆操作必須一起成功,或者一起失敗。
2、下游服務接收消息
下游服務就一直等着從 MQ 消費消息就好了,如果消費到了消息,就操作本地數據庫。
如果操作成功了。就反過來通知可靠消息服務,說自己處理成功了,然后可靠消息服務就會把消息狀態設置為“已完成”。
3、確保上游服務對消息的100%可靠投遞
上面的流程的一個問題是,如果在上述投遞消息的過程中各個環節出現了問題該怎么辦?
1)如果上游服務給消息服務發送待確認消息的過程出錯來了,那上游服務感知到調用異常,就不會執行下面的流程了。
2)如果上游服務操作完本地數據庫之后,通知可靠消息服務確認消息或者刪除消息的時候,出現了問題:比如沒通知成功,或者沒有執行很高,或者是可靠消息服務器沒成功的投遞到 MQ。這些的情況下,可靠消息服務的數據庫里的狀態會一直是“待確認”。此時,我們可以在可靠消息服務里開發一個后台定時運行的線程,不停的檢查各個消息的狀態。如果一直是“待確認”狀態,就認為這個消息出了點什么問題。此時可以回調上游服務提供的接口,問問這個老是這個狀態對應的數據執行成功沒有,如果是執行成功了就將消息的發送狀態改成“已發送”,同時投遞消息到 MQ,不過這種情況下更多的可能是沒有執行成功,此時將可靠消息服務將數據庫中的這條消息刪除即可。
通過以上兩步,可以保證可靠消息服務一定會嘗試完成消息到 MQ 的投遞。
4、保證下游服務對消息的 100% 可靠接收
如果下游服務消費消息時出了問題,沒有消費到,或者是下游服務對消息的處理失敗了,怎么辦?
其實也沒有關系,在可靠消息服務里開發一個后台線程,不斷的檢查消息狀態。如果消息狀態一直是“已發送”,始終沒有變成“已完成”,那就說明下游服務始終沒有處理成功。此時可靠消息服務就可以再次嘗試重新投遞消息到 MQ,讓下游服務再次處理。只要下游服務的接口邏輯實現冪等性,保證多次處理一個消息,不會插入重復數據即可。
5、基於 MQ 實現可靠消息最終一致性方案
在上面的通用方案里,完全依賴可靠消息服務的各種自檢機制來確保:
1)如果上游服務的數據庫操作沒有成功,下游服務是不會收到任何通知的
2)如果上游服務的數據庫操作成功了,可靠消息服務會確保將一個調用消息投遞給下游服務,而且一定會確保下游服務一定會成功處理這條消息。
通過這套機制,保證了基於 MQ 的異步調用/通知的服務間的分布式事務保障。
其中,阿里開源的 RocketMQ,就是實現了可靠消息服務的所有功能,核心思想跟上面類似。只是 RocketMQ 為了保證高並發、高可用、高性能,做了比較復雜的架構實現,非常優秀。。。
二、可靠消息最終一致性方案的高可用保障生產實踐
實際落地生產的時候,如果沒有高並發場景的,完全可以參照上面的思路自己基於某個 MQ 中間件開發一個可靠消息服務,如果有高並發場景的,可以用 RocketMQ 的分布式事務支持,上面的那套流程都可以實現。
這套方案里保障高可用性最大的一個依賴點,就是 MQ 的高可用。
任何一種 MQ 中間件都有一整套的高可用保障機制,無論是 RabbitMQ、RocketMQ 還是 Kafka。
如果 MQ 集群整體故障,完全不可用時,就會導致業務系統的各個服務之間無法通過 MQ 來投遞消息,導致業務流程中斷。
MQ 服務不可用時,服務降級怎么做?
1、故障感知
比如,連續10次嘗試投遞 MQ 都是異常出錯,網絡無法連接等問題,說明 MQ 故障不可用,此時觸發降級開關。然后根據降級開關來判斷這次是寫 MQ 還是寫 Redis。
2、基於KV存儲中隊列的降級方案
用 Redis 作為消息繼續投遞的替代品。
3、下游服務消費 MQ 的降級感知
下游服務消費 MQ 也是通過判斷降級開關是不是打開了,來判斷自己是從 MQ 消費數據還是從 Redis 取數據。
4、故障恢復
如果降級開關打開以后,需要每隔一段時間嘗試給 MQ 投遞一個消息,以判斷其是否已經恢復。
如果 MQ 已經恢復可以正常投遞消息,此時就可以通過關閉降級開關,然后消息繼續投遞到 MQ,下游服務在確認 KV 存儲的各個隊列中已經沒有數據之后,就可以重新切換為從 MQ 消費消息。
5、更多
上面說的那套方案是通用的降級方案,具體可以根據業務特點來設計。
比如在投入 MQ 時,盡可能的確保數據符合規范,可以被下游服務正確消費,否則下游服務會一直出錯。這個就像是 Redis 的事務原理一樣,保證數據的正確性,那他就可以大概率的被成功執行,增加 redis 事務的成功率。
比如在服務降級時可以根據業務和代碼特點做開關,開關可以用 zookeeper 監聽,可以每次使用前從 redis 獲取降級開關值。
各種步驟,其實可以達到相同目的的,都可以 “平替”。
摘自於:https://blog.csdn.net/weixin_30978387/article/details/112166141