系列文章目錄
https://zhuanlan.zhihu.com/p/367683572
一. 業務模型
在上一篇文章中,我們分析了生產者的原理。下一步我們來分析下提交上來的消息在Server端時如何存儲的。
1.1 概念梳理
Kafka用Topic將數據划分成內聚性較強的子集,Topic內部又划分成多個Partition。不過這兩個都是邏輯概念,真正存儲文件的是Partition所對應的一個或多個Replica,即副本。在存儲層有個概念和副本一一對應——Log。為了防止Log過大,增加消息過期和數據檢索的成本,Log又會按一定大小划分成"段",即LogSegment。用一張圖匯總這些概念間的關系:

1.2 文件分析
1.2.1 數據目錄
Kafkap配置文件(server.properties)中有一個配置項——log.dir,其指定了kafka數據文件存放位置。為了研究數據目錄的結構,我們先創建一個Topic(lao-zhang-tou-topic)
kafka-console-producer.sh --topic lao-zhang-tou-topic --bootstrap-server localhost:9092
然后向其中寫幾條消息
kafka-console-producer.sh --topic lao-zhang-tou-topic --bootstrap-server localhost:9092
{"message":"This is the first message"}
{"message":"This is the sencond message"}
接下來我們來看看log.dir指定目錄下存放了那些文件

該目錄下文件分3類:
-
數據文件夾
如截圖中的lao-zhang-tou-topic-0
-
checkpoint文件
- cleaner-offset-checkpoint
- log-start-offset-checkpoint
- recovery-point-offset-checkpoint
- replication-offset-checkpoint
-
配置文件
meta.properties
第2、3類文件后續文章會詳細分析,本文主要關注截圖中lao-zhang-tou-topic-0目錄。

實際上,該目錄對應上文提到的Log概念,命名規則為 ${Topic}-${PartitionIndex}。該目錄下,名稱相同的.log文件、.index文件、.timeindex文件構成了一個LogSegment。例如圖中的 00000000000000000000.log、00000000000000000000.index、00000000000000000000.timeindex 三個文件。其中.log是數據文件,用於存儲消息數據;.index和.timeindex是在.log基礎上建立起來的索引文件。
1.2.2 .log文件
log文件將消息數據依次排開進行存儲

每個Message內部分為"數據頭"(LOG_OVERHEAD)和"數據體"(Record)兩部分

其中,LOG_OVERHEAD包含兩個字段:
- offset:每條數據的邏輯偏移量,按插入順序分別為0、1、2... ... N;每個消息的offset在Partition內部是唯一的;
- size:數據體(RECORD)部分的長度;
RECORD內部格式如下:

其中,
-
crc32:校驗碼,用於驗證數據完整性;
-
magic:消息格式的版本號;v0=0,v1=1;本文講v1格式;
-
timestamp:時間戳,具體業務含義依attributes的值而定;
-
attributes:屬性值;其 8bits 的含義如下

-
keyLength:key值的長度;
-
key:消息數據對應的key;
-
valueLength:value值的長度;
-
value:消息體,承載業務信息;
1.2.3 .index和.timeindex文件
.index文件是依offset建立其的稀疏索引,可減少通過offset查找消息時的遍歷數據量。.index文件的每個索引條目占8 bytes,有兩個字段:relativeOffset 和 position(各占4 bytes)。也就是消息offset到其在文件中偏移量的一個映射。那有人要問了,索引項中保存的明明是一個叫relativeOffset的東西,為什么說是offset到偏移量的映射呢?其實,准確的來講,relativeOffset指的的相對偏移量,是對LogSegment基准offset而言的。我們注意到,一個LogSegment內的.log文件、.index文件、和.index文件除后綴外的名稱都是相同的。其實這個名稱就是該LogSegment的基准offset,即LogSegment內保存的第一條消息對應的offset。baseOffset + relativeOffset即可得到offset,所以稱索引項是offset到物理偏移量的映射。
不是所有的消息都對應.index文件內的一個條目。Kafka會每隔一定量的消息才會在.index建立索引條目,間隔大小由"log.index.interval.bytes"配置指定。.index文件布局示意圖如下:

.timeindex文件和.index原理相同,只不過其IndexEntry的兩個字段分別為timestamp(8 bytes)和relativeOffset(4 bytes)。用於減少以時間戳查找消息時遍歷元素數量。
1.3 順序IO
對於我們常用的機械硬盤,其讀取數據分3步:
- 尋道;
- 尋找扇區;
- 讀取數據;
前兩個,即尋找數據位置的過程為機械運動。我們常說硬盤比內存慢,主要原因是這兩個過程在拖后腿。不過,硬盤比內存慢是絕對的嗎?其實不然,如果我們能通過順序讀寫減少尋找數據位置時讀寫磁頭的移動距離,硬盤的速度還是相當可觀的。一般來講,IO速度層面,內存順序IO > 磁盤順序IO > 內存隨機IO > 磁盤隨機IO。
Kafka在順序IO上的設計分兩方面看:
- LogSegment創建時,一口氣申請LogSegment最大size的磁盤空間,這樣一個文件內部盡可能分布在一個連續的磁盤空間內;
- .log文件也好,.index和.timeindex也罷,在設計上都是只追加寫入,不做更新操作,這樣避免了隨機IO的場景;
Kafka是公認的高性能消息中間件,順序IO在這里占了很大一部分因素。
不知道大家有沒有聽過這樣一個說法:Kafka集群能承載的Partition數量有上限。很大一部分原因是Partition數量太多會抹殺掉Kafka順序IO設計帶來的優勢,相當於自廢武功。Why?因為不同Partition在磁盤上的存儲位置可不保證連續,當以不同Partition為讀寫目標並發地向Kafka發送請求時,Server端近似於隨機IO。
1.4 端到端壓縮
一條壓縮消息從生產者處發出后,其在消費者處才會被解壓。Kafka Server端不會嘗試解析消息體,直接原樣存儲,省掉了Server段壓縮&解壓縮的成本,這也是Kafka性能喜人的原因之一。
二. 源碼結構
2.1 核心類
2.1.1 核心類之間的關系
Kafka消息存儲涉及的核心類有:
- ReplicaManager
- Partition
- Replica
- Log
- LogSegment
- OffsetIndex
- TimeIndex
- MemoryRecords
- FileRecords
它們之間的關系如下圖:

2.1.1 數據傳遞對象
Kafka消息存儲的基本單位不是"一條消息",而是"一批消息"。在生產者文章中提到過,Producer針對每個Partition會攢一批消息,經過壓縮后發到Server端。Server端會將對應Partition下的這一"批"消息作為一個整體進行管理。所以在Server端,一個"Record"表示"一批消息",而數據傳遞對象"XXXRecords"則可以表示一批或多批消息。
MemoryRecords所表示的消息數據存儲於內存。比如Server端從接到生產者消息到將消息存入磁盤的過程就用MemoryRecords來傳遞數據,因為這期間消息需要暫存於內存,且沒有磁盤數據與之對應。MemoryRecords核心屬性有兩個:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| buffer | ByteBuffer | 存儲消息數據 |
| batches | Iterable<MutableRecordBatch> | 迭代器;用於以批為單位遍歷buffer所存儲的數據 |
FileRecords所表示的消息數據存儲於磁盤文件。比如從磁盤讀出消息返回給消費者的過程就用FileRecords來傳遞數據。其核心屬性如下:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| file | File | 消息數據所存儲的文件 |
| channel | FileChannel | 文件所對應的FileChannel |
| start | int | 本FileRecords所表示的數據在文件中的起始偏移量 |
| end | int | 本FileRecords所表示的數據在文件中的結束偏移量 |
| size | AtomicInteger | 本FileRecords所表示的數據的字節數 |
2.1.2 ReplicaManager
ReplicaManager負責管理本節點存儲的所有副本。這個類的屬性真的巨多。不過不要慌,對於消息存儲原理這塊,我們只需要關注下面這一個屬性就可以,其他和請求處理以及副本復制相關的屬性我們放到后邊對應章節慢慢分析。
| 屬性名 | 類型 | 說明 |
|---|---|---|
| allPartitions | Pool[TopicPartition, Partition] | 存儲Partition對象,可根據TopicPartition類將其檢索出來 |
2.1.3 Partition
Partition對象負責維護本分區下的所有副本,其核心屬性如下:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| allReplicasMap | Pool[Int, Replica] | 本分區下的所有副本。其中,key為BrokerId,value為Replica對象 |
| leaderReplicaIdOpt | Option[Int] | Leader副本所在節點的BrokerId |
| localBrokerId | Int | 本節點對應的BrokerId |
2.1.4 Replica
Replica負責維護Log對象。Replica是業務模型層面"副本"的表示,Log是數據存儲層面的"副本"。Replica核心屬性如下:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| log | Option[Log] | Replica對應的Log對象 |
| topicPartition | TopicPartition | 標識該副本所屬"分區" |
| brokerId | Int | 該副本所在的BrokerId |
| highWatermarkMetadata | LogOffsetMetadata | 高水位(后續章節會詳細分析) |
| logEndOffsetMetadata | LogOffsetMetadata | 該副本中現存最大的Offset(后續章節會詳細分析) |
2.1.5 Log
Log負責維護副本下的LogSegment,其核心屬性如下:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| dir | File | Log對應的目錄,即存儲LogSegment的文件夾 |
| segments | ConcurrentSkipListMap[java.lang.Long, LogSegment] | LogSegment集合,其中key為對應LogSegment的起始offset |
2.1.6 LogSegment
LogSegment則實實在在維護消息數據,其核心屬性如下:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| log | FileRecords | 本日志段的消息數據 |
| baseOffset | Long | 本日志段的起始offset |
| maxSegmentBytes | Int | 本日志段的最大字節數; 超過后就需要新建一個LogSegment; |
| maxSegmentMs | Long | 日志段也可以根據時間來滾動; 比如待插入消息和日志段第一個消息間隔超過一定時間后,需要開個新的日志段; maxSegmentMs便是所指定的間隔大小(segment.ms 配置項); |
| rollJitterMs | Long | 為避免當前節點上所有LogSegment同時滾動的情況,需要在maxSegmentMs基礎上減去一個隨機數值; rollJitterMs便是這個隨機擾動(segment.jitter.ms 配置項指定該隨機數的最大值) |
| offsetIndex | OffsetIndex | 偏移量索引,下文分析 |
| timeIndex | TimeIndex | 時間索引,下文分析 |
2.1.7 OffsetIndex和TimeIndex
首先兩個索引都繼承於AbstractIndex,那么他們就有一批共同的核心屬性:
| 屬性名 | 類型 | 說明 |
|---|---|---|
| file | File | 對應的索引文件 |
| mmap | MappedByteBuffer | 索引文件的內存映射 |
| maxIndexSize | Int | 索引文件的最大字節數, 由 segment.index.bytes 配置項指定 |
| baseOffset | Long | 所在日志段的起始offset |
實際上,這些屬性已足夠表達當前的索引邏輯,OffsetIndex和TimeIndex均未再額外自定義屬性。
2.2 消息寫入流程
消息寫入流程時序圖如下:

需要提一點,這里不是為了讓諸君將這一串流程視為整體記入腦海。面向對象的代碼仍然要從面向對象的角度去理解。所以這里重要的是各個類各自內部的邏輯,這有助於進一步明確類所扮演的角色。
2.2.1 ReplicaManager.appendRecords
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],// 各Partition上待插入的消息數據
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
... ...
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
isFromClient = isFromClient, entriesPerPartition, requiredAcks)
... ...
}
private def appendToLocalLog(internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
... ...
// step1 reject appending to internal topics if it is not allowed
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
//step2 若本Broker節點不承載對應partition的主副本, 這步會拋異常
val (partition, _) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
//step3 將消息寫入對應Partition主副本, 並喚醒相關的等待操作(比如, 消費等待)
val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
... ...
}
}
}
}
appendRecords直接調用appendToLocalLog,后者才是真正實行邏輯的方法。ReplicaManager的邏輯基本分三步走:
- 檢查目標Topic是否為Kafka內部Topic,若是的話根據配置決定是否允許寫入;
- 獲取對應的Partition對象;
- 調用Partition.appendRecordsToLeader寫入消息數據;
2.2.2 Partition.appendRecordsToLeader
接下來看看Partition內部的邏輯
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
//step1 判斷Leader副本是否在當前節點
case Some(leaderReplica) =>
//step2 獲取Log對象
val log = leaderReplica.log.get
... ...
//step3 調用Log對象方法寫入數據
val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
... ...
// 若本節點不是目標Partition的Leader副本, 拋異常
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
... ...
}
這里的邏輯也分3步走:
- 判斷Leader副本是否在當前節點;
- 獲取Log對象;
- 調用Log對象的appendAsLeader方法寫入數據;
這里我們額外看下第1步的原理。leaderReplicaIfLocal是個方法
def leaderReplicaIfLocal: Option[Replica] =
leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId))
其核心思想是那本節點BrokerId和Leader副本所在節點的BrokerId作比較,若相等,則返回對應的Replica對象。
2.2.3 Log.appendAsLeader
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
append(records, isFromClient, assignOffsets = true, leaderEpoch)
}
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
... ...
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
... ...
// 將消息插入segment
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
... ...
}
appendAsLeader方法直接調用append方法,后者兩步走:
- 判斷是否需要創建一個新的LogSegment,並返回最新的LogSegment;
- 調用LogSegment.append方法寫入數據;
這里我們再額外關注下第1步的判斷標准。主要還是根據LogSegment.shouldRoll方法的返回值來作決策:
def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
}
Kafka的源碼很清晰的,這方面值得點贊和學習。從shouldRoll的結果表達式我們可以看到,以下4類場景中,LogSegment需要向前滾動:
- 若接受新消息的寫入,當前LogSegment將超過最大字節數限制;
- 若接受新消息的寫入,當前LogSegment將超過最大時間跨度限制;
- 當前LogSegment對應的索引已無法寫入新數據;
- 輸入的offset不在當前LogSegment表示范圍;
2.2.4 LogSegment.append
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
// step1.1 判斷輸入消息大小
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
// step1.2 校驗offset
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// step2 append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// step3 Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// step4 append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
LogSegment.append大體可以分為4步:
- 數據校驗
- 校驗輸入消息大小;
- 校驗offset;
- 寫入數據(注意: 此步的log對象不是Log類的實例,而是FileRecords的實例);
- 更新統計數據;
- 處理索引;
三. 總結
本文從業務模型&源碼角度分析了Kafka消息存儲原理。才疏學淺,不一定很全面。
另外也可以在目錄中找到同系列的其他文章:Kafka源碼分析系列-目錄(收藏關注不迷路)。
歡迎諸君隨時來交流。
微信搜索“村口老張頭”,不定期推送技術文章哦~

也可以知乎搜索“村口老張頭”哦~
