spring boot項目12:Kafka-基礎使用


JAVA 8

Spring Boot 2.5.3

kafka_2.13-2.8.0

apache-zookeeper-3.7.0

---

 

授人以漁:

1、Spring Boot Reference Documentation

This document is also available as Multi-page HTML, Single page HTML and PDF.

有PDF版本哦,下載下來!

2、Spring for Apache Kafka

有PDF版本哦,下載下來!

 

目錄

Kafka簡介

Spring Boot操作Kafka

1個主題1個分區1個消費者組2個消費者

1個主題2個分區

消息發送手動確認結果

消息消費手動確認

參考文檔

 

本文介紹Kafka的基本使用,包括 新建主題、發送消息、接收&處理消息、消息發送確認、消息消費確認 等。

本文基於 單機版的ZooKeeper、Kafka 進行測試。

 

Kafka簡介

kafka是一個分布式消息隊列(MQ)中間件,支持 生產者/消費者、發布者/訂閱者 模式。

Apache軟件基金會開發的一個開源流處理平台。

依賴 Apache軟件基金會 的另一個開源軟件:ZooKeeper(致力於開發和維護開源服務器,實現高度可靠的分布式協調)。

 

關鍵概念:

broker、producer、consumer-group、consumer,

topic、partition、replica、offset。

---

還有更多關鍵概念,這些一起構建了 分布式的、高可靠性的、流式處理 的 Kafka。

 

概念說明:

broker:運行kafka服務器的進程;

producer:生產者,發送消息到topic;

consumer-group:消息消費者的組名,用於將消費者分組;

consumer:消息消費者,歸屬於某個consumer-group,真正用於接收&處理消息的實體;

一個consumer-group下只有一個 consumer,這個consumer將依次消費 主題下各個partition的消息——要是partition有多個,這個consumer會“比較累”吧;

一個consumer-group下有多個 consumer,系統(?)會協調各個 consumer 匹配 主題下各個partition,要是數量相等——都是N,則可以實現並發處理主題下的消息——一個consumer對應一個partition;

consumer的數量 需要和 主題下的partition數量 協調——前者大於后者是沒有必要的,會存在consumer浪費的情況;

topic:消息主題,匹配RabbitMQ中的 交換機,消息發送到主題,再由主題發送到具體的分區(partition);

partition:主題下的分區,可以1~N個,匹配RabbitMQ中的隊列,但不需要和主題綁定,創建Topic的時候就建立好,在kafka的數據文件夾下,每個partition都對應一個文件夾;consumer就是從 partition中消費消息的;

replica:partition的副本,可以1~N個,但是,N的數量必須小於 集群中 broker的數量,replica會被均勻地分布到不同的broker中;

offset:partition下的概念,記錄partition中消息被消費到那里了,由消費者控制——消費后不更新offset、任意指定offset開始消費(比如,從頭開始);和RabbitMQ不同的時,消息被消費后,不會立即被清理,故,可以消費已經被消費過的消息;(疑問:消息會一直保留嗎?消息會被清理嗎?什么時候清理?策略是什么?TODO

 

ZooKeeper、Kafka單機啟動&停止:

單機部署
ZooKeeper

配置文件(Config file):
conf/zoo.cfg

啟動(start):
./zkServer.sh start
停止(stop):
./zkServer.sh stop

Kafka

配置文件:
server.properties

啟動(start):
bin/kafka-server-start.sh config/server.properties
停止(stop):
bin/kafka-server-stop.sh

注,啟動前需要修改好配置文件。

注,上面的Kafka啟動后,會占用一個Shell,可以使用 nohup CMD & 實現后台啟動。

 

Kafka啟動后,可以使用 bin 目錄下的 kafka-topics.sh 管理主題。

kafka-topics.sh操作
# help命令
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --help
...省略...

# 展示所有Topic
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$

# 創建主題 1個分區、1個副本——成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --partitions 1 --replication-factor 1 --create
Created topic topic082901.
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901

# 創建主題 2個分區、1個副本——成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --partitions 2 --replication-factor 1 --create
Created topic topic082902.
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901
topic082902

# 創建主題 1個分區、2個副本——失敗!原因是 只有一個 broker——單機版,2個副本沒有意義,都在一個服務器上
# 多副本需要 集群環境才可以演示
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082903 --partitions 1 --replication-factor 2 --create
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2021-08-29 14:26:18,293] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)
# 2個副本的主題沒有創建成功
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic082901
topic082902

# 展示 主題的信息
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --describe
Topic: topic082901      TopicId: xKGttQHbSy-Ywc9Mo4e_2w PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: topic082901      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --describe
Topic: topic082902      TopicId: cEEZFAgCSN2PDkiAArGUTQ PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: topic082902      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic082902      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$

# 檢查kafka服務器的數據目錄
# 多了 3個文件夾,分別對應着各個主題的 分區
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
drwxrwxr-x 2 ben ben 4096 8月  29 11:45 topic01-0
drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-0
drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-1
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082901-0
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-0
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-1
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$

# 刪除主題
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
# 刪除后查看數據目錄:多了3個 末尾是 delete的目錄,等待若干分鍾,這些文件夾會被清理掉
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
drwxrwxr-x 2 ben ben 4096 8月  29 11:45 topic01-0
drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-0
drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-1
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082901-0.be9a1e49b81a4b9a8421e52e45281f77-delete
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-0.70d6391753e24886a9dcb048db4d5811-delete
drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-1.b335d0068eba494d8576937ecb7f9bb6-delete
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$

 

Kafka基於ZooKeeper,因此,在Kafka使用過程中,ZooKeeper中的一些節點也會發生變化。

下面是 使用 ZooKeeper 的 zkCli.sh 查看到的一些信息。

zkCli.sh部分操作
ben@ben-VirtualBox:~/apache-zookeeper-3.7.0-bin/bin$ ./zkCli.sh
/usr/bin/java
Connecting to localhost:2181
...省略...

# 使用 ZooKeeper的ls命令!
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/seqid
[]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, topic01, topic02]
[zk: localhost:2181(CONNECTED) 5]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 3, 
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 4, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 5, 6, 7, 
8, 9]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/__consumer_offsets/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 9]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/topics/topic01
Node does not exist: /brokers/topics/topics/topic01
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/topic01
[partitions]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/topic01/partitions
[0]
[zk: localhost:2181(CONNECTED) 12] ls /brokers/topics/topic01/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 13] ls /brokers/topics/topic01/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 14]

# help可以看到所有命令
[zk: localhost:2181(CONNECTED) 14] help
ZooKeeper -server host:port -client-configuration properties-file cmd args
...生路...

關於Kafka在ZooKeeper上建立了哪些節點?各個節點的意義,需要另文介紹。

 

Spring Boot操作Kafka

在自己7月份的一篇文章中有過介紹,本文再深入一些。

 

依賴包:

Kafka的依賴包
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

spring-kafka包:

Spring容器中和 Kafka有關的一些Bean:

其中的 kafkaTemplate 的 多個 send函數 用來發送消息到主題 或 其下的分區:

# 部分send函數簽名
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
			@Nullable V data)

public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record)

public ListenableFuture<SendResult<K, V>> send(Message<?> message)

除了send,還有sendDefault函數,發送消息到默認分區,默認分區是什么?可以用 KafkaTemplate 的 getDefaultTopic() 獲取。

另外,還有一些 setXXX函數,用來設置 kafkaTemplate 對象。

 

配置application.properties:

# Kafka
# mylinux 虛擬機地址:配置hosts文件
spring.kafka.bootstrap-servers=mylinux:9092
# consumer:全局
# 也可以在 @KafkaListener 中指定 單獨的 groupId
#spring.kafka.consumer.group-id=myGroup

Java文件:

接口 /try3/send 發送消息,1個@KafkaListener——消費者(注意,指定 groupId=tp03,所以,上面的配置文件中的  spring.kafka.consumer.group-id 可以不配置,否則啟動不了應用)。

try3-源碼1
# Try3Config.java
@Component
@Slf4j
public class Try3Config {

	public static final String TOPIC_03 = "topic03";
	
	public static final String TOPIC_03_KEY = "topic03_key";

	/**
	 * 監聽器1
	 * @author ben
	 * @date 2021-08-29 16:30:46 CST
	 * @param record
	 */
	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
	public void listen01(ConsumerRecord<?, ?> record) {
//		log.info("try3-消費-a:record={}", record);
		log.info("try3-消費-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.value());
	}
	
}

# Try3Controller.java
@RestController
@RequestMapping(value="/try3")
@Slf4j
public class Try3Controller {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;
	
	/**
	 * 調用接口發送消息
	 * @author ben
	 * @date 2021-08-29 16:12:07 CST
	 * @return
	 */
	@GetMapping(value="/send")
	public Boolean sendMsg() {
		IntStream.rangeClosed(0, 9).forEach(i->{
			log.info("sendMsg-{}", i);
			String msg = String.format("Try3Controller send msg-%d @%s", i, new Date());
			
			kafkaTemplate.send(Try3Config.TOPIC_03, Try3Config.TOPIC_03_KEY, "key-" + msg);
		});
		
		return true;
	}
	
}

 

啟動應用,Kafka上 自動出現了 topic03:

檢查K服務器
ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
__consumer_offsets
topic01
topic02
topic03
$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic03 --describe
Topic: topic03  TopicId: km2zL1MeTKiOWlhCPMx6gw PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: topic03  Partition: 0    Leader: 1       Replicas: 1     Isr: 1

應用啟動信息(部分):

調用/try3/send 接口,發送消息:發送、接收成功。

注意,offset的變化。

注意,發送第一條消息時,producer還有一個初始化工作,會輸出更多producer相關日志。再次發送時,就沒有了。(是否需要 預熱?)

 

1個主題1個分區1個消費者組2個消費者

上一章試驗了 1個主題1個分區1個消費者組1個消費者 的消息發送接收,本章增加1個消費者——同一個消費者組下。

Java源碼:2個@KafkaListener,都在 tp03 消費者組 下

	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
	public void listen01(ConsumerRecord<?, ?> record) {
		log.info("try3-消費-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.value());
	}

	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
	public void listen02(ConsumerRecord<?, ?> record) {
		log.info("try3-消費-b:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.value());
	}

啟動時,輸出了更多的日志,並提示發生:

# INFO 日志,不是錯誤
org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.

Consumer clientId 出現了兩個:consumer-tp03-1、consumer-tp03-2。

並顯示了 相關日志:

最終,consumer-tp03-1 生效了,用來消費 唯一分區的消息。

這就是 消費者數量 大於 主題的分區數量時的情況。(注意,這兩個 監聽器 都在同一個應用,要是在不同應用呢?稍后試驗)

調用接口發送消息:

 

進一步試驗1:

使用 另一個端口 啟動應用,實現 主題的 消費者 再更加2個,看看會發生什么情況。

執行結果:

新啟動應用的消費者 有加入消費者組,但是,沒有分區分配,故,也不會收到消息處理(調用 發送消息 接口后,新應用沒有收到)。

 

檢查kafka上 消費者組tp03 的信息:

$ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
tp03            topic03         0          40              40              0               consumer-tp03-1-0705883f-92ab-47f0-82c3-d7334ad81b4f /192.168.0.112  consumer-tp03-1

 

進一步試驗2:

關閉處於消費狀態的應用,檢查 新啟動的應用是否能 作為備用消費者 正確進行消費。

消費者組 下發生了 rebalancing

此時,調用新應用的接口發送消息(舊端口的應用 被關閉了哦),發送成功,,新端口的應用也收到並正確處理了消息。

消費者組tp03 的信息 也發生了變化:CONSUMER-ID 不同了!

$ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
tp03            topic03         0          50              50              0               consumer-tp03-1-a682c694-8018-43a9-8a1e-02a7d5ece6a5 /192.168.0.112  consumer-tp03-1

 

小結:

同一個組中,消費者數量 要小於 主題的分區數,多了的消費者也不會得到分區並發生消費。

但是,屬於不同進程的 消費者 可以實現備用。

 

1個主題2個分區

前面應用啟動后,生成了主題,但這個主題只有1個分區。

下面使用 Kafka命令生成 2個分區 的主題:

 ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic02 --partitions 2 --replication-factor 1 --create

疑問:使用Java程序是否可以生成分區呢?發現一個名為 org.apache.kafka.clients.admin.NewTopic 的類,或許可以實現,后面試驗下。

 

1組1消(1個消費者組、1個消費者)

1、發送時 無key

kafkaTemplate.send(Try2Config.TOPIC_02, "no key-" + msg);

啟動應用:一個監聽器 處理 2個分區的數據

調用接口 /try2/send,結果:

第一次發送,全部到了 分區0,收到並處理:

還以為 這種模式下 只會處理一個分區的數據:無key,只轉發到某個分區。

那么,再次測試幾次發送

兩個分區都有數據來了!

疑問

主題的消息 在 無key的情況下,是按照什么 策略 轉發到不同分區的呢?TODO

 

2、發送時有1個固定key

kafkaTemplate.send(Try2Config.TOPIC_02, Try2Config.TOPIC_02_KEY, "fixed key-" + msg);

試驗結果:

和上面的不同!監聽器 只處理了來自 分區0 的消息,連續發送了多次都是如此。

 

3、key為 i%4,發送20條消息(前面是10條)

// 3.發送消息條數改為 20條,key=i%4
// 注意,第二個參數是 String類型!!
kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);

試驗結果:

發送了20條消息,分別均勻地轉發到了 兩個分區,監聽器也處理了所有20消息。

不過,20條消息 沒有按照 發送順序處理,但是,同一個分區內 是 按照發送順序處理了,0、2、4、..……,1、3、5……

 

4、指定分區編號(從0開始,本試驗只有0、1)發送

// 4、指定分區發送
// 第三個參數為 key = "" + i%3
kafkaTemplate.send(Try2Config.TOPIC_02, i%Try2Config.TOPIC_02_P_NUMBER, "" + i%3, "fixed partition key-" + msg);

試驗結果:

消息按照 指定的分區號 發送到了 不同的分區,消費者也對不同分區的消息進行了消費。

確定性地分配到不同分區的方式,而不像 按照key 轉發時,存在不確定性。

 

發送時指定分區號,這需要提前知道 主題有多少分區,否則,指定的分區號大於分區數會怎樣呢?程序卡住,最后超時發生異常

// 錯誤:指定的 分區號 大於 分區數!
kafkaTemplate.send(Try2Config.TOPIC_02, i%3, "" + i%3, "fixed partition key-" + msg);

如下所示:

Exception thrown when sending a message with key='2' and payload='fixed partition key-...' to topic topic02 and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic topic02 not present in metadata after 60000 ms.

疑問

這個發送超時時間怎么 更改 呢?Spring boot、kafka的文檔中、KafkaTemplate、DefaultKafkaProducerFactory 中都沒有找到 發送超時 的設置方法。TODO

 

注,還有一個普通的 send函數——指定了 timestamp,就不做試驗了。

 

小結:

通過上面的試驗,知道了 不同方式發送的消息 會怎么分配到不同的分區了。

send(String topic, @Nullable V data) key=null 不均勻地發送到 各個分區
send(String topic, K key, @Nullable V data) key=固定值 只會發送到一個分區
send(String topic, K key, @Nullable V data) key=隨機值

會均勻地發送到各個分區,

當然,隨機值的數量要多於分區數量

send(String topic, Integer partition, K key, @Nullable V data) 指定分區數

當然是發送到指定分區。

異常:

指定分區號 大於 分區數,會超時、異常,發送消息失敗。

send(String topic, Integer partition, Long timestamp, K key,
            @Nullable V data)
指定分區數

同上一個(本文未驗證)。

這里的 timestamp 應該有其它用處,比如,消息去重等

 

1組2消

發送方式采用:不指定分區號,隨機key——大於等於分區數2,,確保兩個分區都有消息(均衡)。來自博客園

每個消費者都分配給了一個分區。

發送消息:

kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);
1組2消代碼
@Component
@Slf4j
public class Try2Config {

	/**
	 * 主題02:topic02,擁有2個分區
	 */
	public static final String TOPIC_02 = "topic02";
	
	/**
	 * 主題topic02分區數:2
	 */
	public static final int TOPIC_02_P_NUMBER = 2;
	
	/**
	 * 主題02-KEY:topic02_key
	 */
	public static final String TOPIC_02_KEY = "topic02_key";
	
	public static final String TRY2_GROUP_ID_01 = "try2group01";
	public static final String TRY2_GROUP_ID_02 = "try2group02";

	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
	public void listenTopic1(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}
	
	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
	public void listenTopic2(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}
	
}

試驗結果:兩個消費者,分別處理了 指定分區的消息。來自博客園

 

1組3消

增加一個監聽器:

	// 消費者3:超過分區數量了
	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
	public void listenTopic3(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-C:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}

啟動應用:來自博客園

發送消息,試驗結果:

兩個分區的消息都得到了處理。

但是,try2group01-1、try2group01-2 指定了分區,為何 消費的是 try2-消費-C 呢?難道 這個 try2group01-N 和 日志里面的 A、B、C不匹配

給 @KafkaListener 增加 id屬性:

@KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
@KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
@KafkaListener(id="listenerC", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)

再次測試:

果然,listenerA 對應的是 consumer-try2group01-3!

 

2組2X2消

兩個消費者組,各有兩個消費者:來自博客園

2組2消監聽者
	// groupId = TRY2_GROUP_ID_01
	// 消費者1
	@KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
	public void listenTopic1(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}
	
	// groupId = TRY2_GROUP_ID_01
	// 消費者2
	@KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
	public void listenTopic2(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}
	
	// ---------------------
	
	// groupId = TRY2_GROUP_ID_02
	// 消費者4
	@KafkaListener(id="g2listenerH", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
	public void listenTopic4(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-H:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}
	
	// groupId = TRY2_GROUP_ID_02
	// 消費者5
	@KafkaListener(id="g2listenerJ", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
	public void listenTopic5(ConsumerRecord<?, ?> record) {
		log.info("try2-消費-I:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
	}

啟動應用:ListenerA、B、H、J 都分別指派給了 主題的兩個分區。

注意:TRY2_GROUP_ID_02 的兩個消費者的ID 是以 g2 開頭的,但是,日志里面的線程名中 卻沒有 g2!TODO

發送消息,消費結果:

小結:

兩個消費者組 都消費了 所有消息。來自博客園

 

注意:上面的 @KafkaListener 指定消費的 topic 使用的是 主題全名,其另一個屬性 topicPattern 應該是可以用來 根據 某種模式來配置 監聽的主題的

源碼里面:the topic pattern or expression (SpEL)TODO

 

SpELl:

SpEL(Spring Expression Language),即Spring表達式語言,是比JSP的EL更強大的一種表達式語言。來自博客園

不甚了解,后續再DIG。

 

注意:@KafkaListener 的 topicPartitions屬性 的使用!TODO

注意:同一個應用中,@KafkaListener 的 id不能重復,否則,啟動異常。

注意:@KafkaListener 除了上面的用在方法上,還可用在類上——@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }),未用過

注意:除了 @KafkaListener ,還有一個包含它的 @KafkaListeners,未用過

 

消息發送手動確認結果

主題:topic01,1個分區

消費者:1個

 

調用send函數,發送了消息,但是,消息是否發送成功了呢?網絡、服務器故障等,都可能導致消息丟失,而 發送方卻沒有對 發送是否成功進行檢查。來自博客園

send函數時有返回值的:

ListenableFuture<SendResult<K, V>>

// 繼承了 Future接口
public interface ListenableFuture<T> extends Future<T> {
}

Future接口 的 函數 get() 可以獲取 返回消息 SendResult對象,然后,進行判斷(同步方式)。

			ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
			
			// 1、發送同步確認
			// 發生異常時,不終止,繼續執行下一個發送
			// 發送消息確認:發送失敗時怎么處理?停止繼續發送、延遲后再發送?
			boolean sendErr = false;
			try {
				SendResult<String, Object> sendRet  = sendRetFuture.get();
				RecordMetadata rmd = sendRet.getRecordMetadata();
				if (rmd != null) {
					log.info("發送成功:rmd-topic={},partition={},offset={},ts={}", 
							rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
				} else {
					log.error("發送失敗:topic={}, msg={}", Try1Config.TOPIC_01, msg);
				}
			} catch (InterruptedException | ExecutionException e) {
				log.error("發生異常:發送失敗,e={}", e);
				sendErr = true;
			}
			if (sendErr) {
				// 發生異常,休眠30秒
				// 發送時,斷掉服務器的網絡,或停掉kafka,這里的30秒就有操作空間了
				try {
					TimeUnit.SECONDS.sleep(30);
				} catch (InterruptedException e) {
					log.error("發生異常:sleep-30secs, e={}", e);
				}
			}
			
			// 2、發送異步確認
//			sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {

發送成功時,可以通過 get到的 SendResult對象 獲取 消息在服務器上的信息:

發送失敗時,get到的 SendResult對象 為null。

注,

應用啟動后,把kafka關掉,想測試 發送失敗 的,但是,應用一直打印服務器連接的錯誤日志,沒有測試出 發送失敗 的情況,並 捕獲異常。來自博客園

那么,怎么測試發送失敗的情況呢?TODO

 

除了上面的同步方式,還有一種異步確認方式

send函數的返回值 ListenableFuture 可以添加回調函數。

			ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
			
			// 1、發送同步確認
			// ...省略...
			
			// 2、發送異步確認
			sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {

				@Override
				public void onSuccess(SendResult<String,Object> result) {
					RecordMetadata rmd = result.getRecordMetadata();
					log.info("發送成功-回調:rmd-topic={},partition={},offset={},ts={}", 
							rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
				}

				@Override
				public void onFailure(Throwable ex) {
					log.info("發送失敗-回調:ex={}", ex);
				}
			});

結果:

 

消息消費手動確認

主題:topic01,1個分區

消費者:1個

 

前面的試驗,消費消息后都自動確認了,offset也在逐個增加。來自博客園

怎么實現手動確認?

 

全局配置:

## 全局 手動確認
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL

確認消息,需要參數 Acknowledgment ack,消費者示例如下:來自博客園

	@KafkaListener(id="listenerA",topics = {TOPIC_01}, groupId = TOPIC_01_G01)
	public void listener01(ConsumerRecord<?, ?> record, Acknowledgment ack) {
		log.info("try1-消費-A:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
				record.topic(), record.partition(), record.offset(), record.key(), record.value());
		
		// 消息確認
		// 配置手動確認后,若不執行下面的語句,啟動后還會消費
		ack.acknowledge();
	}

發送消息,消費消息。執行了 ack.acknowledge() 后,消息被消費&確認了。

 

意外情況:

配置了全局消費者手動確認消息,但是,卻沒有執行 消息確認,此時,分區的offset是不會改變的,消費沒有確認被消費成功。

再次啟動應用,消息會被再次消費

要是已消費了100萬條,但是,沒有執行確認,下次啟動應用時,這100萬條要被重復消費,屬於 異常

注釋掉上面的 ack.acknowledge() 可以進行驗證。

 

org.springframework.kafka.support.Acknowledgment 是一個接口,除了 acknowledge() 函數,還有nack(...)——拒絕確認消費:來自博客園

 

上面是 全局消費手動確認 配置,那么,單獨配置一個消費者組、一個消費者手動確認要怎么做呢?

 

》》》全文完《《《 

 

后記:

花費了太多精時了。

對kafka也了解的更深入了。

可是,還有更多細節需要曉得的。

要知道,本文還沒有涉及 KafkaTemplate的定制、Factory的定制,即便是 @KafkaListener 注解也並非全都清楚。

況且,只是單機版的kafka,集群版的會有什么特別的“坑”呢?

還有,Kafka架構、原理、動態擴容(增加分區、減少分區),還曾看過一篇大廠遷移kafka消息到新的系統的。

對了,發送消息時,可以 主動重復消費的。來自博客園

……

先這樣吧,技術畢竟是一點一滴積累起來的嘛,

一天肯定是不行的,搞技術需要日積月累的努力。

 

參考文檔

1、Kafka分布式消息系統

2、kafka生產者消息確認機制和發送方式

3、Kafka消息保留-清理策略

4、

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM