kafka學習(四)-Topic & Partition


topic中partition存儲分布

Topic在邏輯上可以被認為是一個queue。每條消費都必須指定它的topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得 Kafka的吞吐率可以水平擴展,物理上把topic分成一個或多個partition,每個partition在物理上對應一個文件夾,該文件夾下存儲 這個partition的所有消息和索引文件。partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。

partition中文件存儲方式

下面示意圖形象說明了partition中文件存儲方式:

  • 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中,但每個段segment file消息數量不一定相等,下面會提到一消息數量的算法,因為每個segment的大小是一定的,但是每條消息的大小可能不相同,因此數量不同。
  • 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定,這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。

partition中segment文件存儲結構

前面提到了每個topic被分成了多個partition分布到各個broker上,而每個partition的文件夾中又又多個小文件組成。

  • segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件.
  • segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個全局partion的最大offset(偏移message數)。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。

一對segment file文件為例,說明segment中index<—->data file對應關系物理結構

索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。其中以索引文件中元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移址為497。

從上述了解到segment data file由許多message組成,下面詳細說明message物理結構如下:

關鍵字 解釋說明
8 byte offset 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校驗message
1 byte “magic" 表示本次發布Kafka服務程序協議版本號
1 byte “attributes" 表示為獨立版本、或標識壓縮類型、或編碼類型。
4 byte key length 表示key的長度,當key為-1時,K byte key字段不填
K byte key 可選
value bytes payload 表示實際消息數據。

在partition中如何通過offset查找message

例如讀取offset=368776的message,需要通過下面2個步驟查找。

  • 第一步查找segment file

    其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。

    當offset=368776時定位到00000000000000368769.index|log

  • 第二步通過segment file查找message通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,然后再通過00000000000000368769.log順序查找直到 offset=368776為止。

從上述圖3可知這樣做的優點,segment index file采取稀疏索引存儲方式,它減少索引文件大小,通過mmap可以直接內存操作,稀疏索引為數據文件的每個對應message設置一個元數據指針,它 比稠密索引節省了更多的存儲空間,但查找起來需要消耗更多的時間。

高效性的保證

每條消息都被append到該partition中,是順序寫入磁盤的,在機械盤中隨機寫入的效率是很低的,但是如果是順序寫入效率非常高這是kafka高吞吐率的一個很重要的保證。

每一條消息被發送到broker時,會根據paritition規則選擇被存儲到哪一個partition。如果partition規則設置的合理,所有 消息可以均勻分布到不同的partition里,這樣就實現了水平擴展。(如果一個topic對應一個文件,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題)。在創建topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數量(如下所示),當然也可以在topic創建之后去修改parition數量

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

在發送一條消息時,可以指定這條消息的key,producer根據這個key和partition機制來判斷將這條消息發送到哪個parition。 paritition機制可以通過指定producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則將對應的整數與partition總數取余,該消息會被發送到該數對應的partition。(每個parition都會有個序號)

對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有數據(實際 上也沒必要),因此Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數據,如下所示。

  ############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will default to 
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs 
#can then be marked for log compaction.
log.cleaner.enable=false

 這里要注意,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤 以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata信息–當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下 consumer會在消費完一條消息后線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。因為 offet由consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM