Kafka深入理解-3:Kafka如何刪除數據(日志)文件


Kafka作為消息中間件,數據需要按照一定的規則刪除,否則數據量太大會把集群存儲空間占滿。

參考:apache Kafka是如何實現刪除數據文件(日志)的

Kafka刪除數據有兩種方式

  • 按照時間,超過一段時間后刪除過期消息
  • 按照消息大小,消息數量超過一定大小后刪除最舊的數據

Kafka刪除數據的最小單位:segment

Kafka刪除數據主邏輯:kafka源碼

def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      debug("Garbage collecting '" + log.name + "'")
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
    debug("Log cleanup completed. " + total + " files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }

Kafka一段時間(配置文件設置)調用一次 cleanupLogs,刪除所有應該刪除的日志數據。

cleanupExpiredSegments 負責清理超時的數據

private def cleanupExpiredSegments(log: Log): Int = {
    val startMs = time.milliseconds
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }

cleanupSegmentsToMaintainSize 負責清理超過大小的數據


private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM