Spring boot 自定義kafkaTemplate的bean實例進行生產消息和發送消息


本文為博主原創,未經允許不得轉載:

目錄:

  1.  自定義生產消息 kafkaTemplate 實例

  2.  封裝 kafka 發送消息的service 方法

  3.  測試 kafka 發送消息service 的方法

     4.  自定義 kafka 消費消息的工廠 bean 

     5.  kafka 監聽消費消息

      

  1.  自定義 kafkaTemplate 實例

    a : 使用 @ConditionalOnProperty 注解屬性控制是否加載 kafka 相關初始化配置,因為在項目開發過程中,如kafka 或redis 等工具容易封裝為

    工具類,被各微服務引用並進行加載。使用 @ConditionalOnProperty 注解的 havingValue 屬性可以控制服務中是否進行加載對應的配置。

    該屬性的值,可在 yaml 配置文件中指定: kafka.used = true 。如果為true 則加載,false則不加載

    b.  使用工廠實例生成指定的 kafkaTemplate 實例

  

package com.example.demo.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@ConditionalOnProperty(prefix="kafka",name = "isClose",havingValue = "true")
public class KafkaTemplateConfig {

    /**
     * Producer Template 配置
     */
    @Bean(name="kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * Producer 工廠配置
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * Producer 參數配置
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 指定多個kafka集群多個地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");

        // 重試次數,0為不啟用重試機制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默認為1
        // acks=0 把消息發送到kafka就認為發送成功
        // acks=1 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功
        // acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功
        props.put(ProducerConfig.ACKS_CONFIG, 1);

        // 生產者空間不足時,send()被阻塞的時間,默認60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批處理大小,單位為字節
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高並發量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是說send的消息大小不能超過這個限制, 默認1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        // 鍵的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 壓縮消息,支持四種類型,分別為:none、lz4、gzip、snappy,默認為none。
        // 消費者默認支持解壓,所以壓縮設置在生產者,消費者無需設置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        return props;
    }


}

 

  2.  封裝 kafka 發送消息的service 方法:

    

package com.example.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Service
public class KafkaProduceService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * producer 同步方式發送數據
     *
     * @param topic   topic名稱
     * @param message producer發送的數據
     */
    public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * producer 異步方式發送數據
     *
     * @param topic   topic名稱
     * @param message producer發送的數據
     */
    public void sendMessageAsync(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("success");
            }
            @Override
            public void onSuccess(Object o) {
                System.out.println("failure");
            }
        });
    }

}

  

  3. 測試 kafka 發送消息service 的方法:

    

package com.example.demo;

import com.example.demo.service.KafkaProduceService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProduceServiceTest {

    @Autowired
    private KafkaProduceService producerService;

    @Test
    public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
        producerService.sendMessageSync("test","同步發送消息測試");
    }

    @Test
    public void sendMessageAsync() {
        producerService.sendMessageAsync("test","異步發送消息測試");
    }

}

   

  4. 自定義 kafka 消費消息的工廠 bean :

  

package com.example.demo.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 設置消費者工廠
        factory.setConsumerFactory(consumerFactory());
        // 消費者組中線程數量
        factory.setConcurrency(3);
        // 拉取超時時間
        factory.getContainerProperties().setPollTimeout(3000);

        // 當使用批量監聽器時需要設置為true
        factory.setBatchListener(true);

        return factory;
    }

//    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

//    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");
        //配置默認分組,這里沒有配置+在監聽的地方沒有設置groupId,多個服務會出現收到相同消息情況
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
        // 是否自動提交offset偏移量(默認true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自動提交的頻率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超時設置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 鍵的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量規則設置:
        // (1)、earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
        // (2)、latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
        // (3)、none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }
}

  

  5. kafka 監聽消費消息:

    

package com.example.demo.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = {"test"},groupId = "group1",
            containerFactory="kafkaListenerContainerFactory")
    public void kafkaListener(String message){
        System.out.println(message);
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM