Kafka作為消息中間件,數據需要按照一定的規則刪除,否則數據量太大會把集群存儲空間占滿。
參考:apache Kafka是如何實現刪除數據文件(日志)的
Kafka刪除數據有兩種方式
- 按照時間,超過一段時間后刪除過期消息
- 按照消息大小,消息數量超過一定大小后刪除最舊的數據
Kafka刪除數據的最小單位:segment
Kafka刪除數據主邏輯:kafka源碼
def cleanupLogs() { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds for(log <- allLogs; if !log.config.compact) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") }
Kafka一段時間(配置文件設置)調用一次 cleanupLogs,刪除所有應該刪除的日志數據。
cleanupExpiredSegments 負責清理超時的數據
private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) }
cleanupSegmentsToMaintainSize 負責清理超過大小的數據
private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }