Kafka 源代碼分析之Log


這里分析Log對象本身的源代碼.

Log類是一個topic分區的基礎類.一個topic分區的所有基本管理動作.都在這個對象里完成.類源代碼文件為Log.scala.在源代碼log目錄下.

Log類是LogSegment的集合和管理封裝.首先看看初始化代碼.

class Log(val dir: File,                           //log的實例化對象在LogManager分析中已經介紹過.這里可以對照一下.
          @volatile var config: LogConfig,
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler,
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {

  import kafka.log.Log._    //這里是把同文件下的object加載進來.代碼在文件的末尾.

  /* A lock that guards all modifications to the log */
  private val lock = new Object             //鎖對象

  /* last time it was flushed */
  private val lastflushedTime = new AtomicLong(time.milliseconds)  //最后log刷新到磁盤的時間,這個變量貫穿整個管理過程.

  /* the actual segments of the log */
//這個對象是這個topic下所有分片的集合.這個集合貫徹整個log管理過程.之后所有動作都依賴此集合. private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() //將topic所有的分片加載到segments集合了.並做一些topic分片文件檢查工作. /* Calculate the offset of the next message */ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt) //activeSegment表示當前最后一個分片.因為分片是按大小分布.最大的就是最新的.也就是活躍的分片.這里生成下一個offsetmetadata val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) //獲取topic名稱和分區. info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString) //監控度量的映射標簽.
//下面全是通過metrics做的一些監控. newGauge(
"NumLogSegments", new Gauge[Int] { def value = numberOfSegments }, tags) newGauge("LogStartOffset", new Gauge[Long] { def value = logStartOffset }, tags) newGauge("LogEndOffset", new Gauge[Long] { def value = logEndOffset }, tags) newGauge("Size", new Gauge[Long] { def value = size }, tags) /** The name of this log */ def name = dir.getName()

 

  上面是Log class初始化的部分.這個部分最重要的就是聲明了幾個貫穿全過程的對象,並且將分片文件加載到內存對象中.

  下面看看主要的加載函數loadSegments.

private def loadSegments() {
    // create the log directory if it doesn't exist
    dir.mkdirs()       //這里是創建topic目錄的.本身的注釋也說明了這個.
    
    // first do a pass through the files in the log directory and remove any temporary files 
    // and complete any interrupted swap operations
    for(file <- dir.listFiles if file.isFile) {            //這個for循環是用來檢查分片是否是要被清除或者刪除的.
      if(!file.canRead)
        throw new IOException("Could not read file " + file)
      val filename = file.getName
      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
        // if the file ends in .deleted or .cleaned, delete it
        file.delete()
      } else if(filename.endsWith(SwapFileSuffix)) {       //這里檢查是不是有swap文件存在.根據不同情況刪除或重命名swap文件.
        // we crashed in the middle of a swap operation, to recover:
        // if a log, swap it in and delete the .index file
        // if an index just delete it, it will be rebuilt
        val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
        if(baseName.getPath.endsWith(IndexFileSuffix)) {
          file.delete()
        } else if(baseName.getPath.endsWith(LogFileSuffix)){
          // delete the index
          val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
          index.delete()
          // complete the swap operation
          val renamed = file.renameTo(baseName)
          if(renamed)
            info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
          else
            throw new KafkaException("Failed to rename file %s.".format(file.getPath))
        }
      }
    }

    // now do a second pass and load all the .log and .index files
    for(file <- dir.listFiles if file.isFile) {   //這個for循環是加載和檢查log分片是否存在的.
      val filename = file.getName
      if(filename.endsWith(IndexFileSuffix)) {
        // if it is an index file, make sure it has a corresponding .log file
        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
        if(!logFile.exists) {   //這里是如果只有index文件沒有對應的log文件.就把index文件清理掉.
          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
          file.delete()
        }
      } else if(filename.endsWith(LogFileSuffix)) {   //這里是創建LogSegment對象的地方.
        // if its a log file, load the corresponding log segment
        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
        val hasIndex = Log.indexFilename(dir, start).exists  //確認對應的index文件是否存在.
        val segment = new LogSegment(dir = dir, 
                                     startOffset = start,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time)
        if(!hasIndex) {
          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
          segment.recover(config.maxMessageSize)  //對應log文件的index不存在的話,進行recover.這個地方就是平常碰見kafka index出錯需要重新建立的時候管理員刪除了對應的index會引起的動作.
        }
        segments.put(start, segment) //將segment對象添加到總集里.
      }
    }

    if(logSegments.size == 0) {  //這里判斷是否是一個新的topic分區.尚不存在分片文件.所以創建一個空的分片文件對象.
      // no existing segments, create a new mutable segment beginning at offset 0
      segments.put(0L, new LogSegment(dir = dir,
                                     startOffset = 0,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time))
    } else {
      recoverLog()  //這里是topic分片不為空的話.就為檢查點設置新offset值.
      // reset the index size of the currently active log segment to allow more entries
      activeSegment.index.resize(config.maxIndexSize)
    }

    // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
    for (s <- logSegments)
      s.index.sanityCheck()  //index文件檢查.
  }

   看看recoverLog是做了哪些工作.

