Kafka 入門實戰(3)--SpringBoot 整合 Kafka


spring-kafka 使得在 Spring 環境中使用 Kafka 變的很簡單,只需少量的配置和少量的代碼就可以發送和接受消息了。本文主要介紹在 SpringBoot 中用 spring-kafka 操作 Kafka,文中使用到的軟件版本:Kafka 2.8.0、SpringBoot 2.4.6、Java 1.8.0_191。

1、參數說明

spring-kafka 中參數是以 spring.kafka 開頭的,后面的參數名稱和 Kafka 的原始參數很類似,只不過 spring-kafka 會把一些參數中的 "." 改為 "-",如 auto.offset.reset 改為 spring.kafka.consumer.auto-offset-reset。

前綴 描述
spring.kafka Spring 中 Kafka 相關配置總的前綴
spring.kafka.consumer 消費者相關參數
spring.kafka.producer  生產者相關參數
spring.kafka.admin Kafka 管理相關參數

kafka 的原始參數說明可參考:Kafka入門實戰(1)-概念、安裝及簡單使用;或參考官方文檔

2、SpringBoot 整合 Kafka

2.1、引入依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!--流處理需要用到-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

2.2、增加 Kafka 配置

spring:
  kafka:
    bootstrap-servers: 10.40.100.69:9092
    producer:
      acks: all
      transaction-id-prefix: tx. #開啟事務,發送消息的方法需增加@Transactional注解
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: groupA
      auto-offset-reset:
    streams:
      application-id: streams-test
      properties:
        "[default.key.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde
        "[default.value.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde

2.3、發送消息

package com.abc.demo.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class Producer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Transactional
    @Scheduled(cron = "0/10 * * * * ?")
    public void sendMessage() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("test", "消息" + i);
        }
    }

//    @Scheduled(cron = "0/10 * * * * ?")
//    public void sendMessage2() {
//        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
//            @Override
//            public Object doInOperations(KafkaOperations kafkaOperations) {
//                for (int i = 0; i < 10; i++) {
//                    kafkaTemplate.send("test", "消息" + i);
//                }
//                return null;
//            }
//        });
//    }
}

2.4、接受消息

package com.abc.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    private static Logger logger = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(topics = "test")
    public void recevieMessage(ConsumerRecord<String, String> record) {
        logger.info("offset={}, key={}, value={}", record.offset(), record.key(), record.value());
    }
}

2.5、流處理

package com.abc.demo.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

import java.util.HashMap;
import java.util.Map;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class StreamConfig {
    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("stream-in");
        //從 stream-in 隊列中讀取數據,處理后發送給 stream-out 隊列
        //發送數據 key,value 分別使用的序列化類為Serdes.String(),JsonSerde
        stream.map(this::uppercaseValue).to("stream-out", Produced.with(Serdes.String(), new JsonSerde<>()));
        return stream;
    }

    /**
     * 消息轉換,新的消息:key-原來的value值,value-一個map
     */
    private KeyValue<String, Map> uppercaseValue(String key, String value) {
        Map<String, String> map = new HashMap<>();
        map.put("message", value.toUpperCase());
        map.put("timestamp", System.currentTimeMillis() + "");
        return new KeyValue(value, map);
    }
}

程序從 stream-in 中讀取消息,對消息加工后再發送給 stream-out;打開兩個終端,一個往 stream-in 發送消息,一個接受 stream-out 的消息。

./kafka-console-producer.sh --broker-list 10.40.100.69:9092 --topic stream-in #發送消息

./kafka-console-consumer.sh --bootstrap-server 10.40.100.69:9092 --topic stream-out --property print.key=true #接受消息

stream-in 的輸入:

stream-out 的輸出:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM