為什么使用消息隊列?
以用戶下單購買商品的行為舉例,在使用微服務架構時,我們需要調用多個服務。傳統的調用方式是同步調用,這會存在一定的性能問題
使用消息隊列可以實現異步的通信方式,相比於同步的通信⽅式,異步的⽅式可以讓上游快速成功,極大提高系統的吞吐量。在分布式系統中,通過下游多個服務的分布式事務的保障,也能保障業務執行之后的最終⼀致性
Kafka 概述
1. 介紹
Kafka 是⼀個分布式的、⽀持分區的(partition)、多副本的 (replica),基於 zookeeper 協調的分布式消息系統,它最大的特性就是可以實時處理大量數據以滿足各類需求場景:
- 日志收集:使用 Kafka 收集各種服務的日志,並通過 kafka 以統一接口服務的方式開放給各種 consumer,例如 hadoop、Hbase、Solr 等
- 消息系統:解耦和生產者和消費者、緩存消息等
- 用戶活動跟蹤:Kafka 經常被用來記錄 web 用戶或者 app 用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到 kafka 的 topic 中,然后訂閱者通過訂閱這些 topic 來做實時的監控分析,或者裝載到 hadoop、數據倉庫中做離線分析和挖掘
- 運營指標:Kafka 也經常用來記錄運營監控數據,包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告
2. 相關術語
名稱 | 解釋 |
---|---|
Broker | 消息中間件處理節點,⼀個 Kafka 節點就是⼀個 broker,⼀個或者多個 Broker 可以組成⼀個 Kafka 集群 |
Topic | Kafka 根據 topic 對消息進行歸類,發布到 Kafka 集群的每條消息都需要指定⼀個 topic |
Producer | 消息生產者,向 Broker 發送消息的客戶端 |
Consumer | 消息消費者,從 Broker 讀取消息的客戶端 |
ConsumerGroup | 每個 Consumer 屬於⼀個特定的 Consumer Group,⼀條消息可以被多個不同的 Consumer Group 消費,但是⼀個 Consumer Group 中只能有⼀個 Consumer 能夠消費該消息 |
Partition | 物理上的概念,⼀個 topic 可以分為多個 partition,每個 partition 內部消息是有序的 |
3. 安裝
安裝 Kafka 之前需要先安裝 JDK 和 Zookeeper,在官網下載 Kafka 安裝包:http://kafka.apache.org/downloads,直接解壓即可
需要修改配置文件,進⼊到 config 目錄內,修改 server.properties
# broker.id 屬性在 kafka 集群中必須唯一
broker.id= 0
# kafka 部署的機器 ip 和提供服務的端口號
listeners=PLAINTEXT://192.168.65.60:9092
# kafka 的消息存儲文件
log.dir=/usr/local/data/kafka-logs
# kafka 連接 zookeeper 的地址
zookeeper.connect= 192.168.65.60:2181
server.properties 核心配置詳解:
Property | Default | Description |
---|---|---|
broker.id | 0 | 每個 broker 都可以用⼀個唯⼀的非負整數 id 進行標識,作為 broker 的 名字 |
log.dirs | /tmp/kafka-logs | kafka 存放數據的路徑,這個路徑並不是唯⼀的,可以是多個,路徑之間只需要使⽤逗號分隔即可;每當創建新 partition 時,都會選擇在包含最少 partitions 的路徑下進行 |
listeners | PLAINTEXT://192.168.65.60:9092 | server 接受客戶端連接的端⼝,ip 配置 kafka 本機 ip 即可 |
zookeeper.connect | localhost:2181 | zooKeeper 連接字符串的格式為:hostname:port,此處 hostname 和 port 分別是 ZooKeeper 集群中某個節點的 host 和 port;zookeeper 如果是集群,連接⽅式為 hostname1:port1,hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每個日志文件刪除之前保存的時間,默認數據保存時間對所有 topic 都⼀樣 |
num.partitions | 1 | 創建 topic 的默認分區數 |
default.replication.factor | 1 | ⾃動創建 topic 的默認副本數量,建議設置為⼤於等於 2 |
min.insync.replicas | 1 | 當 producer 設置 acks 為 -1 時,min.insync.replicas 指定 replicas 的最小數目(必須確認每⼀個 repica 的寫數據都是成功的),如果這個數目沒有達到,producer 發送消息會產生異常 |
delete.topic.enable | false | 是否允許刪除主題 |
進入到 bin 目錄下,使用命令來啟動
./kafka-server-start.sh -daemon../config/server.properties
驗證是否啟動成功:進入到 zk 中的節點看 id 是 0 的 broker 有沒有存在(上線)
ls /brokers/ids/
實現消息的生產和消費
1. 主題 Topic
topic 可以實現消息的分類,不同消費者訂閱不同的 topic
執行以下命令創建名為 test
的 topic,這個 topic 只有一個 partition,並且備份因子也設置為 1
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test
查看當前 kafka 內有哪些 topic
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181
2. 發送消息
把消息發送給 broker 中的某個 topic,打開⼀個 kafka 發送消息的客戶端,然后開始⽤客戶端向 kafka 服務器發送消息
kafka 自帶了一個 producer 命令客戶端,可以從本地文件中讀取內容,或者我們也可以以命令行中直接輸入內容,並將這些內容以消息的形式發送到 kafka 集群中。在默認情況下,每一個行會被當做成一個獨立的消息
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test
3. 消費消息
對於 consumer,kafka 同樣也攜帶了一個命令行客戶端,會將獲取到內容在命令中進行輸出,默認是消費最新的消息。使用 kafka 的消費者客戶端,從指定 kafka 服務器的指定 topic 中消費消息
方式一:從最后一條消息的 偏移量+1 開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
方式二:從頭開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test
消息的發送方會把消息發送到 broker 中,broker 會存儲消息,消息是按照發送的順序進行存儲。因此消費者在消費消息時可以指明主題中消息的偏移量。默認情況下,是從最后一個消息的下一個偏移量開始消費
4. 單播消息
一個消費組里只有一個消費者能消費到某一個 topic 中的消息,可以創建多個消費者,這些消費者在同一個消費組中
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test
5. 多播消息
在一些業務場景中需要讓一條消息被多個消費者消費,那么就可以使用多播模式。kafka 實現多播,只需要讓不同的消費者處於不同的消費組即可
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test
6. 查看消費組及信息
# 查看當前主題下有哪些消費組
./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list
# 查看消費組中的具體信息:比如當前偏移量、最后一條消息的偏移量、堆積的消息數量
./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
- Currennt-offset:當前消費組的已消費偏移量
- Log-end-offset:主題對應分區消息的結束偏移量(HW)
- Lag:當前消費組未消費的消息數
7. 其他細節
-
生產者將消息發送給 broker,broker 會將消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主題-分區/00000000.log
-
消息的保存是有序的,通過 offset 偏移量來描述消息的有序性
-
消費者消費消息時也是通過 offset 來描述當前要消費的那條消息的位置
主題與分區
主題 Topic 在 kafka 中是⼀個邏輯概念,kafka 通過 topic 將消息進行分類。不同的 topic 會被訂閱該 topic 的消費者消費。但是有⼀個問題,如果說這個 topic 的消息非常多,消息是會被保存到 log 日志文件中的,這會出現文件過大的問題,因此,kafka 提出了 Partition 分區的概念
通過 partition 將⼀個 topic 中的消息分區來存儲,這樣的好處有多個:
- 分區存儲,可以解決存儲文件過大的問題
- 提供了讀寫的吞吐量:讀和寫可以同時在多個分區進⾏
為⼀個主題創建多個分區
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1
通以下命令查看 topic 的分區信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
分區的作用:
- 可以分布式存儲
- 可以並行寫
了解了 Partition,再補充一個 Kafka 細節:在消息日志文件中,kafka 內部創建了 __consumer_offsets 主題包含了 50 個分區。這個主題用來存放消費者某個主題的偏移量,每個消費者會把消費的主題的偏移量自主上報給 kafka 中的默認主題:consumer_offsets。因此 kafka 為了提升這個主題的並發性,默認設置了 50 個分區
- 提交到哪個分區:通過 hash 函數:
hash(consumerGroupId) % __consumer_offsets 主題的分區數
- 提交到該主題中的內容是:key 是 consumerGroupId + topic + 分區號,value 就是當前 offset 的值
- 文件中保存的消息,默認保存七天,七天到后消息會被刪除
Kafka 集群
1. 搭建
創建三個 server.properties 文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
通過命令啟動三台 broker
./kafka-server-start.sh -daemon../config/server0.properties
./kafka-server-start.sh -daemon../config/server1.properties
./kafka-server-start.sh -daemon../config/server2.properties
搭建完后通過查看 zk 中的 /brokers/ids 看是否啟動成功
2. 副本
下面的命令,在創建主題時,除了指明了主題的分區數以外,還指明了副本數,分別是:一個主題,兩個分區、三個副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通過查看主題信息,其中的關鍵數據:
-
replicas:當前副本所存在的 broker 節點
-
leader:副本里的概念
- 每個 partition 都有一個 broker 作為 leader
- 消息發送方要把消息發給哪個 broker,就看副本的 leader 是在哪個 broker 上面,副本里的 leader 專門用來接收消息
- 接收到消息,其他 follower 通過 poll 的方式來同步數據
-
follower:leader 處理所有針對這個 partition 的讀寫請求,而 follower 被動復制 leader,不提供讀寫(主要是為了保證多副本數據與消費的一致性),如果 leader 所在的 broker 掛掉,那么就會進行新 leader 的選舉
-
isr:可以同步的 broker 節點和已同步的 broker 節點,存放在 isr 集合中
3. broker、主題、分區、副本
Kafka 集群中由多個 broker 組成,⼀個 broker 存放⼀個 topic 的不同 partition 以及它們的副本
4. 集群消息的發送
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic
5. 集群消息的消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
6. 分區消費組消費者的細節
- ⼀個 partition 只能被⼀個消費組中的⼀個消費者消費,目的是為了保證消費的順序性,但是多個 partion 的多個消費者的消費順序性是得不到保證的
- 一個消費者可以消費多個 partition,如果消費者掛了,那么會觸發rebalance機制,由其他消費者來消費該分區
- 消費組中消費者的數量不能比一個 topic 中的 partition 數量多,否則多出來的消費者消費不到消息
Java 中使用 Kafka
1. 生產者
1.1 引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
1.2 生產者發送消息
/**
* 消息的發送方
*/
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.設置參數
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 把發送的 key 從字符串序列化為字節數組
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把發送消息 value 從字符串序列化為字節數組
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2.創建⽣產消息的客戶端,傳⼊參數
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 3.創建消息
// key: 作⽤是決定了往哪個分區上發
// value: 具體要發送的消息內容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
// 4.發送消息,得到消息發送的元數據並輸出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式發送消息結果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
1.3 發送消息到指定分區
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));
如果未指定分區,則會通過業務 Key 的 hash 運算,得出要發送的分區,公式為:hash(key)%partitionNum
1.4 同步發送消息
⽣產者同步發消息,在收到 kafka 的 ack 告知發送成功之前將⼀直處於阻塞狀態
// 等待消息發送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
1.5 異步發送消息
異步發送,⽣產者發送完消息后就可以執⾏之后的業務,broker 在收到消息后異步調用生產者提供的 callback 回調方法
// 指定發送分區
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));
// 異步回調方式發送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發送消息失敗:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步方式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
1.6 生產者中的 ack 的配置
在同步發送的前提下,生產者在獲得集群返回的 ack 之前會⼀直阻塞,那么集群什么時候返回 ack 呢?此時 ack 有三個配置:
- acks = 0:表示 producer 不需要等待任何 broker 確認收到消息的回復,就可以繼續發送下一條消息,性能最高,但最容易丟消息
- acks = 1:至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一條消息。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失
- acks = -1 或 all:需要等待
min.insync.replicas
(默認為 1 ,推薦配置大於等於2)這個參數配置的副本個數都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的數據保證,一般是金融級別,或跟錢打交道的場景才會使用這種配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 發送失敗,默認會重試三次,每次間隔 100ms
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)
1.7 消息發送的緩沖區
-
kafka 默認會創建⼀個消息緩沖區,用來存放要發送的消息,緩沖區是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
-
kafka 本地線程會在緩沖區中⼀次拉 16k 的數據,發送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
-
如果線程拉不到 16k 的數據,間隔 10ms 也會將已拉到的數據發到 broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
2. 消費者
2.1 消費消息
public class MySimpleConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
// 消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 1.創建⼀個消費者的客戶端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 2.消費者訂閱主題列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3. poll() API 是拉取消息的⻓輪詢
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
2.2 自動提交和手動提交 offset
無論是自動提交還是手動提交,都需要把所屬的 消費組 + 消費的某個主題 + 消費的某個分區 + 消費的偏移量
提交到集群的 _consumer_offsets 主題里面
-
自動提交:消費者 poll 消息下來以后自動提交 offset
// 是否自動提交 offset,默認就是 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自動提交 offset 的間隔時間 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意:如果消費者還沒消費完 poll 下來的消息就自動提交了偏移量,此時消費者掛了,於是下⼀個消費者會從已提交的 offset 的下⼀個位置開始消費消息,之前未被消費的消息就丟失掉了
-
手動提交:需要把自動提交的配置改成 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手動提交又分成了兩種:
-
手動同步提交
在消費完消息后調用同步提交的方法,當集群返回 ack 前⼀直阻塞,返回 ack 后表示提交成功,執行之后的邏輯
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 所有的消息已消費完 if (records.count() > 0) { // 有消息 // ⼿動同步提交 offset, 當前線程會阻塞直到 offset 提交成功 // ⼀般使⽤同步提交, 因為提交之后⼀般也沒有什么邏輯代碼了 consumer.commitSync(); // ====阻塞=== 提交成功 } }
-
手動異步提交
在消息消費完后提交,不需要等到集群 ack,直接執行之后的邏輯,可以設置⼀個回調方法,供集群調用
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 所有的消息已消費完 if (records.count() > 0) { // 手動異步提交 offset,當前線程提交 offset 不會阻塞,可以繼續處理后⾯的程序邏輯 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } } }); } }
-
2.3 長輪詢 poll 消息
消費者建立與 broker 之間的長連接,開始 poll 消息,默認⼀次 poll 五百條消息
// ⼀次 poll 最⼤拉取消息的條數,可以根據消費速度的快慢來設置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)
可以根據消費速度的快慢來設置,如果兩次 poll 的時間超出了 30s 的時間間隔,kafka 會認為其消費能力過弱,將其踢出消費組,將分區分配給其他消費者
代碼中設置了長輪詢的時間是 1000 毫秒
while (true) {
/*
* poll() API 是拉取消息的⻓輪詢
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
- 如果⼀次 poll 到 500 條,就直接執行 for 循環
- 如果這⼀次沒有 poll 到 500 條,且時間在1秒內,那么長輪詢繼續 poll,要么到 500 條,要么到 1s
- 如果多次 poll 都沒達到 500 條,且 1 秒時間到了,那么直接執行 for 循環
2.4 健康狀態檢查
消費者每隔 1s 向 Kafka 集群發送心跳,集群發現如果有超過 10s 沒有續約的消費者,將被踢出消費組,觸發該消費組的 rebalance 機制,將該分區交給消費組里的其他消費者進行消費
// consumer 給 broker 發送⼼跳的間隔時間
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka 如果超過 10 秒沒有收到消費者的⼼跳,則會把消費者踢出消費組,進行rebalance,把分區分配給其他消費者
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
2.5 指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6 消息回溯消費
也即從頭開始消費消息
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,
0)));
2.7 指定偏移量消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
2.8 從指定時間點消費
根據時間,去所有的 partition 中確定該時間對應的 offset,然后去所有的 partition 中找到該 offset 之后的消息開始消費
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 從一小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) {
continue;
}
// 根據消費⾥的 timestamp 確定 offset
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
2.9 新消費組的消費 offset 規則
新消費組中的消費者在啟動以后,默認會從當前分區的最后⼀條消息的 offset+1 開始消費(消費新消息),可以通過以下的設置,讓新的消費者第⼀次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量 +1)
- Latest:默認的,消費新消息
- earliest:第⼀次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
SpringBoot 中使用 Kafka
1. 引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 編寫配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
producer: # ⽣產者
retries: 3 # 設置⼤於0的值,則客戶端會將發送失敗的記錄重新發送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼⽅式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 當每⼀條記錄被消費者監聽器(ListenerConsumer)處理之后提交
# RECORD
# 當每⼀批 poll() 的數據被消費者監聽器(ListenerConsumer)處理之后提交
# BATCH
# 當每⼀批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間⼤於TIME時提交
# TIME
# 當每⼀批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量⼤於等於COUNT時提交
# COUNT
# TIME | COUNT 有⼀個條件滿⾜時提交
# COUNT_TIME
# 當每⼀批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, ⼿動調⽤Acknowledgment.acknowledge()后提交
# MANUAL
# 手動調⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤這種
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 172.16.253.21
3. 編寫生產者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class MyKafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
return "send success!";
}
}
4. 編寫消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手動提交offset
ack.acknowledge();
}
}
配置消費主題、分區和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同組下的消費者個數,就是並發消費數,建議⼩於等於分區總數
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手動提交offset
ack.acknowledge();
}
Kafka 集群 Controller、Rebalance、HW
1. Controller
Kafka 集群中的 broker 在 zookeeper 中創建臨時序號節點,序號最小的節點(最先創建的節點)將作為集群的 controller,負責管理整個集群中的所有分區和副本的狀態:
- 當某個分區的 leader 副本出現故障時,由控制器負責為該分區選舉新的 leader 副本,選舉的規則是從 isr 集合中最左邊獲取
- 當集群中有 broker 新增或減少,controller 會同步信息給其他 broker
- 當集群中有分區新增或減少,controller 會同步信息給其他 broker
2. Rebalance
如果消費者沒有指明分區消費,那么當消費組里消費者和分區的關系發生變化,就會觸發 rebalance 機制,重新調整消費者該消費哪個分區
在觸發 rebalance 機制之前,消費者消費哪個分區有三種分配策略:
- range:通過公式來計算某個消費者消費哪個分區,公式為:前面的消費者是
(分區總數/消費者數量)+1
,之后的消費者是分區總數/消費者數量
- 輪詢:大家輪着來
- sticky:粘合策略,如果需要 rebalance,會在之前已分配的基礎上調整,不會改變之前的分配情況。如果這個策略沒有開,那么就要全部重新分配,所以建議開啟
3. HW 和 LEO
LEO 是某個副本最后消息的消息位置(log-end-offset),HW 是已完成同步的位置。消息在寫入 broker 時,且每個 broker 完成這條消息的同步后,HW 才會變化。在這之前,消費者是消費不到這條消息的,在同步完成之后,HW 更新之后,消費者才能消費到這條消息,這樣的目的是防止消息的丟失
Kafka 線上問題優化
1. 防止消息丟失
生產者:
- 使用同步發送
- 把 ack 設成 1 或者 all,並且設置同步的分區數 >= 2
消費者:
- 把自動提交改成手動提交
2. 防止重復消費
如果生產者發送完消息后,卻因為網絡抖動,沒有收到 ack,但實際上 broker 已經收到了。此時生產者會進行重試,於是 broker 就會收到多條相同的消息,而造成消費者的重復消費
解決方案:
- 生產者關閉重試,但會造成丟消息,不建議
- 消費者解決非冪等性消費問題,所謂非冪等性,就是用戶對於同一操作發起的一次請求或者多次請求的結果是一致的,可以用唯一主鍵或分布式鎖來實現
3. 保證消息的順序消費
生產者:使用同步發送,ack 設置成非 0 的值
消費者:主題只能設置⼀個分區,消費組中只能有⼀個消費者
4. 解決消息積壓
所謂消息積壓,就是消息的消費者的消費速度遠趕不上生產者的生產消息的速度,導致 kafka 中有大量的數據沒有被消費。隨着沒有被消費的數據堆積越多,消費者尋址的性能會越來越差,最后導致整個 kafka 對外提供的服務的性能很差,從而造成其他服務也訪問速度變慢,造成服務雪崩
解決方案:
- 在這個消費者中,使用多線程,充分利用機器的性能消費消息
- 通過業務的架構設計,提升業務層面消費的性能
- 創建多個消費組,多個消費者,部署到其他機器上,⼀起消費,提高消費者的消費速度
- 創建⼀個消費者,該消費者在 kafka 另建⼀個主題,配上多個分區,多個分區再配上多個消費者。該消費者將 poll 下來的消息,不進行消費,直接轉發到新建的主題上。此時,新的主題的多個分區的多個消費者就開始⼀起消費了
5. 實現延時隊列
假設一個應用場景:訂單創建后,超過 30 分鍾沒有支付,則需要取消訂單,這種場景可以通過延時隊列來實現,實現方案如下:
- 在 Kafka 創建相應的主題,比如該主題的超時時間定為 30 分鍾
- 消費者消費該主題的消息(輪詢)
- 消費者消費消息時,判斷消息的創建時間和當前時間是否超過 30 分鍾(前提是訂單沒有完成支付)
- 超過:數據庫修改訂單狀態為已取消
- 沒有超過:記錄當前消息的 offset,並不再繼續消費之后的消息。等待 1 分鍾后,再次從 Kafka 拉取該 offset 及之后的消息,繼續判斷,以此反復