消費者組與消費者偏移量
消費者組
具有相同組ID的消費者就屬於同一個消費者組,它有如下特點:
-
一個組可以有多個消費者
-
主題中的消息只能被同一個組中的一個消費者消費
-
一個主題可以被多個消費者組消費
消費者組的概念主要是為了實現點對點隊列模式和發布訂閱模式,它是如何實現的呢?
-
隊列模式:所有消費者屬於同一個組
-
發布訂閱模式:消費者屬於不同組,也就是有不同消費者組
另外消費者組模式具有高伸縮性,通常有幾個分區那么消費者組中就包含幾個消費者,這樣形成一對一模式,每個消費者都順序的消費對應的分區(注意這里是分區消息順序性並不是全局順序性),如果某個消費者崩潰了,那么它消費的分區就會被分配給其他消費者,這樣保證繼續工作,這個過程叫做rebalance,這也是消費者組的一個好處,不過凡事有利有弊,rebalance頻繁發生也是災難。
消費者偏移量
__consumer_offsets主題
這里說的消費者偏移量指的是消費者消費某個主題的消費進度,消費者自己包括Kafka集群都需要知道這個進度,因為無論消費者是啟動中還是重啟后都需要知道自己應該從哪里繼續消費。如何上報這個信息呢?其實就是位移提交,默認消費客戶端是自動提交的,通常我們都會設置為手動提交。
切記:這個消費者offset指的是消費者要消費的下一條消息的位移,而不是當前消費到哪里了。
消費者偏移量其實並不復雜,無非就是記錄消費者針對某個主題的消費進度,所以這個東西就是鍵值:
-
鍵:Group id + 主題 + 分區號
-
值:offset值
老版本的消費者偏移量都是放在Zookeeper上,新版本的消費者偏移量是放在單獨的一個主題上,叫做__consumer_offsets,默認有50個分區,每個分區默認1個副本,如下圖:
我這里是3台的kafka集群,所以這50個分區被分布在3台上。
offsets.topic.replication.factor=3
設置這個broker參數,默認為1,那么創建該主題的時候就會是3個副本。
在第一次搭建集群的時候不會創建這個主題,只有在第一次啟動消費者程序之后kafka才會自動創建這個主題。
這個主題除了放消費者消費偏移量之外還會存放其他類型消息,保存消費者組的注冊消息和刪除Group過期位移消息,而刪除其實就是根據鍵來保留最近的消息。
# 新版本查看消費者偏移量
kafka-consumer-groups.sh --bootstrap-server 172.16.100.10:9092 --describe --group TestGroup
# 老版本查看消費者偏移量
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect [ZOOKEEPER_IPADDRESS]:[ZOOKEEPER_PORT] --group [CONSUMER_GROUP]
說明:50個__consumer_offsets
分區,消費者到底要使用哪一個分區呢?這個就是就是找Coordinator組件,在后面會有這個組件的說明。
自動提交和手動提交的選擇
自動提交:消費者第一次poll消息的時候會根據auto.offset.reset策略來決定從哪里開始消費,因為消費是個循環,就會不停的調用poll,至於什么時候提交呢?這是由auto.commit.interval.ms來決定的,它會定期去提交。如果出現Rebalance的情況,那么在Rebalance之后消費者則從上一次提交位移的地方繼續消費。所以基於上面的特點,自動提交不會丟失消息(也就是消息都會被消費)但是會發生重復消費的情況。比如你已經消費了消息並執行了業務邏輯,但是還沒有到達提交周期,這時候線程掛了,重啟后還是從上次的位置消費這就出現了重復消費,當然也有可能出現消息丟失的情況。
手動提交:靈活度高不過這里有2種也就是同步提交和異步提交。兩種各有利弊,同步提交會阻塞但是可以實現重試機制;異步提交雖然不會阻塞但是無法重試,因為重試時的位移可能已經不是之前的位移了。
這里要說一下語義保證:
-
最多一次:消息可能會丟失,但是不會出現重復消費的情況
-
最少一次:消息不會丟失,但是可能會出現重復消費情況
-
精確一次:消息一定會被處理且只會被處理一次
如果業務對於消息的丟失或者重復消費沒有要求那么可以使用自動提交;如果要求精確一次的語義那么就要使用手動提交。
獨立消費者
消費者組消費通常我們叫做subscribe而獨立消費者也就是不使用組的都是需要手動分配要消費的分區以及手動管理偏移量,這種方式我們稱作assign,但是這兩種方式有區別:
subscribe()
和assign()
的區別:如果使用前者則必須使用消費者組因為在一個組內消費者消費的分區是自動分配的,這個是Kafka內部通過算法來完成所以有時候組內成員變化或者分區數量變化會出現reblance的情況,但總之會均衡分配給組內的消費者。而后者是手動指定主題的分區,所以這種消費方式無需消費者組,就算指定了也不起作用。至於偏移量的話使用消費者的形式偏移量無論是手動提交還是自動提交都會保存在broker上,下次可以繼續消費;但是assign的方式都是客戶端自己維護偏移量,而且我們是無法查看它的偏移量的,無論是命令行還是其他監控形式。除了特殊場景外一般都是使用消費者的形式。我在工作中的確遇到過需要手動指定分區消費的場景。
真實案例:公司一個程序它會有多個實例,每個實例使用消費組方式來消費同一個主題,但是每個程序的消費者組除了前綴一樣,后面都會隨機生成一個字符串,這就變成了其實每個程序都是屬於一個單獨的組,而且程序重啟后組也就變了。這里面有幾個邏輯。消費者偏移量提交毫無意義,因為組變了,所以使用消費者組也毫無意義。大量沒用組的臟數據留在集群上。但他們的需求是每個程序都全量消費所有分區,而且配置的是從最新點位消費。不過這里存在一個問題就是,如果4個程序,同時宕機1分鍾,然后啟動,就會存在丟失消息的情況,因為中途1分鍾的消息它們是不會拿到的。因為組名變量,位移丟失了。
解決方案是:不使用消費組,改用assign的方式,首先獲取指定主題的分區,然后通過assign分配分區,seek_to_end,剩下的就和普通消費一樣了。不需要提交位移,在程序運行期間客戶端會自己在內存中維護位移以實現持續消費,但是重啟后就不行了。這樣就滿足了上面業務的場景要求。不過需要注意的是需要關注分區變化,如果變化
了就需要重新assign一下。另外assign方式消費是無法監控的。只能客戶端自己打日志了。assign的方式一般用在流計算或者某些類廣播場景。正常情況下,不建議使用assign的。
消費者組重平衡(rebalance)
rebalance的本質就是讓某一個消費者組中的成員可以均衡的消費主題,所以這是一個分配過程,引發這個rebalance的條件通常有3個:
-
消費者組成員變化(新加入成員,已有消費者崩潰,消費者在一定時間內沒有完成消息處理也會被視為崩潰)
-
消費者組所消費的主題分區增加(分區不允許減少)
-
組訂閱的主題數量發生變化
雖然這個機制是為了保證組內消費者均衡的消費主題而不出現忙閑不均的情況,不過這個機制最遭人詬病的就是STW,JVM垃圾回收機制導致,在垃圾回收期間所有應用線程都會停止工作,所以你想想。而且如果你的消費者組有很多消費者,這個rebalance的過程還會消耗大量時間,而且它還會影響消費者的TPS。總之經常發生rebalance不是一件好事情,所以我們要盡量避免,注意是盡量避免因為你無法做到不發生而且。
我們這里說的避免是說非人為干預的,如果我們人為的增加了消費者組成員那一定會進行rebalance。
在消費者程序中我們可以設置如下參數來盡量避免發生rebalance,下面以Python客戶端為例,對於Java客戶端也是一樣的:
# 默認10000毫秒,回話超時時長,Coordinator在10秒內沒有收到Group下某個
# Consumer的心跳,則該Consumer被認為掛了,那么broker則開始rebalance
session_timeout_ms
# 消費者向Coordinator發送心跳的頻率,默認3秒。
heartbeat_interval_ms
# 消費者兩次poll的最大時間間隔,默認30秒。表示在這個時間內消費者無法完成poll方法的返回,那么消費者則被認為掛了,也會進行rebalance
max_poll_interval_ms
session_timeout_ms
和heartbeat_interval_ms
並不沖突,通常后者要小於前者,后者是周期,前者是N個周期。一般我們把heartbeat_interval_ms
設置為2000,把session_timeout_ms
設置為6000,也就是 Coordinator在6秒內(3個心跳周期)收不到心跳則認為消費者掛了。所以這兩個參數是避免因為網絡抖動而心跳信息沒有及時發出導致超時而引發的rebalance。不過session_timeout_ms
也不能設置太長,因為這樣就無法及時發現真正掛了的消費者。
而max_poll_interval_ms
解決的是因為消息處理時間過長其實消費者並沒有掛但是達到了max_poll_interval_ms
設置的值從而被誤認為是消費者掛了而引發的rebalance。所以這個參數到底設置多少你要去評估一下每次獲取消息的條數以及處理這些消息大約話費多長時間。
關於Coordinator
這是一個協調組件,它負責執行消費者注冊、組成員管理包括執行rebalance和元數據操作。消費者提交位移也是向Coordinator所在的broker提交。每一個broker都有Coordinator,那么消費者如何找到屬於自己的Coordinator呢?這里就是靠__consumer_offsets
主題來完成,主要分2步:
-
kafka會計算group.id的哈希值
-
計算
__consumer_offsets
的分區數量,默認是50個 -
abs(哈希值 % 50) 也就是取模求絕對值,這個值就是該消費者要使用的
__consumer_offsets
主題的分區編號
當知道了__consumer_offsets
的分區編號,那么該分區的Leader副本所在的broker就是該消費者要使用的Coordinator。
如何監控消費者消費進度
通過命令查看
kafka-consumer-groups.sh --bootstrap-server 192.168.5.138:9092 --describe --group TestGroup
另外就是通過JMX監控,這種形式可以集成到監控系統中。這里有2個指標比較重要:
-
records-lag-max:這個表示積壓量,值越大表示消費者越落后生產者。
-
records-lead-min:表示消費者最新消息位移與分區當前第一條消息位移的差值。
這個被監控的JMX是消費者程序而不是Kafka服務器的,而且只能是JAVA消費者程序。