消息為什么會丟失
消息從被寫入到消息隊列,到被消費者消費完成,這個鏈路上會有哪些地方存在丟失消息的可能呢?其實,主要存在三個場景:
- 消息從生產者寫入到消息隊列的過程。
- 消息在消息隊列中的存儲場景。
- 消息被消費者消費的過程。

1.在消息生產的過程中丟失消息
在這個環節中主要有兩種情況。
首先,消息的生產者一般是我們的業務服務器,消息隊列是獨立部署在單獨的服務器上的。兩者之間的網絡雖然是內網,但是也會存在抖動的可能,而一旦發生抖動,消息就有可能因為網絡的錯誤而丟失。
針對這種情況,我建議你采用的方案是消息重傳:也就是當你發現發送超時后你就將消息重新發一次,但是你也不能無限制地重傳消息。一般來說,如果不是消息隊列發生故障,或者是到消息隊列的網絡斷開了,重試2~3次就可以了。
不過,這種方案可能會造成消息的重復,從而導致在消費的時候會重復消費同樣的消息。比方說,消息生產時由於消息隊列處理慢或者網絡的抖動,導致雖然最終寫入消息隊列成功,但在生產端卻超時了,生產者重傳這條消息就會形成重復的消息,那么針對上面的例子,直觀顯示在你面前的就會是你收到了兩個現金紅包。
2.在消息隊列中丟失消息
拿Kafka舉例,消息在Kafka中是存儲在本地磁盤上,而為了減少消息存儲時對磁盤的隨機I/O,一般會將消息先寫入到操作系統的Page Cache中,然后再找合適時機刷新到磁盤上。
比如,Kafka可以配置當達到某一時間間隔,或者累積一定的消息數量的時候再刷盤,也就是所說的異步刷盤。
來看一個形象的比喻:假如你經營一個圖書館,讀者每還一本書你都要去把圖書歸位,不僅工作量大而且效率低下,但是如果你可以選擇每隔3小時,或者圖書達到一定數量的時候再把圖書歸位,這樣可以把同一類型的書一起歸位,節省了查找圖書位置的時間,這樣就可以提高效率了。
不過,如果發生機器掉電或者機器異常重啟,那么Page Cache中還沒有來得及刷盤的消息就會丟失了。
那么怎么解決呢?
你可能會把刷盤的間隔設置很短,或者設置累積一條消息就就刷盤,但這樣頻繁刷盤會對性能有比較大的影響,而且從經驗來看,出現機器宕機或者掉電的幾率也不高,所以我不建議你這樣做。

如果你的電商系統對消息丟失的容忍度很低,那么你可以考慮以集群方式部署Kafka服務,通過部署多個副本備份數據,保證消息盡量不丟失。
那么它是怎么實現的呢?
Kafka集群中有一個Leader負責消息的寫入和消費,可以有多個Follower負責數據的備份。Follower中有一個特殊的集合叫做ISR(in-sync replicas),當Leader故障時,新選舉出來的Leader會從ISR中選擇,默認Leader的數據會異步地復制給Follower,這樣在Leader發生掉電或者宕機時,Kafka會從Follower中消費消息,減少消息丟失的可能。
由於默認消息是異步地從Leader復制到Follower的,所以一旦Leader宕機,那些還沒有來得及復制到Follower的消息還是會丟失。為了解決這個問題,Kafka為生產者提供一個選項叫做“acks”,當這個選項被設置為“all”時,生產者發送的每一條消息除了發給Leader外還會發給所有的ISR,並且必須得到Leader和所有ISR的確認后才被認為發送成功。這樣,只有Leader和所有的ISR都掛了,消息才會丟失

