kafka 數據清除機制


Log Cleanup 簡介

在Kafka中,存在數據過期的機制,稱為data expire。如何處理過期數據是根據指定的policy(策略)決定的,而處理過期數據的行為,即為log cleanup。

在Kafka中有以下幾種處理過期數據的策略:

· log.cleanup.policy=delete(Kafka中所有用戶創建的topics,默認均為此策略)

o 根據數據已保存的時間,進行刪除(默認為1周)

o 根據log的max size,進行刪除(默認為-1,也就是無限制)

· log.cleanup.policy=compact(topic __consumer_offsets 默認為此策略)

o 根據messages中的key,進行刪除操作

o 在active segment 被commit 后,會刪除掉old duplicate keys

o 無限制的時間與空間的日志保留

自動清理Kafka中的數據可以控制磁盤上數據的大小、刪除不需要的數據,同時也減少了對Kafka集群的維護成本。

那Log cleanup 在什么時候發生呢?

· 首先值得注意的是:log cleanup 在partition segment 上發生

· 更小/更多的segment,也就意味着log cleanup 發生的頻率會上升

· Log cleanup 不應該頻繁發生=> 因為它會消耗CPU與內存資源

· Cleaner的檢查會在每15秒進行一次,由log.cleaner.backoff.ms 控制

log.cleanup.policy=delete

log.cleanup.policy=delete 的策略,根據數據保留的時間、以及log的max size,對數據進行cleanup。控制數據保留時間以及log max size的參數分別為:

· log.retention.hours:指定數據保留的時常(默認為一周,168)

o 將參數調整到更高的值,也就意味着會占據更多的磁盤空間

o 更小值意味着保存的數據量會更少(假如consumer 宕機超過一周,則數據便會再未處理前即丟失)

· log.retention.bytes:每個partition中保存的最大數據量大小(默認為-1,也就是無限大)

o 再控制log的大小不超過一個閾值時,會比較有用

在到達log cleanup 的條件后,cleaner會自動根據時間或是空間的規則進行刪除,新數據仍寫入active segment:

 

 

針對於這個參數,一般有以下兩種使用場景,分別為:

· log保留周期為一周,根據log保留期進行log cleanup:

o log.retention.hours=168 以及 log.retention.bytes=-1

· log保留期為無限制,根據log大小進行進行log cleanup:

o log.retention.hours=17520以及 log.retention.bytes=524288000

其中第一個場景會更常見。

Log Compaction

Log compaction用於確保:在一個partition中,對任意一個key,它所對應的value都是最新的。

這里舉個例子:我們有個topic名為employee-salary,我們希望維護每個employee當前最新的工資情況。

左邊的是compaction前,segments中的數據,右邊為compaction 后,segments中的數據,其中有部分key對應的value有更新:

 

 

可以看到在log compaction后,相對於更新后的key-value message,舊的message被刪除。

Log Compaction 有如下特點:

· messages的順序仍然是保留的,log compaction 僅移除一些messages,但不會重新對它們進行排序

· 一條message的offset是無法改變的(immutable),如果一條message缺失,則offset會直接被跳過

· 被刪除的records在一段時間內仍然可以被consumers訪問到,這段時間由參數delete.retention.ms(默認為24小時)控制

需要注意的是:Kafka 本身是不會組織用戶發送duplicate data的。這些重復數據也僅會在一個segment在被commit 的時候做重復數據刪除,所以consumer仍會讀取到這部分重復數據(如果客戶端有發的話)。

Log Compaction也會有時失敗,compaction thread 可能會crash,所以需要確保給Kafka server 足夠的內存用於做這些操作。如果log compaction異常,則需要重啟Kafka(此為一個已知的bug)。

Log Compaction也無法通過API手動觸發(至少到現在為止是這樣),只能server端自動觸發。

下面是一個 Log Compaction過程的示意圖:

 

 

正在寫入的records仍會被寫入Active Segment,已經committed segments會自動做compaction。此過程會遍歷所有segments中的records,並移除掉所有需要被移除的messages。

Log compaction由上文提到的log.cleanup.policy=compact進行配置,其中:

· Segment.ms(默認為7天):在關閉一個active segment前,所需等待的最長時間

· Segment.bytes(默認為1G):一個segment的最大大小

· Min.compaction .lag.ms(默認為0):在一個message可以被compact前,所需等待的時間

· Delete.retention.ms(默認為24小時):在一條message被加上刪除標記后,在實際刪除前等待的時間

· Min.Cleanable.dirty.ratio(默認為0.5):若是設置的更高,則會有更高效的清理,但是更少的清理操作觸發。若是設置的更低,則清理的效率稍低,但是會有更多的清理操作被觸發


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM