在消息傳遞過程中,如果出現傳遞失敗的情況,發送方會執行重試,重試的過程中就有可能會產生重復的消息。對使用消息隊列的業務系統來說,如果沒有對重復消息進行處理,就有可能會導致系統的數據出現錯誤。
比如說,一個消費訂單消息,統計下單金額的微服務,如果沒有正確處理重復消息,那就會出現重復統計,導致統計結果錯誤。
你可能會問,如果消息隊列本身能保證消息不重復,那應用程序的實現不就簡單了?那有沒有消息隊列能保證消息不重復呢?
消息重復的情況必然存在
在 MQTT 協議中,給出了三種傳遞消息時能夠提供的服務質量標准,這三種服務質量從低到高依次是:
-
At most once: 至多一次。消息在傳遞時,最多會被送達一次。換一個說法就是,沒什么消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不太高的監控場景使用,比如每分鍾上報一次機房溫度數據,可以接受數據少量丟失。
-
At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。
-
Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。
這個服務質量標准不僅適用於 MQTT,對所有的消息隊列都是適用的。我們現在常用的絕大部分消息隊列提供的服務質量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是這樣。也就是說,消息隊列很難保證消息不重復。
說到這兒我知道肯定有的同學會反駁我:“你說的不對,我看過 Kafka 的文檔,Kafka 是支持 Exactly once 的。”我在這里跟這些同學解釋一下,你說的沒錯,Kafka 的確是支持 Exactly once,但是我講的也沒有問題,為什么呢?
Kafka 支持的“Exactly once”和我們剛剛提到的消息傳遞的服務質量標准“Exactly once”是不一樣的,它是 Kafka 提供的另外一個特性,Kafka 中支持的事務也和我們通常意義理解的事務有一定的差異。在 Kafka 中,事務和 Excactly once 主要是為了配合流計算使用的特性,我們在專欄“進階篇”這個模塊中,會有專門的一節課來講 Kafka 的事務和它支持的 Exactly once 特性。
稍微說一些題外話,Kafka 的團隊是一個非常善於包裝和營銷的團隊,你看他們很巧妙地用了兩個所有人都非常熟悉的概念“事務”和“Exactly once”來包裝它的新的特性,實際上它實現的這個事務和 Exactly once 並不是我們通常理解的那兩個特性,但是你深入了解 Kafka 的事務和 Exactly once 后,會發現其實它這個特性雖然和我們通常的理解不一樣,但確實和事務、Exactly once 有一定關系。
這一點上,我們都要學習 Kafka 團隊。一個優秀的開發團隊,不僅要能寫代碼,更要能寫文檔,能寫 Slide(PPT),還要能講,會分享。對於每個程序員來說,也是一樣的。
我們把話題收回來,繼續來說重復消息的問題。既然消息隊列無法保證消息不重復,就需要我們的消費代碼能夠接受“消息是可能會重復的”這一現狀,然后,通過一些方法來消除重復消息對業務的影響。
用冪等性解決重復消息問題
一般解決重復消息的辦法是,在消費端,讓我們消費消息的操作具備冪等性。
冪等(Idempotence) 本來是一個數學上的概念,它是這樣定義的:
如果一個函數 f(x) 滿足:f(f(x)) = f(x),則函數 f(x) 滿足冪等性。
這個概念被拓展到計算機領域,被用來描述一個操作、方法或者服務。一個冪等操作的特點是,其任意多次執行所產生的影響均與一次執行的影響相同。
一個冪等的方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。所以,對於冪等的方法,不用擔心重復執行會對系統造成任何改變。
我們舉個例子來說明一下。在不考慮並發的情況下,“將賬戶 X 的余額設置為 100 元”,執行一次后對系統的影響是,賬戶 X 的余額變成了 100 元。只要提供的參數 100 元不變,那即使再執行多少次,賬戶 X 的余額始終都是 100 元,不會變化,這個操作就是一個冪等的操作。
再舉一個例子,“將賬戶 X 的余額加 100 元”,這個操作它就不是冪等的,每執行一次,賬戶余額就會增加 100 元,執行多次和執行一次對系統的影響(也就是賬戶的余額)是不一樣的。
如果我們系統消費消息的業務邏輯具備冪等性,那就不用擔心消息重復的問題了,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。也就可以認為,消費多次等於消費一次。
從對系統的影響結果來說:At least once + 冪等消費 = Exactly once。
那么如何實現冪等操作呢?最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。但是,不是所有的業務都能設計成天然冪等的,這里就需要一些方法和技巧來實現冪等。
下面我給你介紹幾種常用的設計冪等操作的方法:
1. 利用數據庫的唯一約束實現冪等
例如我們剛剛提到的那個不具備冪等特性的轉賬的例子:將賬戶 X 的余額加 100 元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。
首先,我們可以限定,對於每個轉賬單每個賬戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在數據庫中建一張轉賬流水表,這個表有三個字段:轉賬單 ID、賬戶 ID 和變更金額,然后給轉賬單 ID 和賬戶 ID 這兩個字段聯合起來創建一個唯一約束,這樣對於相同的轉賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。
這樣,我們消費消息的邏輯可以變為:“在轉賬流水表中增加一條轉賬記錄,然后再根據轉賬記錄,異步操作更新用戶余額即可。”在轉賬流水表增加一條轉賬記錄這個操作中,由於我們在這個表中預先定義了“賬戶 ID 轉賬單 ID”的唯一約束,對於同一個轉賬單同一個賬戶只能插入一條記錄,后續重復的插入操作都會失敗,這樣就實現了一個冪等的操作。我們只要寫一個 SQL,正確地實現它就可以了。
基於這個思路,不光是可以使用關系型數據庫,只要是支持類似“INSERT IF NOT EXIST”語義的存儲類系統都可以用於實現冪等,比如,你可以用 Redis 的 SETNX 命令來替代數據庫中的唯一約束,來實現冪等消費。
2. 為更新的數據設置前置條件
另外一種實現冪等的思路是,給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。這樣,重復執行這個操作時,由於第一次更新數據的時候已經變更了前置條件中需要判斷的數據,不滿足前置條件,則不會重復執行更新數據操作。
比如,剛剛我們說過,“將賬戶 X 的余額增加 100 元”這個操作並不滿足冪等性,我們可以把這個操作加上一個前置條件,變為:“如果賬戶 X 當前的余額為 500 元,將余額加 100 元”,這個操作就具備了冪等性。對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的余額,在消費的時候進行判斷數據庫中,當前余額是否與消息中的余額相等,只有相等才執行變更操作。
但是,如果我們要更新的數據不是數值,或者我們要做一個比較復雜的更新操作怎么辦?用什么作為前置判斷條件呢?更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等更新。
3. 記錄並檢查操作
如果上面提到的兩種實現冪等方法都不能適用於你的場景,我們還有一種通用性最強,適用范圍最廣的實現冪等性方法:記錄並檢查操作,也稱為“Token 機制或者 GUID(全局唯一 ID)機制”,實現的思路特別簡單:在執行數據更新操作之前,先檢查一下是否執行過這個更新操作。
具體的實現方法是,在發送消息時,給每條消息指定一個全局唯一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,如果沒有消費過,才更新數據,然后將消費狀態置為已消費。
原理和實現是不是很簡單?其實一點兒都不簡單,在分布式系統中,這個方法其實是非常難實現的。首先,給每個消息指定一個全局唯一的 ID 就是一件不那么簡單的事兒,方法有很多,但都不太好同時滿足簡單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在“檢查消費狀態,然后更新數據並且設置消費狀態”中,三個操作必須作為一組操作保證原子性,才能真正實現冪等,否則就會出現 Bug。
比如說,對於同一條消息:“全局 ID 為 8,操作為:給 ID 為 666 賬戶增加 100 元”,有可能出現這樣的情況:
-
t0 時刻:Consumer A 收到條消息,檢查消息執行狀態,發現消息未處理過,開始執行“賬戶增加 100 元”;
-
t1 時刻:Consumer B 收到條消息,檢查消息執行狀態,發現消息未處理過,因為這個時刻,Consumer A 還未來得及更新消息執行狀態。
這樣就會導致賬戶被錯誤地增加了兩次 100 元,這是一個在分布式系統中非常容易犯的錯誤,一定要引以為戒。
對於這個問題,當然我們可以用事務來實現,也可以用鎖來實現,但是在分布式系統中,無論是分布式事務還是分布式鎖都是比較難解決問題。
小結
這節課我們主要介紹了通過冪等消費來解決消息重復的問題,然后我重點講了幾種實現冪等操作的方法,你可以利用數據庫的約束來防止重復更新數據,也可以為數據更新設置一次性的前置條件,來防止重復消息,如果這兩種方法都不適用於你的場景,還可以用“記錄並檢查操作”的方式來保證冪等,這種方法適用范圍最廣,但是實現難度和復雜度也比較高,一般不推薦使用。
這些實現冪等的方法,不僅可以用於解決重復消息的問題,也同樣適用於,在其他場景中來解決重復請求或者重復調用的問題。比如,我們可以將 HTTP 服務設計成冪等的,解決前端或者 APP 重復提交表單數據的問題;也可以將一個微服務設計成冪等的,解決 RPC 框架自動重試導致的重復調用問題。這些方法都是通用的,希望你能做到觸類旁通,舉一反三。