Kafka日志壓縮剖析


1.概述

最近有些同學在學習Kafka時,問到Kafka的日志壓縮(Log Compaction)問題,對於Kafka的日志壓縮有些疑惑,今天筆者就為大家來剖析一下Kafka的日志壓縮的相關內容。

2.內容

2.1 日志壓縮是什么?

Kafka是一個基於Log的流處理系統,一個Topic可以有若干個Partition,Partition是復制的基本單元,在一個Broker節點上,一個Partition的數據文件可以存儲在若干個獨立磁盤目錄中,每個Partition的日志文件存儲的時候又會被分成一個個的Segment,默認的Segment的大小是1GB,有屬性offsets.topic.segment.bytes來控制。Segment是日志清理的基本單元,當前正在使用的Segment是不會被清理的,對於每一個Partition的日志,以Segment為單位,都會被分為兩部分,已清理和未清理的部分。同時,未清理的那部分又分為可以清理和不可清理。日志壓縮是Kafka的一種機制,可以提供較為細粒度的記錄保留,而不是基於粗粒度的基於時間的保留。

Kafka中的每一條數據都包含Key和Value,數據存儲在磁盤上,一般不會永久保留,而是在數據達到一定的量或者時間后,對最早寫入的數據進行刪除。日志壓縮在默認的刪除規則之外提供了另一種刪除過時數據(或者說是保留有價值的數據)的方式,就是對於具有相同的Key,而數據不同,值保留最后一條數據,前面的數據在合適的情況下刪除。

2.2 日志壓縮的應用場景

日志壓縮特性,就實時計算來說,可以在異常容災方面有很好的應用途徑。比如,我們在Spark、Flink中做實時計算時,需要長期在內存里面維護一些數據,這些數據可能是通過聚合了一天或者一周的日志得到的,這些數據一旦由於異常因素(內存、網絡、磁盤等)崩潰了,從頭開始計算需要很長的時間。一個比較有效可行的方式就是定時將內存里的數據備份到外部存儲介質中,當崩潰出現時,再從外部存儲介質中恢復並繼續計算。

使用日志壓縮來替代這些外部存儲有哪些優勢及好處呢?這里為大家列舉並總結了幾點:

  • Kafka即是數據源又是存儲工具,可以簡化技術棧,降低維護成本
  • 使用外部存儲介質的話,需要將存儲的Key記錄下來,恢復的時候再使用這些Key將數據取回,實現起來有一定的工程難度和復雜度。使用Kafka的日志壓縮特性,只需要把數據寫進Kafka,等異常出現恢復任務時再讀回到內存就可以了
  • Kafka對於磁盤的讀寫做了大量的優化工作,比如磁盤順序讀寫。相對於外部存儲介質沒有索引查詢等工作量的負擔,可以實現高性能。同時,Kafka的日志壓縮機制可以充分利用廉價的磁盤,不用依賴昂貴的內存來處理,在性能相似的情況下,實現非常高的性價比(這個觀點僅僅針對於異常處理和容災的場景來說)

2.3 日志壓縮方式的實現細節

當Topic中的cleanup.policy(默認為delete)設置為compact時,Kafka的后台線程會定時將Topic遍歷兩次,第一次將每個Key的哈希值最后一次出現的offset記錄下來,第二次檢查每個offset對應的Key是否在較為后面的日志中出現過,如果出現了就刪除對應的日志。

日志壓縮是允許刪除的,這個刪除標記將導致刪除任何先前帶有該Key的消息,但是刪除標記的特殊之處在於,它們將在一段時間后從日志中清理,以釋放空間。這些需要注意的是,日志壓縮是針對Key的,所以在使用時應注意每個消息的Key值不為NULL。

壓縮是在Kafka后台通過定時的重新打開Segment來完成的,Segment的壓縮細節如下圖所示:

 

 日志壓縮可以確保的內容,這里筆者總結了以下幾點:

  • 任何保持在日志頭部以內的使用者都將看到所寫的每條消息,這些消息將具有順序偏移量。可以使用Topic的min.compaction.lag.ms屬性來保證消息在被壓縮之前必須經過的最短時間。也就是說,它為每個消息在(未壓縮)頭部停留的時間提供了一個下限。可以使用Topic的max.compaction.lag.ms屬性來保證從編寫消息到消息符合壓縮條件之間的最大延時
  • 消息始終保持順序,壓縮永遠不會重新排序消息,只是刪除一些而已
  • 消息的偏移量永遠不會改變,它是日志中位置的永久標識符
  • 從日志開始的任何使用者將至少看到所有記錄的最終狀態,按記錄的順序寫入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的時間內到達日志的頭部,則會看到已刪除記錄的所有delete標記。保留時間默認是24小時。

2.4 日志壓縮核心代碼實現

