SpringBoot整合操作Kafka


環境

kafka 2.6.0(安裝步驟查看這里

引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>

配置KafkaAdminClient

# 指定kafka server的地址,集群配多個,中間逗號隔開
spring.kafka.bootstrap-servers=192.168.25.132:9092

定義Bean:

@Configuration
public class KafkaConf {

    @Value("${spring.kafka.bootstrap-servers}")
    private String server;

    @Bean
    public KafkaAdminClient kafkaAdminClient(){
        Properties props = new Properties();
        props.put("bootstrap.servers", server);
        return (KafkaAdminClient) KafkaAdminClient.create(props);
    }
}

創建Topic:

@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private KafkaAdminClient kafkaAdminClient;
    
    @GetMapping("/createTopic")
    public CreateTopicsResult createTopic(){
        NewTopic newTopic = new NewTopic("spring-topic", 3, (short) 1);
        CreateTopicsResult result = kafkaAdminClient.createTopics(Arrays.asList(newTopic));
        return result;
    }
}

執行后,通過kafka-manager查看,名稱為spring-topic的主題已經創建成功:
在這里插入圖片描述


配置producer

# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
spring.kafka.producer.retries=5
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
spring.kafka.producer.batch-size=16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

定義發送消息接口:

@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/send")
    public String send(@RequestParam String message){
        kafkaTemplate.send("spring-topic", message);
        return "success";
    }
}

配置consumer

# 指定默認消費者group id --> 由於在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名
spring.kafka.consumer.group-id=group-1
# enable.auto.commit:true --> 設置自動提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

定義消息接收監聽器:

@Component
public class ConsumerListener {

    @KafkaListener(topics = "test")
    public void onListener(String message){
        System.out.println(String.format("接收到消息:%s", message));
    }
}

測試

調用發送消息接口:http://localhost:99/kafka/send?message=abc,然后查看IDE窗口,consumer輸出了接收到的消息
在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM