最近碰到了消息時間戳的問題,於是花了一些功夫研究了一下,特此記錄一下。
Kafka消息的時間戳
在消息中增加了一個時間戳字段和時間戳類型。目前支持的時間戳類型有兩種:
CreateTime 和 LogAppendTime 前者表示producer創建這條消息的時間;后者表示broker接收到這條消息的時間(嚴格來說,是leader broker將這條消息寫入到log的時間)
為什么要加入時間戳?
引入時間戳主要解決3個問題:
- 日志保存(log retention)策略:Kafka目前會定期刪除過期日志(log.retention.hours,默認是7天)。判斷的依據就是比較日志段文件(log segment file)的最新修改時間(last modification time)。倘若最近一次修改發生於7天前,那么就會視該日志段文件為過期日志,執行清除操作。但如果topic的某個分區曾經發生過分區副本的重分配(replica reassigment),那么就有可能會在一個新的broker上創建日志段文件,並把該文件的最新修改時間設置為最新時間,這樣設定的清除策略就無法執行了,盡管該日志段中的數據其實已經滿足可以被清除的條件了。
- 日志切分(log rolling)策略:與日志保存是一樣的道理。當前日志段文件會根據規則對當前日志進行切分——即,創建一個新的日志段文件,並設置其為當前激活(active)日志段。其中有一條規則就是基於時間的(log.roll.hours,默認是7天),即當前日志段文件的最新一次修改發生於7天前的話,就創建一個新的日志段文件,並設置為active日志段。所以,它也有同樣的問題,即最近修改時間不是固定的,一旦發生分區副本重分配,該值就會發生變更,導致日志無法執行切分。(注意:log.retention.hours及其家族與log.rolling.hours及其家族不會沖突的,因為Kafka不會清除當前激活日志段文件)
- 流式處理(Kafka streaming):流式處理中需要用到消息的時間戳
消息格式的變化
1 增加了timestamp字段,表示時間戳
2 增加了timestamp類型字段,保存在attribute屬性低位的第四個比特上,0表示CreateTime;1表示LogAppendTime(低位前三個比特保存消息壓縮類型)
客戶端消息格式的變化
ProducerRecord:增加了timestamp字段,允許producer指定消息的時間戳,如果不指定的話使用producer客戶端的當前時間
ConsumerRecord:增加了timestamp字段,允許消費消息時獲取到消息的時間戳
ProducerResponse: 增加了timestamp字段,如果是CreateTime返回-1;如果是LogAppendTime,返回寫入該條消息時broker的本地時間
如何使用時間戳?
Kafka broker config提供了一個參數:log.message.timestamp.type來統一指定集群中的所有topic使用哪種時間戳類型。用戶也可以為單個topic設置不同的時間戳類型,具體做法是創建topic時覆蓋掉全局配置:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --config message.timestamp.type=LogAppendTime
另外, producer在創建ProducerRecord時可以指定時間戳:
record = new ProducerRecord<String, String>("my-topic", null, System.currentTimeMillis(), "key", "value");
Kafka內部如何處理時間戳?
說起來太麻煩,直接上圖吧:

值得一提的是上圖中的”指定閾值“ —— 有時候我們需要實現這樣的場景:比如某條消息如果在5分鍾內還不能被創建出來那么就不再需要創建了,直接丟棄之。Kafka提供了log.message.timestamp.difference.max.ms和message.timestamp.difference.max.ms參數來實現這樣的需求,當然只對CreateTime類型的時間戳有效,如果是LogAppendTime則該參數無效。
基於時間戳的功能
1 根據時間戳來定位消息:之前的索引文件是根據offset信息的,從邏輯語義上並不方便使用,引入了時間戳之后,Kafka支持根據時間戳來查找定位消息
2 基於時間戳的日志切分策略
3 基於時間戳的日志清除策略
個人認為,第2,3點其實是引入時間戳的初衷,而第1點可以看做是時間戳的另一個功能應用。
基於時間戳的消息定位
自0.10.0.1開始,Kafka為每個topic分區增加了新的索引文件:基於時間的索引文件:<segment基礎位移>.timeindex,索引項間隔由index.interval.bytes確定。
具體的格式是時間戳+位移
時間戳記錄的是該日志段當前記錄的最大時間戳
位移信息記錄的是插入新的索引項時的消息位移信息
該索引文件中的每一行元組(時間戳T,位移offset)表示:該日志段中比T晚的所有消息的位移都比offset大。
由於創建了額外的索引文件,所需的操作系統文件句柄平均要增加1/3(原來需要2個文件,現在需要3個),所以有可能需要調整文件句柄的參數。