我們先回想一下Kafka的日志結構是怎樣的?
Kafka 日志對象由多個日志段對象組成,而每個日志段對象會在磁盤上創建一組文件,包括消息日志文件(.log)、位移索引文件(.index)、時間戳索引文件(.timeindex)以及已中止(Aborted)事務的索引文件(.txnindex)。當然,如果你沒有使用 Kafka 事務,已中止事務的索引文件是不會被創建出來的。
下面我們看一下LogSegment的實現情況,具體文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。
LogSegment
LogSegment.scala這個文件里面定義了三個對象:
- LogSegment class;
- LogSegment object;
- LogFlushStats object。LogFlushStats 結尾有個 Stats,它是做統計用的,主要負責為日志落盤進行計時。
我這里貼一下LogSegment.scala這個文件上面的注釋,介紹了LogSegment的構成:
A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment
這段注釋清楚的寫了每個日志段由兩個核心組件構成:日志和索引。每個日志段都有一個起始位置:base offset,而該位移值是此日志段所有消息中最小的位移值,同時,該值卻又比前面任何日志段中消息的位移值都大。
LogSegment構造參數
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { … }
FileRecords是實際保存 Kafka 消息的對象。
lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分別對應位移索引文件、時間戳索引文件、已中止事務索引文件。
baseOffset是每個日志段對象的起始位移,每個 LogSegment 對象實例一旦被創建,它的起始位移就是固定的了,不能再被更改。
indexIntervalBytes 值其實就是 Broker 端參數 log.index.interval.bytes 值,它控制了日志段對象新增索引項的頻率。默認情況下,日志段至少新寫入 4KB 的消息數據才會新增一條索引項。
time 是用於統計計時的一個實現類。
append
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
// 判斷是否日志段是否為空
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// 確保輸入參數最大位移值是合法的
ensureOffsetInRange(largestOffset)
// append the messages
// 執行真正的寫入
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
// 更新日志段的最大時間戳以及最大時間戳所屬消息的位移值屬性
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
// 當已寫入字節數超過了 4KB 之后,append 方法會調用索引對象的 append 方法新增索引項,同時清空已寫入字節數
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
這個方法主要做了那么幾件事:
- 判斷日志段是否為空,不為空則往下進行操作
- 調用ensureOffsetInRange方法,確保輸入參數最大位移值是合法的。
- 調用 FileRecords 的 append 方法執行真正的寫入。
- 更新日志段的最大時間戳以及最大時間戳所屬消息的位移值屬性。
- 更新索引項和寫入的字節數,日志段每寫入 4KB 數據就要寫入一個索引項。當已寫入字節數超過了 4KB 之后,append 方法會調用索引對象的 append 方法新增索引項,同時清空已寫入字節數。
我們下面再看看ensureOffsetInRange方法是怎么校驗最大位移的:
private def ensureOffsetInRange(offset: Long): Unit = {
if (!canConvertToRelativeOffset(offset))
throw new LogSegmentOffsetOverflowException(this, offset)
}
這個方法最終會調用到AbstractIndex的toRelative方法中:
private def toRelative(offset: Long): Option[Int] = {
val relativeOffset = offset - baseOffset
if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
None
else
Some(relativeOffset.toInt)
}
可見這個方法會將offset和baseOffset做對比,當offset小於baseOffset或者當offset和baseOffset相減后大於Int的最大值,那么都是異常的情況,那么這時就會拋出LogSegmentOffsetOverflowException異常。
read
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
// 將位移索引轉換成物理文件位置索引
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
// 計算要讀取的總字節數
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
// log.slice讀取消息后封裝成FetchDataInfo返回
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
這段代碼中,主要做了這幾件事:
-
調用 translateOffset 方法定位要讀取的起始文件位置 (startPosition)。
舉個例子,假設 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能讀取 50 字節,因為 maxPosition - startPosition = 50。我們把它和 maxSize 參數相比較,其中的最小值就是最終能夠讀取的總字節數。
-
調用 FileRecords 的 slice 方法,從指定位置讀取指定大小的消息集合。
recover
這個方法是恢復日志段,Broker 在啟動時會從磁盤上加載所有日志段信息到內存中,並創建相應的 LogSegment 對象實例。在這個過程中,它需要執行一系列的操作。
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
//情況索引文件
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
//遍歷日志段中所有消息集合
for (batch <- log.batches.asScala) {
// 校驗
batch.ensureValid()
// 校驗消息中最后一條消息的位移不能越界
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
// 獲取最大時間戳及所屬消息位移
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
// 當已寫入字節數超過了 4KB 之后,調用索引對象的 append 方法新增索引項,同時清空已寫入字節數
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
// 更新總消息字節數
validBytes += batch.sizeInBytes()
// 更新Porducer狀態和Leader Epoch緩存
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
// 遍歷完后將 遍歷累加的值和日志總字節數比較,
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
//執行日志截斷操作
log.truncateTo(validBytes)
// 調整索引文件大小
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
這個方法主要做了以下幾件事:
- 清空索引文件
- 遍歷日吹端中多有消息集合
- 校驗日志段中的消息
- 獲取最大時間戳及所屬消息位移
- 更新索引項
- 更新總消息字節數
- 更新Porducer狀態和Leader Epoch緩存
- 執行消息日志索引文件截斷
- 調整索引文件大小
下面我們進入到truncateTo方法中,看一下截斷操作是怎么做的:
public int truncateTo(int targetSize) throws IOException {
int originalSize = sizeInBytes();
// 要截斷的目標大小不能超過當前文件的大小
if (targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.");
//如果目標大小小於當前文件大小,那么執行截斷
if (targetSize < (int) channel.size()) {
channel.truncate(targetSize);
size.set(targetSize);
}
return originalSize - targetSize;
}
Kafka 會將日志段當前總字節數和剛剛累加的已讀取字節數進行比較,如果發現前者比后者大,說明日志段寫入了一些非法消息,需要執行截斷操作,將日志段大小調整回合法的數值。
truncateTo
這個方法會將日志段中的數據強制截斷到指定的位移處。
def truncateTo(offset: Long): Int = {
// Do offset translation before truncating the index to avoid needless scanning
// in case we truncate the full index
// 將位置值轉換成物理文件位置
val mapping = translateOffset(offset)
// 移動索引到指定位置
offsetIndex.truncateTo(offset)
timeIndex.truncateTo(offset)
txnIndex.truncateTo(offset)
// After truncation, reset and allocate more space for the (new currently active) index
// 因為位置變了,為了節省內存,做一次resize操作
offsetIndex.resize(offsetIndex.maxIndexSize)
timeIndex.resize(timeIndex.maxIndexSize)
val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position)
// 如果調整到初始位置,那么重新記錄一下創建時間
if (log.sizeInBytes == 0) {
created = time.milliseconds
rollingBasedTimestamp = None
}
//調整索引項
bytesSinceLastIndexEntry = 0
//調整最大的索引位置
if (maxTimestampSoFar >= 0)
loadLargestTimestamp()
bytesTruncated
}
- 將位置值轉換成物理文件位置
- 移動索引到指定位置,位移索引文件、時間戳索引文件、已中止事務索引文件等位置
- 將索引做一次resize操作,節省內存空間
- 調整日志段日志位置
我們到OffsetIndex的truncateTo方法中看一下:
override def truncateTo(offset: Long): Unit = {
inLock(lock) {
val idx = mmap.duplicate
//根據指定位移返回消息中位移
val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY)
/* There are 3 cases for choosing the new size
* 1) if there is no entry in the index <= the offset, delete everything
* 2) if there is an entry for this exact offset, delete it and everything larger than it
* 3) if there is no entry for this offset, delete everything larger than the next smallest
*/
val newEntries =
//如果沒有消息的位移值小於指定位移值,那么就直接從頭開始
if(slot < 0)
0
// 跳到執行的位移位置
else if(relativeOffset(idx, slot) == offset - baseOffset)
slot
// 指定位移位置大於消息中所有位移,那么跳到消息位置中最大的一個的下一個位置
else
slot + 1
// 執行位置跳轉
truncateToEntries(newEntries)
}
}
- 根據指定位移返回消息中的槽位。
- 如果返回的槽位小於零,說明沒有消息位移小於指定位移,所以newEntries返回0。
- 如果指定位移在消息位移中,那么返回slot槽位。
- 如果指定位移位置大於消息中所有位移,那么跳到消息位置中最大的一個的下一個位置。
講完了LogSegment之后,我們在來看看Log。
Log
Log 源碼結構
Log.scala定義了 10 個類和對象,圖中括號里的 C 表示 Class,O 表示 Object。
我們主要看的是Log類:
Log類的定義
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
……
}
主要的屬性有兩個dir和logStartOffset,分別表示個日志所在的文件夾路徑,也就是主題分區的路徑以及日志的當前最早位移。
在kafka中,我們用Log End Offset(LEO)表示日志下一條待插入消息的位移值,也就是日志的末端位移。
Log Start Offset表示日志當前對外可見的最早一條消息的位移值。
再看看其他屬性:
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
nextOffsetMetadata基本上等同於LEO。
highWatermarkMetadata是分區日志高水位值。
segments保存了分區日志下所有的日志段信息。
Leader Epoch Cache 對象保存了分區 Leader 的 Epoch 值與對應位移值的映射關系。
Log類初始化代碼
locally {
val startMs = time.milliseconds
// create the log directory if it doesn't exist
//創建分區日志路徑
Files.createDirectories(dir.toPath)
//初始化Leader Epoch Cache
initializeLeaderEpochCache()
//加載所有日志段對象
val nextOffset = loadSegments()
/* Calculate the offset of the next message */
nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
//更新Leader Epoch Cache,清除無效數據
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
}
這個代碼里面主要做了這幾件事:
Leader Epoch暫且不表,我們看看loadSegments是如何加載日志段的。
loadSegments
private def loadSegments(): Long = {
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
//移除上次 Failure 遺留下來的各種臨時文件(包括.cleaned、.swap、.deleted 文件等)
val swapFiles = removeTempFilesAndCollectSwapFiles()
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
//清空所有日志段對象,並且再次遍歷分區路徑,重建日志段 segments Map 並刪除無對應日志段文件的孤立索引文件。
retryOnOffsetOverflow {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
//先清空日志段信息
logSegments.foreach(_.close())
segments.clear()
//從文件中裝載日志段
loadSegmentFiles()
}
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
//完成未完成的 swap 操作
completeSwapOperations(swapFiles)
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = false))
}
0
}
}
這個方法首先會調用removeTempFilesAndCollectSwapFiles方法移除上次 Failure 遺留下來的各種臨時文件(包括.cleaned、.swap、.deleted 文件等)。
然后它會清空所有日志段對象,並且再次遍歷分區路徑,重建日志段 segments Map 並刪除無對應日志段文件的孤立索引文件。
待執行完這兩次遍歷之后,它會完成未完成的 swap 操作,即調用 completeSwapOperations 方法。等這些都做完之后,再調用 recoverLog 方法恢復日志段對象,然后返回恢復之后的分區日志 LEO 值。
removeTempFilesAndCollectSwapFiles
private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
// 在方法內部定義一個名為deleteIndicesIfExist的方法,用於刪除日志文件對應的索引文件
def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
val offset = offsetFromFile(baseFile)
Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}
var swapFiles = Set[File]()
var cleanFiles = Set[File]()
var minCleanedFileOffset = Long.MaxValue
for (file <- dir.listFiles if file.isFile) {
if (!file.canRead)
throw new IOException(s"Could not read file $file")
val filename = file.getName
//如果是以.deleted結尾的文件
if (filename.endsWith(DeletedFileSuffix)) {
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
// 說明是上次Failure遺留下來的文件,直接刪除
Files.deleteIfExists(file.toPath)
// 如果是以.cleaned結尾的文件
} else if (filename.endsWith(CleanedFileSuffix)) {
minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
cleanFiles += file
// .swap結尾的文件
} else if (filename.endsWith(SwapFileSuffix)) {
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the index files, complete the swap operation later
// if an index just delete the index files, they will be rebuilt
//更改文件名
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
//如果該.swap文件原來是索引文件
if (isIndexFile(baseFile)) {
// 刪除原來的索引文件
deleteIndicesIfExist(baseFile)
// 如果該.swap文件原來是日志文件
} else if (isLogFile(baseFile)) {
// 刪除掉原來的索引文件
deleteIndicesIfExist(baseFile)
// 加入待恢復的.swap文件集合中
swapFiles += file
}
}
}
// KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
// files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment
// for more details about the split operation.
// 從待恢復swap集合中找出那些起始位移值大於minCleanedFileOffset值的文件,直接刪掉這些無效的.swap文件
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
invalidSwapFiles.foreach { file =>
debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
deleteIndicesIfExist(baseFile, SwapFileSuffix)
Files.deleteIfExists(file.toPath)
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
// 清除所有待刪除文件集合中的文件
cleanFiles.foreach { file =>
debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
}
// 最后返回當前有效的.swap文件集合
validSwapFiles
}
- 定義了一個內部方法deleteIndicesIfExist,用於刪除日志文件對應的索引文件。
- 遍歷文件列表刪除遺留文件,並篩選出.cleaned結尾的文件和.swap結尾的文件。
- 根據minCleanedFileOffset刪除無效的.swap文件。
- 最后返回當前有效的.swap文件集合
處理完了removeTempFilesAndCollectSwapFiles方法,然后進入到loadSegmentFiles方法中。
loadSegmentFiles
private def loadSegmentFiles(): Unit = {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
//如果不是以.log結尾的文件,如.index、.timeindex、.txnindex
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFile(file)
val logFile = Log.logFile(dir, offset)
// 確保存在對應的日志文件,否則記錄一個警告,並刪除該索引文件
if (!logFile.exists) {
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
// 如果是以.log結尾的文件
} else if (isLogFile(file)) {
// if it's a log file, load the corresponding log segment
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
// 創建對應的LogSegment對象實例,並加入segments中
val segment = LogSegment.open(dir = dir,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
"recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
recoverSegment(segment)
}
addSegment(segment)
}
}
}
- 遍歷文件目錄
- 如果文件是索引文件,那么檢查一下是否存在相應的日志文件。
- 如果是日志文件,那么創建對應的LogSegment對象實例,並加入segments中。
接下來調用completeSwapOperations方法處理有效.swap 文件集合。
completeSwapOperations
private def completeSwapOperations(swapFiles: Set[File]): Unit = {
// 遍歷所有有效.swap文件
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
val baseOffset = offsetFromFile(logFile)// 拿到日志文件的起始位移值
// 創建對應的LogSegment實例
val swapSegment = LogSegment.open(swapFile.getParentFile,
baseOffset = baseOffset,
config,
time = time,
fileSuffix = SwapFileSuffix)
info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
// 執行日志段恢復操作
recoverSegment(swapSegment)
// We create swap files for two cases:
// (1) Log cleaning where multiple segments are merged into one, and
// (2) Log splitting where one segment is split into multiple.
//
// Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
// must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
// of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
// do a replace with an existing segment.
// 確認之前刪除日志段是否成功,是否還存在老的日志段文件
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
segment.readNextOffset > swapSegment.baseOffset
}
// 如果存在,直接把.swap文件重命名成.log
replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
}
}
- 遍歷所有有效.swap文件;
- 創建對應的LogSegment實例;
- 執行日志段恢復操作,恢復部分的源碼已經在LogSegment里面講了;
- 把.swap文件重命名成.log;
最后是執行recoverLog部分代碼。
recoverLog
private def recoverLog(): Long = {
// if we have the clean shutdown marker, skip recovery
// 如果不存在以.kafka_cleanshutdown結尾的文件。通常都不存在
if (!hasCleanShutdownFile) {
// okay we need to actually recover this log
// 獲取到上次恢復點以外的所有unflushed日志段對象
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
var truncated = false
// 遍歷這些unflushed日志段
while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
// 執行恢復日志段操作
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
s"creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) {// 如果有無效的消息導致被截斷的字節數不為0,直接刪除剩余的日志段對象
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
truncated = true
}
}
}
// 這些都做完之后,如果日志段集合不為空
if (logSegments.nonEmpty) {
val logEndOffset = activeSegment.readNextOffset
if (logEndOffset < logStartOffset) {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file system.")
removeAndDeleteSegments(logSegments, asyncDelete = true)
}
}
// 這些都做完之后,如果日志段集合為空了
if (logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at logStartOffset
// 至少創建一個新的日志段,以logStartOffset為日志段的起始位移,並加入日志段集合中
addSegment(LogSegment.open(dir = dir,
baseOffset = logStartOffset,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = config.preallocate))
}
// 更新上次恢復點屬性,並返回
recoveryPoint = activeSegment.readNextOffset
recoveryPoint
}