關於 Topic 和 Partition
Topic
在 kafka 中,topic 是一個存儲消息的邏輯概念,可以認為是一個消息集合。每條消息發送到 kafka 集群的消息都有一個類別。物理上來說,不同的 topic 的消息是分開存儲的,每個 topic 可以有多個生產者向它發送消息,也可以有多個消費者去消費其中的消息。
Partition
每個 topic 可以划分多個分區(每個 Topic 至少有一個分區),同一 topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset
保證消息在分區內的順序,offset 的順序不跨分區,即 kafka只保證在同一個分區內的消息是有序的。
下圖中,對於名字為 test 的 topic,做了 3 個分區,分別是p0、p1、p2
每一條消息發送到 broker 時,會根據 partition 的規則選擇存儲到哪一個 partition。如果 partition 規則設置合理,那么所有的消息會均勻的分布在不同的partition中,這樣就有點類似數據庫的分庫分表的概念,把數據做了分片處理。
Topic&Partition 的存儲
Partition 是以文件的形式存儲在文件系統中,比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那么在kafka 的數據目錄(/tmp/kafka-log)中就有 3 個目錄,
firstTopic-0~3,命名規則是<topic_name>-<partition_id>./kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic
關於消息分發
kafka 消息分發策略
消息是 kafka 中最基本的數據單元,在 kafka 中,一條消息由 key、value 兩部分構成,在發送一條消息時,我們可以指定這個 key,那么 producer 會根據 key 和 partition 機制來判斷當前這條消息應該發送並存儲到哪個 partition 中。
我們可以根據需要進行擴展 producer 的 partition 機制。
package com.lf; import java.util.List; import java.util.Map; import java.util.Random; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; public class MyPartition implements Partitioner { private Random random = new Random(); @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 獲取分區列表 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int patitionNum = 0; if (key == null) { patitionNum = random.nextInt(partitionInfos.size());//隨機分區 } else { patitionNum = Math.abs((key.hashCode()%partitionInfos.size())); } System.out.println("key->"+key+"value->"+value+"->"+patitionNum); return patitionNum;//返回分區值 } @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } }
分發消息指定分區
package com.lf;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerDemo extends Thread{
private final KafkaProducer<Integer, String> producer;
private final String topic ;
public KafkaProducerDemo(String topic) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.129:9092,192.168.25.130:9092,192.168.25.131:9092");
// 客戶端ID標識
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
//確認記錄,保證記錄不丟失 總是設置成-1
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
// 鍵序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
//值序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//指定分區
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lf.MyPartition");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
}
@Override
public void run() {
int num = 0;
while(num<50){
try {
num++;
String message = "message_" + num;
System.out.println("begin send message" + message);
producer.send(new ProducerRecord<Integer, String>(topic, message));
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerDemo("lftest").start();
}
}
消息默認的分發機制
默認情況下,kafka 采用的是 hash 取模的分區算法。如果Key 為 null,則會隨機分配一個分區。這個隨機是在這個參數”metadata.max.age.ms”的時間范圍內隨機選擇一個。對於這個時間段內,如果 key 為 null,則只會發送到唯一的
分區。這個值值哦默認情況下是 10 分鍾更新一次。關 於 Metadata ,這個之前沒講過,簡單理解就是Topic/Partition 和 broker 的映射關系,每一個 topic 的每一個 partition,需要知道對應的 broker 列表是什么,leader
是誰、follower 是誰。這些信息都是存儲在 Metadata 這個類里面。
消費端如何消費指定的分區
package com.lf;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaConsumerDemo extends Thread{
private final KafkaConsumer consumer;
public KafkaConsumerDemo(String topic) {
Properties properties = new Properties();
// 連接的 kafka 集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.129:9092,192.168.25.130:9092,192.168.25.131:9092");
// 消費者分組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
//確認自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交間隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 鍵序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
// 值序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//對於不同的groupid保證能消費到之前的消息,充值offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
//consumer.subscribe(Collections.singletonList(topic));
//指定消費分區
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
}
@Override
public void run() {
while(true){
ConsumerRecords<Integer,String> consumerRecords = consumer.poll(1000);
for(ConsumerRecord consumerRecord: consumerRecords){
System.out.println(consumerRecord.partition()+"message_"+consumerRecord.value());
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("lftest").start();
}
}
消息的消費原理
kafka 消息消費原理演示
在實際生產過程中,每個 topic 都會有多個 partitions,多個 partitions 的好處在於,一方面能夠對 broker 上的數據進行分片有效減少了消息的容量從而提升 io 性能。另外一方面,為了提高消費端的消費能力,一般會通過多個
consumer 去消費同一個 topic ,也就是消費端的負載均衡機制,也就是我們接下來要了解的,在多個 partition 以及多個 consumer 的情況下,消費者是如何消費消息的同時,在上一節課,我們講了,kafka 存在 consumer group
的 概 念 , 也 就是 group.id 一 樣 的 consumer ,這些consumer 屬於一個 consumer group,組內的所有消費者協調在一起來消費訂閱主題的所有分區。當然每一個分區只能由同一個消費組內的 consumer 來消費,那么同一個
consumer group 里面的 consumer 是怎么去分配該消費哪個分區里的數據的呢?如下圖所示,3 個分區,3 個消費者,那么哪個消費者消分哪個分區?(代碼開三個消費者,觀察)
對於上面這個圖來說,這 3 個消費者會分別消費 test 這個topic 的 3 個分區,也就是每個 consumer 消費一個partition。
什么是分區分配策略
通過前面的案例演示,我們應該能猜到,同一個 group 中的消費者對於一個 topic 中的多個 partition,存在一定的分區分配策略。
在 kafka 中,存在兩種分區分配策略,一種是 Range(默認)、另 一 種 另 一 種 還 是 RoundRobin ( 輪 詢 )。 通 過partition.assignment.strategy 這個參數來設置。
Range strategy(范圍分區)
Range 策略是對每個主題而言的,首先對同一個主題里面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設我們有 10 個分區,3 個消費者,排完序的分區
將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C3-0。然后將 partitions 的個數除於消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不
盡,那么前面幾個消費者線程將會多消費一個分區。在我們的例子里面,我們有 10 個分區,3 個消費者線程, 10 / 3 = 3,而且除不盡,那么消費者線程 C1-0 將會多消費一
個分區,所以最后分區分配的結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6 分區
C3-0 將消費 7, 8, 9 分區
假如我們有 11 個分區,那么最后分區分配的結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6, 7 分區
C3-0 將消費 8, 9, 10 分區
假如我們有 2 個主題(T1 和 T2),分別有 10 個分區,那么最后分區分配的結果看起來是這樣的:
C1-0 將消費 T1 主題的 0, 1, 2, 3 分區以及 T2 主題的 0,
1, 2, 3 分區
C2-0 將消費 T1 主題的 4, 5, 6 分區以及 T2 主題的 4, 5,
6 分區
C3-0 將消費 T1 主題的 7, 8, 9 分區以及 T2 主題的 7, 8,
9 分區
可以看出,C1-0 消費者線程比其他消費者線程多消費了 2 個分區,這就是 Range strategy 的一個很明顯的弊端
RoundRobin strategy(輪詢分區)
輪詢分區策略是把所有 partition 和所有 consumer 線程都列出來,然后按照 hashcode 進行排序。最后通過輪詢算法分配 partition 給消費線程。如果所有 consumer 實例的
訂閱是相同的,那么 partition 會均勻分布。
在我們的例子里面,假如按照 hashCode 排序完的 topicpartitions 組依次為 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為 C1-0, C1-1, C2-0, C2-1,最后分區分配的結果為:
C1-0 將消費 T1-5, T1-2, T1-6 分區;
C1-1 將消費 T1-3, T1-1, T1-9 分區;
C2-0 將消費 T1-0, T1-4 分區;
C2-1 將消費 T1-8, T1-7 分區;
使用輪詢分區策略必須滿足兩個條件
1. 每個主題的消費者實例具有相同數量的流
2. 每個消費者訂閱的主題必須是相同的
什么時候會觸發這個策略呢?
當出現以下幾種情況時,kafka 會進行一次分區分配操作,也就是 kafka consumer 的 rebalance
1. 同一個 consumer group 內新增了消費者
2. 消費者離開當前所屬的 consumer group,比如主動停機或者宕機
3. topic 新增了分區(也就是分區數量發生了變化)
kafka consuemr 的 rebalance 機制規定了一個 consumergroup 下的所有 consumer 如何達成一致來分配訂閱 topic的每個分區。而具體如何執行分區策略,就是前面提到過
的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式, 也就是說,除了這兩種之外,我們還可以創建自己的分配機制。
誰來執行 Rebalance 以及管理 consumer 的 group 呢?
Kafka 提供了一個角色:coordinator 來執行對於 consumer group 的管理,Kafka 提供了一個角色:coordinator 來執行對於 consumer group 的管理,當 consumer group 的第一個 consumer 啟動的時候,它會去和 kafka server 確
定誰是它們組的 coordinator。之后該 group 內的所有成員都會和該 coordinator 進行協調通信
如何確定 coordinator
consumer group 如何確定自己的 coordinator 是誰呢, 消費 者 向 kafka 集 群 中 的 任 意 一 個 broker 發 送 一 個GroupCoordinatorRequest 請求,服務端會返回一個負載最 小 的 broker 節 點 的 id , 並 將 該 broker 設 置 為coordinator
JoinGroup 的過程
在 rebalance 之前,需要保證 coordinator 是已經確定好了的,整個 rebalance 的過程分為兩個步驟,Join 和 Sync
join: 表示加入到 consumer group 中,在這一步中,所有的成員都會向 coordinator 發送 joinGroup 的請求。一旦所有成員都發送了 joinGroup 請求,那么 coordinator 會選擇一個 consumer 擔任 leader 角色,並把組成員信息和訂閱信息發送消費者
protocol_metadata: 序列化后的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator 會選擇一個座位 leader,對應的就是 member_id
member_metadata 對應消費者的訂閱信息
members:consumer group 中全部的消費者的訂閱信息
generation_id:年代信息,類似於之前講解 zookeeper 的時候的 epoch 是一樣的,對於每一輪 rebalance ,generation_id 都會遞增。
主要用來保護 consumer group。隔離無效的 offset 提交。也就是上一輪的 consumer 成員無法提交 offset 到新的 consumer group 中。
Synchronizing Group State 階段
完成分區分配之后,就進入了 Synchronizing Group State階段,主要邏輯是向 GroupCoordinator 發 送SyncGroupRequest 請求,並且處理 SyncGroupResponse響應,簡單來說,就是 leader 將消費者對應的 partition 分
配方案同步給 consumer group 中的所有 consumer
每個消費者都會向 coordinator 發送 syncgroup 請求,不過只有 leader 節點會發送分配方案,其他消費者只是打打醬油而已。當 leader 把方案發給 coordinator 以后,coordinator 會把結果設置到 SyncGroupResponse 中。這
樣所有成員都知道自己應該消費哪個分區。
consumer group 的分區分配方案是在客戶端執行的!Kafka 將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性
如何保存消費端的消費位置
什么是 offset
前面在講解 partition 的時候,提到過 offset, 每個 topic可以划分多個分區(每個 Topic 至少有一個分區),同一topic 下的不同分區包含的消息是不同的。每個消息在被添
加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset 保證消息
在分區內的順序,offset 的順序不跨分區,即 kafka 只保證在同一個分區內的消息是有序的; 對於應用層的消費來說,每次消費一個消息並且提交以后,會保存當前消費到的最近的一個 offset。那么 offset 保存在哪里?
offset 在哪里維護?
在 kafka 中,提供了一個__consumer_offsets_* 的一個topic , 把 offset 信 息 寫 入 到 這 個 topic 中 。__consumer_offsets——按保存了每個 consumer group某一時刻提交的 offset 信息。__consumer_offsets 默認有
50 個分區。根 據 前 面 我 們 演 示 的 案 例 , 我 們 設 置 了 一 個KafkaConsumerDemo 的 groupid。首先我們需要找到這個 consumer_group 保存在哪個分區中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
計算公式
➢ Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 於 默 認 情 況 下groupMetadataTopicPartitionCount 有 50 個分區,計算得到的結果為:35, 意味着當前的 consumer_group 的位移信息保存在__consumer_offsets 的第 35 個分區
➢ 執行如下命令,可以查看當前 consumer_goup 中的offset 位移信息
sh kafka-simple-consumer-shell.sh --topic__consumer_offsets --partition 35 --broker-list 192.168.11.153:9092,192.168.11.154:9092,192.168.11.157:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$ OffsetsMessageFormatter"
從輸出結果中,我們就可以看到 test 這個 topic 的 offset的位移日志
消息的存儲
消息的保存路徑
消息發送端發送消息到 broker 上以后,消息是如何持久化的呢?那么接下來去分析下消息的存儲首先我們需要了解的是,kafka 是使用日志文件的方式來保存生產者和發送者的消息,每條消息都有一個 offset 值來
表示它在分區中的偏移量。Kafka 中存儲的一般都是海量的消息數據,為了避免日志文件過大,Log 並不是直接對應在一個磁盤上的日志文件,而是對應磁盤上的一個目錄,
這個目錄的明明規則是<topic_name>_<partition_id>比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那么在 kafka 的數據目錄(/tmp/kafka-log)中就有 3 個目錄,firstTopic-0~3
多個分區在集群中的分配
如果我們對於一個 topic,在集群中創建多個 partition,那么 partition 是如何分布的呢?
1.將所有 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上
了解到這里的時候,大家再結合前面講的消息分發策略,就應該能明白消息發送到 broker 上,消息會保存到哪個分區中,並且消費端應該消費哪些分區的數據了
消息寫入的性能
我們現在大部分企業仍然用的是機械結構的磁盤,如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數據所在的物理地址,在磁盤上就要
找到對應的柱面、磁頭以及對應的扇區;這個過程相對內存來說會消耗大量時間,為了規避隨機讀寫帶來的時間消耗,kafka 采用順序寫的方式存儲數據。即使是這樣,但是頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka
還有一個性能策略
零拷貝
消息從發送到落地保存,broker 維護的消息日志本身就是文件目錄,每個文件都是二進制保存,生產者和消費者使用相同的格式來處理。在消費者獲取消息時,服務器先從硬盤讀取數據到內存,然后把內存中的數據原封不動的通
過 socket 發送給消費者。雖然這個操作描述起來很簡單,但實際上經歷了很多步驟。
操作系統將數據從磁盤讀入到內核空間的頁緩存
▪ 應用程序將數據從內核空間讀入到用戶空間緩存中
▪ 應用程序將數據寫回到內核空間到 socket 緩存中
▪ 操作系統將數據從 socket 緩沖區復制到網卡緩沖區,以便將
數據經網絡發出
這個過程涉及到 4 次上下文切換以及 4 次數據復制,並且有兩次復制操作是由 CPU 完成。但是這個過程中,數據完全沒有進行變化,僅僅是從磁盤復制到網卡緩沖區。
通過“零拷貝”技術,可以去掉這些沒必要的數據復制操作,同時也會減少上下文切換次數。現代的 unix 操作系統提供一個優化的代碼路徑,用於將數據從頁緩存傳輸到 socket;在 Linux 中,是通過 sendfile 系統調用來完成的。Java 提
供了訪問這個系統調用的方法:FileChannel.transferTo API
使用 sendfile,只需要一次拷貝就行,允許操作系統將數據直接從頁緩存發送到網絡上。所以在這個優化的路徑中,只有最后一步將數據拷貝到網卡緩存中是需要的