眾所周知,__consumer__offsets是一個內部topic,對用戶而言是透明的,除了它的數據文件以及偶爾在日志中出現這兩點之外,用戶一般是感覺不到這個topic的。不過我們的確知道它保存的是Kafka新版本consumer的位移信息。本文我們簡單梳理一下這個內部topic(以1.0.0代碼為分析對象)
一、何時被創建?
首先,我們先來看下 它是何時被創建的?__consumer_offsets創建的時機有很多種,主要包括:
- broker響應FindCoordinatorRequest請求時
- broker響應MetadataRequest顯式請求__consumer_offsets元數據時
其中以第一種最為常見,而第一種時機的表現形式可能有很多,比如用戶啟動了一個消費者組(下稱consumer group)進行消費或調用kafka-consumer-groups --describe等
二、消息種類
__consumer_offsets中保存的記錄是普通的Kafka消息,只是它的格式完全由Kafka來維護,用戶不能干預。嚴格來說,__consumer_offsets中保存三類消息,分別是:
- Consumer group組元數據消息
- Consumer group位移消息
- Tombstone消息
2.1 Consumer group組元數據消息
我們都知道__consumer_offsets是保存位移的,但實際上每個消費者組的元數據信息也保存在這個topic。這些元數據包括:
這里不詳細展開組元數據各個字段的含義。我們只需要知道組元數據消息也是保存在__consumer_offsets中即可。值得一提的是, 如果用戶使用standalone consumer(即consumer.assign(****)方法),那么就不會寫入這類消息,畢竟我們使用的是獨立的消費者,而沒有使用消費者組。
這類消息的key是一個二元組,格式是【版本+groupId】,這里的版本表征這類消息的版本號,無實際用途;而value就是上圖所有這些信息打包而成的字節數組。
2.2 Consumer group組位移提交消息
如果只允許說出__consumer_offsets的一個功能,那么我們就記住這個好了:__consumer_offsets保存consumer提交到Kafka的位移數據。這句話有兩個要點:1. 只有當consumer group向Kafka提交位移時才會向__consumer_offsets寫入這類消息。如果你的consumer壓根就不提交位移,或者你將位移保存到了外部存儲中(比如Apache Flink的檢查點機制或老版本的Storm Kafka Spout),那么__consumer_offsets中就是無位移數據;2. 這句話中的consumer既包含consumer group也包含standalone consumer。也就是說,只要你向Kafka提交位移,不論使用哪種java consumer,它都是向__consumer_offsets寫消息。
這類消息的key是一個三元組,格式是【groupId + topic + 分區號】,value則是要提交的位移信息,如下圖所示:
位移就是待提交的位移,提交時間是提交位移時的時間戳,而過期時間則是用戶指定的過期時間。由於目前consumer代碼在提交位移時並沒有明確指定過期間隔,故broker端默認設置過期時間為提交時間+offsets.retention.minutes參數值,即提交1天之后自動過期。Kafka會定期掃描__consumer_offsets中的位移消息並刪除掉那些過期的位移數據。
上圖中還有個“自定義元數據”,實際上consumer允許用戶在提交位移時指定一些特殊的自定義信息。我們不對此進行詳細展開,因為java consumer根本就沒有使用到它。相反地,Kafka Streams利用該字段來完成某些定制任務。
2.3 tombstone消息或Delete Mark消息
第三類消息成為tombstone消息或delete mark消息。這類消息只出現在源碼中而不暴露給用戶。它和第一類消息很像,key都是二元組【版本+groupId】,唯一的區別在於這類消息的消息體是null,即空消息體。何時寫入這類消息?前面說過了,Kafka會定期掃描過期位移消息並刪除之。一旦某個consumer group下已沒有任何active成員且所有的位移數據都已被刪除時,Kafka會將該group的狀態置為Dead並向__consumer__offsets對應分區寫入tombstone消息,表明要徹底刪除這個group的信息。簡單來說,這類消息就是用於徹底刪除group信息的。
三、何時寫入?
第一類消息是在組rebalance時寫入的;第二類消息是在提交位移時寫入的;第三類消息是在Kafka后台線程掃描並刪除過期位移或者__consumer_offsets分區副本重分配時寫入的。
四、消息留存策略
__consumer_offsets目前的留存策略是compact,__consumer_offsets會定期對消息內容進行compact操作——用戶也可以同時啟用兩種留存策略來減少該topic所占的磁盤空間,不過要承擔可能丟失位移數據的風險。
五、副本因子
__consumer_offest不受server.properties中num.partitions和default.replication.factor參數的制約。相反地,它的分區數和備份因子分別由offsets.topic.num.partitions和offsets.topic.replication.factor參數決定。這兩個參數的默認值分別是50和1,表示該topic有50個分區,副本因子是1。鑒於位移和group元數據等信息都保存在該topic中,實際使用過程中很多用戶都會將offsets.topic.replication.factor設置成大於1的數以增加可靠性,這是推薦的做法。不過在0.11.0.0之前,這個設置是有缺陷的:假設你設置了offsets.topic.replication.factor = 3,只要Kafka創建該topic時可用broker數<3,那么創建出來的__consumer_offsets的備份因子就是2。也就是說Kafka並沒有尊重我們設置的offsets.topic.replication.factor參數。好在這個問題在0.11.0.0版本得到了解決,現在用戶在使用時,一旦需要創建__consumer_offsets了Kafka一定會保證湊齊足量的broker才會開始創建,否則就拋出異常給用戶。
日常使用中,另一個常見的問題是如何擴展該topic的副本因子。由於它依然是一個Kafka topic,因此我們可以調用bin/kafka-reassign-partitions.sh(bat)腳本來擴展replication factor。做法如下:
1. 構造一個json文件,如下所示,其中1,2,3表示3台broker的ID
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[2,3,1]}
]}
2. 運行bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute
如果一切正常,你會發現__consumer_offsets的replication factor已然被擴展為3。
六、如何刪除group信息
首先明確一點,Kafka是會刪除consumer group信息的,既包括位移信息,也包括組元數據信息。對於位移信息而言,前面提到過每條位移消息都設置了過期時間。每個Kafka broker在后台會啟動一個線程,定期(由offsets.retention.check.interval.ms確定,默認10分鍾)掃描過期位移,並刪除之。而對組元數據而言,刪除它們的條件有兩個:1. 這個group下不能存在active成員,即所有成員都已經退出了group;2. 這個group的所有位移信息都已經被刪除了。當滿足了這兩個條件后,Kafka后台線程會刪除group運輸局信息。
好了, 我們總說刪除,那么Kafka到底是怎么刪除的呢——正是通過寫入具有相同key的tombstone消息。我們舉個例子,假設__consumer_offsets當前保存有一條位移消息,key是【testGroupid,test, 0】(三元組),value是待提交的位移信息。無論何時,只要我們向__consumer_offsets相同分區寫入一條key=【testGroupid,test, 0】,value=null的消息,那么Kafka就會認為之前的那條位移信息是可以刪除的了——即相當於我們向__consumer_offsets中插入了一個delete mark。
再次強調一下,向__consumer_offsets寫入tombstone消息僅僅是標記它之前的具有相同key的消息是可以被刪除的,但刪除操作通常不會立即開始。真正的刪除操作是由log cleaner的Cleaner線程來執行的。
鑒於目前水平有限,能想到的就這么多。有相關問題的讀者可以將問題發動評論區,如果具有較大的共性,我會添加到本文的末尾~~