Kafka中的每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序號,用於partition唯一標識一條消息。
Offset記錄着下一條將要發送給Consumer的消息的序號。
Offset從語義上來看擁有兩種:Current Offset 和 Committed Offset。
Current Offset
Current Offset保存在Consumer客戶端中,它表示Consumer希望收到的下一條消息的序號。它僅僅在poll()方法中使用。例如,Consumer第一次調用poll()方法后收到了20條消息,那么Current Offset就被設置為20。這樣Consumer下一次調用poll()方法時,Kafka就知道應該從序號為21的消息開始讀取。這樣就能夠保證每次Consumer poll消息時,都能夠收到不重復的消息。
Committed Offset
Committed Offset保存在Broker上,它表示Consumer已經確認消費過的消息的序號。主要通過 commitSync
和 co
mmitAsync
API來操作。舉個例子,Consumer通過poll() 方法收到20條消息后,此時Current Offset就是20,經過一系列的邏輯處理后,並沒有調用consumer.commitAsync()
或consumer.commitSync()
來提交Committed Offset,那么此時Committed Offset依舊是0。
Committed Offset主要用於Consumer Rebalance。
總結一下,Current Offset是針對Consumer的poll過程的,它可以保證每次poll都返回不重復的消息;而Committed Offset是用於Consumer Rebalance過程的,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重復消費。
在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目錄中(zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作)。而在0.9之后,所有的offset信息都保存在了Broker上的一個名為__consumer_offsets的topic中。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。
Group Coordinator
Group Coordinator是運行在Kafka集群中每一個Broker內的一個進程。它主要負責Consumer Group的管理,Offset位移管理以及Consumer Rebalance。
對於每一個Consumer Group,Group Coordinator都會存儲以下信息:
-
訂閱的topics列表
-
Consumer Group配置信息,包括session timeout等
-
組中每個Consumer的元數據。包括主機名,consumer id
-
每個Group正在消費的topic partition的當前offsets
-
Partition的ownership元數據,包括consumer消費的partitions映射關系
Consumer Group如何確定自己的coordinator是誰呢?簡單來說分為兩步:
1. 確定Consumer Group offset信息將要寫入__consumers_offsets topic的哪個分區。具體計算公式:
2. 該分區leader所在的broker就是被選定的coordinator
Offset存儲模型
由於一個partition只能固定的交給一個消費者組中的一個消費者消費,因此Kafka保存offset時並不直接為每個消費者保存,而是以groupid-topic-partition -> offset的方式保存。如圖所示:
group-offset.png
Kafka在保存Offset的時候,實際上是將Consumer Group和partition對應的offset以消息的方式保存在__consumers_offsets這個topic中。
__consumers_offsets默認擁有50個partition,可以通過
的方式來查詢某個Consumer Group的offset信息保存在__consumers_offsets的哪個partition中。下圖展示了__consumers_offsets中保存的offset消息的格式:
__consumers_offsets.png
__consumers_offsets_data.png
如圖所示,一條offset消息的格式為groupid-topic-partition -> offset。因此consumer poll消息時,已知groupid和topic,
Offset查詢
前面我們已經描述過offset的存儲模型,它是按照groupid-topic-partition -> offset的方式存儲的。然而Kafka只提供了根據offset讀取消息的模型,並不支持根據key讀取消息的方式。那么Kafka是如何支持Offset的查詢呢?
答案就是Offsets Cache!!
Offsets Cache.JPG
如圖所示,Consumer提交offset時,Kafka Offset Manager會首先追加一條條新的commit消息到__consumers_offsets topic中,然后更新對應的緩存。讀取offset時從緩存中讀取,而不是直接讀取__consumers_offsets這個topic。
Log Compaction
我們已經知道,Kafka使用groupid-topic-partition -> offset*的消息格式,將Offset信息存儲在__consumers_offsets topic中。請看下面一個例子:
__consumers_offsets.JPG
如圖,對於audit-consumer這個Consumer Group來說,上面的存儲了兩條具有相同key的記錄:PageViewEvent-0 -> 240
和PageViewEvent-0 -> 323
。事實上,這就是一種無用的冗余。因為對於一個partition來說,我們實際上只需要它當前最新的Offsets。因此這條舊的PageViewEvent-0 -> 240
記錄事實上是無用的。
為了消除這樣的過期數據,Kafka為__consumers_offsets topic設置了Log Compaction功能。Log Compaction意味着對於有相同key的的不同value值,只保留最后一個版本。如果應用只關心key對應的最新value值,可以開啟Kafka的Log Compaction功能,Kafka會定期將相同key的消息進行合並,只保留最新的value值。
這張圖片生動的闡述了Log Compaction的過程:
Log Compaction.JPG
下圖闡釋了__consumers_offsets topic中的數據在Log Compaction下的變化:
Log Compaction for __consumers_offsets.JPG
在新建topic時添加
log.cleanup.policy=compact
參數就可以為topic開啟Log Compaction功能。
auto.offset.reset參數
auto.offset.reset
表示如果Kafka中沒有存儲對應的offset信息的話(有可能offset信息被刪除),消費者從何處開始消費消息。它擁有三個可選值:
-
earliest:從最早的offset開始消費
-
latest:從最后的offset開始消費
-
none:直接拋出exception給consumer
看一下下面兩個場景:
1、Consumer消費了5條消息后宕機了,重啟之后它讀取到對應的partition的Committed Offset為5,因此會直接從第6條消息開始讀取。此時完全依賴於Committed Offset機制,和auto.offset.reset
配置完全無關。
2、新建了一個新的Group,並添加了一個Consumer,它訂閱了一個已經存在的Topic。此時Kafka中還沒有這個Consumer相應的Offset信息,因此此時Kafka就會根據auto.offset.reset
配置來決定這個Consumer從何處開始消費消息。