每一個分區都是一個順序的、不可變的消息隊列,並且可以持續的添加。分區中的消息都被分配了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。
一個分區在文件系統里存儲為一個文件夾。文件夾里包含日志文件和索引文件。其文件名是其包含的offset的最小的條目的offset。
每個文件是一個segment。
在broker的log存儲文件下,除了存儲這各個topic的文件夾,還存在這幾個checkpoint文件。分別是
recovery-point-offset-checkpoint 負責記錄topic已經被寫入磁盤的offset。
replication-offset-checkpoint 用來存儲每一個replica的HighWatermark。由ReplicaManager負責寫。參考下面關於HW定義,也就是那些已經成功被復制給其他broker消息的offset。
__consumer_offsets存儲各個topic的消費者offset。但是,他的只有一份。
一些常見的offset
HighWatermark 最后committed消息的起始偏移。它后面的消息在目前還是uncommited的狀態。
logStartOffset 日志段集合中第一個日志段(segment)的基礎位移,也就是這個日志對象的基礎位移
LogEndOffset 下一條將要被加入到日志的消息的位移。注意,這個offset未必在硬盤中,可能目前只在內存中還沒有被flush到硬盤。
recovery-point-offset-checkpoint 已經被確認寫入磁盤的offset
replication-offset-checkpoint 已經確認復制給其他replica的offset。也就是HW。
失敗的follower開始恢復時,會首先將自己的日志截斷到上次的checkpointed時刻的HW。然后,向leader拉去消息。
同時,kafka有日志清理機制,日志清理主要是用於縮減日志的大小,如清理重復的key等等。min.compaction.lag.ms配置不滿足的最近segment和activesegment顯然是不能清理的。
FAQ
Resetting first dirty offset of __consumer_offsets
例如,重復報錯信息如下,這顯然是清理線程在一直遇到麻煩。
[2018-06-01 13:46:27,156] WARN Resetting first dirty offset of __consumer_offsets-18 to log start offset 44 since the checkpointed offset 42 is invalid. (kafka.log.LogCleanerManager$)
報錯代碼段為
val lastCleanOffset: Option[Long] = lastClean.get(topicPartition) // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid; // reset to the log starting offset and log the error val logStartOffset = log.logSegments.head.baseOffset val firstDirtyOffset = { val offset = lastCleanOffset.getOrElse(logStartOffset) if (offset < logStartOffset) { // don't bother with the warning if compact and delete are enabled. if (!isCompactAndDelete(log)) warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset since the checkpointed offset $offset is invalid.") logStartOffset } else { offset } }
我們可以看見,清理線程試圖獲取一個partition的最后清理的位移(lastCleanOffset),並同時獲取了該partition中現存的所有segment中最小的頭部offset(logStartOffset)。但是,卻發現lastCleanOffset比logStartOffset還要小。清理線程自然會反應,那些我沒有清理的數據跑哪里去了呢?抱怨完后,其將firstDirtyOffset置為logStartOffset,准備下一次從這里開始清理。報錯中令人迷惑的checkpointed offset是指lastCleanOffset。
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
kafka本來應該是在完成清理后將lastCleanOffset提高,但是問題在於,如果此時沒有可清理的segment,lastCleanOffset也就將保持不變。則線程下一次循環時仍然會遇到這個問題。
解決方案中最快捷的是清空kafka的data目錄。或者忽略這個問題,等待大量數據灌入。一旦產生可以清理的segment,這個問題就會解決。