private def recoverLog() {
    // if we have the clean shutdown marker, skip recovery
    if(hasCleanShutdownFile) {    //看看是否有cleanshutdownfile存在.hasCleanShutdownFile函數就是判斷這個文件存不存在
      this.recoveryPoint = activeSegment.nextOffset //存在則直接把恢復檢查點設置成最后一個分片的最新offset值
      return
    }

    // okay we need to actually recovery this log
    val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator //這個是獲取檢查點到最大值之間是否還有其他的分片.也就是檢查檢查點是不是就是最后一個分片文件.
    while(unflushed.hasNext) { //如果不是最后一個分片.就獲取這個分片.然后調用這個對象的recover函數如果函數返回錯誤就刪除這個分片.
      val curr = unflushed.next 
      info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
      val truncatedBytes = 
        try {
          curr.recover(config.maxMessageSize)
        } catch {
          case e: InvalidOffsetException => 
            val startOffset = curr.baseOffset
            warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                 "creating an empty one with starting offset " + startOffset)
            curr.truncateTo(startOffset)
        }
      if(truncatedBytes > 0) {
        // we had an invalid message, delete all remaining log
        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
        unflushed.foreach(deleteSegment)
      }
    }
  }

    這個函數的處理動作.包裝的是LogSegment對同名對象.LogSegment的分析會在后續文章里繼續分析.現在接着看看Log對象的其他方法,即被LogManager函數里封裝的兩個個功能函數deleteOldSegment和flush.

  首先是deleteOldSegment函數

def deleteOldSegments(predicate: LogSegment => Boolean): Int = {  //這個函數就是在LogManager類中被用來處理清除log的函數.函數的參數是一個匿名函數
    // find any segments that match the user-supplied predicate UNLESS it is the final segment 
    // and it is empty (since we would just end up re-creating it
    val lastSegment = activeSegment //activeSegment 在上面也提到過是最后一個分片對象.
//這里predicate函數在LogManager類中是判斷是否超過大小限制和時間限制的.這里遍歷判斷每個分片是否達到限制,並且不是第一個分片或者分片不是空的. val deletable
= logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val numToDelete = deletable.size //這里獲得有多少個分片要被刪除. if(numToDelete > 0) { //如果有分片要被刪除則執行這個 lock synchronized { //這是一個同步塊. // we must always have at least one segment, so if we are going to delete all the segments, create a new one first if(segments.size == numToDelete) //如果要刪除的分片是這個topic下的所有分片的話.需要先通過roll創建新的分片. roll() // remove the segments for lookups deletable.foreach(deleteSegment(_)) //遍歷所有要被刪除的分片.將之刪除. } } numToDelete //返回刪除的分片個數. }

 

  上面函數可以看的很清楚,刪除一個分片所面臨的動作.下面貼上關於這個函數的一些相關被調用函數的解析.

  activeSegment,logSegments,deleteSegment函數.

def activeSegment = segments.lastEntry.getValue  //可以看見這個被多次使用的函數.就是分片集里的最后一個分片.
  
  /**
   * All the log segments in this log ordered from oldest to newest
   */
  def logSegments: Iterable[LogSegment] = {    //這是在上面deleteOldSegment函數中被調用的函數.是分片集的一個值集.
    import JavaConversions._
    segments.values
  }
  
  /**
   * Get all segments beginning with the segment that includes "from" and ending with the segment
   * that includes up to "to-1" or the end of the log (if to > logEndOffset)
   */
  def logSegments(from: Long, to: Long): Iterable[LogSegment] = {  //這是在recoverLog函數里被調用來查找檢查點記錄的offset是否是最后一個分片.
    import JavaConversions._
    lock synchronized {
      val floor = segments.floorKey(from)  //獲取最后一個分片.或者返回給出的值到Long.maxvalue之間的所有分片對象.
      if(floor eq null)
        segments.headMap(to).values
      else
        segments.subMap(floor, true, to, false).values
    }
  }

  再接着看看deleteSegment函數.這個函數是主要的刪除函數.並且這個函數也調用了其他函數來刪除分片對象.

private def deleteSegment(segment: LogSegment) {
    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
    lock synchronized {   //這里進行同步,
      segments.remove(segment.baseOffset)  //從集合里清楚這個分片對象.
      asyncDeleteSegment(segment)   //調用異步刪除方法來清理對象的相關文件.
    }
  }
  
  /**
   * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
   * @throws KafkaStorageException if the file can't be renamed and still exists 
   */
  private def asyncDeleteSegment(segment: LogSegment) {
    segment.changeFileSuffixes("", Log.DeletedFileSuffix)
    def deleteSeg() {
      info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
      segment.delete()
    }
//可以看見這里是調用了構建對象的時候傳遞進來的最初由KafkaServer.start里最開始初始化的線程池.然后把經過包裝的LogSegment.delete方法提交到線程池中運行. scheduler.schedule(
"delete-file", deleteSeg, delay = config.fileDeleteDelayMs) }

  上面的部分講解了LogManager.cleanupLogs函數的封裝函數具體處理工作.下面看看LogManager.flushDirtyLogs里的Log.flush是如何工作的.

def flush(): Unit = flush(this.logEndOffset)  //這是在logmanager中被調用的函數.

  /**
   * Flush log segments for all offsets up to offset-1
   * @param offset The offset to flush up to (non-inclusive); the new recovery point
   */
  def flush(offset: Long) : Unit = {  //這是真正工作的函數.
    if (offset <= this.recoveryPoint)  //首先判斷現在的offset是否是在檢查點offset范圍內的.如果是則不做任何動作.
      return
    debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
          time.milliseconds + " unflushed = " + unflushedMessages)
    for(segment <- logSegments(this.recoveryPoint, offset))  //找到檢查點offset到最新offset之間的所有分片.為這些分片調用LogSegment.flush函數
      segment.flush()   //通過這個函數刷新到磁盤.具體動作如何.會在后續LogSegment的文章里分析.
    lock synchronized {  //同步方法.
      if(offset > this.recoveryPoint) { 
        this.recoveryPoint = offset    //刷新之后更新最新的檢查點offset.
        lastflushedTime.set(time.milliseconds)  //更新最新刷新時間.
      }
    }
  }

  上面說完了核心的管理函數和加載函數.下面看看讀取和寫入相關的函數.read和append.

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {  //將消息添加到分片尾
    val appendInfo = analyzeAndValidateMessageSet(messages) //這里驗證log信息和創建logappendinfo.
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)  //如果消息為空的話,就直接返回信息.
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)  //這個函數將消息里多余的部分截掉.

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset  //這里開始分配offset值.即上最后一個分片的最后一個offset值.

        if(assignOffsets) {
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset) //創建新的offset
          try {
            validMessages = validMessages.assignOffsets(offset, appendInfo.codec) //使用ByteBufferMessageSet類中的分配方法分配offset值.
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1 //因為offset被assignOffsets方法累加過.所以最后要減1.
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { //這里判斷每一個消息是否大於配置的消息最大長度.
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

        // check messages set size may be exceed config.segmentSize
        if(validMessages.sizeInBytes > config.segmentSize) {  //判斷要寫入的消息集大小是否超過配置的分片大小.
          throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
            .format(validMessages.sizeInBytes, config.segmentSize))
        }


        // maybe roll the log if this segment is full
        val segment = maybeRoll(validMessages.sizeInBytes)  //這里是判斷是否需要滾動分片.

        // now append to the log
        segment.append(appendInfo.firstOffset, validMessages)  //這里真正調用LogSegment對象寫入消息.

        // increment the log end offset
        updateLogEndOffset(appendInfo.lastOffset + 1) //更新lastoffset.

        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))

        if(unflushedMessages >= config.flush)  //判斷是否需要刷新到磁盤
          flush()

        appendInfo
      }
    } catch {
      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
    }
  }

   由於涉及到消息寫入檢查等等動作.所以其中有很多操作需要看見message目錄下的關於message的具體實現才能了解.關於Message的具體解析會在后續的篇章里繼續分析.先簡略看看對應的驗證和處理函數analyzeAndValidateMessageSet和trimInvalidBytes

private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset, lastOffset = -1L
    var codec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    for(messageAndOffset <- messages.shallowIterator) { //這里是遍歷所有message對象.
      // update the first offset if on the first message
      if(firstOffset < 0)
        firstOffset = messageAndOffset.offset //設置第一個offset
      // check that offsets are monotonically increasing
      if(lastOffset >= messageAndOffset.offset)  //判斷最后offset是否失效.
        monotonic = false
      // update the last offset seen
      lastOffset = messageAndOffset.offset //設置最后一個offset

      val m = messageAndOffset.message  //具體message消息.

      // Check if the message sizes are valid.
      val messageSize = MessageSet.entrySize(m)
      if(messageSize > config.maxMessageSize) { //判斷消息大小時候超過配置最大消息大小.
        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
          .format(messageSize, config.maxMessageSize))
      }

      // check the validity of the message by checking CRC
      m.ensureValid() //校驗消息是否完整.

      shallowMessageCount += 1 //統計驗證完成的消息總個數.
      validBytesCount += messageSize //統計總大小.
      
      val messageCodec = m.compressionCodec  //是否啟用壓縮.
      if(messageCodec != NoCompressionCodec)
        codec = messageCodec
    }
//返回一個LogAppendInfo的對象. LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic) }
/** * Trim any invalid bytes from the end of this message set (if there are any) * @param messages The message set to trim * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. */ private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = { val messageSetValidBytes = info.validBytes if(messageSetValidBytes < 0) //查看消息是不是正常. throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") if(messageSetValidBytes == messages.sizeInBytes) { //消息正好正常.就直接返回 messages } else { // trim invalid bytes val validByteBuffer = messages.buffer.duplicate() validByteBuffer.limit(messageSetValidBytes) //不正常則通過從新設置limit把大小設置在驗證的大小上.丟棄后續部分. new ByteBufferMessageSet(validByteBuffer) //返回新的消息. } }

 

  這兩個函數是在append函數里被調用的預處理方法.里面涉及了message的操作.具體會在message的篇幅里分析.  

  寫入日志里做了日志管理中的另外一個工作.就是滾動日志分片.maybeRoll用來跟配置文件對照看是否需要創建新分片.

private def maybeRoll(messagesSize: Int): LogSegment = {
    val segment = activeSegment
//這里判斷是否需要滾動log分片.
if (segment.size > config.segmentSize - messagesSize || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." .format(name, segment.size, config.segmentSize, segment.index.entries, segment.index.maxEntries, time.milliseconds - segment.created, config.segmentMs - segment.rollJitterMs)) roll() //調用roll函數完成. } else { segment //不需要則直接返回當前分片. } } /** * Roll the log over to a new active segment starting with the current logEndOffset. * This will trim the index to the exact size of the number of entries it currently contains. * @return The newly rolled segment */ def roll(): LogSegment = { val start = time.nanoseconds lock synchronized { val newOffset = logEndOffset //以最后offset當作新分片文件名 val logFile = logFilename(dir, newOffset) //log文件名 val indexFile = indexFilename(dir, newOffset) //index文件名. for(file <- List(logFile, indexFile); if file.exists) { //判斷這兩個文件是否存在 warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") file.delete() //存在就刪除. } segments.lastEntry() match { case null => case entry => entry.getValue.index.trimToValidSize() } val segment = new LogSegment(dir, //創建一個新分片. startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time) val prev = addSegment(segment) //將新分片添加到集合中. if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) // schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) //提交一個刷新任務到線程池中. info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) segment } }

  上面討論了寫入一段消息.下面看看讀取一段消息.

def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
    trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

    // check if the offset is valid and in range
    val next = nextOffsetMetadata.messageOffset
    if(startOffset == next) //如果讀取的消息就是最新的消息.返回一個空消息對象.
      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
    
    var entry = segments.floorEntry(startOffset) //獲取對應的offset分片對象.
      
    // attempt to read beyond the log end offset is an error
    if(startOffset > next || entry == null)
      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
    
    // do the read on the segment with a base offset less than the target offset
    // but if that segment doesn't contain any messages with an offset greater than that
    // continue to read from successive segments until we get some messages or we reach the end of the log
    while(entry != null) {
      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)  //調用logsegment的read方法獲取消息.
      if(fetchInfo == null) {
        entry = segments.higherEntry(entry.getKey)
      } else {
        return fetchInfo  //成功返回新消息對象.
      }
    }
    
    // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
    // this can happen when all messages with offset larger than start offsets have been deleted.
    // In this case, we will return the empty set with log end offset metadata
    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) //失敗返回空消息對象.
  }

   讀取一段消息,依賴LogSegment的實現.具體將在后續篇章里分析.

到這里已經將Log類中的主要功能,方法都分析過了.關於Log的分析就到此結束了.

關於在Log中使用到的一些常量,以及常量方法.Log Object的內容就直接貼在下面部分了.

object Log {
  //這里就是一些常量和一些組合函數.
  /** a log file */
  val LogFileSuffix = ".log"
    
  /** an index file */
  val IndexFileSuffix = ".index"
    
  /** a file that is scheduled to be deleted */
  val DeletedFileSuffix = ".deleted"
    
  /** A temporary file that is being used for log cleaning */
  val CleanedFileSuffix = ".cleaned"
    
  /** A temporary file used when swapping files into the log */
  val SwapFileSuffix = ".swap"

  /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
    * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
  /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
  val CleanShutdownFile = ".kafka_cleanshutdown"

  /**
   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
   * so that ls sorts the files numerically.
   * @param offset The offset to use in the file name
   * @return The filename
   */
  def filenamePrefixFromOffset(offset: Long): String = {
    val nf = NumberFormat.getInstance()
    nf.setMinimumIntegerDigits(20)
    nf.setMaximumFractionDigits(0)
    nf.setGroupingUsed(false)
    nf.format(offset)
  }
  
  /**
   * Construct a log file name in the given dir with the given base offset
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   */
  def logFilename(dir: File, offset: Long) = 
    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
  
  /**
   * Construct an index file name in the given dir using the given base offset
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   */
  def indexFilename(dir: File, offset: Long) = 
    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
  

  /**
   * Parse the topic and partition out of the directory name of a log
   */
  def parseTopicPartitionName(name: String): TopicAndPartition = {
    val index = name.lastIndexOf('-')
    TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
  }
}

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM