Kafka ConsumerRecord Timestamp


在監控binlog日志中,會有ts字段表示一個事務提交的時間戳,如果用這個時間戳處理數據,會出現同一個單號時間戳相同的情況。

於是考慮用kafka每條消息的時間戳來進行數據處理。

 

在消息中增加一個時間戳字段和時間戳類型,目前支持的時間戳類型有兩種:CreateTime和LogAppendTime,前者表示Producer創建這條消息的時間;后者表示broker接收到這條消息的時間(嚴格的講,是leader broker講消息寫入log的時間)

 

引入時間戳主要解決以下幾個問題:

  • 日志保存(log retention)策略:Kafka目前會定期刪除過期日志(log.retention.hours,默認是7天)。判斷的依據就是比較日志段文件(log segment file)的最新修改時間(last modification time)。倘若最近一次修改發生於7天前,那么就會視該日志段文件為過期日志,執行清除操作。但如果topic的某個分區曾經發生過分區副本的重分配(replica
    reassigment),那么就有可能會在一個新的broker上創建日志段文件,並把該文件的最新修改時間設置為最新時間,這樣設定的清除策略就無法執行了,盡管該日志段中的數據其實已經滿足可以被清除的條件了。
  • 日志切分(log rolling)策略:與日志保存是一樣的道理。當前日志段文件會根據規則對當前日志進行切分——即,創建一個新的日志段文件,並設置其為當前激活(active)日志段。其中有一條規則就是基於時間的(log.roll.hours,默認是7天),即當前日志段文件的最新一次修改發生於7天前的話,就創建一個新的日志段文件,並設置為active日志段。所以,它也有同樣的問題,即最近修改時間不是固定的,一旦發生分區副本重分配,該值就會發生變更,導致日志無法執行切分。(注意:log.retention.hours及其家族與log.rolling.hours及其家族不會沖突的,因為Kafka不會清除當前激活日志段文件)
  • 流式處理(Kafka streaming):流式處理中需要用到消息的時間戳

消息格式的變化:

  增加了timestamp字段,表示時間戳

  增加了timestamp類型字段,保存在attribute屬性低位的第四個比特上,0表示CreateTime;1表示LogAppendTime(低位前三個比特保存消息壓縮類型)

客戶端消息格式的變化:

  ProducerRecord:增加了timestamp字段,允許producer指定消息的時間戳,如果不指定的話使用producer客戶端的當前時間

  ConsumerRecord:增加了timestamp字段,允許消費消息時獲取到消息的時間戳
ProducerResponse: 增加了timestamp字段,如果是CreateTime返回-1;如果是LogAppendTime,返回寫入該條消息時broker的本地時間
 
Kafka broker config提供了一個參數:log.message.timestamp.type來統一指定集群中的所有topic使用哪種時間戳類型。用戶也可以為單個topic設置不同的時間戳類型,具體做法是創建topic時覆蓋掉全局配置:
kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 2 --topic streaming --config message.timestamp.type=LogAppendTime

 另外, producer在創建ProducerRecord時可以指定時間戳:

record = new ProducerRecord<String, String>("my-topic", null, System.currentTimeMillis(), "key", "value");

基於時間戳的功能:

  根據時間戳來定位消息:之前的索引文件是根據offset信息的,從邏輯語義上並不方便使用,引入了時間戳之后,Kafka支持根據時間戳來查找定位消息

  基於時間戳的日志切分策略

  基於時間戳的日志清除策略

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM