spring boot 2.x 系列 —— spring boot 整合 kafka


文章目錄

一、kafka的相關概念:
1.主題和分區
2.分區復制
3. 生產者
4. 消費者
5.broker和集群
二、項目說明
1.1 項目結構說明
1.2 主要依賴
二、 整合 kafka
2.1 kafka基本配置
2.2 KafkaTemplate實現消息發送
2.3 @KafkaListener注解實現消息的監聽
2.4 測試整合結果
三、關於多消費者組的測試
3.1 創建多分區主題
3.2 多消費者組對同一主題的監聽
3.2 發送消息時候指定主題的具體分區
3.4 測試結果
四、序列化與反序列化

源碼Gitub地址:https://github.com/heibaiying/spring-samples-for-all
一、kafka的相關概念:

1.主題和分區

kafka 的消息通過主題進行分類。一個主題可以被分為若干個分區,一個分區就是一個提交日志。消息以追加的方式寫入分區,然后以先入先出的順序讀取。kafka通過分區來實現數據的冗余和伸縮性,分區可以分布在不同的服務器上,也就是說一個主題可以橫跨多個服務器,以此來提供比單個服務器更強大的性能(類比HDFS分布式文件系統)。

注意:由於一個主題包含多個分區,因此無法在整個主題范圍內保證消息的順序性,但可以保證消息在單個分區內的順序性。


2.分區復制

每個主題被分為若干個分區,每個分區有多個副本。那些副本被保存在 broker 上,每個 broker 可以保存成百上千個屬於不同主題和分區的副本。副本有以下兩種類型 :

首領副本 每個分區都有一個首領副本 。 為了保證一致性,所有生產者請求和消費者請求都會經過這個副本。
跟隨者副本 首領以外的副本都是跟隨者副本。跟隨者副本不處理來自客戶端的請求,它們唯一的任務就是從首領那里復制消息,保持與首領一致的狀態。如果首領發生崩漬,其中的一個跟隨者會被提升為新首領。
3. 生產者

默認情況下生產者在把消息均衡地分布到在主題的所有分區上,而並不關心特定消息會被寫到那個分區;
如果指定消息鍵,則通過對消息鍵的散列來實現分區;
也可以通過消息鍵和分區器來實現把消息直接寫到指定的分區,這個需要自定義分區器,需要實現Partitioner 接口,並重寫其中的partition方法。
4. 消費者

消費者是消費者群組的一部分。也就是說,會有一個或者多個消費者共同讀取一個主題,群組保證每個分區只能被一個消費者使用。

一個分區只能被同一個消費者群組里面的一個消費者讀取,但可以被不同消費者群組里面的多個消費者讀取。多個消費者群組可以共同讀取同一個主題,彼此之間互不影響。


5.broker和集群

一個獨立的kafka服務器被稱為broker。broker 接收來自生產者的消息,為消息設置偏移量,並提交消息到磁盤保存。broker為消費者提供服務,對讀取分區的請求做出響應,返回已經提交到磁盤的消息。

broker是集群的組成部分。每一個集群都有一個broker同時充當了集群控制器的角色(自動從集群的活躍成員中選舉出來)。控制器負責管理工作,包括將分區分配給broker和監控broker。在集群中,一個分區從屬一個broker,該broker被稱為分區的首領。一個分區可以分配給多個broker,這個時候會發生分區復制。這種復制機制為分區提供了消息冗余,如果有一個broker失效,其他broker可以接管領導權。


更多kafka 的說明可以參考我的個人筆記:《Kafka權威指南》讀書筆記

二、項目說明

1.1 項目結構說明

本項目提供kafka發送簡單消息、對象消息、和多消費者組消費消息三種情況下的sample。

kafkaSimpleConsumer 用於普通消息的監聽;
kafkaBeanConsumer 用於對象消息監聽;
kafkaGroupConsumer 用於多消費者組和多消費者對主題分區消息監聽的情況。

1.2 主要依賴

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

 


二、 整合 kafka

2.1 kafka基本配置

spring:
  kafka:
    # 以逗號分隔的地址列表,用於建立與Kafka集群的初始連接(kafka 默認的端口號為9092)
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 發生錯誤后,消息重發的次數。
      retries: 0
      #當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。
      batch-size: 16384
      # 設置生產者內存緩沖區的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
      # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
      # acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
      # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量
      enable-auto-commit: true
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽器容器中運行的線程數。
      concurrency: 5

 

 

這里需要說明的是:

在spring boot 2.X 版本 auto-commit-interval(自動提交的時間間隔)采用的是值的類型為Duration ,Duration 是 jdk 1.8 版本之后引入的類,在其源碼中我們可以看到對於其字符串的表達需要符合一定的規范,即數字+單位,如下的寫法1s ,1.5s, 0s, 0.001S ,1h, 2d 在yaml 中都是有效的。如果傳入無效的字符串,則spring boot 在啟動階段解析配置文件的時候就會拋出異常。

public final class Duration
implements TemporalAmount, Comparable<Duration>, Serializable {

/**
* The pattern for parsing.
*/
private static final Pattern PATTERN =
Pattern.compile("([-+]?)P(?:([-+]?[0-9]+)D)?" +
"(T(?:([-+]?[0-9]+)H)?(?:([-+]?[0-9]+)M)?(?:([-+]?[0-9]+)(?:[.,]([0-9]{0,9}))?S)?)?", Pattern.CASE_INSENSITIVE);

........ 

}

  


2.2 KafkaTemplate實現消息發送

@Component
@Slf4j
public class KafKaCustomrProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(String topic, Object object) {

/*
* 這里的ListenableFuture類是spring對java原生Future的擴展增強,是一個泛型接口,用於監聽異步方法的回調
* 而對於kafka send 方法返回值而言,這里的泛型所代表的實際類型就是 SendResult<K, V>,而這里K,V的泛型實際上
* 被用於ProducerRecord<K, V> producerRecord,即生產者發送消息的key,value 類型
*/
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("發送消息失敗:" + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> sendResult) {
System.out.println("發送結果:" + sendResult.toString());
}
});
}
}

  


2.3 @KafkaListener注解實現消息的監聽

@Component
@Slf4j
public class KafkaSimpleConsumer {

// 簡單消費者
@KafkaListener(groupId = "simpleGroup", topics = Topic.SIMPLE)
public void consumer1_1(ConsumerRecord<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
System.out.println("消費者收到消息:" + record.value() + "; topic:" + topic);
/*
* 如果需要手工提交異步 consumer.commitSync();
* 手工同步提交 consumer.commitAsync()
*/
}
}

  


2.4 測試整合結果

@Slf4j
@RestController
public class SendMsgController {

@Autowired
private KafKaCustomrProducer producer;
@Autowired
private KafkaTemplate kafkaTemplate;

/***
* 發送消息體為基本類型的消息
*/
@GetMapping("sendSimple")
public void sendSimple() {
producer.sendMessage(Topic.SIMPLE, "hello spring boot kafka");
}
}

  


三、關於多消費者組的測試

3.1 創建多分區主題

/**
* @author : heibaiying
* @description : kafka配置類
*/
@Configuration
public class KafkaConfig {

@Bean
public NewTopic groupTopic() {
// 指定主題名稱,分區數量,和復制因子
return new NewTopic(Topic.GROUP, 10, (short) 2);
}

}

  


3.2 多消費者組對同一主題的監聽

消費者1-1 監聽主題的 0、1 分區
消費者1-2 監聽主題的 2、3 分區
消費者1-3 監聽主題的 0、1 分區
消費者2-1 監聽主題的所有分區

/**
* @author : heibaiying
* @description : kafka 消費者組
* <p>
* 多個消費者群組可以共同讀取同一個主題,彼此之間互不影響。
*/
@Component
@Slf4j
public class KafkaGroupConsumer {

// 分組1 中的消費者1
@KafkaListener(id = "consumer1-1", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
})
public void consumer1_1(ConsumerRecord<String, Object> record) {
System.out.println("consumer1-1 收到消息:" + record.value());
}

// 分組1 中的消費者2
@KafkaListener(id = "consumer1-2", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"2", "3"})
})
public void consumer1_2(ConsumerRecord<String, Object> record) {
System.out.println("consumer1-2 收到消息:" + record.value());
}

// 分組1 中的消費者3
@KafkaListener(id = "consumer1-3", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
})
public void consumer1_3(ConsumerRecord<String, Object> record) {
System.out.println("consumer1-3 收到消息:" + record.value());
}

// 分組2 中的消費者
@KafkaListener(id = "consumer2-1", groupId = "group2", topics = Topic.GROUP)
public void consumer2_1(ConsumerRecord<String, Object> record) {
System.err.println("consumer2-1 收到消息:" + record.value());
}
}

  


3.2 發送消息時候指定主題的具體分區

/***
* 多消費者組、組中多消費者對同一主題的消費情況
*/
@GetMapping("sendGroup")
public void sendGroup() {
for (int i = 0; i < 4; i++) {
// 第二個參數指定分區,第三個參數指定消息鍵 分區優先
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(Topic.GROUP, i % 4, "key", "hello group " + i);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("發送消息失敗:" + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> sendResult) {
System.out.println("發送結果:" + sendResult.toString());
}
});
}
}

  


測試結果:

# 主要看每次發送結果中的 partition 屬性,代表四次消息分別發送到了主題的0,1,2,3分區
發送結果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=1, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 1, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-1@13]
發送結果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 0, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-0@19]
發送結果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=3, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 3, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-3@13]
發送結果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=2, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 2, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-2@13]
# 消費者組2 接收情況
consumer2-1 收到消息:hello group 1
consumer2-1 收到消息:hello group 0
consumer2-1 收到消息:hello group 2
consumer2-1 收到消息:hello group 3
# 消費者1-1接收情況
consumer1-1 收到消息:hello group 1
consumer1-1 收到消息:hello group 0
# 消費者1-3接收情況
consumer1-3 收到消息:hello group 1
consumer1-3 收到消息:hello group 0
# 消費者1-2接收情況
consumer1-2 收到消息:hello group 3
consumer1-2 收到消息:hello group 2

  



3.4 測試結果

和kafka 原本的機制一樣,多消費者組之間對於同一個主題的消費彼此之間互不影響;
和kafka原本機制不一樣的是,這里我們消費者1-1和消費1-3共同屬於同一個消費者組,並且監聽同樣的分區,按照原本kafka的機制,群組保證每個分區只能被同一個消費者組的一個消費者使用,但是按照spring的聲明方式實現的消息監聽,這里被兩個消費者都監聽到了。
四、序列化與反序列化

用例采用的是第三方fastjson將實體類序列化為json后發送。實現如下:

/***
* 發送消息體為bean的消息
*/
@GetMapping("sendBean")
public void sendBean() {
Programmer programmer = new Programmer("xiaoming", 12, 21212.33f, new Date());
producer.sendMessage(Topic.BEAN, JSON.toJSON(programmer).toString());
}

  

@Component
@Slf4j
public class KafkaBeanConsumer {

@KafkaListener(groupId = "beanGroup",topics = Topic.BEAN)
public void consumer(ConsumerRecord<String, Object> record) {
System.out.println("消費者收到消息:" + JSON.parseObject(record.value().toString(), Programmer.class));
}
}

  


附:源碼Gitub地址:https://github.com/heibaiying/spring-samples-for-all
---------------------
作者:m0_37809146
來源:CSDN
原文:https://blog.csdn.net/m0_37809146/article/details/86680328
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM