記錄下和kafka相關的Message、日志文件、索引文件、consumer記錄消費的offset相關內容,文中很多理解參考文末博文、書籍還有前輩。
kafka中的消息
kafka中的消息Message,在V1版本中是如下部分組成,主要關系key和value。
(1)key:當需要將消息寫入到某個topic下的指定partition分區時,需要給定key的值。
(2)value:實際消息內容保存在這里。
(3)其他均是消息的元數據,一般不用關心,對用戶來說是透明的。
為了保存這些消息數據,kafka使用了ByteBuffer來存儲,它是緊湊型字節數組,相比使用java對象來保存消息數據到堆內存,它更加的節省空間,提高內存使用率。
log和index文件
基本介紹
查看一個topic分區目錄下的內容,發現有log、index和timeindex三個文件,它有以下幾個特點。
(1)log文件名是以文件中第一條message的offset來命名的,實際offset長度是64位,但是這里只使用了20位,應付生產是足夠的。可以看出第一個log文件名是以0開頭,而第二個log文件是4161281,說明第一log文件保存了offset從0到4161280的消息。
(2)一組index+log+timeindex文件的名字是一樣的,並且log文件默認寫滿1G后,會進行log rolling形成一個新的組合來記錄消息,這個是通過broker端log.segment.bytes=1073741824指定的,可以修改這個值進行調整。
(3)index和timeindex在剛使用時會分配10M的大小,當進行log rolling后,它會修剪為實際的大小,所以看到前幾個索引文件的大小,只有幾百K。
# 一個分區目錄下文件內容,參考文末書籍杜撰,主要為了說明概念
[root@hadoop01 /home/software/kafka-2/kafka-logs/football-0]# ll -h
-rw-r--r--. 1 root root 514K Mar 20 16:04 00000000000000000000.index
-rw-r--r--. 1 root root 1.0G Mar 17 03:36 00000000000000000000.log
-rw-r--r--. 1 root root 240K Mar 20 16:04 00000000000000000000.timeindex
-rw-r--r--. 1 root root 512K Mar 20 16:04 00000000000004161281.index
-rw-r--r--. 1 root root 1.0G Mar 17 03:36 00000000000004161281.log
-rw-r--r--. 1 root root 177K Mar 20 16:04 00000000000004161281.timeindex
-rw-r--r--. 1 root root 10M Mar 20 16:04 00000000000008749921.index
-rw-r--r--. 1 root root 390M Mar 17 03:36 00000000000008749921.log
-rw-r--r--. 1 root root 10M Mar 20 16:04 00000000000008749921.timeindex
如果想查看這些文件,可以使用kafka提供的shell來完成,幾個關鍵信息如下:
(1)offset是逐漸增加的整數。
(2)position是相對外層batch的位置增量,可以理解為消息的字節偏移量。
(3)CreateTime:時間戳。
(4)magic:2代表這個消息類型是V2,如果是0則代表是V0類型,1代表V1類型。本機是V2類型的,不過也可以暫時按照上面的V1來參考理解,具體需要看文末書籍里的詳細介紹。
(5)compresscodec:None說明沒有指定壓縮類型,kafka目前提供了4種可選擇,0-None、1-GZIP、2-snappy、3-lz4。
(6)crc:對所有字段進行校驗后的crc值。
# 查看並打印log文件內容
[root@hadoop01 /home/software/kafka-2/kafka-logs/football-0]# ../../bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000004.log --print-data-log
Dumping 00000000000000000004.log
Starting offset: 4
baseOffset: 4 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false position: 0 CreateTime: 1584368524633 isvalid: true size: 85 magic: 2 compresscodec: NONE crc: 3049289418
baseOffset: 5 lastOffset: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false position: 85 CreateTime: 1584368668414 isvalid: true size: 73 magic: 2 compresscodec: NONE crc: 2267711305
baseOffset: 6 lastOffset: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false position: 158 CreateTime: 1584368679882 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 789213838
baseOffset: 7 lastOffset: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false position: 236 CreateTime: 1584368695371 isvalid: true size: 95 magic: 2 compresscodec: NONE crc: 703634716
結構原理
(1)消息內容,保存在log日志文件中,它是記錄message的載體。消息會封裝成Record的形式,append到log日志文件末尾,采用的是順序寫模式,參考官網圖片,一個topic的不同分區,可以想成queue,都會順序寫入發送到它的消息。圖中partition0中的0、1、2、3等數字就是一個分區中消息的offset,它是遞增的數字。
注意消費者也是有offset的,剛開始學的時候兩者混淆了,消費者的offset指的是消費的位置,它是不斷更新的數字,主要是為了下次繼續消費定位用的。如官網中圖片所示,消費者A消費的offset是9,消費者B消費的offset是11,不同的消費者offset是交給一個內部公共topic來記錄的。
(2)位置索引,保存在index文件中,log日志默認每寫入4K(log.index.interval.bytes設定的),會寫入一條索引信息到index文件中,因此索引文件是稀疏索引,它不會為每條日志都建立索引信息。
下圖是網上拿來的直接用了,log文件中的日志,是順序寫入的,由message+實際offset+position組成,索引文件的數據結構則是由相對offset(4byte)+position(4byte)組成,由於保存的是相對第一個消息的相對offset,只需要4byte就可以了,可以節省空間,在實際查找后還需要計算回實際的offset,這對用戶是透明的。如下圖由於log文件名是從0開始的,因此相對offset為3的實際offset是3+0,依然是3。
對於稀疏索引,盡管它的索引密度不高,但是offset是有序的,kafka查找一條offset對應的實際的消息時,可以通過index二分查找,獲取到最近的低位offset,然后從低位offset對應的position開始,從實際的log文件中開始往后查找對應的消息。如要查找offset=5的消息,先去索引文件中找到低位的3 4597這條數據,然后通過4597這個字節偏移量,從log文件中從4597個字節開始讀取,直到讀取到offset=5的這條數據,這比直接從log文件開始讀取要節省時間。二分查找的時間復雜度為O(lgN),如果從頭遍歷時間復雜度是O(N)。
注意下圖的index中逗號是不存在的,這個圖片加的逗號是為了方便理解。
(3)時間戳索引文件,它的作用是可以讓用戶查詢某個時間段內的消息,它一條數據的結構是時間戳(8byte)+相對offset(4byte),如果要使用這個索引文件,首先需要通過時間范圍,找到對應的相對offset,然后再去對應的index文件找到position信息,然后才能遍歷log文件,它也是需要使用上面說的index文件的。
但是由於producer生產消息可以指定消息的時間戳,這可能將導致消息的時間戳不一定有先后順序,因此盡量不要生產消息時指定時間戳。
消費組和coordinator
消費者消費消息時,會記錄消費者offset(注意不是分區的offset,不同的上下文環境一定要區分),這個消費者的offset,也是保存在一個特殊的內部分區,叫做__consumer_offsets,它就一個作用,那就是保存消費組里消費者的offset。默認創建時會生成50個分區(offsets.topic.num.partitions設置),一個副本,如果50個分區分布在50台服務器上,將大大緩解消費者提交offset的壓力。可以在創建消費者的時候產生這個特殊消費組。
# 如果只啟動了hadoop03一個broker,則所有的50個分區都會在這上面生成
[root@hadoop03 /home/software/kafka-2/bin]# sh kafka-console-consumer.sh --bootstrap-server hadoop03:9092 --topic football --from-beginning --new-consumer
那么問題來了,消費者的offset到底保存到哪個分區呢,kafka中是按照消費組group.id來確定的,使用Math.abs(groupId.hashCode())%50,來計算分區號,這樣就可以確定一個消費組下的所有的消費者的offset,都會保存到哪個分區了.
那么問題又來了,既然一個消費組內的所有消費者都把offset提交到了__consumer_offsets下的同一個分區,如何區分不同消費者的offset呢?原來提交到這個分區下的消息,key是groupId+topic+分區號,value是消費者offset。這個key里有分區號,注意這個分區號是消費組里消費者消費topic的分區號。由於實際情況下一個topic下的一個分區,只能被一個消費組里的一個消費者消費,這就不擔心offset混亂的問題了。
實際上,topic下多個分區均勻分布給一個消費組下的消費者消費,是由coordinator來完成的,它會監聽消費者,如果有消費者宕機或添加新的消費者,就會rebalance,使用一定的策略讓分區重新分配給消費者。如下圖所示,消費組會通過offset保存的位置在哪個broker,就選舉它作為這個消費組的coordinator,負責監聽各個消費者心跳了解其健康狀況,並且將topic對應的leader分區,盡可能平均的分給消費組里的消費者,根據消費者的變動,如新增一個消費者,會觸發coordinator進行rebalance。
還有一個細節,消費者組和coordinator之間還進行了什么通信,各個消費者之間是如何做到默契不搶別人的資源?參考前輩整理如下。
(1)消費組會對選出的coordinator發送join group請求。
(2)coordinator會在消費組中選一個leader消費者,並且隨后把要消費的topic信息返回給這個leader。
(3)leader消費者會根據topic信息,制定出一套符合自己消費組的消費方案,通過sync group請求返回給coordinator。
(4)coordinator收到分配方案后會分發給各個消費者。
(5)最后每個消費者身上都會有一套消費方案,都遵守它進行消費。
coordinator主要是充當管理者的角色,它不負責消費方案的制定。
rebalance
rebalance是消費組內達成一致如何消費topic分區的協議,文末書籍里提到有三個觸發條件,這里只記錄第一個因為它最常出現,那就是消費組里消費者或增加、或離去、或奔潰(它像極了人生)。其他兩個,一個是topic分區數使用kafka shell增加了分區,還有一個就是消費的topic是按照正則去匹配的,當有了符合這個規則的新的topic出現,也會觸發rebalance。
它有三種策略,為range、round robin、sticky。
假設topicA分區有p0~p6 一共6個分區,某個消費組有三個消費者,以此為基礎來直觀感受三個策略。
(1)range
它就是一個范圍(有點類似python的range,跟這個沒關系就是名字像),會按照分區號來划分,結果就是:
消費者1 p0 p1,消費者2 p2 p3,消費者3 p4 p5
(2)round robin
就是隨機均勻分配,結果略。
(3)sticky
上面兩種分配存在一個小問題,就是有消費者宕機后,重新分配后,原本屬於一個消費者消費得好好的的分區會被分到新的消費者。如range策略下消費者3掛掉,重新分配后會變成消費者1 p0 p1 p2 消費者2 p3 p4 p5,這樣p2就被重分配了。考慮到管理消費者offset的復雜性,盡量希望維持原來的習慣,如果是sticky策略會變成消費者1 p0 p1 p4 消費者2 p2 p3 p5。
以上,理解不一定正確,寫的也比較啰嗦,但學習就是一個不斷了解和糾錯的過程。
參考博文:
(1)https://blog.csdn.net/xiaoyu_bd/article/details/52398265
(2)《Apache Kafka實戰》