Kafka 0.8: 多日志文件夾機制


kafka 0.7.2 中對log.dir的定義如下:

log.dir none Specifies the root directory in which all log data is kept.

在kafka 0.8 中將log.dir 修改為 log.dirs,官方文檔說明如下:

log.dirs /tmp/kafka-logs

A comma-separated list of one or more directories in which Kafka data is stored. Each new partition that is created will be placed in the directory which currently has the fewest partitions.

從0.8開始,支持配置多個日志文件夾,文件夾之間使用逗號隔開即可,這樣做在實際項目中有非常大的好處,那就是支持多硬盤。

下面從源碼着手來淺析一下多日志文件夾是怎么工作的

1. 首先broker啟動時會加載指定的配置文件,並把property對象傳入KafkaConfig對象中

object Kafka extends Logging {
    try {
      val props = Utils.loadProps(args(0))
      val serverConfig = new KafkaConfig(props)

 

2. 在kafkaConfig 中會解析log.dirs字符串,將其通過逗號隔開,形成Set,調用split方法時傳入"\\s*,\\s*",表示逗號前后的空格都會被忽略

 /* the directories in which the log data is kept */
  val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs")))
  require(logDirs.size > 0)
  /**
   * Parse a comma separated string into a sequence of strings.
   * Whitespace surrounding the comma will be removed.
   */
  def parseCsvList(csvList: String): Seq[String] = {
    if(csvList == null || csvList.isEmpty)
      Seq.empty[String]
    else {
      csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
    }
  }

 

3. 在KafkaServer中生成LogManager對象時傳入 [(dir_path_1,File(dir_path_1)), (dir_path_2,File(dir_path_2)) ]

    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
                   topicConfigs = configs,
                   defaultConfig = defaultLogConfig,
                   cleanerConfig = cleanerConfig,
                   flushCheckMs = config.logFlushSchedulerIntervalMs,
                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                   retentionCheckMs = config.logCleanupIntervalMs,
                   scheduler = kafkaScheduler,
                   time = time)

 

4.LogManager首先對傳入的dir進行下列驗證:是否存在相同的文件夾、文件夾是否存在(不存在則創建)、是否為可讀的文件夾

  /**
   * Create and check validity of the given directories, specifically:
   * <ol>
   * <li> Ensure that there are no duplicates in the directory list
   * <li> Create each directory if it doesn't exist
   * <li> Check that each path is a readable directory 
   * </ol>
   */
  private def createAndValidateLogDirs(dirs: Seq[File]) {
    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
      throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
    for(dir <- dirs) {
      if(!dir.exists) {
        info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
        val created = dir.mkdirs()
        if(!created)
          throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
      }
      if(!dir.isDirectory || !dir.canRead)
        throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
    }
  }

 

5. LogManager 對所有的文件夾獲取文件鎖,防止其他進行對該文件夾進行操作

  /**
   * Lock all the given directories
   */
  private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
    dirs.map { dir =>
      val lock = new FileLock(new File(dir, LockFile))
      if(!lock.tryLock())
        throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + 
                               ". A Kafka instance in another process or thread is using this directory.")
      lock
    }
  }

 

6. 通過文件夾下面的recovery-point-offset-checkpoint 恢復加載每個目錄下面的partition文件

  /**
   * Recover and load all logs in the given data directories
   */
  private def loadLogs(dirs: Seq[File]) {
    for(dir <- dirs) {
      val recoveryPoints = this.recoveryPointCheckpoints(dir).read
      /* load the logs */
      val subDirs = dir.listFiles()
      if(subDirs != null) {
        //當kafka退出時,正常關閉的日志文件都會在該日志文件下生成.kafka_cleanshutdown為后綴的文件,該文件的作用是,在下次啟動時,此日志文件可以不進行恢復流程
        val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
        if(cleanShutDownFile.exists())
          info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
        for(dir <- subDirs) {
          if(dir.isDirectory) {
            info("Loading log '" + dir.getName + "'")
            val topicPartition = Log.parseTopicPartitionName(dir.getName)
            val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
            val log = new Log(dir, 
                              config,
                              recoveryPoints.getOrElse(topicPartition, 0L),
                              scheduler,
                              time)
            val previous = this.logs.put(topicPartition, log)
            if(previous != null)
              throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
        cleanShutDownFile.delete()
      }
    }
  }

 

7. 當需要創建新的日志文件時,會在日志文件比較少的文件夾下去創建,源碼中的注釋很詳細

  /**
   * Choose the next directory in which to create a log. Currently this is done
   * by calculating the number of partitions in each directory and then choosing the
   * data directory with the fewest partitions.
   */
  private def nextLogDir(): File = {
    if(logDirs.size == 1) {
      logDirs(0)
    } else {
      // count the number of logs in each parent directory (including 0 for empty directories
      val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
      val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
      //下面代碼的主要作用是,對沒有日志文件的文件夾設置size為0
      var dirCounts = (zeros ++ logCounts).toBuffer
    
      // choose the directory with the least logs in it
      val leastLoaded = dirCounts.sortBy(_._2).head
      new File(leastLoaded._1)
    }
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM