Kafka-如何保證消費者的可靠性
只有那些被提交到kafka的數據,也就是那些已經被寫入所有同步副本的數據,對消費者是可用的,這意味着消費者得到的消息已經具備了一致性。消費者唯一要做的是跟蹤哪些消息是已經讀取過的,哪些是還沒有讀取過的。這是在讀取消息時不丟失消息的關鍵。
在從分區讀取數據時,消費者會獲取一批事件,檢查這批事件里最大的偏移量,然后從這個偏移量開始讀取另外一批事件。這樣可以保證消費者總能以正確的順序獲取新數據,不會錯過任何事件。
如果一個消費者退出,另一個消費者需要知道從什么地方開始繼續處理,它需要知道前一個消費者在退出前處理的最后一個偏移量是多少。它們把當前讀取的偏移量保存起來,在退出之后,同一個群組里的其他消費者就可以接手它們的工作。如果消費者提交了偏移量卻未能處理完消息,那么就有可能造成消息丟失,這也是消費者丟失消息的主要原因。在這種情況下,如果其他消費者接手了工作,那些沒有被處理完的消息就會被忽略,永遠得不到處理。
消費者的可靠性配置
1.group.id:如果兩個消費者具有相同的group.id,並且訂閱了同一個主題,那么每個消費者會分到主題分區的一個子集,也就是說他們只能讀到所有消息的一個子集(不過群組會讀取主題所有的消息)。如果希望消費者可以看到主題的所有消息,那么需要為它們設置唯一的group.id。
2.auto.offset.reset:這個參數指定了在沒有偏移量可提交時或者請求的偏移量在broker上不存在時,消費者會做什么。這個參數有兩種配置:
- earliest:消費者會從分區的開始位置讀取數據,不管偏移量是否有效,這樣會導致消費者讀取大量的重復數據,但可以保證最少的數據丟失。
- latest:消費者會從分區的末尾開始讀取數據,這樣可以減少重復處理消息,但是很有可能會錯過一些消息。
3.enable.auto.commit:這是一個非常重要的配置參數,可以讓消費者基於任務調度自動提交偏移量,也可以在代碼里手動提交偏移量。自動提交的一個最大好處是,在實現消費者邏輯時可以少考慮一些問題。如果在消費者輪詢操作里處理所有的數據,那么自動提交可以保證只提交已經處理過的偏移量。自動提交的缺點:無法控制重復處理消息(比如消費者在自動提交偏移量之前停止處理消息),而且如果把消息交給另外一個后台線程去處理,自動提交機制可能會在消息還沒有處理完畢就提交偏移量。
4.auto.commit.interval.ms:此參數與enable.auto.commit有直接的聯系,如果選擇了自動提交偏移量,可以通過此參數配置提交的頻度,默認值是每5秒鍾提交一次。一般來說,頻繁提交會增加額外的開銷,但也會降低重復處理消息的概率。
顯式提交偏移量
1.總是在處理完事件后再提交偏移量
如果所有的處理都是在輪詢里完成,並且不需要在輪詢之間維護狀態(比如為了實現聚合操作),那么可以使用自動提交,或者在輪詢結束時進行手動提交。
2.提交頻度是性能和重復消息數量之間的權衡
即使是在最簡單的場景里,比如所有的處理都在輪詢里完成,並且不需要在輪詢之間維護狀態,仍然可以再一個循環里多次提交偏移量(甚至可以在每處理完一個事件之后),或者多個循環里只提交一次(與生產者的acks=all配置有點類似),這完全取決於在性能和重復處理消息之間做出的權衡。
3.確保對提交的偏移量心里有數
在輪詢過程中提交偏移量有一個不好的地方,就是提交的偏移量有可能是讀取到的最新偏移量,而不是處理過的最新偏移量。要記住,在處理完消息后再提交偏移量時非常關鍵的--否則會導致消費者錯過消息。
4.再均衡
在設計應用程序時要注意處理消費者的再均衡問題。一般要在分區被撤銷之前提交偏移量,並在分配到新分區時清理之前的狀態。
5.消費者可能需要重試
有時候,在進行輪詢之后,有些消息不會被完全處理,想稍后再來處理。例如,假設要把kafka的數據寫到數據庫里,不過那個時候數據庫不可用,於是想稍后重試。要注意,提交的是偏移量,而不是對消息的確認,這個與傳統的發布可訂閱消息系統不太一樣。如果記錄#30處理失敗,但記錄#31處理成功,那么不應該提交#31,否則會導致#31以內的偏移量都被提交,包括#30在內,而這可能不是想看到的結果。可以采用以下兩種模式來解決這個問題。
- 第一種模式。在遇到可重試錯誤時,提交最后一個處理成功的偏移量,然后把還沒有處理好的消息保存到緩沖區(這樣下一個輪詢就不會把它們覆蓋掉),調用消費者的pause()方法來確保其它的輪詢不會返回數據(不需要擔心在重試時緩沖區溢出),在保持輪詢的同事嘗試重新處理。如果重試成功,或者重試次數達到上限並決定放棄,那么把錯誤記錄下來並丟棄消息,然后調用resume()方法讓消費者繼續從輪詢里獲取新數據
- 第二種模式。在遇到可重試錯誤時,把錯誤寫入一個獨立的主題,然后繼續。一個獨立的消費者群組負責從該主題上讀取錯誤消息,並進行重試,或者使用其中的一個消費者同時從該主題上讀取錯誤消息並進行重試,不過在重試時需要暫停該主題。這種模式有點像其他消息系統里的dead-letter-queue。
6.消費者可能需要維護狀態
有時候希望在多個輪詢之間維護狀態,例如,想計算消息的移動平均數,希望在首次輪詢之后計算平均數,然后在后續的輪詢中更新這個結果。如果進程重啟,不僅需要從上一個偏移量開始處理數據,還需要恢復移動平均數。有一種辦法是在提交偏移量的同時把最近計算的平均數寫到一個結果主題上。消費者線程在重新啟動之后,它就可以拿到最近的平均數並接着計算。不過這並不能完全地解決問題,因為kafka並沒有提供事務支持。消費者有可能在寫入平均數之后來不及提交偏移量就崩潰了,或者反過來也一樣。建議使用KafkaStreams這個類庫,它為聚合、連接、時間窗和其他復雜的分析提供了高級的DSL API。
7.長時間處理
有時候處理數據需要很長時間:可能會從發生阻塞的外部系統獲取信息,或者把數據寫到外部系統,或者進行一個非常復雜的計算。要記住,暫停輪詢的時間不能超過幾秒鍾。即使不想獲取更多的數據,也要保持輪詢,這樣客戶端才能往broker發送心跳。在這種情況下,一種常見的做法是使用一個線程池來處理數據,因為使用多個線程可以進行並行處理,從而加快處理速度。在把數據移交給線程池去處理之后,就可以暫停消費者,然后保持輪詢,但不獲取新數據,直到工作線程處理完成。在工作線程處理完成之后,可以讓消費者繼續獲取新數據。因為消費者一直保持輪詢,心跳會正常發送,就不會發生再均衡。
8.僅一次傳遞
有些應用程序不僅僅需要至少一次(at-least-once)語義(意味着沒有數據丟失),還需要僅一次(exactly-once)語義。盡管kafka現在還不能完全支持僅一次語義,消費者還是有一些辦法可以保證kafka里的每個消息只被寫到外部系統一次(但不會處理kafka寫入數據時可能出現的重復數據)。
實現僅一次處理最簡單且最常用的辦法是把結果寫到一個支持唯一鍵的系統里,比如鍵值存儲引擎、關系型數據庫、elasticsearch或其它數據存儲引擎。在這種情況下,要么消息本身包含一個唯一鍵(通常都是這樣),要么使用主題、分區和偏移量的組合來創建唯一鍵--它們的組合可以唯一標識一個kafka記錄。如果把消息和一個唯一鍵寫入系統,然后碰巧又讀到一個相同的消息,只要把原先的鍵值覆蓋掉即可。數據存儲引擎會覆蓋已經存在的鍵值對,就像沒有出現過重復數據一樣。這個模式被叫做冪等性寫入,它是一種很常見也很有用的模式。