kafka日志作為日志段的容器,重點分析kafka日志是如何加載日志段。
Log源碼結構
Log源碼位於kakfa core工程的log包下,對應的文件名為Log.scala。文件中中包含了與log有關的10個class或者object,見下圖所示。
模塊概述
LogAppendInfo(class)
保存了一組待寫入消息的各種元數據信息,包含位移值或者最大消息的時間戳等,后續會進行詳細介紹
LogAppendInfo(object)
我們可以理解為定義了一些工廠方法,用來創建LogAppendInfo對象
Log(class)
Log源碼中最最核心的代碼
Log(object)
Log伴生類的工廠方法,里面有很多的輔助方法
RollParams(C)
定義了日志段是否需要切分的數據結構
RollParams(object)
RollParams的伴生類
LogMetricNames
定義了Log對象的監控指標
LogOffsetSnapshot
封裝分區所有位移元數據的容器類
LogReadInfo
封裝讀取日志返回的數據和元數據
CompletedTxn
記錄已完成事務的元數據,用來構建事務索引
Log Class&Log Object
下面我會一一介紹他們,我們都知道Scala的派生類一般存儲的是靜態變量或者靜態工廠方法等,考慮到派生類的閱讀比較容易,我們先分析下Log Object相關信息。我們先看下里面都定義了那些常量,見下圖所示
object Log { /** a log file */ val LogFileSuffix = ".log" /** an index file */ val IndexFileSuffix = ".index" /** a time index file */ val TimeIndexFileSuffix = ".timeindex" val ProducerSnapshotFileSuffix = ".snapshot" /** an (aborted) txn index */ val TxnIndexFileSuffix = ".txnindex" /** a file that is scheduled to be deleted */ val DeletedFileSuffix = ".deleted" /** A temporary file that is being used for log cleaning */ val CleanedFileSuffix = ".cleaned" /** A temporary file used when swapping files into the log */ val SwapFileSuffix = ".swap" /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher. * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be * avoided by passing in the recovery point, however finding the correct position to do this * requires accessing the offset index which may not be safe in an unclean shutdown. * For more information see the discussion in PR#2104 */ val CleanShutdownFile = ".kafka_cleanshutdown" /** a directory that is scheduled to be deleted */ val DeleteDirSuffix = "-delete" /** a directory that is used for future partition */ val FutureDirSuffix = "-future" }
我們發現這些常量 中包含了很多種的文件類型,除了大家熟知的index、timeindex、txnindex、log等,還有不認識的其他文件類型,我們對這些不認識的進行匯總說明。
snapshot:是kafka對冪等型或者事務型producer所生成的快照文件。事務型冪等先不做介紹。
deleted:刪除日志段操作時所創建的文件,目前刪除日志段文件的操作是異步的,
cleaned和swarp是compaction操作的產物,后續會進行解釋
delete 是應用於文件夾的,當我們刪除一個主題的時候,主題的分區文件夾會被加上這個后綴
當然Log object中還定義了很多的工具類方法,比如下圖,基於偏移量計算出日志段的文件名,kafak日志段的固定長度是20位。
/** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. * * @param offset The offset to use in the file name * @return The filename */ def filenamePrefixFromOffset(offset: Long): String = { val nf = NumberFormat.getInstance() nf.setMinimumIntegerDigits(20) nf.setMaximumFractionDigits(0) nf.setGroupingUsed(false) nf.format(offset) }
Log class 詳細介紹
Log源碼中最重要的就是這個Log class了,代碼量還是比較大的,我們先從這個類定義的入參開始講起,見下圖所示。
class Log(@volatile private var _dir: File, @volatile var config: LogConfig, @volatile var logStartOffset: Long, @volatile var recoveryPoint: Long, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, val time: Time, val maxProducerIdExpirationMs: Int, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { ....... }
我們發現這里面的字段比較多,重點關注以下兩個字段,分別是dir和logStartOffset。
dir是主題分區的路徑
logStartOffset是日志的最早的位移
此外我們還需要關注一下幾個屬性
nextOffsetmetadata:封裝了下一條待插入消息的位移值
highWatermarkMetadata:用來區分日志的高水位值,即已提交事務和未提交事務的分界
segment:保存了分區日志下所有的日志段信息,Map結構,key是日志段的起始位移,value是日志段本身對象
大概了解了上文之后,我們一起看看Log類是如何初始化的。