概述
日志段及其相關代碼是 Kafka 服務器源碼中最為重要的組件代碼之一。你可能會非常關心,在 Kafka 中,消息是如何被保存和組織在一起的。畢竟,不管是學習任何消息引擎,弄明白消息建模方式都是首要的問題。因此,你非常有必要學習日志段這個重要的子模塊的源碼實現。今天,我會帶你詳細看下日志段部分的源碼。不過在此之前,你需要先了解一下 Kafka 的日志結構日志是 Kafka 服務器端代碼的重要組件之一,很多其他的核心組件都是以日志為基礎的,比如后面要講到的狀態管理機和副本管理器等。
總的來說,Kafka 日志對象由多個日志段對象組成,而每個日志段對象會在磁盤上創建一組文件,包括消息日志文件(.log)、位移索引文件(.index)、時間戳索引文件(.timeindex)以及已中止(Aborted)事務的索引文件(.txnindex)。當然,如果你沒有使用 Kafka 事務,已中止事務的索引文件是不會被創建出來的。圖中的一串數字 0 是該日志段的起始位移值(Base Offset),也就是該日志段中所存的第一條消息的位移值。
日志段核心代碼
日志段源碼位於 Kafka 的 core 工程下,具體文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。實際上,所有日志結構部分的源碼都在 core 的 kafka.log 包下。該文件下定義了三個 Scala 對象:LogSegment class;LogSegment object;LogFlushStats object。LogFlushStats 結尾有個 Stats,它是做統計用的,主要負責為日志落盤進行計時。每個日志段由兩個核心組件構成:日志和索引。當然,這里的索引泛指廣義的索引文件。另外,這段注釋還給出了一個重要的事實:每個日志段都有一個起始位移值(Base Offset),而該位移值是此日志段所有消息中最小的位移值,同時,該值卻又比前面任何日志段中消息的位移值都大。
下面,我分批次給出比較關鍵的代碼片段,並對其進行解釋。首先,我們看下 LogSegment 的定義:
1 class LogSegment private[log] (val log: FileRecords, 2 val lazyOffsetIndex: LazyIndex[OffsetIndex], 3 val lazyTimeIndex: LazyIndex[TimeIndex], 4 val txnIndex: TransactionIndex, 5 val baseOffset: Long, 6 val indexIntervalBytes: Int, 7 val rollJitterMs: Long, 8 val time: Time) extends Logging { … }
就像我前面說的,一個日志段包含消息日志文件、位移索引文件、時間戳索引文件、已中止事務索引文件等。這里的 FileRecords 就是實際保存 Kafka 消息的對象。專欄后面我將專門討論 Kafka 是如何保存具體消息的,也就是 FileRecords 及其家族的實現方式。同時,我還會給你介紹一下社區在持久化消息這塊是怎么演進的,你一定不要錯過那部分的內容。下面的 lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分別對應於剛才所說的 3 個索引文件。不過,在實現方式上,前兩種使用了延遲初始化的原理,降低了初始化時間成本。后面我們在談到索引的時候再詳細說。
每個日志段對象保存自己的起始位移 baseOffset——這是非常重要的屬性!事實上,你在磁盤上看到的文件名就是 baseOffset 的值。每個 LogSegment 對象實例一旦被創建,它的起始位移就是固定的了,不能再被更改。
對於一個日志段而言,最重要的方法就是寫入消息和讀取消息了,它們分別對應着源碼中的 append 方法和 read 方法。另外,recover 方法同樣很關鍵,它是 Broker 重啟后恢復日志段的操作邏輯。
append 方法
我們先來看 append 方法,了解下寫入消息的具體操作。append 方法接收 4 個參數,分別表示待寫入消息批次中消息的最大位移值、最大時間戳、最大時間戳對應消息的位移以及真正要寫入的消息集合。下面這張圖展示了 append 方法的完整執行流程:
第一步:在源碼中,首先調用 log.sizeInBytes 方法判斷該日志段是否為空,如果是空的話, Kafka 需要記錄要寫入消息集合的最大時間戳,並將其作為后面新增日志段倒計時的依據。
第二步:代碼調用 ensureOffsetInRange 方法確保輸入參數最大位移值是合法的。那怎么判斷是不是合法呢?標准就是看它與日志段起始位移的差值是否在整數范圍內,即 largestOffset - baseOffset 的值是不是介於 [0,Int.MAXVALUE] 之間。在極個別的情況下,這個差值可能會越界,這時,append 方法就會拋出異常,阻止后續的消息寫入。一旦你碰到這個問題,你需要做的是升級你的 Kafka 版本,因為這是由已知的 Bug 導致的。
第三步:待這些做完之后,append 方法調用 FileRecords 的 append 方法執行真正的寫入。前面說過了,專欄后面我們會詳細介紹 FileRecords 類。這里你只需要知道它的工作是將內存中的消息對象寫入到操作系統的頁緩存就可以了。
第四步:再下一步,就是更新日志段的最大時間戳以及最大時間戳所屬消息的位移值屬性。每個日志段都要保存當前最大時間戳信息和所屬消息的位移信息。還記得 Broker 端提供定期刪除日志的功能嗎?比如我只想保留最近 7 天的日志,沒錯,當前最大時間戳這個值就是判斷的依據;而最大時間戳對應的消息的位移值則用於時間戳索引項。雖然后面我會詳細介紹,這里我還是稍微提一下:時間戳索引項保存時間戳與消息位移的對應關系。在這步操作中,Kafka 會更新並保存這組對應關系。
第五步:append 方法的最后一步就是更新索引項和寫入的字節數了。我在前面說過,日志段每寫入 4KB 數據就要寫入一個索引項。當已寫入字節數超過了 4KB 之后,append 方法會調用索引對象的 append 方法新增索引項,同時清空已寫入字節數,以備下次重新累積計算。
read 方法
好了,append 方法我就解釋完了。下面我們來看 read 方法,了解下讀取日志段的具體操作。
read 方法接收 4 個輸入參數。startOffset:要讀取的第一條消息的位移;maxSize:能讀取的最大字節數;maxPosition :能讀到的最大文件位置;minOneMessage:是否允許在消息體過大時至少返回第一條消息。前 3 個參數的含義很好理解,我重點說下第 4 個。當這個參數為 true 時,即使出現消息體字節數超過了 maxSize 的情形,read 方法依然能返回至少一條消息。引入這個參數主要是為了確保不出現消費餓死的情況。
邏輯很簡單,我們一步步來看下。第一步是調用 translateOffset 方法定位要讀取的起始文件位置 (startPosition)。輸入參數 startOffset 僅僅是位移值,Kafka 需要根據索引信息找到對應的物理文件位置才能開始讀取消息。待確定了讀取起始位置,日志段代碼需要根據這部分信息以及 maxSize 和 maxPosition 參數共同計算要讀取的總字節數。舉個例子,假設 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能讀取 50 字節,因為 maxPosition - startPosition = 50。我們把它和 maxSize 參數相比較,其中的最小值就是最終能夠讀取的總字節數。最后一步是調用 FileRecords 的 slice 方法,從指定位置讀取指定大小的消息集合。
recover 方法
除了 append 和 read 方法,LogSegment 還有一個重要的方法需要我們關注,它就是 recover 方法,用於恢復日志段。下面的代碼是 recover 方法源碼。什么是恢復日志段呢?其實就是說, Broker 在啟動時會從磁盤上加載所有日志段信息到內存中,並創建相應的 LogSegment 對象實例。在這個過程中,它需要執行一系列的操作。
recover 開始時,代碼依次調用索引對象的 reset 方法清空所有的索引文件,之后會開始遍歷日志段中的所有消息集合或消息批次(RecordBatch)。對於讀取到的每個消息集合,日志段必須要確保它們是合法的,這主要體現在兩個方面:該集合中的消息必須要符合 Kafka 定義的二進制格式;該集合中最后一條消息的位移值不能越界,即它與日志段起始位移的差值必須是一個正整數值。
校驗完消息集合之后,代碼會更新遍歷過程中觀測到的最大時間戳以及所屬消息的位移值。同樣,這兩個數據用於后續構建索引項。再之后就是不斷累加當前已讀取的消息字節數,並根據該值有條件地寫入索引項。最后是更新事務型 Producer 的狀態以及 Leader Epoch 緩存。不過,這兩個並不是理解 Kafka 日志結構所必需的組件,因此,我們可以忽略它們。遍歷執行完成后,Kafka 會將日志段當前總字節數和剛剛累加的已讀取字節數進行比較,如果發現前者比后者大,說明日志段寫入了一些非法消息,需要執行截斷操作,將日志段大小調整回合法的數值。同時, Kafka 還必須相應地調整索引文件的大小。把這些都做完之后,日志段恢復的操作也就宣告結束了。
日志Log基礎知識
你可以認為,日志是日志段的容器,里面定義了很多管理日志段的操作。坦率地說,如果看 Kafka 源碼卻不看 Log,就跟你買了這門課卻不知道作者是誰一樣。在我看來,Log 對象是 Kafka 源碼(特別是 Broker 端)最核心的部分,沒有之一。
Log 源碼結構
Log 源碼位於 Kafka core 工程的 log 源碼包下,文件名是 Log.scala。總體上,該文件定義了 10 個類和對象,如下圖所示:
圖中括號里的 C 表示 Class,O 表示 Object。還記得我在上節課提到過的伴生對象嗎?沒錯,同時定義同名的 Class 和 Object,就屬於 Scala 中的伴生對象用法。我們先來看伴生對象,也就是 LogAppendInfo、Log 和 RollParams。
LogAppendInfo
LogAppendInfo(C):保存了一組待寫入消息的各種元數據信息。比如,這組消息中第一條消息的位移值是多少、最后一條消息的位移值是多少;再比如,這組消息中最大的消息時間戳又是多少。總之,這里面的數據非常豐富(下節課我再具體說說)。LogAppendInfo(O): 可以理解為其對應伴生類的工廠方法類,里面定義了一些工廠方法,用於創建特定的 LogAppendInfo 實例。
Log
Log(C): Log 源碼中最核心的代碼。這里我先賣個關子,一會兒細聊。Log(O):同理,Log 伴生類的工廠方法,定義了很多常量以及一些輔助方法。
RollParams
RollParams(C):定義用於控制日志段是否切分(Roll)的數據結構。
RollParams(O):同理,RollParams 伴生類的工廠方法。
除了這 3 組伴生對象之外,還有 4 類源碼。LogMetricNames:定義了 Log 對象的監控指標。LogOffsetSnapshot:封裝分區所有位移元數據的容器類。LogReadInfo:封裝讀取日志返回的數據及其元數據。CompletedTxn:記錄已完成事務的元數據,主要用於構建事務索引。
Log Class & Object
下面,我會按照這些類和對象的重要程度,對它們一一進行拆解。首先,咱們先說說 Log 類及其伴生對象。考慮到伴生對象多用於保存靜態變量和靜態方法(比如靜態工廠方法等),因此我們先看伴生對象(即 Log Object)的實現。
1 object Log { 2 val LogFileSuffix = ".log" 3 val IndexFileSuffix = ".index" 4 val TimeIndexFileSuffix = ".timeindex" 5 val ProducerSnapshotFileSuffix = ".snapshot" 6 val TxnIndexFileSuffix = ".txnindex" 7 val DeletedFileSuffix = ".deleted" 8 val CleanedFileSuffix = ".cleaned" 9 val SwapFileSuffix = ".swap" 10 val CleanShutdownFile = ".kafka_cleanshutdown" 11 val DeleteDirSuffix = "-delete" 12 val FutureDirSuffix = "-future" 13 …… 14 }
這是 Log Object 定義的所有常量。如果有面試官問你 Kafka 中定義了多少種文件類型,你可以自豪地把這些說出來。耳熟能詳的.log、.index、.timeindex 和.txnindex 我就不解釋了,我們來了解下其他幾種文件類型。
.snapshot 是 Kafka 為冪等型或事務型 Producer 所做的快照文件。鑒於我們現在還處於閱讀源碼的初級階段,事務或冪等部分的源碼我就不詳細展開講了。
.deleted 是刪除日志段操作創建的文件。目前刪除日志段文件是異步操作,Broker 端把日志段文件從.log 后綴修改為.deleted 后綴。如果你看到一大堆.deleted 后綴的文件名,別慌,這是 Kafka 在執行日志段文件刪除。
.cleaned 和.swap 都是 Compaction 操作的產物,等我們講到 Cleaner 的時候再說。
-delete 則是應用於文件夾的。當你刪除一個主題的時候,主題的分區文件夾會被加上這個后綴。
-future 是用於變更主題分區文件夾地址的,屬於比較高階的用法。
總之,記住這些常量吧。記住它們的主要作用是,以后不要被面試官唬住!開玩笑,其實這些常量最重要的地方就在於,它們能夠讓你了解 Kafka 定義的各種文件類型。Log Object 還定義了超多的工具類方法。由於它們都很簡單,這里我只給出一個方法的源碼,我們一起讀一下。
1 def filenamePrefixFromOffset(offset: Long): String = { 2 val nf = NumberFormat.getInstance() 3 nf.setMinimumIntegerDigits(20) 4 nf.setMaximumFractionDigits(0) 5 nf.setGroupingUsed(false) 6 nf.format(offset) 7 }
這個方法的作用是通過給定的位移值計算出對應的日志段文件名。Kafka 日志文件固定是 20 位的長度,filenamePrefixFromOffset 方法就是用前面補 0 的方式,把給定位移值擴充成一個固定 20 位長度的字符串。
下面我們來看 Log 源碼部分的重頭戲:Log 類。這是一個 2000 多行的大類。放眼整個 Kafka 源碼,像 Log 這么大的類也不多見,足見它的重要程度。我們先來看這個類的定義:
1 class Log(@volatile var dir: File, 2 @volatile var config: LogConfig, 3 @volatile var logStartOffset: Long, 4 @volatile var recoveryPoint: Long, 5 scheduler: Scheduler, 6 brokerTopicStats: BrokerTopicStats, 7 val time: Time, 8 val maxProducerIdExpirationMs: Int, 9 val producerIdExpirationCheckIntervalMs: Int, 10 val topicPartition: TopicPartition, 11 val producerStateManager: ProducerStateManager, 12 logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { 13 …… 14 }
看着好像有很多屬性,但其實,你只需要記住兩個屬性的作用就夠了:dir 和 logStartOffset。dir 就是這個日志所在的文件夾路徑,也就是主題分區的路徑。而 logStartOffset,表示日志的當前最早位移。dir 和 logStartOffset 都是 volatile var 類型,表示它們的值是變動的,而且可能被多個線程更新。你可能聽過日志的當前末端位移,也就是 Log End Offset(LEO),它是表示日志下一條待插入消息的位移值,而這個 Log Start Offset 是跟它相反的,它表示日志當前對外可見的最早一條消息的位移值。我用一張圖來標識它們的區別:
有意思的是,Log End Offset 可以簡稱為 LEO,但 Log Start Offset 卻不能簡稱為 LSO。因為在 Kafka 中,LSO 特指 Log Stable Offset,屬於 Kafka 事務的概念。
其實,除了 Log 類簽名定義的這些屬性之外,Log 類還定義了一些很重要的屬性,比如下面這段代碼:
1 @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ 2 @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset) 3 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] 4 @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
第一個屬性 nextOffsetMetadata,它封裝了下一條待插入消息的位移值,你基本上可以把這個屬性和 LEO 等同起來。
第二個屬性 highWatermarkMetadata,是分區日志高水位值。
第三個屬性 segments,我認為這是 Log 類中最重要的屬性。它保存了分區日志下所有的日志段信息,只不過是用 Map 的數據結構來保存的。Map 的 Key 值是日志段的起始位移值,Value 則是日志段對象本身。Kafka 源碼使用 ConcurrentNavigableMap 數據結構來保存日志段對象,就可以很輕松地利用該類提供的線程安全和各種支持排序的方法,來管理所有日志段對象。
第四個屬性是 Leader Epoch Cache 對象。Leader Epoch 是社區於 0.11.0.0 版本引入源碼中的,主要是用來判斷出現 Failure 時是否執行日志截斷操作(Truncation)。之前靠高水位來判斷的機制,可能會造成副本間數據不一致的情形。這里的 Leader Epoch Cache 是一個緩存類數據,里面保存了分區 Leader 的 Epoch 值與對應位移值的映射關系,我建議你查看下 LeaderEpochFileCache 類,深入地了解下它的實現原理.
LOG對象基礎操作
我一般習慣把 Log 的常見操作分為 4 大部分。高水位管理操作:高水位的概念在 Kafka 中舉足輕重,對它的管理,是 Log 最重要的功能之一。日志段管理:Log 是日志段的容器。高效組織與管理其下轄的所有日志段對象,是源碼要解決的核心問題。關鍵位移值管理:日志定義了很多重要的位移值,比如 Log Start Offset 和 LEO 等。確保這些位移值的正確性,是構建消息引擎一致性的基礎。讀寫操作:所謂的操作日志,大體上就是指讀寫日志。讀寫操作的作用之大,不言而喻。
高水位管理操作
源碼中日志對象定義高水位的語句只有一行:
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
這行語句傳達了兩個重要的事實:高水位值是 volatile(易變型)的。因為多個線程可能同時讀取它,因此需要設置成 volatile,保證內存可見性。另外,由於高水位值可能被多個線程同時修改,因此源碼使用 Java Monitor 鎖來確保並發修改的線程安全。高水位值的初始值是 Log Start Offset 值。每個 Log 對象都會維護一個 Log Start Offset 值。當首次構建高水位時,它會被賦值成 Log Start Offset 值。你可能會關心 LogOffsetMetadata 是什么對象。因為它比較重要,我們一起來看下這個類的定義:
1 case class LogOffsetMetadata(messageOffset: Long, 2 segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
顯然,它就是一個 POJO 類,里面保存了三個重要的變量。
messageOffset:消息位移值,這是最重要的信息。我們總說高水位值,其實指的就是這個變量的值。
segmentBaseOffset:保存該位移值所在日志段的起始位移。日志段起始位移值輔助計算兩條消息在物理磁盤文件中位置的差值,即兩條消息彼此隔了多少字節。這個計算有個前提條件,即兩條消息必須處在同一個日志段對象上,不能跨日志段對象。否則它們就位於不同的物理文件上,計算這個值就沒有意義了。這里的 segmentBaseOffset,就是用來判斷兩條消息是否處於同一個日志段的。
relativePositionSegment:保存該位移值所在日志段的物理磁盤位置。這個字段在計算兩個位移值之間的物理磁盤位置差值時非常有用。你可以想一想,Kafka 什么時候需要計算位置之間的字節數呢?答案就是在讀取日志的時候。假設每次讀取時只能讀 1MB 的數據,那么,源碼肯定需要關心兩個位移之間所有消息的總字節數是否超過了 1MB。
LogOffsetMetadata 類的所有方法,都是圍繞這 3 個變量展開的工具輔助類方法,非常容易理解。我會給出一個方法的詳細解釋,剩下的你可以舉一反三。
1 def onSameSegment(that: LogOffsetMetadata): Boolean = { 2 if (messageOffsetOnly) 3 throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") 4 5 this.segmentBaseOffset == that.segmentBaseOffset 6 }
看名字我們就知道了,這個方法就是用來判斷給定的兩個 LogOffsetMetadata 對象是否處於同一個日志段的。判斷方法很簡單,就是比較兩個 LogOffsetMetadata 對象的 segmentBaseOffset 值是否相等。
獲取和設置高水位值,關於獲取高水位值的方法,其實很好理解,我就不多說了。設置高水位值的方法,也就是 Setter 方法更復雜一些,為了方便你理解,我用注釋的方式來解析它的作用。
1 // getter method:讀取高水位的位移值 2 def highWatermark: Long = highWatermarkMetadata.messageOffset 3 4 // setter method:設置高水位值 5 private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = { 6 if (newHighWatermark.messageOffset < 0) // 高水位值不能是負數 7 throw new IllegalArgumentException("High watermark offset should be non-negative") 8 9 lock synchronized { // 保護Log對象修改的Monitor鎖 10 highWatermarkMetadata = newHighWatermark // 賦值新的高水位值 11 producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 處理事務狀態管理器的高水位值更新邏輯,忽略它…… 12 maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事務機制的一部分,忽略它…… 13 } 14 trace(s"Setting high watermark $newHighWatermark") 15 }
更新高水位值,除此之外,源碼還定義了兩個更新高水位值的方法:updateHighWatermark 和 maybeIncrementHighWatermark。從名字上來看,前者是一定要更新高水位值的,而后者是可能會更新也可能不會。我們分別看下它們的實現原理。
其實,這兩個方法有着不同的用途。updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。一旦拿到新的消息,就必須要更新高水位值;而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有條件的——某些情況下會更新高水位值,某些情況下可能不會。就像我剛才說的,Follower 副本成功拉取 Leader 副本的消息后必須更新高水位值,但 Producer 端向 Leader 副本寫入消息時,分區的高水位值就可能不需要更新——因為它可能需要等待其他 Follower 副本同步的進度。因此,源碼中定義了兩個更新的方法,它們分別應用於不同的場景。
讀取高水位值關於高水位值管理的最后一個操作是 fetchHighWatermarkMetadata 方法。它不僅僅是獲取高水位值,還要獲取高水位的其他元數據信息,即日志段起始位移和物理位置信息。
日志段管理
前面我反復說過,日志是日志段的容器,那它究竟是如何承擔起容器一職的呢?
1 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
可以看到,源碼使用 Java 的 ConcurrentSkipListMap 類來保存所有日志段對象。ConcurrentSkipListMap 有 2 個明顯的優勢。它是線程安全的,這樣 Kafka 源碼不需要自行確保日志段操作過程中的線程安全;它是鍵值(Key)可排序的 Map。Kafka 將每個日志段的起始位移值作為 Key,這樣一來,我們就能夠很方便地根據所有日志段的起始位移值對它們進行排序和比較,同時還能快速地找到與給定位移值相近的前后兩個日志段。
關鍵位移值管理
Log 對象維護了一些關鍵位移值數據,比如 Log Start Offset、LEO 等。其實,高水位值也算是關鍵位移值,只不過它太重要了,所以,我單獨把它拎出來作為獨立的一部分來講了。
Log 對象中的 LEO 永遠指向下一條待插入消息,也就是說,LEO 值上面是沒有消息的!源碼中定義 LEO 的語句很簡單:這里的 nextOffsetMetadata 就是我們所說的 LEO,它也是 LogOffsetMetadata 類型的對象。Log 對象初始化的時候,源碼會加載所有日志段對象,並由此計算出當前 Log 的下一條消息位移值。之后,Log 對象將此位移值賦值給 LEO。
實際上,LEO 對象被更新的時機有 4 個。Log 對象初始化時:當 Log 對象初始化時,我們必須要創建一個 LEO 對象,並對其進行初始化。寫入新消息時:這個最容易理解。以上面的圖為例,當不斷向 Log 對象插入新消息時,LEO 值就像一個指針一樣,需要不停地向右移動,也就是不斷地增加。Log 對象發生日志切分(Log Roll)時:日志切分是啥呢?其實就是創建一個全新的日志段對象,並且關閉當前寫入的日志段對象。這通常發生在當前日志段對象已滿的時候。一旦發生日志切分,說明 Log 對象切換了 Active Segment,那么,LEO 中的起始位移值和段大小數據都要被更新,因此,在進行這一步操作時,我們必須要更新 LEO 對象。日志截斷(Log Truncation)時:這個也是顯而易見的。日志中的部分消息被刪除了,自然可能導致 LEO 值發生變化,從而要更新 LEO 對象。
現在,我們再來思考一下,Kafka 什么時候需要更新 Log Start Offset 呢?我們一一來看下。Log 對象初始化時:和 LEO 類似,Log 對象初始化時要給 Log Start Offset 賦值,一般是將第一個日志段的起始位移值賦值給它。日志截斷時:同理,一旦日志中的部分消息被刪除,可能會導致 Log Start Offset 發生變化,因此有必要更新該值。Follower 副本同步時:一旦 Leader 副本的 Log 對象的 Log Start Offset 值發生變化。為了維持和 Leader 副本的一致性,Follower 副本也需要嘗試去更新該值。刪除日志段時:這個和日志截斷是類似的。凡是涉及消息刪除的操作都有可能導致 Log Start Offset 值的變化。刪除消息時:嚴格來說,這個更新時機有點本末倒置了。在 Kafka 中,刪除消息就是通過抬高 Log Start Offset 值來實現的,因此,刪除消息時必須要更新該值。
讀寫操作
最后,我重點說說針對 Log 對象的讀寫操作。
寫操作
在 Log 中,涉及寫操作的方法有 3 個:appendAsLeader、appendAsFollower 和 append。appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法。
Kafka 消息格式經歷了兩次大的變遷,目前是 0.11.0.0 版本引入的 Version 2 消息格式。我們沒有必要詳細了解這些格式的變遷,你只需要知道,在 0.11.0.0 版本之后,lastOffset 和 lastOffsetOfFirstBatch 都是指向消息集合的最后一條消息即可。它們的區別主要體現在 0.11.0.0 之前的版本。
讀操作
read 方法的流程相對要簡單一些,首先來看它的方法簽名:
1 def read(startOffset: Long, 2 maxLength: Int, 3 isolation: FetchIsolation, 4 minOneMessage: Boolean): FetchDataInfo = { 5 ...... 6 }
它接收 4 個參數,含義如下:startOffset,即從 Log 對象的哪個位移值開始讀消息。maxLength,即最多能讀取多少字節。isolation,設置讀取隔離級別,主要控制能夠讀取的最大位移值,多用於 Kafka 事務。minOneMessage,即是否允許至少讀一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。read 方法的返回值是 FetchDataInfo 類,也是一個 POJO 類,里面最重要的數據就是讀取的消息集合,其他數據還包括位移等元數據信息。
日志中的索引應用
在 Kafka 源碼中,跟索引相關的源碼文件有 5 個,它們都位於 core 包的 /src/main/scala/kafka/log 路徑下。我們一一來看下。AbstractIndex.scala:它定義了最頂層的抽象類,這個類封裝了所有索引類型的公共操作。LazyIndex.scala:它定義了 AbstractIndex 上的一個包裝類,實現索引項延遲加載。這個類主要是為了提高性能。OffsetIndex.scala:定義位移索引,保存“< 位移值,文件磁盤物理位置 >”對。TimeIndex.scala:定義時間戳索引,保存“< 時間戳,位移值 >”對。TransactionIndex.scala:定義事務索引,為已中止事務(Aborted Transcation)保存重要的元數據信息。只有啟用 Kafka 事務后,這個索引才有可能出現。這些類的關系如下圖所示:
其中,OffsetIndex、TimeIndex 和 TransactionIndex 都繼承了 AbstractIndex 類,而上層的 LazyIndex 僅僅是包裝了一個 AbstractIndex 的實現類,用於延遲加載。就像我之前說的,LazyIndex 的作用是為了提升性能,並沒有什么功能上的改進。
AbstractIndex 代碼結構
AbstractIndex 定義了 4 個屬性字段。由於是一個抽象基類,它的所有子類自動地繼承了這 4 個字段。也就是說,Kafka 所有類型的索引對象都定義了這些屬性。我先給你解釋下這些屬性的含義。
索引文件(file)。每個索引對象在磁盤上都對應了一個索引文件。你可能注意到了,這個字段是 var 型,說明它是可以被修改的。難道索引對象還能動態更換底層的索引文件嗎?是的。自 1.1.0 版本之后,Kafka 允許遷移底層的日志路徑,所以,索引文件自然要是可以更換的。
起始位移值(baseOffset)。索引對象對應日志段對象的起始位移值。舉個例子,如果你查看 Kafka 日志路徑的話,就會發現,日志文件和索引文件都是成組出現的。比如說,如果日志文件是 00000000000000000123.log,正常情況下,一定還有一組索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。這里的“123”就是這組文件的起始位移值,也就是 baseOffset 值。
索引文件最大字節數(maxIndexSize)。它控制索引文件的最大長度。Kafka 源碼傳入該參數的值是 Broker 端參數 segment.index.bytes 的值,即 10MB。這就是在默認情況下,所有 Kafka 索引文件大小都是 10MB 的原因。
索引文件打開方式(writable)。“True”表示以“讀寫”方式打開,“False”表示以“只讀”方式打開。如果我沒記錯的話,這個參數應該是我加上去的,就是為了修復我剛剛提到的那個 Bug。
AbstractIndex 是抽象的索引對象類。可以說,它是承載索引項的容器,而每個繼承它的子類負責定義具體的索引項結構。比如,OffsetIndex 的索引項是 < 位移值,物理磁盤位置 > 對,TimeIndex 的索引項是 < 時間戳,位移值 > 對。基於這樣的設計理念,AbstractIndex 類中定義了一個抽象方法 entrySize 來表示不同索引項的大小。
子類實現該方法時需要給定自己索引項的大小,對於 OffsetIndex 而言,該值就是 8;對於 TimeIndex 而言,該值是 12。說到這兒,你肯定會問,為什么是 8 和 12 呢?我來解釋一下。在 OffsetIndex 中,位移值用 4 個字節來表示,物理磁盤位置也用 4 個字節來表示,所以總共是 8 個字節。你可能會說,位移值不是長整型嗎,應該是 8 個字節才對啊。還記得 AbstractIndex 已經保存了 baseOffset 了嗎?這里的位移值,實際上是相對於 baseOffset 的相對位移值,即真實位移值減去 baseOffset 的值。下節課我會給你重點講一下它,這里你只需要知道使用相對位移值能夠有效地節省磁盤空間就行了。而 Broker 端參數 log.segment.bytes 是整型,這說明,Kafka 中每個日志段文件的大小不會超過 2^32,即 4GB,這就說明同一個日志段文件上的位移值減去 baseOffset 的差值一定在整數范圍內。因此,源碼只需要 4 個字節保存就行了。同理,TimeIndex 中的時間戳類型是長整型,占用 8 個字節,位移依然使用相對位移值,占用 4 個字節,因此總共需要 12 個字節。
如果有人問你,Kafka 中的索引底層的實現原理是什么?你可以大聲地告訴他:內存映射文件,即 Java 中的 MappedByteBuffer。使用內存映射文件的主要優勢在於,它有很高的 I/O 性能,特別是對於索引這樣的小文件來說,由於文件內存被直接映射到一段虛擬內存上,訪問內存映射文件的速度要快於普通的讀寫文件速度。另外,在很多操作系統中(比如 Linux),這段映射的內存區域實際上就是內核的頁緩存(Page Cache)。這就意味着,里面的數據不需要重復拷貝到用戶態空間,避免了很多不必要的時間、空間消耗。在 AbstractIndex 中,這個 MappedByteBuffer 就是名為 mmap 的變量。
二分查找算法
到目前為止,從已排序數組中尋找某個數字最快速的算法就是二分查找了,它能做到 O(lgN) 的時間復雜度。Kafka 的索引組件就應用了二分查找算法。我先給出原版的實現算法代碼。
1 private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { 2 // 第1步:如果當前索引為空,直接返回<-1,-1>對 3 if(_entries == 0) 4 return (-1, -1) 5 6 7 // 第2步:要查找的位移值不能小於當前最小位移值 8 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) 9 return (-1, 0) 10 11 12 // binary search for the entry 13 // 第3步:執行二分查找算法 14 var lo = 0 15 var hi = _entries - 1 16 while(lo < hi) { 17 val mid = ceil(hi/2.0 + lo/2.0).toInt 18 val found = parseEntry(idx, mid) 19 val compareResult = compareIndexEntry(found, target, searchEntity) 20 if(compareResult > 0) 21 hi = mid - 1 22 else if(compareResult < 0) 23 lo = mid 24 else 25 return (mid, mid) 26 } 27 28 29 (lo, if (lo == _entries - 1) -1 else lo + 1)
這段代碼的核心是,第 3 步的二分查找算法。熟悉 Binary Search 的話,你對這段代碼一定不會感到陌生。講到這里,似乎一切很完美:Kafka 索引應用二分查找算法快速定位待查找索引項位置,之后調用 parseEntry 來讀取索引項。不過,這真的就是無懈可擊的解決方案了嗎?
改進版二分查找算法
大多數操作系統使用頁緩存來實現內存映射,而目前幾乎所有的操作系統都使用 LRU(Least Recently Used)或類似於 LRU 的機制來管理頁緩存。Kafka 寫入索引文件的方式是在文件末尾追加寫入,而幾乎所有的索引查詢都集中在索引的尾部。這么來看的話,LRU 機制是非常適合 Kafka 的索引訪問場景的。但,這里有個問題是,當 Kafka 在查詢索引的時候,原版的二分查找算法並沒有考慮到緩存的問題,因此很可能會導致一些不必要的缺頁中斷(Page Fault)。此時,Kafka 線程會被阻塞,等待對應的索引項從物理磁盤中讀出並放入到頁緩存中。下面我舉個例子來說明一下這個情況。假設 Kafka 的某個索引占用了操作系統頁緩存 13 個頁(Page),如果待查找的位移值位於最后一個頁上,也就是 Page 12,那么標准的二分查找算法會依次讀取頁號 0、6、9、11 和 12。
通常來說,一個頁上保存了成百上千的索引項數據。隨着索引文件不斷被寫入,Page #12 不斷地被填充新的索引項。如果此時索引查詢方都來自 ISR 副本或 Lag 很小的消費者,那么這些查詢大多集中在對 Page #12 的查詢,因此,Page #0、6、9、11、12 一定經常性地被源碼訪問。也就是說,這些頁一定保存在頁緩存上。后面當新的索引項填滿了 Page #12,頁緩存就會申請一個新的 Page 來保存索引項,即 Page #13。現在,最新索引項保存在 Page #13 中。如果要查找最新索引項,原版二分查找算法將會依次訪問 Page #0、7、10、12 和 13。此時,問題來了:Page 7 和 10 已經很久沒有被訪問過了,它們大概率不在頁緩存中,因此,一旦索引開始征用 Page #13,就會發生 Page Fault,等待那些冷頁數據從磁盤中加載到頁緩存。根據國外用戶的測試,這種加載過程可能長達 1 秒。
顯然,這是一個普遍的問題,即每當索引文件占用 Page 數發生變化時,就會強行變更二分查找的搜索路徑,從而出現不在頁緩存的冷數據必須要加載到頁緩存的情形,而這種加載過程是非常耗時的。基於這個問題,社區提出了改進版的二分查找策略,也就是緩存友好的搜索算法。總體的思路是,代碼將所有索引項分成兩個部分:熱區(Warm Area)和冷區(Cold Area),然后分別在這兩個區域內執行二分查找算法。
乍一看,該算法並沒有什么高大上的改進,僅僅是把搜尋區域分成了冷、熱兩個區域,然后有條件地在不同區域執行普通的二分查找算法罷了。實際上,這個改進版算法提供了一個重要的保證:它能保證那些經常需要被訪問的 Page 組合是固定的。想想剛才的例子,同樣是查詢最熱的那部分數據,一旦索引占用了更多的 Page,要遍歷的 Page 組合就會發生變化。這是導致性能下降的主要原因。這個改進版算法的最大好處在於,查詢最熱那部分數據所遍歷的 Page 永遠是固定的,因此大概率在頁緩存中,從而避免無意義的 Page Fault。
位移索引和時間戳索引
Kafka 索引類型有三大類:位移索引、時間戳索引和已中止事務索引。相比於最后一類索引,前兩類索引的出鏡率更高一些。在 Kafka 的數據路徑下,你肯定看到過很多.index 和.timeindex 后綴的文件。不知你是否有過這樣的疑問:“這些文件是用來做什么的呢?” 現在我可以明確告訴你:.index 文件就是 Kafka 中的位移索引文件,而.timeindex 文件則是時間戳索引文件。
位移索引
位移索引也就是所謂的 OffsetIndex,它可是一個老資歷的組件了。如果我沒記錯的話,國內大面積使用 Kafka 應該是在 0.8 時代。從那個時候開始,OffsetIndex 就已經存在了。每當 Consumer 需要從主題分區的某個位置開始讀取消息時,Kafka 就會用到 OffsetIndex 直接定位物理文件位置,從而避免了因為從頭讀取消息而引入的昂貴的 I/O 操作。不同索引類型保存不同的 <key, value="">對。就 OffsetIndex 而言,Key 就是消息的相對位移,Value 是保存該消息的日志段文件中該消息第一個字節的物理文件位置。
為什么是 8 呢?相對位移是一個整型(Integer),占用 4 個字節,物理文件位置也是一個整型,同樣占用 4 個字節,因此總共是 8 個字節。那相對位移是什么值呢?我們知道,Kafka 中的消息位移值是一個長整型(Long),應該占用 8 個字節才對。在保存 OffsetIndex 的 <key, value="">對時,Kafka 做了一些優化。每個 OffsetIndex 對象在創建時,都已經保存了對應日志段對象的起始位移,因此,OffsetIndex 索引項沒必要保存完整的 8 字節位移值。相反地,它只需要保存與起始位移的差值(Delta)就夠了,而這個差值是可以被整型容納的。這種設計可以讓 OffsetIndex 每個索引項都節省 4 個字節。
當讀取 OffsetIndex 時,源碼還需要將相對位移值還原成之前的完整位移。這個是在 parseEntry 方法中實現的。
這個方法返回一個 OffsetPosition 類型。該類有兩個方法,分別返回索引項的 Key 和 Value。這里的 parseEntry 方法,就是要構造 OffsetPosition 所需的 Key 和 Value。Key 是索引項中的完整位移值,代碼使用 baseOffset + relativeOffset(buffer, n) 的方式將相對位移值還原成完整位移值;Value 是這個位移值上消息在日志段文件中的物理位置,代碼調用 physical 方法計算這個物理位置並把它作為 Value。最后,parseEntry 方法把 Key 和 Value 封裝到一個 OffsetPosition 實例中,然后將這個實例返回。
寫入索引項好了,有了這些基礎,下面的內容就很容易理解了。我們來看下 OffsetIndex 中最重要的操作——寫入索引項 append 方法的實現。
1 def append(offset: Long, position: Int): Unit = { 2 inLock(lock) { 3 // 索引文件如果已經寫滿,直接拋出異常 4 require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") 5 // 要保證待寫入的位移值offset比當前索引文件中所有現存的位移值都要大 6 // 這主要是為了維護索引的單調增加性 7 if (_entries == 0 || offset > _lastOffset) { 8 trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") 9 mmap.putInt(relativeOffset(offset)) // 向mmap寫入相對位移值 10 mmap.putInt(position) // 向mmap寫入物理文件位置 11 _entries += 1 // 更新索引項個數 12 _lastOffset = offset // 更新當前索引文件最大位移值 13 // 確保寫入索引項格式符合要求 14 require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") 15 } else { 16 throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + 17 s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") 18 } 19 } 20 }
append 方法接收兩個參數:Long 型的位移值和 Integer 型的物理文件位置。該方法最重要的兩步,就是分別向 mmap 寫入相對位移值和物理文件位置。除了 append 方法,索引還有一個常見的操作:截斷操作(Truncation)。截斷操作是指,將索引文件內容直接裁剪掉一部分。比如,OffsetIndex 索引文件中當前保存了 100 個索引項,我想只保留最開始的 40 個索引項。
這個方法接收 entries 參數,表示要截取到哪個槽,主要的邏輯實現是調用 mmap 的 position 方法。源碼中的 _entries * entrySize 就是 mmap 要截取到的字節處。下面,我來說說 OffsetIndex 的使用方式。既然 OffsetIndex 被用來快速定位消息所在的物理文件位置,那么必然需要定義一個方法執行對應的查詢邏輯。這個方法就是 lookup。
方法返回的,是不大於給定位移值 targetOffset 的最大位移值,以及對應的物理文件位置。你大致可以把這個方法,理解為位移值的 FLOOR 函數。
時間戳索引
與 OffsetIndex 不同的是,TimeIndex 保存的是 < 時間戳,相對位移值 > 對。時間戳需要一個長整型來保存,相對位移值使用 Integer 來保存。因此,TimeIndex 單個索引項需要占用 12 個字節。這也揭示了一個重要的事實:在保存同等數量索引項的基礎上,TimeIndex 會比 OffsetIndex 占用更多的磁盤空間。
寫入索引項TimeIndex 也有 append 方法,只不過它叫作 maybeAppend。我們來看下它的實現邏輯。
和 OffsetIndex 類似,向 TimeIndex 寫入索引項的主體邏輯,是向 mmap 分別寫入時間戳和相對位移值。只不過,除了校驗位移值的單調增加性之外,TimeIndex 還會確保順序寫入的時間戳也是單調增加的。
我帶你詳細分析了 OffsetIndex 和 TimeIndex,以及它們的不同之處。雖然 OffsetIndex 和 TimeIndex 是不同類型的索引,但 Kafka 內部是把二者結合使用的。通常的流程是,先使用 TimeIndex 尋找滿足時間戳要求的消息位移值,然后再利用 OffsetIndex 去定位該位移值所在的物理文件位置。因此,它們其實是合作的關系。最后,我還想提醒你一點:不要對索引文件做任何修改!我碰到過因用戶擅自重命名索引文件,從而導致 Broker 崩潰無法啟動的場景。另外,雖然 Kafka 能夠重建索引,但是隨意地刪除索引文件依然是一個很危險的操作。在生產環境中,我建議你盡量不要執行這樣的操作。
總結
以后關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜索。