JAVA 8
Spring Boot 2.5.3
kafka_2.13-2.8.0
apache-zookeeper-3.7.0
---
授人以漁:
1、Spring Boot Reference Documentation
This document is also available as Multi-page HTML, Single page HTML and PDF.
有PDF版本哦,下載下來!
有PDF版本哦,下載下來!
目錄
本文介紹Kafka的基本使用,包括 新建主題、發送消息、接收&處理消息、消息發送確認、消息消費確認 等。
本文基於 單機版的ZooKeeper、Kafka 進行測試。
kafka是一個分布式消息隊列(MQ)中間件,支持 生產者/消費者、發布者/訂閱者 模式。
由Apache軟件基金會開發的一個開源流處理平台。
依賴 Apache軟件基金會 的另一個開源軟件:ZooKeeper(致力於開發和維護開源服務器,實現高度可靠的分布式協調)。
關鍵概念:
broker、producer、consumer-group、consumer,
topic、partition、replica、offset。
---
還有更多關鍵概念,這些一起構建了 分布式的、高可靠性的、流式處理 的 Kafka。
概念說明:
broker:運行kafka服務器的進程;
producer:生產者,發送消息到topic;
consumer-group:消息消費者的組名,用於將消費者分組;
consumer:消息消費者,歸屬於某個consumer-group,真正用於接收&處理消息的實體;
一個consumer-group下只有一個 consumer,這個consumer將依次消費 主題下各個partition的消息——要是partition有多個,這個consumer會“比較累”吧;
一個consumer-group下有多個 consumer,系統(?)會協調各個 consumer 匹配 主題下各個partition,要是數量相等——都是N,則可以實現並發處理主題下的消息——一個consumer對應一個partition;
consumer的數量 需要和 主題下的partition數量 協調——前者大於后者是沒有必要的,會存在consumer浪費的情況;
topic:消息主題,匹配RabbitMQ中的 交換機,消息發送到主題,再由主題發送到具體的分區(partition);
partition:主題下的分區,可以1~N個,匹配RabbitMQ中的隊列,但不需要和主題綁定,創建Topic的時候就建立好,在kafka的數據文件夾下,每個partition都對應一個文件夾;consumer就是從 partition中消費消息的;
replica:partition的副本,可以1~N個,但是,N的數量必須小於 集群中 broker的數量,replica會被均勻地分布到不同的broker中;
offset:partition下的概念,記錄partition中消息被消費到那里了,由消費者控制——消費后不更新offset、任意指定offset開始消費(比如,從頭開始);和RabbitMQ不同的時,消息被消費后,不會立即被清理,故,可以消費已經被消費過的消息;(疑問:消息會一直保留嗎?消息會被清理嗎?什么時候清理?策略是什么?TODO)
ZooKeeper、Kafka單機啟動&停止:
單機部署
ZooKeeper
配置文件(Config file):
conf/zoo.cfg
啟動(start):
./zkServer.sh start
停止(stop):
./zkServer.sh stop
Kafka
配置文件:
server.properties
啟動(start):
bin/kafka-server-start.sh config/server.properties
停止(stop):
bin/kafka-server-stop.sh
注,啟動前需要修改好配置文件。
注,上面的Kafka啟動后,會占用一個Shell,可以使用 nohup CMD & 實現后台啟動。
Kafka啟動后,可以使用 bin 目錄下的 kafka-topics.sh 管理主題。
kafka-topics.sh操作
# help命令
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --help
...省略...
# 展示所有Topic
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
# 創建主題 1個分區、1個副本——成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --partitions 1 --replication-factor 1 --create
Created topic topic082901.
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901
# 創建主題 2個分區、1個副本——成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --partitions 2 --replication-factor 1 --create
Created topic topic082902.
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901
topic082902
# 創建主題 1個分區、2個副本——失敗!原因是 只有一個 broker——單機版,2個副本沒有意義,都在一個服務器上
# 多副本需要 集群環境才可以演示
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082903 --partitions 1 --replication-factor 2 --create
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2021-08-29 14:26:18,293] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
# 2個副本的主題沒有創建成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901
topic082902
# 展示 主題的信息
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --describe
Topic: topic082901 TopicId: xKGttQHbSy-Ywc9Mo4e_2w PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: topic082901 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --describe
Topic: topic082902 TopicId: cEEZFAgCSN2PDkiAArGUTQ PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: topic082902 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic082902 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
# 檢查kafka服務器的數據目錄
# 多了 3個文件夾,分別對應着各個主題的 分區
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
drwxrwxr-x 2 ben ben 4096 8月 29 11:45 topic01-0
drwxrwxr-x 2 ben ben 4096 8月 29 11:38 topic02-0
drwxrwxr-x 2 ben ben 4096 8月 29 11:38 topic02-1
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082901-0
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082902-0
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082902-1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
# 刪除主題
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
# 刪除后查看數據目錄:多了3個 末尾是 delete的目錄,等待若干分鍾,這些文件夾會被清理掉
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
drwxrwxr-x 2 ben ben 4096 8月 29 11:45 topic01-0
drwxrwxr-x 2 ben ben 4096 8月 29 11:38 topic02-0
drwxrwxr-x 2 ben ben 4096 8月 29 11:38 topic02-1
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082901-0.be9a1e49b81a4b9a8421e52e45281f77-delete
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082902-0.70d6391753e24886a9dcb048db4d5811-delete
drwxrwxr-x 2 ben ben 4096 8月 29 14:25 topic082902-1.b335d0068eba494d8576937ecb7f9bb6-delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
Kafka基於ZooKeeper,因此,在Kafka使用過程中,ZooKeeper中的一些節點也會發生變化。
下面是 使用 ZooKeeper 的 zkCli.sh 查看到的一些信息。
zkCli.sh部分操作
ben@ben-VirtualBox:~/apache-zookeeper-3.7.0-bin/bin$ ./zkCli.sh
/usr/bin/java
Connecting to localhost:2181
...省略...
# 使用 ZooKeeper的ls命令!
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/seqid
[]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, topic01, topic02]
[zk: localhost:2181(CONNECTED) 5]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 3,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 4, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 5, 6, 7,
8, 9]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/__consumer_offsets/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 9]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/topics/topic01
Node does not exist: /brokers/topics/topics/topic01
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/topic01
[partitions]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/topic01/partitions
[0]
[zk: localhost:2181(CONNECTED) 12] ls /brokers/topics/topic01/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 13] ls /brokers/topics/topic01/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 14]
# help可以看到所有命令
[zk: localhost:2181(CONNECTED) 14] help
ZooKeeper -server host:port -client-configuration properties-file cmd args
...生路...
關於Kafka在ZooKeeper上建立了哪些節點?各個節點的意義,需要另文介紹。
在自己7月份的一篇文章中有過介紹,本文再深入一些。
依賴包:
Kafka的依賴包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
spring-kafka包:
Spring容器中和 Kafka有關的一些Bean:
其中的 kafkaTemplate 的 多個 send函數 用來發送消息到主題 或 其下的分區:
# 部分send函數簽名
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data)
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record)
public ListenableFuture<SendResult<K, V>> send(Message<?> message)
除了send,還有sendDefault函數,發送消息到默認分區,默認分區是什么?可以用 KafkaTemplate 的 getDefaultTopic() 獲取。
另外,還有一些 setXXX函數,用來設置 kafkaTemplate 對象。
配置application.properties:
# Kafka
# mylinux 虛擬機地址:配置hosts文件
spring.kafka.bootstrap-servers=mylinux:9092
# consumer:全局
# 也可以在 @KafkaListener 中指定 單獨的 groupId
#spring.kafka.consumer.group-id=myGroup
Java文件:
接口 /try3/send 發送消息,1個@KafkaListener——消費者(注意,指定 groupId=tp03,所以,上面的配置文件中的 spring.kafka.consumer.group-id 可以不配置,否則啟動不了應用)。
try3-源碼1
# Try3Config.java
@Component
@Slf4j
public class Try3Config {
public static final String TOPIC_03 = "topic03";
public static final String TOPIC_03_KEY = "topic03_key";
/**
* 監聽器1
* @author ben
* @date 2021-08-29 16:30:46 CST
* @param record
*/
@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
public void listen01(ConsumerRecord<?, ?> record) {
// log.info("try3-消費-a:record={}", record);
log.info("try3-消費-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
# Try3Controller.java
@RestController
@RequestMapping(value="/try3")
@Slf4j
public class Try3Controller {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 調用接口發送消息
* @author ben
* @date 2021-08-29 16:12:07 CST
* @return
*/
@GetMapping(value="/send")
public Boolean sendMsg() {
IntStream.rangeClosed(0, 9).forEach(i->{
log.info("sendMsg-{}", i);
String msg = String.format("Try3Controller send msg-%d @%s", i, new Date());
kafkaTemplate.send(Try3Config.TOPIC_03, Try3Config.TOPIC_03_KEY, "key-" + msg);
});
return true;
}
}
啟動應用,Kafka上 自動出現了 topic03:
檢查K服務器
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic03
$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic03 --describe
Topic: topic03 TopicId: km2zL1MeTKiOWlhCPMx6gw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: topic03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
應用啟動信息(部分):
調用/try3/send 接口,發送消息:發送、接收成功。
注意,offset的變化。
注意,發送第一條消息時,producer還有一個初始化工作,會輸出更多producer相關日志。再次發送時,就沒有了。(是否需要 預熱?)
上一章試驗了 1個主題1個分區1個消費者組1個消費者 的消息發送接收,本章增加1個消費者——同一個消費者組下。
Java源碼:2個@KafkaListener,都在 tp03 消費者組 下
@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
public void listen01(ConsumerRecord<?, ?> record) {
log.info("try3-消費-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
public void listen02(ConsumerRecord<?, ?> record) {
log.info("try3-消費-b:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
啟動時,輸出了更多的日志,並提示發生:
# INFO 日志,不是錯誤
org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
Consumer clientId 出現了兩個:consumer-tp03-1、consumer-tp03-2。
並顯示了 相關日志:
最終,consumer-tp03-1 生效了,用來消費 唯一分區的消息。
這就是 消費者數量 大於 主題的分區數量時的情況。(注意,這兩個 監聽器 都在同一個應用,要是在不同應用呢?稍后試驗)
調用接口發送消息:
進一步試驗1:
使用 另一個端口 啟動應用,實現 主題的 消費者 再更加2個,看看會發生什么情況。
執行結果:
新啟動應用的消費者 有加入消費者組,但是,沒有分區分配,故,也不會收到消息處理(調用 發送消息 接口后,新應用沒有收到)。
檢查kafka上 消費者組tp03 的信息:
$ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tp03 topic03 0 40 40 0 consumer-tp03-1-0705883f-92ab-47f0-82c3-d7334ad81b4f /192.168.0.112 consumer-tp03-1
進一步試驗2:
關閉處於消費狀態的應用,檢查 新啟動的應用是否能 作為備用消費者 正確進行消費。
消費者組 下發生了 rebalancing!
此時,調用新應用的接口發送消息(舊端口的應用 被關閉了哦),發送成功,,新端口的應用也收到並正確處理了消息。
消費者組tp03 的信息 也發生了變化:CONSUMER-ID 不同了!
$ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tp03 topic03 0 50 50 0 consumer-tp03-1-a682c694-8018-43a9-8a1e-02a7d5ece6a5 /192.168.0.112 consumer-tp03-1
小結:
同一個組中,消費者數量 要小於 主題的分區數,多了的消費者也不會得到分區並發生消費。
但是,屬於不同進程的 消費者 可以實現備用。
前面應用啟動后,生成了主題,但這個主題只有1個分區。
下面使用 Kafka命令生成 2個分區 的主題:
./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic02 --partitions 2 --replication-factor 1 --create
疑問:使用Java程序是否可以生成分區呢?發現一個名為 org.apache.kafka.clients.admin.NewTopic 的類,或許可以實現,后面試驗下。
1組1消(1個消費者組、1個消費者)
1、發送時 無key
kafkaTemplate.send(Try2Config.TOPIC_02, "no key-" + msg);
啟動應用:一個監聽器 處理 2個分區的數據
調用接口 /try2/send,結果:
第一次發送,全部到了 分區0,收到並處理:
還以為 這種模式下 只會處理一個分區的數據:無key,只轉發到某個分區。
那么,再次測試幾次發送:
兩個分區都有數據來了!
疑問:
主題的消息 在 無key的情況下,是按照什么 策略 轉發到不同分區的呢?TODO
2、發送時有1個固定key
kafkaTemplate.send(Try2Config.TOPIC_02, Try2Config.TOPIC_02_KEY, "fixed key-" + msg);
試驗結果:
和上面的不同!監聽器 只處理了來自 分區0 的消息,連續發送了多次都是如此。
3、key為 i%4,發送20條消息(前面是10條)
// 3.發送消息條數改為 20條,key=i%4
// 注意,第二個參數是 String類型!!
kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);
試驗結果:
發送了20條消息,分別均勻地轉發到了 兩個分區,監聽器也處理了所有20消息。
不過,20條消息 沒有按照 發送順序處理,但是,同一個分區內 是 按照發送順序處理了,0、2、4、..……,1、3、5……
4、指定分區編號(從0開始,本試驗只有0、1)發送
// 4、指定分區發送
// 第三個參數為 key = "" + i%3
kafkaTemplate.send(Try2Config.TOPIC_02, i%Try2Config.TOPIC_02_P_NUMBER, "" + i%3, "fixed partition key-" + msg);
試驗結果:
消息按照 指定的分區號 發送到了 不同的分區,消費者也對不同分區的消息進行了消費。
確定性地分配到不同分區的方式,而不像 按照key 轉發時,存在不確定性。
發送時指定分區號,這需要提前知道 主題有多少分區,否則,指定的分區號大於分區數會怎樣呢?程序卡住,最后超時發生異常!
// 錯誤:指定的 分區號 大於 分區數!
kafkaTemplate.send(Try2Config.TOPIC_02, i%3, "" + i%3, "fixed partition key-" + msg);
如下所示:
Exception thrown when sending a message with key='2' and payload='fixed partition key-...' to topic topic02 and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic topic02 not present in metadata after 60000 ms.
疑問:
這個發送超時時間怎么 更改 呢?Spring boot、kafka的文檔中、KafkaTemplate、DefaultKafkaProducerFactory 中都沒有找到 發送超時 的設置方法。TODO
注,還有一個普通的 send函數——指定了 timestamp,就不做試驗了。
小結:
通過上面的試驗,知道了 不同方式發送的消息 會怎么分配到不同的分區了。
send(String topic, @Nullable V data) | key=null | 不均勻地發送到 各個分區 |
send(String topic, K key, @Nullable V data) | key=固定值 | 只會發送到一個分區 |
send(String topic, K key, @Nullable V data) | key=隨機值 | 會均勻地發送到各個分區, 當然,隨機值的數量要多於分區數量 |
send(String topic, Integer partition, K key, @Nullable V data) | 指定分區數 | 當然是發送到指定分區。 異常: 指定分區號 大於 分區數,會超時、異常,發送消息失敗。 |
send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) |
指定分區數 | 同上一個(本文未驗證)。 這里的 timestamp 應該有其它用處,比如,消息去重等 |
1組2消
發送方式采用:不指定分區號,隨機key——大於等於分區數2,,確保兩個分區都有消息(均衡)。來自博客園
每個消費者都分配給了一個分區。
發送消息:
kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);
1組2消代碼
@Component
@Slf4j
public class Try2Config {
/**
* 主題02:topic02,擁有2個分區
*/
public static final String TOPIC_02 = "topic02";
/**
* 主題topic02分區數:2
*/
public static final int TOPIC_02_P_NUMBER = 2;
/**
* 主題02-KEY:topic02_key
*/
public static final String TOPIC_02_KEY = "topic02_key";
public static final String TRY2_GROUP_ID_01 = "try2group01";
public static final String TRY2_GROUP_ID_02 = "try2group02";
@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
public void listenTopic1(ConsumerRecord<?, ?> record) {
log.info("try2-消費-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
public void listenTopic2(ConsumerRecord<?, ?> record) {
log.info("try2-消費-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
}
試驗結果:兩個消費者,分別處理了 指定分區的消息。來自博客園
1組3消
增加一個監聽器:
// 消費者3:超過分區數量了
@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
public void listenTopic3(ConsumerRecord<?, ?> record) {
log.info("try2-消費-C:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
啟動應用:來自博客園
發送消息,試驗結果:
兩個分區的消息都得到了處理。
但是,try2group01-1、try2group01-2 指定了分區,為何 消費的是 try2-消費-C 呢?難道 這個 try2group01-N 和 日志里面的 A、B、C不匹配?
給 @KafkaListener 增加 id屬性:
@KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
@KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
@KafkaListener(id="listenerC", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
再次測試:
果然,listenerA 對應的是 consumer-try2group01-3!
2組2X2消
兩個消費者組,各有兩個消費者:來自博客園
2組2消監聽者
// groupId = TRY2_GROUP_ID_01
// 消費者1
@KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
public void listenTopic1(ConsumerRecord<?, ?> record) {
log.info("try2-消費-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
// groupId = TRY2_GROUP_ID_01
// 消費者2
@KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
public void listenTopic2(ConsumerRecord<?, ?> record) {
log.info("try2-消費-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
// ---------------------
// groupId = TRY2_GROUP_ID_02
// 消費者4
@KafkaListener(id="g2listenerH", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
public void listenTopic4(ConsumerRecord<?, ?> record) {
log.info("try2-消費-H:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
// groupId = TRY2_GROUP_ID_02
// 消費者5
@KafkaListener(id="g2listenerJ", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
public void listenTopic5(ConsumerRecord<?, ?> record) {
log.info("try2-消費-I:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
}
啟動應用:ListenerA、B、H、J 都分別指派給了 主題的兩個分區。
注意:TRY2_GROUP_ID_02 的兩個消費者的ID 是以 g2 開頭的,但是,日志里面的線程名中 卻沒有 g2!TODO
發送消息,消費結果:
小結:
兩個消費者組 都消費了 所有消息。來自博客園
注意:上面的 @KafkaListener 指定消費的 topic 使用的是 主題全名,其另一個屬性 topicPattern 應該是可以用來 根據 某種模式來配置 監聽的主題的。
源碼里面:the topic pattern or expression (SpEL)。TODO
SpELl:
SpEL(Spring Expression Language),即Spring表達式語言,是比JSP的EL更強大的一種表達式語言。來自博客園
不甚了解,后續再DIG。
注意:@KafkaListener 的 topicPartitions屬性 的使用!TODO
注意:同一個應用中,@KafkaListener 的 id不能重復,否則,啟動異常。
注意:@KafkaListener 除了上面的用在方法上,還可用在類上——@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }),未用過。
注意:除了 @KafkaListener ,還有一個包含它的 @KafkaListeners,未用過。
主題:topic01,1個分區
消費者:1個
調用send函數,發送了消息,但是,消息是否發送成功了呢?網絡、服務器故障等,都可能導致消息丟失,而 發送方卻沒有對 發送是否成功進行檢查。來自博客園
send函數時有返回值的:
ListenableFuture<SendResult<K, V>>
// 繼承了 Future接口
public interface ListenableFuture<T> extends Future<T> {
}
Future接口 的 函數 get() 可以獲取 返回消息 SendResult對象,然后,進行判斷(同步方式)。
ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
// 1、發送同步確認
// 發生異常時,不終止,繼續執行下一個發送
// 發送消息確認:發送失敗時怎么處理?停止繼續發送、延遲后再發送?
boolean sendErr = false;
try {
SendResult<String, Object> sendRet = sendRetFuture.get();
RecordMetadata rmd = sendRet.getRecordMetadata();
if (rmd != null) {
log.info("發送成功:rmd-topic={},partition={},offset={},ts={}",
rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
} else {
log.error("發送失敗:topic={}, msg={}", Try1Config.TOPIC_01, msg);
}
} catch (InterruptedException | ExecutionException e) {
log.error("發生異常:發送失敗,e={}", e);
sendErr = true;
}
if (sendErr) {
// 發生異常,休眠30秒
// 發送時,斷掉服務器的網絡,或停掉kafka,這里的30秒就有操作空間了
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
log.error("發生異常:sleep-30secs, e={}", e);
}
}
// 2、發送異步確認
// sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {
發送成功時,可以通過 get到的 SendResult對象 獲取 消息在服務器上的信息:
發送失敗時,get到的 SendResult對象 為null。
注,
應用啟動后,把kafka關掉,想測試 發送失敗 的,但是,應用一直打印服務器連接的錯誤日志,沒有測試出 發送失敗 的情況,並 捕獲異常。來自博客園
那么,怎么測試發送失敗的情況呢?TODO
除了上面的同步方式,還有一種異步確認方式:
send函數的返回值 ListenableFuture 可以添加回調函數。
ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
// 1、發送同步確認
// ...省略...
// 2、發送異步確認
sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {
@Override
public void onSuccess(SendResult<String,Object> result) {
RecordMetadata rmd = result.getRecordMetadata();
log.info("發送成功-回調:rmd-topic={},partition={},offset={},ts={}",
rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
}
@Override
public void onFailure(Throwable ex) {
log.info("發送失敗-回調:ex={}", ex);
}
});
結果:
主題:topic01,1個分區
消費者:1個
前面的試驗,消費消息后都自動確認了,offset也在逐個增加。來自博客園
怎么實現手動確認?
全局配置:
## 全局 手動確認
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL
確認消息,需要參數 Acknowledgment ack,消費者示例如下:來自博客園
@KafkaListener(id="listenerA",topics = {TOPIC_01}, groupId = TOPIC_01_G01)
public void listener01(ConsumerRecord<?, ?> record, Acknowledgment ack) {
log.info("try1-消費-A:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 消息確認
// 配置手動確認后,若不執行下面的語句,啟動后還會消費
ack.acknowledge();
}
發送消息,消費消息。執行了 ack.acknowledge() 后,消息被消費&確認了。
意外情況:
配置了全局消費者手動確認消息,但是,卻沒有執行 消息確認,此時,分區的offset是不會改變的,消費沒有確認被消費成功。
再次啟動應用,消息會被再次消費。
要是已消費了100萬條,但是,沒有執行確認,下次啟動應用時,這100萬條要被重復消費,屬於 異常!
注釋掉上面的 ack.acknowledge() 可以進行驗證。
org.springframework.kafka.support.Acknowledgment 是一個接口,除了 acknowledge() 函數,還有nack(...)——拒絕確認消費:來自博客園
上面是 全局消費手動確認 配置,那么,單獨配置一個消費者組、一個消費者手動確認要怎么做呢?
》》》全文完《《《
后記:
花費了太多精時了。
對kafka也了解的更深入了。
可是,還有更多細節需要曉得的。
要知道,本文還沒有涉及 KafkaTemplate的定制、Factory的定制,即便是 @KafkaListener 注解也並非全都清楚。
況且,只是單機版的kafka,集群版的會有什么特別的“坑”呢?
還有,Kafka架構、原理、動態擴容(增加分區、減少分區),還曾看過一篇大廠遷移kafka消息到新的系統的。
對了,發送消息時,可以 主動重復消費的。來自博客園
……
先這樣吧,技術畢竟是一點一滴積累起來的嘛,
一天肯定是不行的,搞技術需要日積月累的努力。
4、