Log Cleanup 簡介
在Kafka中,存在數據過期的機制,稱為data expire。如何處理過期數據是根據指定的policy(策略)決定的,而處理過期數據的行為,即為log cleanup。
在Kafka中有以下幾種處理過期數據的策略:
· log.cleanup.policy=delete(Kafka中所有用戶創建的topics,默認均為此策略)
o 根據數據已保存的時間,進行刪除(默認為1周)
o 根據log的max size,進行刪除(默認為-1,也就是無限制)
· log.cleanup.policy=compact(topic __consumer_offsets 默認為此策略)
o 根據messages中的key,進行刪除操作
o 在active segment 被commit 后,會刪除掉old duplicate keys
o 無限制的時間與空間的日志保留
自動清理Kafka中的數據可以控制磁盤上數據的大小、刪除不需要的數據,同時也減少了對Kafka集群的維護成本。
那Log cleanup 在什么時候發生呢?
· 首先值得注意的是:log cleanup 在partition segment 上發生
· 更小/更多的segment,也就意味着log cleanup 發生的頻率會上升
· Log cleanup 不應該頻繁發生=> 因為它會消耗CPU與內存資源
· Cleaner的檢查會在每15秒進行一次,由log.cleaner.backoff.ms 控制
log.cleanup.policy=delete
log.cleanup.policy=delete 的策略,根據數據保留的時間、以及log的max size,對數據進行cleanup。控制數據保留時間以及log max size的參數分別為:
· log.retention.hours:指定數據保留的時常(默認為一周,168)
o 將參數調整到更高的值,也就意味着會占據更多的磁盤空間
o 更小值意味着保存的數據量會更少(假如consumer 宕機超過一周,則數據便會再未處理前即丟失)
· log.retention.bytes:每個partition中保存的最大數據量大小(默認為-1,也就是無限大)
o 再控制log的大小不超過一個閾值時,會比較有用
在到達log cleanup 的條件后,cleaner會自動根據時間或是空間的規則進行刪除,新數據仍寫入active segment:

針對於這個參數,一般有以下兩種使用場景,分別為:
· log保留周期為一周,根據log保留期進行log cleanup:
o log.retention.hours=168 以及 log.retention.bytes=-1
· log保留期為無限制,根據log大小進行進行log cleanup:
o log.retention.hours=17520以及 log.retention.bytes=524288000
其中第一個場景會更常見。
Log Compaction
Log compaction用於確保:在一個partition中,對任意一個key,它所對應的value都是最新的。
這里舉個例子:我們有個topic名為employee-salary,我們希望維護每個employee當前最新的工資情況。
左邊的是compaction前,segments中的數據,右邊為compaction 后,segments中的數據,其中有部分key對應的value有更新:

可以看到在log compaction后,相對於更新后的key-value message,舊的message被刪除。
Log Compaction 有如下特點:
· messages的順序仍然是保留的,log compaction 僅移除一些messages,但不會重新對它們進行排序
· 一條message的offset是無法改變的(immutable),如果一條message缺失,則offset會直接被跳過
· 被刪除的records在一段時間內仍然可以被consumers訪問到,這段時間由參數delete.retention.ms(默認為24小時)控制
需要注意的是:Kafka 本身是不會組織用戶發送duplicate data的。這些重復數據也僅會在一個segment在被commit 的時候做重復數據刪除,所以consumer仍會讀取到這部分重復數據(如果客戶端有發的話)。
Log Compaction也會有時失敗,compaction thread 可能會crash,所以需要確保給Kafka server 足夠的內存用於做這些操作。如果log compaction異常,則需要重啟Kafka(此為一個已知的bug)。
Log Compaction也無法通過API手動觸發(至少到現在為止是這樣),只能server端自動觸發。
下面是一個 Log Compaction過程的示意圖:

正在寫入的records仍會被寫入Active Segment,已經committed segments會自動做compaction。此過程會遍歷所有segments中的records,並移除掉所有需要被移除的messages。
Log compaction由上文提到的log.cleanup.policy=compact進行配置,其中:
· Segment.ms(默認為7天):在關閉一個active segment前,所需等待的最長時間
· Segment.bytes(默認為1G):一個segment的最大大小
· Min.compaction .lag.ms(默認為0):在一個message可以被compact前,所需等待的時間
· Delete.retention.ms(默認為24小時):在一條message被加上刪除標記后,在實際刪除前等待的時間
· Min.Cleanable.dirty.ratio(默認為0.5):若是設置的更高,則會有更高效的清理,但是更少的清理操作觸發。若是設置的更低,則清理的效率稍低,但是會有更多的清理操作被觸發