Kafka如何加載日志段


kafka日志作為日志段的容器,重點分析kafka日志是如何加載日志段。

Log源碼結構

  Log源碼位於kakfa core工程的log包下,對應的文件名為Log.scala。文件中中包含了與log有關的10個class或者object,見下圖所示。

  

 

 

  模塊概述

  LogAppendInfo(class)

    保存了一組待寫入消息的各種元數據信息,包含位移值或者最大消息的時間戳等,后續會進行詳細介紹

  LogAppendInfo(object)

    我們可以理解為定義了一些工廠方法,用來創建LogAppendInfo對象

  Log(class)

    Log源碼中最最核心的代碼

  Log(object)

    Log伴生類的工廠方法,里面有很多的輔助方法

  RollParams(C)

    定義了日志段是否需要切分的數據結構

  RollParams(object)

    RollParams的伴生類

  LogMetricNames

    定義了Log對象的監控指標

  LogOffsetSnapshot

    封裝分區所有位移元數據的容器類

  LogReadInfo

    封裝讀取日志返回的數據和元數據

  CompletedTxn

    記錄已完成事務的元數據,用來構建事務索引

  Log Class&Log Object

    下面我會一一介紹他們,我們都知道Scala的派生類一般存儲的是靜態變量或者靜態工廠方法等,考慮到派生類的閱讀比較容易,我們先分析下Log Object相關信息。我們先看下里面都定義了那些常量,見下圖所示

    

object Log {

  /** a log file */
  val LogFileSuffix = ".log"

  /** an index file */
  val IndexFileSuffix = ".index"

  /** a time index file */
  val TimeIndexFileSuffix = ".timeindex"

  val ProducerSnapshotFileSuffix = ".snapshot"

  /** an (aborted) txn index */
  val TxnIndexFileSuffix = ".txnindex"

  /** a file that is scheduled to be deleted */
  val DeletedFileSuffix = ".deleted"

  /** A temporary file that is being used for log cleaning */
  val CleanedFileSuffix = ".cleaned"

  /** A temporary file used when swapping files into the log */
  val SwapFileSuffix = ".swap"

  /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
   * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
   * avoided by passing in the recovery point, however finding the correct position to do this
   * requires accessing the offset index which may not be safe in an unclean shutdown.
   * For more information see the discussion in PR#2104
   */
  val CleanShutdownFile = ".kafka_cleanshutdown"

  /** a directory that is scheduled to be deleted */
  val DeleteDirSuffix = "-delete"

  /** a directory that is used for future partition */
  val FutureDirSuffix = "-future"
}

  我們發現這些常量 中包含了很多種的文件類型,除了大家熟知的index、timeindex、txnindex、log等,還有不認識的其他文件類型,我們對這些不認識的進行匯總說明。

  snapshot:是kafka對冪等型或者事務型producer所生成的快照文件。事務型冪等先不做介紹。

  deleted:刪除日志段操作時所創建的文件,目前刪除日志段文件的操作是異步的,

  cleaned和swarp是compaction操作的產物,后續會進行解釋

  delete 是應用於文件夾的,當我們刪除一個主題的時候,主題的分區文件夾會被加上這個后綴

  當然Log object中還定義了很多的工具類方法,比如下圖,基於偏移量計算出日志段的文件名,kafak日志段的固定長度是20位。

  

/**
   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
   * so that ls sorts the files numerically.
   *
   * @param offset The offset to use in the file name
   * @return The filename
   */
  def filenamePrefixFromOffset(offset: Long): String = {
    val nf = NumberFormat.getInstance()
    nf.setMinimumIntegerDigits(20)
    nf.setMaximumFractionDigits(0)
    nf.setGroupingUsed(false)
    nf.format(offset)
  }

  Log class 詳細介紹

   Log源碼中最重要的就是這個Log class了,代碼量還是比較大的,我們先從這個類定義的入參開始講起,見下圖所示。

  

class Log(@volatile private var _dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
        .......
}    

  我們發現這里面的字段比較多,重點關注以下兩個字段,分別是dir和logStartOffset。

  dir是主題分區的路徑

  logStartOffset是日志的最早的位移

  此外我們還需要關注一下幾個屬性

  nextOffsetmetadata:封裝了下一條待插入消息的位移值

  highWatermarkMetadata:用來區分日志的高水位值,即已提交事務和未提交事務的分界

  segment:保存了分區日志下所有的日志段信息,Map結構,key是日志段的起始位移,value是日志段本身對象

  大概了解了上文之后,我們一起看看Log類是如何初始化的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM