關於Kafka日志留存(log retention)策略的介紹,網上已有很多文章。不過目前其策略已然發生了一些變化,故本文針對較新版本的Kafka做一次統一的討論。如果沒有顯式說明,本文一律以Kafka 1.0.0作為分析對象。
所謂日志留存策略,就是Kafka保存topic數據的規則,我將按照以下幾個方面分別介紹留存策略:
- 留存策略類型
- 留存機制及其工作原理
一、留存策略類型
目前,與日志留存方式相關的策略類型主要有兩種:delete和compact。這兩種留存方式的機制完全不同。本文主要討論針對delete類型的留存策略。用戶可以通過設置broker端參數log.cleanup.policy來指定集群上所有topic默認的策略類型。另外也可以通過topic級別參數cleanup.policy來為某些topic設置不同於默認值的策略類型。當前log.cleanup.policy參數的默認值是[delete,compact],這是一個list類型的參數,表示集群上所有topic會同時開啟delete和compact兩種留存策略——這是0.10.1.0新引入的功能,在0.10.1.0之前,該參數只能兩選一,不能同時兼顧,但在實際使用中很多用戶都抱怨compact類型的topic存在過期key消息未刪除的情況,故社區修改了該參數配置,允許一個topic同時開啟兩種留存策略。
再次強調下, 本文只討論delete類型的留存策略。
二、留存機制及其工作原理
在開始詳細介紹各種留存機制之前,先簡要說下Kafka是如何處理日志留存的。每個Kafka broker啟動時,都會在后台開啟一個定時任務,定期地去檢查並執行所有topic日志留存,這個定時任務觸發的時間周期由broker端參數log.retention.check.interval.ms控制,默認是5分鍾,即每台broker每5分鍾都會嘗試去檢查一下是否有可以刪除的日志。因此如果你要縮短這個間隔,只需要調小log.retention.check.interval.ms即可。
鑒於日志留存和日志刪除實際上是一個問題的兩個方面,因而我們下面討論的是關於Kafka根據什么規則來刪除日志。但有一點要強調一下,待刪除的標的是日志段,即LogSegment,也就是以.log結尾的一個個文件,而非整個文件夾。另外還有一點也很重要,當前日志段(active logsegment)是永遠不會被刪除的,不管用戶配置了哪種留存機制。
當前留存機制共有3種:
- 基於空間維度
- 基於時間維度
- 基於起始位移維度
前兩種策略相信大家已經耳熟能詳,而第三種策略由於新加入的時間不長,目前網上對其的介紹並不多。我們一個一個來看。
2.1 基於空間維度
也稱size-based retention,指的是Kafka定期為那些超過磁盤空間閾值的topic進行日志段的刪除。這個閾值由broker端參數log.retention.bytes和topic級別參數retention.bytes控制,默認是-1,表示Kafka當前未開啟這個留存機制,即不管topic日志量漲到多少,Kafka都不視其為“超過閾值”。如果用戶要開啟這種留存機制,必須顯式設置log.retention.bytes(或retention.bytes)。
一旦用戶設置了閾值,那么Kafka就會在定時任務中嘗試比較當前日志量總大小是否超過閾值至少一個日志段的大小。這里所說的總大小是指所有日志段文件的大小,不包括索引文件的大小!如果是則會嘗試從最老的日志段文件開始刪起。注意這里的“超過閾值至少一個日志段的大小”,這就是說超過閾值的部分必須要大於一個日志段的大小,否則不會進行刪除的,原因就是因為刪除的標的是日志段文件——即文件只能被當做一個整體進行刪除,無法刪除部分內容。
舉個例子來說明,假設日志段大小是700MB,當前分區共有4個日志段文件,大小分別是700MB,700MB,700MB和1234B——顯然1234B那個文件就是active日志段。此時該分區總的日志大小是3*700MB+1234B=2100MB+1234B,如果閾值設置為2000MB,那么超出閾值的部分就是100MB+1234B,小於日志段大小700MB,故Kafka不會執行任何刪除操作,即使總大小已經超過了閾值;反之如果閾值設置為1400MB,那么超過閾值的部分就是700MB+1234B > 700MB,此時Kafka會刪除最老的那個日志段文件。
2.2 基於時間維度
也稱time-based retention,指的是Kafka定期未那些超過時間閾值的topic進行日志段刪除操作。這個閾值由broker端參數log.retention.ms、log.retention.mintues、log.retention.hours以及topic級別參數retention.ms控制。如果同時設置了log.retention.ms、log.retention.mintues、log.retention.hours,以log.retention.ms優先級為最高,log.retention.mintues次之,log.retention.hours最次。當前這三個參數的默認值依次是null, null和168,故Kafka為每個topic默認保存7天的日志。
這里需要討論下這“7天”是如何界定的?在0.10.0.0之前,Kafka每次檢查時都會將當前時間與每個日志段文件的最新修改時間做比較,如果兩者的差值超過了上面設定的閾值(比如上面說的7天),那么Kafka就會嘗試刪除該文件。不過這種界定方法是有問題的,因為文件的最新修改時間是可變動的——比如用戶在終端通過touch命令查看該日志段文件或Kafka對該文件切分時都可能導致最新修改時間的變化從而擾亂了該規則的判定,因此自0.10.0.0版本起,Kafka在消息體中引入了時間戳字段(當然不是單純為了修復這個問題),並且為每個日志段文件都維護一個最大時間戳字段。通過將當前時間與該最大時間戳字段進行比較來判定是否過期。使用當前最大時間戳字段的好處在於它對用戶是透明的,用戶在外部無法直接修改它,故不會造成判定上的混亂。
最大時間戳字段的更新機制也很簡單,每次日志段寫入新的消息時,都會嘗試更新該字段。因為消息時間戳通常是遞增的,故每次寫入操作時都會保證最大時間戳字段是會被更新的,而一旦一個日志段寫滿了被切分之后它就不再接收任何新的消息,其最大時間戳字段的值也將保持不變。倘若該值距離當前時間超過了設定的閾值,那么該日志段文件就會被刪除。
2.3 基於起始位移維度
用戶對前兩種留存機制實際上是相當熟悉的,下面我們討論下第三種留存機制:基於日志起始位移(log start offset)。這實際上是0.11.0.0版本新增加的功能。其實增加這個功能的初衷主要是為了Kafka流處理應用——在流處理應用中存在着大量的中間消息,這些消息可能已經被處理過了,但依然保存在topic日志中,占用了大量的磁盤空間。如果通過設置基於時間維度的機制來刪除這些消息就需要用戶設置很小的時間閾值,這可能導致這些消息尚未被下游操作算子(operator)處理就被刪除;如果設置得過大,則極大地增加了空間占用。故社區在0.11.0.0引入了第三種留存機制:基於起始位移
所謂起始位移,就是指分區日志的當前起始位移——注意它是分區級別的值,而非日志段級別。故每個分區都只維護一個起始位移值。該值在初始化時被設置為最老日志段文件的基礎位移(base offset),隨着日志段的不斷刪除,該值會被更新當前最老日志段的基礎位移。另外Kafka提供提供了一個腳本命令幫助用戶設置指定分區的起始位移:kafka-delete-records.sh。
該留存機制是默認開啟的,不需要用戶任何配置。Kafka會為每個日志段做這樣的檢查:1. 獲取日志段A的下一個日志段B的基礎位移;2. 如果該值小於分區當前起始位移則刪除此日志段A。
依然拿例子還說明,假設我有一個topic,名字是test,該topic只有1個分區,該分區下有5個日志段文件,分別是A1.log, A2.log, A3.log, A4.log和A5.log,其中A5.log是active日志段。這5個日志段文件中消息范圍分別是0~9999,10000~19999,20000~29999,30000~39999和40000~43210(A5未寫滿)。如果此時我確信前3個日志段文件中的消息已經被處理過了,於是想刪除這3個日志段,此時我應該怎么做呢?由於我無法預知這些日志段文件產生的速度以及被消費的速度,因此不管是基於時間的刪除機制還是基於空間的刪除機制都是不適用的。此時我便可以使用kafka-delete-records.sh腳本將該分區的起始位移設置為A4.log的起始位移,即40000。為了做到這點,我需要首先創建一個JSON文件a.json,內容如下:
{"partitions":[{"topic": "test", "partition": 0,"offset": 40000}],"version":1}
然后執行下列命令:
bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file a.json
如果一切正常,應該可以看到類似於這樣的輸出:
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 40000
此時test的分區0的起始位移被手動調整為40000,那么理論上所有最大消息位移< 40000的日志段都可以被刪除了。有了這個機制,用戶可以實現更為靈活的留存策略。
以上就是關於當前Kafka針對於delete留存類型的topic的3種留存機制。也許在未來社區會增加更多的留存策略,我們拭目以待~