從上面這張圖來看,當設置“acks=all”時,需要同步執行1,3,4三個步驟,對於消息生產的性能來說也是有比較大的影響的,所以你在實際應用中需要仔細地權衡考量。我給你的建議是:
1.如果你需要確保消息一條都不能丟失,那么建議不要開啟消息隊列的同步刷盤,而是需要使用集群的方式來解決,可以配置當所有ISR Follower都接收到消息才返回成功。
2.如果對消息的丟失有一定的容忍度,那么建議不部署集群,即使以集群方式部署,也建議配置只發送給一個Follower就可以返回成功了。
3.我們的業務系統一般對於消息的丟失有一定的容忍度,比如說以上面的紅包系統為例,如果紅包消息丟失了,我們只要后續給沒有發送紅包的用戶補發紅包就好了。
3.在消費的過程中存在消息丟失的可能
我還是以Kafka為例來說明。一個消費者消費消息的進度是記錄在消息隊列集群中的,而消費的過程分為三步:接收消息、處理消息、更新消費進度。
這里面接收消息和處理消息的過程都可能會發生異常或者失敗,比如說,消息接收時網絡發生抖動,導致消息並沒有被正確的接收到;處理消息時可能發生一些業務的異常導致處理流程未執行完成,這時如果更新消費進度,那么這條失敗的消息就永遠不會被處理了,也可以認為是丟失了。
所以,在這里你需要注意的是,一定要等到消息接收和處理完成后才能更新消費進度,但是這也會造成消息重復的問題,比方說某一條消息在處理之后,消費者恰好宕機了,那么因為沒有更新消費進度,所以當這個消費者重啟之后,還會重復地消費這條消息。
如何保證消息只被消費一次
只要保證即使消費到了重復的消息,從消費的最終結果來看和只消費一次是等同的就好了,也就是保證在消息的生產和消費的過程是“冪等”的。
1.什么是冪等
如果我們消費一條消息的時候,要給現有的庫存數量減1,那么如果消費兩條相同的消息就會給庫存數量減2,這就不是冪等的。而如果消費一條消息后,處理邏輯是將庫存的數量設置為0,或者是如果當前庫存數量是10時則減1,這樣在消費多條消息時,所得到的結果就是相同的,這就是冪等的。
說白了,你可以這么理解“冪等”:一件事兒無論做多少次都和做一次產生的結果是一樣的,那么這件事兒就具有冪等性。
2.在生產、消費過程中增加消息冪等性的保證
在消息生產過程中,在Kafka0.11版本和Pulsar中都支持“producer idempotency”的特性,翻譯過來就是生產過程的冪等性,這種特性保證消息雖然可能在生產端產生重復,但是最終在消息隊列存儲時只會存儲一份。
它的做法是給每一個生產者一個唯一的ID,並且為生產的每一條消息賦予一個唯一ID,消息隊列的服務端會存儲<生產者ID,最后一條消息ID>的映射。當某一個生產者產生新的消息時,消息隊列服務端會比對消息ID是否與存儲的最后一條ID一致,如果一致,就認為是重復的消息,服務端會自動丟棄。
而在消費端,冪等性的保證會稍微復雜一些,你可以從通用層和業務層兩個層面來考慮。
在通用層面,你可以在消息被生產的時候,使用發號器給它生成一個全局唯一的消息ID,消息被處理之后,把這個ID存儲在數據庫中,在處理下一條消息之前,先從數據庫里面查詢這個全局ID是否被消費過,如果被消費過就放棄消費。
不過這樣會有一個問題:如果消息在處理之后,還沒有來得及寫入數據庫,消費者宕機了重啟之后發現數據庫中並沒有這條消息,還是會重復執行兩次消費邏輯,這時你就需要引入事務機制,保證消息處理和寫入數據庫必須同時成功或者同時失敗,但是這樣消息處理的成本就更高了,所以,如果對於消息重復沒有特別嚴格的要求,可以直接使用這種通用的方案,而不考慮引入事務。
在業務層面怎么處理呢?這里有很多種處理方式,其中有一種是增加樂觀鎖的方式。比如,你的消息處理程序需要給一個人的賬號加錢,那么你可以通過樂觀鎖的方式來解決。
具體的操作方式是這樣的:你給每個人的賬號數據中增加一個版本號的字段,在生產消息時先查詢這個賬戶的版本號,並且將版本號連同消息一起發送給消息隊列。消費端在拿到消息和版本號后,在執行更新賬戶金額SQL的時候帶上版本號,類似於執行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
在更新數據時給數據加了樂觀鎖,這樣在消費第一條消息時,version值為1,SQL可以執行成功,並且同時把version值改為了2;在執行第二條相同的消息時,由於version值不再是1,所以這條SQL不能執行成功,也就保證了消息的冪等性。
