1. 保證消息被消費
即使消息發送到了消息隊列,消息也不會萬無一失,還是會面臨丟失的風險。
我們以 Kafka 為例,消息在Kafka 中是存儲在本地磁盤上的, 為了減少消息存儲對磁盤的隨機 I/O,一般我們會將消息寫入到操作系統的 Page Cache 中,然后在合適的時間將消息刷新到磁盤上。
例如,Kafka 可以配置當達到某一時間間隔,或者累積一定的消息數量的時候再刷盤,也就是所謂的異步刷盤。
不過,如果發生機器掉電或者機器異常重啟,那么 Page Cache 中還沒有來得及刷盤的消息就會丟失了。那么怎么解決呢?你可能會把刷盤的間隔設置很短,或者設置累積一條消息就就刷盤。
但這樣頻繁刷盤會對性能有比較大的影響,而且從經驗來看,出現機器宕機或者掉電的幾率也不高,所以我不建議你這樣做。
如果你的電商系統對消息丟失的容忍度很低,那么你可以考慮以集群方式部署 Kafka 服務,通過部署多個副本備份數據,保證消息盡量不丟失。
那么它是怎么實現的呢?Kafka 集群中有一個 Leader 負責消息的寫入和消費,可以有多個 Follower 負責數據的備份。Follower 中有一個特殊的集合叫做 ISR(in-sync replicas),當 Leader 故障時,新選舉出來的 Leader 會從 ISR 中選擇,默認 Leader 的數據會異步地復制給 Follower,這樣在 Leader 發生掉電或者宕機時,Kafka 會從 Follower 中消費消息,減少消息丟失的可能。
由於默認消息是異步地從 Leader 復制到 Follower 的,所以一旦 Leader 宕機,那些還沒有來得及復制到 Follower 的消息還是會丟失。
為了解決這個問題,Kafka 為生產者提供一個選項叫做“acks”,當這個選項被設置為“all”時,生產者發送的每一條消息除了發給 Leader 外還會發給所有的 ISR,並且必須得到 Leader 和所有 ISR 的確認后才被認為發送成功。這樣,只有 Leader 和所有的 ISR 都掛了,消息才會丟失。
從上面這張圖來看,當設置“acks=all”時,需要同步執行 1,3,4 三個步驟,對於消息生產的性能來說也是有比較大的影響的,所以你在實際應用中需要仔細地權衡考量。這里建議是:
-
如果你需要確保消息一條都不能丟失,那么建議不要開啟消息隊列的同步刷盤,而是需要使用集群的方式來解決,可以配置當所有 ISR Follower 都接收到消息才返回成功。
-
如果對消息的丟失有一定的容忍度,那么建議不部署集群,即使以集群方式部署,也建議配置只發送給一個 Follower 就可以返回成功了。
-
我們的業務系統一般對於消息的丟失有一定的容忍度,比如說以上面的紅包系統為例,如果紅包消息丟失了,我們只要后續給沒有發送紅包的用戶補發紅包就好了。
2.保證被消費一次
如何保證消息只被消費一次
從上面的分析中,你能發現,為了避免消息丟失,我們需要付出兩方面的代價:一方面是性能的損耗;一方面可能造成消息重復消費。
性能的損耗我們還可以接受,因為一般業務系統只有在寫請求時才會有發送消息隊列的操作,而一般系統的寫請求的量級並不高,但是消息一旦被重復消費,就會造成業務邏輯處理的錯誤。那么我們要如何避免消息的重復呢?
想要完全的避免消息重復的發生是很難做到的,因為網絡的抖動、機器的宕機和處理的異常都是比較難以避免的,在工業上並沒有成熟的方法,因此我們會把要求放寬,只要保證即使消費到了重復的消息,從消費的最終結果來看和只消費一次是等同的就好了,也就是保證在消息的生產和消費的過程是“冪等”的。
1. 什么是冪等
冪等是一個數學上的概念,它的含義是多次執行同一個操作和執行一次操作,最終得到的結果是相同的,說起來可能有些抽象,我給你舉個例子:
比如,男生和女生吵架,女生抓住一個點不放,傳遞“你不在乎我了嗎?”(生產消息)的信息。那么當多次埋怨“你不在乎我了嗎?”的時候(多次生產相同消息),她不知道的是,男生的耳朵(消息處理)會自動把 N 多次的信息屏蔽,就像只聽到一次一樣,這就是冪等性。
如果我們消費一條消息的時候,要給現有的庫存數量減 1,那么如果消費兩條相同的消息就會給庫存數量減 2,這就不是冪等的。而如果消費一條消息后,處理邏輯是將庫存的數量設置為 0,或者是如果當前庫存數量是 10 時則減 1,這樣在消費多條消息時,所得到的結果就是相同的,這就是冪等的。
說白了,你可以這么理解“冪等”:一件事兒無論做多少次都和做一次產生的結果是一樣的,那么這件事兒就具有冪等性。
2. 在生產、消費過程中增加消息冪等性的保證
消息在生產和消費的過程中都可能會產生重復,所以你要做的是,在生產過程和消費過程中增加消息冪等性的保證,這樣就可以認為從“最終結果上來看”,消息實際上是只被消費了一次的。
在消息生產過程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻譯過來就是生產過程的冪等性,這種特性保證消息雖然可能在生產端產生重復,但是最終在消息隊列存儲時只會存儲一份。
它的做法是給每一個生產者一個唯一的 ID,並且為生產的每一條消息賦予一個唯一 ID,消息隊列的服務端會存儲 < 生產者 ID,最后一條消息 ID> 的映射。當某一個生產者產生新的消息時,消息隊列服務端會比對消息 ID 是否與存儲的最后一條 ID 一致,如果一致,就認為是重復的消息,服務端會自動丟棄。
而在消費端,冪等性的保證會稍微復雜一些,你可以從通用層和業務層兩個層面來考慮。
你可以看到,無論是生產端的冪等性保證方式,還是消費端通用的冪等性保證方式,它們的共同特點都是為每一個消息生成一個唯一的 ID,然后在使用這個消息的時候,先比對這個 ID 是否已經存在,如果存在,則認為消息已經被使用過。
所以這種方式是一種標准的實現冪等的方式,你在項目之中可以拿來直接使用,它在邏輯上的偽代碼就像下面這樣:
-
boolean isIDExisted = selectByID(ID); // 判斷ID是否存在
-
if(isIDExisted) {
-
return; //存在則直接返回
-
} else {
-
process(message); //不存在,則處理消息
-
saveID(ID); //存儲ID
-
}
不過這樣會有一個問題:如果消息在處理之后,還沒有來得及寫入數據庫,消費者宕機了重啟之后發現數據庫中並沒有這條消息,還是會重復執行兩次消費邏輯。
這時你就需要引入事務機制,保證消息處理和寫入數據庫必須同時成功或者同時失敗,但是這樣消息處理的成本就更高了,所以,如果對於消息重復沒有特別嚴格的要求,可以直接使用這種通用的方案,而不考慮引入事務。
在業務層面怎么處理呢?這里有很多種處理方式,其中有一種是增加樂觀鎖的方式。比如,你的消息處理程序需要給一個人的賬號加錢,那么你可以通過樂觀鎖的方式來解決。
具體的操作方式是這樣的:你給每個人的賬號數據中增加一個版本號的字段,在生產消息時先查詢這個賬戶的版本號,並且將版本號連同消息一起發送給消息隊列。消費端在拿到消息和版本號后,在執行更新賬戶金額 SQL 的時候帶上版本號,類似於執行:
-
update user set amount = amount + 20, version=version+1
-
where userId=1 and version=1;
你看,我們在更新數據時給數據加了樂觀鎖,這樣在消費第一條消息時,version 值為 1,SQL 可以執行成功,並且同時把 version 值改為了 2;在執行第二條相同的消息時,由於 version 值不再是 1,所以這條 SQL 不能執行成功,也就保證了消息的冪等性。