日志壓縮的核心實現代碼大部分的功能在CleanerThread中,核心實現邏輯在Cleaner中的clean方法中,實現細節如下:

 /**
   * Clean the given log
   *
   * @param cleanable The log to be cleaned
   *
   * @return The first offset not cleaned and the statistics for this round of cleaning
   */
  private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
    // figure out the timestamp below which it is safe to remove delete tombstones
    // this position is defined to be a configurable time beneath the last modified time of the last clean segment
    val deleteHorizonMs =
      cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
        case None => 0L
        case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
    }

    doClean(cleanable, deleteHorizonMs)
  }

  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
    info("Beginning cleaning of log %s.".format(cleanable.log.name))

    val log = cleanable.log
    val stats = new CleanerStats()

    // build the offset map
    info("Building offset map for %s...".format(cleanable.log.name))
    val upperBoundOffset = cleanable.firstUncleanableOffset
    buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
    val endOffset = offsetMap.latestOffset + 1
    stats.indexDone()

    // determine the timestamp up to which the log will be cleaned
    // this is the lower of the last active segment and the compaction lag
    val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)

    // group the segments and clean the groups
    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
    val transactionMetadata = new CleanedTransactionMetadata

    val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
      log.config.maxIndexSize, cleanable.firstUncleanableOffset)
    for (group <- groupedSegments)
      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)

    // record buffer utilization
    stats.bufferUtilization = offsetMap.utilization

    stats.allDone()

    (endOffset, stats)
  }

日志壓縮通過兩次遍歷所有的數據來實現,兩次遍歷之間交流的通道就是一個OffsetMap,下面是OffsetMap的內容:

trait OffsetMap {
  def slots: Int
  def put(key: ByteBuffer, offset: Long): Unit
  def get(key: ByteBuffer): Long
  def updateLatestOffset(offset: Long): Unit
  def clear(): Unit
  def size: Int
  def utilization: Double = size.toDouble / slots
  def latestOffset: Long
}

這基本就是一個普通的MuTable Map,在Kafka代碼中,它的實現只有一個叫做SkimpyOffsetMap

2.4.1 PUT方法

PUT方法會為每個Key生成一份信息,默認使用MD5方法生成一個Byte,根據這個信息在Byte中哈希的到一個下標,如果這個下標已經被別的占用,則線性查找到下個空余的下標為止,然后對應位置插入該Key的Offset,實現代碼如下:

 /**
   * Associate this offset to the given key.
   * @param key The key
   * @param offset The offset
   */
  override def put(key: ByteBuffer, offset: Long): Unit = {
    require(entries < slots, "Attempt to add a new entry to a full offset map.")
    lookups += 1
    hashInto(key, hash1)
    // probe until we find the first empty slot
    var attempt = 0
    var pos = positionOf(hash1, attempt)  
    while(!isEmpty(pos)) {
      bytes.position(pos)
      bytes.get(hash2)
      if(Arrays.equals(hash1, hash2)) {
        // we found an existing entry, overwrite it and return (size does not change)
        bytes.putLong(offset)
        lastOffset = offset
        return
      }
      attempt += 1
      pos = positionOf(hash1, attempt)
    }
    // found an empty slot, update it--size grows by 1
    bytes.position(pos)
    bytes.put(hash1)
    bytes.putLong(offset)
    lastOffset = offset
    entries += 1
  }

2.4.2 GET方法

GET方法使用和PUT同樣的算法獲取Key的信息,通過信息獲得Offset的存儲位置,實現代碼如下:

/**
   * Get the offset associated with this key.
   * @param key The key
   * @return The offset associated with this key or -1 if the key is not found
   */
  override def get(key: ByteBuffer): Long = {
    lookups += 1
    hashInto(key, hash1)
    // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot
    var attempt = 0
    var pos = 0
    //we need to guard against attempt integer overflow if the map is full
    //limit attempt to number of slots once positionOf(..) enters linear search mode
    val maxAttempts = slots + hashSize - 4
    do {
     if(attempt >= maxAttempts)
        return -1L
      pos = positionOf(hash1, attempt)
      bytes.position(pos)
      if(isEmpty(pos))
        return -1L
      bytes.get(hash2)
      attempt += 1
    } while(!Arrays.equals(hash1, hash2))
    bytes.getLong()
  }

3.配置實踐注意事項

默認情況下,啟動日志清理器,若需要啟動特定Topic的日志清理,請添加特定的屬性。配置日志清理器,這里為大家總結了以下幾點:

  • log.cleanup.policy設置為compact,該策略屬性是在Broker中配置,它會影響到集群中所有的Topic。
  • log.cleaner.min.compaction.lag.ms這個屬性用來防止對更新超過最小消息進行壓縮,如果沒有設置,除最后一個Segment之外,所有Segment都有資格進行壓縮
  • log.cleaner.max.compaction.lag.ms這個可以用來防止低生產速率的日志在無限制的時間內不適合壓縮

4.總結

Kafka的日志壓縮原理並不復雜,就是定時把所有的日志讀取兩遍,寫一遍,而CPU的速度超過磁盤完全不是問題,只要日志的量對應的讀取兩遍和寫入一遍的時間在可接受的范圍內,那么它的性能就是可以接受的。

另外,筆者開源的一款Kafka監控關系系統Kafka-Eagle,喜歡的同學可以Star一下,進行關注。

Kafka Eagle源代碼地址:https://github.com/smartloli/kafka-eagle

5.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM