本文代碼使用的是Spring Boot 2.1.8.RELEASE 版本
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<!-- parent.relativePath用法:設定一個空值將始終從倉庫中獲取,不從本地路徑獲取 查找順序:relativePath元素中的地址 –> 本地倉庫 –> 遠程倉庫 -->
<relativePath /> <!-- lookup parent from repository -->
</parent>
1、 pom.xml文件,引入依賴
<!-- kafka依賴 begin -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka依賴 end -->
采用Kafka提供的StringSerializer和StringDeserializer進行序列化和反序列化
2、在application-dev.properties配置生產者
#### kafka配置生產者 begin ####
#============== kafka ===================
# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=106.12.241.89:9092
#=============== provider =======================
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
spring.kafka.producer.retries=0
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
spring.kafka.producer.batch-size=16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
#acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生產者 end ####
3、生產者向kafka發送消息
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; /** * * * @author Lynch */ @Controller @RequestMapping("/api/kafka/") public class KafkaController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; @GetMapping("send") @ResponseBody public boolean send(@RequestParam String message) { try { kafkaTemplate.send("testTopic", message); System.out.println("消息發送成功..."); } catch (Exception e) { e.printStackTrace(); } return true; } @GetMapping("test") @ResponseBody public String test(){ System.out.println("hello world!"); return "ok"; } }
4、在application-dev.properties配置消費者
#### kafka配置消費者 start #### # 指定默認消費者group id --> 由於在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名 spring.kafka.consumer.group-id=test # smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設置smallest spring.kafka.consumer.auto-offset-reset=earliest # enable.auto.commit:true --> 設置自動提交offset spring.kafka.consumer.enable-auto-commit=true #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。 spring.kafka.consumer.auto-commit-interval=1000 # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #### kafka配置消費者 end ####
5、消費者監聽topic=testTopic的消息
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * 消費者監聽topic=testTopic的消息 * * @author Lynch */ @Component public class ConsumerListener { @KafkaListener(topics = "testTopic") public void onMessage(String message){ //insertIntoDb(buffer);//這里為插入數據庫代碼
System.out.println("message: " + message); } }
6、控制台打印消息
http://localhost:8091/api/kafka/send?message=aaabbbccc
消息發送成功... message: aaabbbccc http://localhost:8091/api/kafka/send?message='1111' ##編解碼方式是字符串,用單引號括起來表示字符串
消息發送成功... message: '1111'
到此,采用Kafka提供的StringSerializer和StringDeserializer進行序列化和反序列化,因為此種序列化方式無法序列化實體類。
如果是實體類的消息傳遞,可以采用自定義序列化和反序列化器進行實體類的序列化和反序列化,由於Serializer和Deserializer影響到上下游系統,導致牽一發而動全身。自定義序列化&反序列化實現不是能力的體現,而是逗比的體現。所以強烈不建議自定義實現序列化&反序列化,推薦直接使用StringSerializer和StringDeserializer,然后使用json作為標准的數據傳輸格式。