kafka日志作為日志段的容器,重點分析kafka日志是如何加載日志段。
Log源碼結構
Log源碼位於kakfa core工程的log包下,對應的文件名為Log.scala。文件中中包含了與log有關的10個class或者object,見下圖所示。

模塊概述
LogAppendInfo(class)
保存了一組待寫入消息的各種元數據信息,包含位移值或者最大消息的時間戳等,后續會進行詳細介紹
LogAppendInfo(object)
我們可以理解為定義了一些工廠方法,用來創建LogAppendInfo對象
Log(class)
Log源碼中最最核心的代碼
Log(object)
Log伴生類的工廠方法,里面有很多的輔助方法
RollParams(C)
定義了日志段是否需要切分的數據結構
RollParams(object)
RollParams的伴生類
LogMetricNames
定義了Log對象的監控指標
LogOffsetSnapshot
封裝分區所有位移元數據的容器類
LogReadInfo
封裝讀取日志返回的數據和元數據
CompletedTxn
記錄已完成事務的元數據,用來構建事務索引
Log Class&Log Object
下面我會一一介紹他們,我們都知道Scala的派生類一般存儲的是靜態變量或者靜態工廠方法等,考慮到派生類的閱讀比較容易,我們先分析下Log Object相關信息。我們先看下里面都定義了那些常量,見下圖所示
object Log {
/** a log file */
val LogFileSuffix = ".log"
/** an index file */
val IndexFileSuffix = ".index"
/** a time index file */
val TimeIndexFileSuffix = ".timeindex"
val ProducerSnapshotFileSuffix = ".snapshot"
/** an (aborted) txn index */
val TxnIndexFileSuffix = ".txnindex"
/** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
/** A temporary file that is being used for log cleaning */
val CleanedFileSuffix = ".cleaned"
/** A temporary file used when swapping files into the log */
val SwapFileSuffix = ".swap"
/** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*/
val CleanShutdownFile = ".kafka_cleanshutdown"
/** a directory that is scheduled to be deleted */
val DeleteDirSuffix = "-delete"
/** a directory that is used for future partition */
val FutureDirSuffix = "-future"
}
我們發現這些常量 中包含了很多種的文件類型,除了大家熟知的index、timeindex、txnindex、log等,還有不認識的其他文件類型,我們對這些不認識的進行匯總說明。
snapshot:是kafka對冪等型或者事務型producer所生成的快照文件。事務型冪等先不做介紹。
deleted:刪除日志段操作時所創建的文件,目前刪除日志段文件的操作是異步的,
cleaned和swarp是compaction操作的產物,后續會進行解釋
delete 是應用於文件夾的,當我們刪除一個主題的時候,主題的分區文件夾會被加上這個后綴
當然Log object中還定義了很多的工具類方法,比如下圖,基於偏移量計算出日志段的文件名,kafak日志段的固定長度是20位。
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
*
* @param offset The offset to use in the file name
* @return The filename
*/
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
Log class 詳細介紹
Log源碼中最重要的就是這個Log class了,代碼量還是比較大的,我們先從這個類定義的入參開始講起,見下圖所示。
class Log(@volatile private var _dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
.......
}
我們發現這里面的字段比較多,重點關注以下兩個字段,分別是dir和logStartOffset。
dir是主題分區的路徑
logStartOffset是日志的最早的位移
此外我們還需要關注一下幾個屬性
nextOffsetmetadata:封裝了下一條待插入消息的位移值
highWatermarkMetadata:用來區分日志的高水位值,即已提交事務和未提交事務的分界
segment:保存了分區日志下所有的日志段信息,Map結構,key是日志段的起始位移,value是日志段本身對象
大概了解了上文之后,我們一起看看Log類是如何初始化的。
