Kafka之SpringBoot集成Kafka實戰


  在spring應用中如果需要訂閱kafka消息,通常情況下我們不會直接使用kafka-client, 而是使用更方便的一層封裝spring-kafka。
  在spring-kafka在運行時會啟動兩類線程,一類是Consumer線程,另一類是Listener線程。前者用來直接調用kafka-client的poll()方法獲取消息,后者才是調用我們代碼中標有@KafkaListener注解方法的線程。如果直接使用kafka-client的話,那么正常的寫法是一個while循環,在循環里面調用poll(),然后處理消息,這在kafka broker看來就是一個Consumer。如果想用多個Consumer,除了多啟動幾個進程以外,也可以在一個進程使用多個線程執行此while()循環。spring-kafka就是這么干的。

1.添加依賴

<dependencies>
    <dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.5.5.RELEASE</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    	<version>2.3.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.74</version>
    </dependency>
</dependencies>

2.kafka配置

  springBoot集成kafka,kafka的原生配置可以參考以下源碼:

org.apache.kafka.clients.CommonClientConfigs.class
org.apache.kafka.clients.consumer.ConsumerConfig.class
org.apache.kafka.clients.producer.ProducerConfig.class

   application.properties配置如下:

#============== KAFKA START===================
spring.kafka.listener.concurrency=5

spring.kafka.producer.bootstrap.servers=192.168.15.218:9093
spring.kafka.producer.retries= 3
spring.kafka.producer.buffer.memory=33554432
spring.kafka.producer.acks=0
#自定義配置,控制生產者是否發送消息
spring.kafka.producer.enable=false

spring.kafka.consumer.bootstrap.servers=192.168.15.218:9093
spring.kafka.consumer.group.id=kafka-group-ryj
spring.kafka.consumer.enable.auto.commit=true
spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.max.poll.records=10
#自定義配置,控制消費者是否監聽
spring.kafka.consumer.enable=true
#============== KAFKA END======================

#============== TOPIC START======================
topic.testRecord=topic.testRecord
#============== TOPIC END======================

3.修改啟動類,支持kafka注解

@SpringBootApplication()
@EnableKafka
public class KafkaTest {
    public static void main(String[] args) {
        System.out.println("Hello World!");
        SpringApplication.run(KafkaTest.class, args);
    }
}

4.增加kafka配置類,生成生產者、消費者相關信息

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.producer.bootstrap.servers}")
    private String producerServer;

    @Value("${spring.kafka.producer.retries}")
    private Integer producerRetries;

    @Value("${spring.kafka.producer.buffer.memory}")
    private String producerBufferMemory;

    @Value("${spring.kafka.producer.acks}")
    private String producerAcks;

    @Value("${spring.kafka.consumer.bootstrap.servers}")
    private String consumerServer;

    @Value("${spring.kafka.consumer.enable.auto.commit}")
    private Boolean consumerAutoCommit;

    @Value("${spring.kafka.consumer.group.id}")
    private String consumerGroupId;

    @Value("${spring.kafka.consumer.auto.offset.reset}")
    private String consumerOffsetReset;

    @Value("${spring.kafka.consumer.max.poll.records}")
    private String consumerPollNum;

    /**
     * 生產者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.ACKS_CONFIG, producerAcks);// 為0時,生產者不會等待返回消息發送結果
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerServer);
        props.put(ProducerConfig.RETRIES_CONFIG, producerRetries);// 發送失敗時,重新發送消息次數
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 批量發送消息的間隔時間
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);// 生產者緩存消息的內存字節數
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 生產者工廠
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生產者模板
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * 消費者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);// 消費者組ID
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOffsetReset);// offser沒有初始化或者不存在時默認的配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerServer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerPollNum);// 每次拉取記錄的數量
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);// 用於檢測客戶端故障的超時時間
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);// 請求響應的超時時間
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(consumerFactory());
        container.setBatchListener(true);//批量拉取消息,與消費者的接收參數有關         return container;
    }

    /**
     * KafkaListener 延遲啟動監聽工廠
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs()));
        // 禁止自動啟動
        container.setAutoStartup(false);
        container.setBatchListener(true);
        return container;
    }
}

5.生產者消息發送測試類

@Component
public class KafkaProducer {

    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    KafkaTemplate<String, Object> kafkaTemplate;

    public void sendJsonMessageToKafka(String jsonMessage, String topicName) {
        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, jsonMessage);
        listenableFuture
                .addCallback(
                        o -> logger.info("send message to kafka success !!! topicName={}, partition={}, offset={},msg={}", topicName,
                                o.getRecordMetadata().partition(), o.getRecordMetadata().offset(), o.getProducerRecord().value()),
                        throwable -> this.sendMsgFail(throwable, topicName));
    }

    private void sendMsgFail(Throwable throwable, String topicName) {
        logger.error("send message to kafka fail !!! topicName=" + topicName + " error " + throwable.getMessage());
    }
}
@Component
@Order(100)
public class KafkaTestRunner implements ApplicationRunner {

    @Value("${topic.testRecord}")
    private String topicName;
    
    @Value("${spring.kafka.producer.enable}")
    private Boolean producerEnable;

    @Autowired
    KafkaProducer kafkaProducer;

    @Override
    public void run(ApplicationArguments args) {
        if(producerEnable) {
            new Thread() {
                @Override
                public void run() {
                    sendMessage();
                }
                
            }.start();
        }
    }

    private void sendMessage() {
        for (Integer i = 0; i < 1000; i++) {
            kafkaProducer.sendJsonMessageToKafka(JSON.toJSONString(new Digit(i)), topicName);
        }
    }
}

@Data
class Digit {

    Integer i;

    public Digit(Integer i) {
        super();
        this.i = i;
    }
    
}

6.消費者批量消費測試類

@Component
@SpringBootConfiguration
public class KafkaConsumerTest {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    //如果想立即消費,可以更換containerFactory
    @KafkaListener(id = "delayConsumer",topics = "${topic.testRecord}", containerFactory = "delayContainerFactory", groupId = "${spring.kafka.consumer.group.id}")
    //批量時不能用Object作為參數,否則會報錯
    public void delayConsumer(List<ConsumerRecord<String, String>> message) {
        try {
            message.forEach(record -> {
                logger.info("delayConsumer consumer success.partition={}, offset={},msg={}",record.partition(),record.offset(),record.value());
            });
        } catch (Exception e) {
            logger.error("delayConsumer error.",e);
        }
    }
}
@Component
@Order(10)
public class KafkaDelayConsumerRunner implements ApplicationRunner {

    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    @Value("${spring.kafka.consumer.enable}")
    private Boolean consumerEnable;

    @Override
    public void run(ApplicationArguments args) {
        if (consumerEnable) {
            //喚醒延遲啟動的kafka消費者
            registry.getListenerContainer("delayConsumer").start();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM