首先要注意的是,Kafka 中的 Topic 和 ActiveMQ 中的 Topic 是不一樣的。
在 Kafka 中,Topic 是一個存儲消息的邏輯概念,可以認為是一個消息集合。每條消息發送到 Kafka 集群的消息都有一個類別。物理上來說,不同的 Topic 的消息是分開存儲的,每個 Topic 可以有多個生產者向它發送消息,也可以有多個消費者去消費其中的消息。
每個 Topic 可以划分多個分區(每個 Topic 至少有一個分區),同一 Topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset,它是消息在此分區中的唯一編號,Kafka 通過 offset 保證消息在分區內的順序,offset 的順序不跨分區,即 Kafka 只保證在同一個分區內的消息是有序的。
消息是每次追加到對應的 Partition 的后面:

Topic & Partition 的存儲
Topic 是一個邏輯上的概念,具體的存儲還是基於 Partition 來的。
創建一個 test2 Topic(注意這里的 partitions 參數為 3):
可以進入 /tmp/kafka-logs 目錄下進行查看(當前機器 IP 是 192.168.220.135):
在 135 機器:
在另外一台 136 機器上:
可以發現在 135 機器上有 test2-0 和 test2-2,在 136 機器上有 test2-1。接下來再結合 Kafka 的消息分發策略來看。
消息分發
Kafka 中最基本的數據單元就是消息,而一條消息其實是由 Key + Value 組成的(Key 是可選項,可傳空值,Value 也可以傳空值),這也是與 ActiveMQ 不同的一個地方。在發送一條消息時,我們可以指定這個 Key,那么 Producer 會根據 Key 和 partition 機制來判斷當前這條消息應該發送並存儲到哪個 partition 中(這個就跟分片機制類似)。我們可以根據需要進行擴展 Producer 的 partition 機制(默認算法是 hash 取 %)。
擴展自己的 partition:
-
package dongguabai.kafka.partition;
-
-
import org.apache.kafka.clients.producer.Partitioner;
-
import org.apache.kafka.common.Cluster;
-
import org.apache.kafka.common.PartitionInfo;
-
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Random;
-
-
/**
-
* 消息發送后會調用自定義的策略
-
*
-
* @author Dongguabai
-
* @date 2019/1/18 15:40
-
*/
-
public class MyPartitioner implements Partitioner {
-
-
-
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
-
//獲取當前 topic 有多少個分區(分區列表)
-
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
-
int partitionNum = 0;
-
if (key == null) { //之前介紹過 Key 是可以傳空值的
-
partitionNum = new Random().nextInt(partitions.size()); //隨機
-
} else {
-
//取 %
-
partitionNum = Math.abs((key.hashCode()) % partitions.size());
-
}
-
System.out.println( "key:" + key + ",value:" + value + ",partitionNum:" + partitionNum);
-
//發送到指定分區
-
return partitionNum;
-
}
-
-
-
public void close() {
-
-
}
-
-
-
public void configure(Map<String, ?> configs) {
-
-
}
-
}
改造 Kafka Demo 中的 Producer 的代碼:
-
package dongguabai.kafka;
-
-
import org.apache.kafka.clients.producer.*;
-
-
import java.util.Properties;
-
import java.util.concurrent.ExecutionException;
-
-
/**
-
* @author Dongguabai
-
* @date 2019/1/17 11:26
-
*/
-
public class KafkaProducerDemo extends Thread {
-
/**
-
* 消息發送者
-
*/
-
private final KafkaProducer<Integer, String> producer;
-
-
/**
-
* topic
-
*/
-
private final String topic;
-
-
private final Boolean isAsync;
-
-
public KafkaProducerDemo(String topic, Boolean isAsync) {
-
this.isAsync = isAsync;
-
//構建相關屬性
-
//@see ProducerConfig
-
Properties properties = new Properties();
-
//Kafka 地址
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
-
//kafka 客戶端 Demo
-
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
-
//The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
-
/**發送端消息確認模式:
-
* 0:消息發送給broker后,不需要確認(性能較高,但是會出現數據丟失,而且風險最大,因為當 server 宕機時,數據將會丟失)
-
* 1:只需要獲得集群中的 leader節點的確認即可返回
-
* -1/all:需要 ISR 中的所有的 Replica進行確認(集群中的所有節點確認),最安全的,也有可能出現數據丟失(因為 ISR 可能會縮小到僅包含一個 Replica)
-
*/
-
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
-
-
/**【調優】
-
* batch.size 參數(默認 16kb)
-
* public static final String BATCH_SIZE_CONFIG = "batch.size";
-
*
-
* producer對於同一個 分區 來說,會按照 batch.size 的大小進行統一收集進行批量發送,相當於消息並不會立即發送,而是會收集整理大小至 16kb.若將該值設為0,則不會進行批處理
-
*/
-
-
/**【調優】
-
* linger.ms 參數
-
* public static final String LINGER_MS_CONFIG = "linger.ms";
-
* 一個毫秒值。Kafka 默認會把兩次請求的時間間隔之內的消息進行搜集。相當於會有一個 delay 操作。比如定義的是1000(1s),消息一秒鍾發送5條,那么這 5條消息不會立馬發送,而是會有一個 delay操作進行聚合,
-
* delay以后再次批量發送到 broker。默認是 0,就是不延遲(同 TCP Nagle算法),那么 batch.size 也就不生效了
-
*/
-
//linger.ms 參數和batch.size 參數只要滿足其中一個都會發送
-
-
/**【調優】
-
* max.request.size 參數(默認是1M) 設置請求最大字節數
-
* public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
-
* 如果設置的過大,發送的性能會受到影響,同時寫入接收的性能也會受到影響。
-
*/
-
-
//設置 key的序列化,key 是 Integer類型,使用 IntegerSerializer
-
//org.apache.kafka.common.serialization
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
-
//設置 value 的序列化
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
-
//指定分區策略
-
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "dongguabai.kafka.partition.MyPartitioner");
-
-
//構建 kafka Producer,這里 key 是 Integer 類型,Value 是 String 類型
-
producer = new KafkaProducer<Integer, String>(properties);
-
this.topic = topic;
-
}
-
-
public static void main(String[] args) {
-
new KafkaProducerDemo("test2",true).start();
-
}
-
-
-
public void run() {
-
int num = 0;
-
while (num < 100) {
-
String message = "message--->" + num;
-
System.out.println( "start to send message 【 " + message + " 】");
-
if (isAsync) { //如果是異步發送
-
producer.send( new ProducerRecord<Integer, String>(topic, message), new Callback() {
-
-
public void onCompletion(RecordMetadata metadata, Exception exception) {
-
if (metadata!=null){
-
System.out.println( "async-offset:"+metadata.offset()+"-> partition"+metadata.partition());
-
}
-
}
-
});
-
} else { //同步發送
-
try {
-
RecordMetadata metadata = producer.send( new ProducerRecord<Integer, String>(topic, message)).get();
-
System.out.println( "sync-offset:"+metadata.offset()+"-> partition"+metadata.partition());
-
} catch (InterruptedException | ExecutionException e) {
-
e.printStackTrace();
-
}
-
}
-
num++;
-
try {
-
Thread.sleep( 1000);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
在 Consumer 中接收消息的時候輸出分區:
-
package dongguabai.kafka;
-
-
import org.apache.kafka.clients.consumer.ConsumerConfig;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
-
import org.apache.kafka.clients.consumer.ConsumerRecords;
-
import org.apache.kafka.clients.consumer.KafkaConsumer;
-
-
import java.util.Collections;
-
import java.util.Properties;
-
-
/**
-
* @author Dongguabai
-
* @date 2019/1/17 11:55
-
*/
-
public class KafkaConsumerDemo extends Thread {
-
-
private final KafkaConsumer<Integer, String> kafkaConsumer;
-
-
public KafkaConsumerDemo(String topic) {
-
//構建相關屬性
-
//@see ConsumerConfig
-
Properties properties = new Properties();
-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
-
//消費組
-
/**
-
* consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是
-
一個組,那么組內必然可以有多個消費者或消費者實例((consumer instance),
-
它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂
-
閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一
-
個消費組內的一個consumer來消費.后面會進一步介紹。
-
*/
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
-
-
/** auto.offset.reset 參數 從什么時候開始消費
-
* public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
-
*
-
* 這個參數是針對新的groupid中的消費者而言的,當有新groupid的消費者來消費指定的topic時,對於該參數的配置,會有不同的語義
-
* auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的offset處開始消費topic下的消息
-
* auto.offset.reset= earliest情況下,新的消費者會從該topic最早的消息開始消費
-
auto.offset.reset=none情況下,新的消費組加入以后,由於之前不存在 offset,則會直接拋出異常。說白了,新的消費組不要設置這個值
-
*/
-
-
//enable.auto.commit
-
//消費者消費消息以后自動提交,只有當消息提交以后,該消息才不會被再次接收到(如果沒有 commit,消息可以重復消費,也沒有 offset),還可以配合auto.commit.interval.ms控制自動提交的頻率。
-
//當然,我們也可以通過consumer.commitSync()的方式實現手動提交
-
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-
-
/**max.poll.records
-
*此參數設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔
-
要處理的最大值。通過調整此值,可以減少poll間隔
-
*/
-
-
//間隔時間
-
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-
//反序列化 key
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-
//反序列化 value
-
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-
//構建 KafkaConsumer
-
kafkaConsumer = new KafkaConsumer<>(properties);
-
//設置 topic
-
kafkaConsumer.subscribe(Collections.singletonList(topic));
-
}
-
-
-
/**
-
* 接收消息
-
*/
-
-
public void run() {
-
while (true) {
-
//拉取消息
-
ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll( 100000000);
-
for (ConsumerRecord<Integer, String> record : consumerRecord) {
-
//record.partition() 獲取當前分區
-
System.out.println(record.partition()+ "】】 message receive 【" + record.value() + "】");
-
}
-
}
-
}
-
-
public static void main(String[] args) {
-
new KafkaConsumerDemo("test2").start();
-
}
-
-
}
首先啟動 Consumer,再啟動 Producer:
可以看到是能夠對的上的。
默認情況下,Kafka 采用的是 hash 取 % 的分區算法。如果 Key 為 null,則會隨機分配一個分區。這個隨機是在這個參數“metadata.max.age.ms“的時間范圍內隨機選擇一個。對於這個時間段內,如果 Key 為 null,則只會發送到唯一的分區。這個值默認情況下是 10 分鍾更新一次(因為 partition 狀態可能會發生變化)。
關於 Metadata
Metadata 包含 Topic 和 Partition 和 broker 的映射關系,每一個 Topic 的每一個 partition,需要知道對應的 broker 列表是什么,Leader 是誰,Follower 是誰。這些信息都是存儲在 Metadata 這個類中。
消費端如何消費指定分區
Consumer 可以指定具體消費的分區。
再重新啟動 Consumer 和 Producer:
可以看到 Consumer 只消費了分區為 1 的消息。
以上是單個 Consumer 消費(指定)分區的情況。一般每個 Topic 都會有多個 partition(主要是用於數據分片,減少消息的容量,從而提升 I/O 性能)。當然也可以使用多個 Consumer 從而提高消費能力,有一個消費組的概念(具體可參看 https://blog.csdn.net/Dongguabai/article/details/86520617)。
如果 Consumer1、Consumer2 和 Consumer3 都屬於 group.id 為 1 的消費組。那么 Consumer1 就會消費 p0,Consumer2 就會消費 p1,Consumer3 就會消費 p2。
可以先測試一下。創建三個 Consumer。要注意的是這里不能使用指定分區的方式:
‘
而且它們都是同一個消費組:
同時啟動三個 Consumer,和 Producer:
可以看到三個 Consumer 分別消費三個 Partition,很均勻。對同一個 Group 來說,其中的 Consumer 可以消費指定分區也可以消費自動分配的分區(這里是 Consumer 數量和 partition 數量一致,均勻分配)。那么如果 Consumer 數量大於 partition 數量呢,如果 Consumer 數量小於 partition 數量呢,測試也很簡單,這里就不多做測試了。
要注意的是如果 Consumer 數量比 partition 數量多,會有的 Consumer 閑置無法消費,這樣是一個浪費。如果 Consumer 數量小於 partition 數量會有一個 Consumer 消費多個 partition。Kafka 在 partition 上是不允許並發的。Consuemr 數量建議最好是 partition 的整數倍。 還有一點,如果 Consumer 從多個 partiton 上讀取數據,是不保證順序性的,Kafka 只保證一個 partition 的順序性,跨 partition 是不保證順序性的。增減 Consumer、broker、partition 會導致 Rebalance。
Kafka 分區分配策略
在 Kafka 中,同一個 Group 中的消費者對於一個 Topic 中的多個 partition 存在一定的分區分配策略。
在 Kafka 中,存在兩種分區分配策略,一種是 Range(默認)、另一種是 RoundRobin(輪詢)。通過partition.assignment.strategy 這個參數來設置。
Range strategy(范圍分區)
Range 策略是對每個主題而言的,首先對同一個主題里面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設我們有10個分區,3個消費者,排完序的分區將會是0,1,2,3,4,5,6,7,8,9;消費者線程排完序將會是C1-0, C2-0, C3-0。然后將 partitions 的個數除於消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不盡,那么前面幾個消費者線程將會多消費一個分區。
假如在 Topic1 中有 10 個分區,3 個消費者線程,10/3 = 3,而且除不盡,那么消費者線程 C1-0 將會多消費一個分區,所以最后分區分配的結果是這樣的:
C1-0 將消費 0,1,2,3 分區
C2-0 將消費 4,5,6 分區
C3-0 將消費 7,8,9 分區
假如在 Topic1 中有 11 個分區,那么最后分區分配的結果看起來是這樣的:
C1-0 將消費 0,1,2,3 分區
C2-0 將消費 4, 5, 6, 7 分區
C3-0 將消費 8,9,10 分區
假如有兩個 Topic:Topic1 和 Topic2,都有 10 個分區,那么最后分區分配的結果看起來是這樣的:
C1-0 將消費 Topic1 的 0,1,2,3 分區和 Topic1 的 0,1,2,3 分區
C2-0 將消費 Topic1 的 4,5,6 分區和Topic2 的 4,5,6 分區
C3-0 將消費 Topic1 的 7,8,9 分區和Topic2 的 7,8,9 分區
其實這樣就會有一個問題,C1-0 就會多消費兩個分區,這就是一個很明顯的弊端。
RoundRobin strategy(輪詢分區)
輪詢分區策略是把所有 partition 和所有 Consumer 線程都列出來,然后按照 hashcode 進行排序。最后通過輪詢算法分配partition 給消費線程。如果所有 Consumer 實例的訂閱是相同的,那么 partition 會均勻分布。
假如按照 hashCode 排序完的 Topic / partitions組依次為T1一5, T1一3, T1-0, T1-8, T1-2, T1-1, T1-4,T1-7,T1-6,T1-9,消費者線程排序為 C1-0,C1-1,C2-0,C2-1,最后的分區分配的結果為:
C1-0 將消費 T1-5, T1-2, T1-6分區
C1-1 將消費 T1-3, T1-1, T1-9分區
C2-0 將消費 T1-0, T1-4分區
C2-1 將消費 T1-8, T1-7分區
使用輪詢分區策略必須滿足兩個條件
1.每個主題的消費者實例具有相同數量的流
2.每個消費者訂閱的主題必須是相同的
什么時候會觸發這個策略呢?
當出現以下幾種情況時,Kafka 會進行一次分區分配操作,也就是 Kafka Consumer 的 Rebalance
1.同一個 Consumer group內新增了消費者
2.消費者離開當前所屬的 Consumer group,比如主動停機或者宕機
3.Topic 新增了分區(也就是分區數量發生了變化)
Kafka Consuemr 的 Rebalance 機制規定了一個 Consumer group 下的所有 Consumer 如何達成一致來分配訂閱 Topic 的每個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 Kafka 對於分配策略這塊,提供了可插拔的實現方式,也就是說,除了這兩種之外,我們還可以創建自己的分配機制。
誰來執行 Rebalance 以及管理 Consumer 的 group 呢?
Consumer group 如何確定自己的 coordinator 是誰呢,消費者向 Kafka 集群中的任意一個 broker 發送一個 GroupCoord inatorRequest 請求,服務端會返回一個負載最小的 broker 節點的 id,並將該 broker 設置為 coordinator。
JoinGroup 的過程
在 Rebalance 之前,需要保證 coordinator 是已經確定好了的,整個 Rebalance 的過程分為兩個步驟,Join和Syncjoin:表示加入到 Consumer group中,在這一步中,所有的成員都會向 coordinator 發送 joinGroup 的請求。一旦所有成員都發了 joinGroup請求,那么 coordinator 會選擇一個 Consumer 擔任 leader 角色,並把組成員信息和訂閱信息發送給消費者:
protocol-metadata:序列化后的消費者的訂閱信息
leader id:消費組中的消費者,coordinator 會選擇一個作為 leader,對應的就是 member id
member metadata:對應消費者的訂閱信息
members: consumer group 中全部的消費者的訂閱信息
generation_id:年代信息,類似於 ZooKeeper 中的 epoch,對於每一輪 Rebalance,generation_id 都會遞增。主要用來保護consumer group,隔離無效的 offset 提交。也就是上一輪的 consumer 成員無法提交 offset 到新的 Consumer group中。
Synchronizing Group State 階段
完成分區分配之后,就進入了 Synchronizing Group State 階段,主要邏輯是向 GroupCoordinator 發送 SyncGroupRequest 請求,並且處理 SyncGroupResponse 響應,簡單來說,就是 leader 將消費者對應的 partition 分配方案同步給 Consumer group中的所有 Consumer。
每個消費者都會向 coordinator 發送 syncgroup 請求,不過只有 leader 節點會發送分配方案,其他消費者只是打打醬油而已。當leader 把方案發給 coordinator 以后,coordinator 會把結果設置到 SyncGroupResponse 中。這樣所有成員都知道自己應該消費哪個分區。
Consumer group 的分區分配方案是在客戶端執行的!Kafka 將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性。
參考資料: