spring boot整合kafka服務創建消費者消費消息


<!--新增kafka依賴-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.2.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

  由於我們的springboot版本比較低,是1.5.9的,所以使用的kafka依賴是1.1.2的,但由於客戶安裝的kafka服務是2.0.0的版本,所以要排除spring-kafka自帶的kafka-clients,導入2.0.0的版本。

注意:不同的springboot版本要導入不同的spring-kafka依賴,有版本對應的關系,否則會啟動報錯。版本對應關系自行百度測試

 

消費者配置文件consumer.properties:

kafka.bootstrap-servers=192.168.8.15:9092

kafka.consumer.auto-offset-reset=latest
kafka.consumer.enable-auto-commit=true
kafka.consumer.auto-commit-interval=100
kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.group-id=testGroup
kafka.consumer.session.timeout=6000
kafka.consumer.concurrency=10
kafka.consumer.max-poll-records=500

#kafka.consumer.topic=kafka-test-topic

kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";

 

創建消費者配置類:方法一

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description  消費者配置
 *
 * @Author huang  createTime:2020/11/16 13:57
 * @return
 */
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {

    //@Autowired
    //private BaseSysParamService baseSysParamService;

    @Value("${kafka.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    /** 一次調用poll()操作時返回的最大記錄數,默認值為500 */
    @Value("${kafka.consumer.max-poll-records}")
    private int maxPollRecords;

    @Value("${kafka.consumer.sasl-jaas-config}")
    private String kafkaConsumerSASLJaasConfig;

//    static {
//        String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
////        System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
//        System.setProperty("java.security.auth.login.config", path);
//    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //並發數量
        factory.setConcurrency(concurrency);
        //批量獲取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
        //if (StringUtils.isNotBlank(kafkaIp)){
          //  servers = kafkaIp;
        //}
        //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
        //if (StringUtils.isNotBlank(kafkaSASL)){
          //  kafkaConsumerSASLJaasConfig = kafkaSASL;
        //}
        Map props =new HashMap<>();
        //消費者參數設置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費
        //用戶密碼認證參數
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        return props;
    }

    @Bean
    public KafkaProperties.Listener listener() {
        return new KafkaProperties.Listener();
    }

}

  

創建消費者配置類:方法二

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description  消費者配置
 *
 * @Author huang  createTime:2020/11/16 13:57
 * @return
 */
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {

    @Autowired
    private BaseSysParamService baseSysParamService;

    @Value("${kafka.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    /** 一次調用poll()操作時返回的最大記錄數,默認值為500 */
    @Value("${kafka.consumer.max-poll-records}")
    private int maxPollRecords;

   // @Value("${kafka.consumer.sasl-jaas-config}")
    //private String kafkaConsumerSASLJaasConfig;

    static {
        String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
////        System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
        System.setProperty("java.security.auth.login.config", path);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //並發數量
        factory.setConcurrency(concurrency);
        //批量獲取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
        //if (StringUtils.isNotBlank(kafkaIp)){
          //  servers = kafkaIp;
        //}
        //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
        //if (StringUtils.isNotBlank(kafkaSASL)){
          //  kafkaConsumerSASLJaasConfig = kafkaSASL;
        //}
        Map props =new HashMap<>();
        //消費者參數設置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費
        //用戶密碼認證參數
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        //props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        return props;
    }

    @Bean
    public KafkaProperties.Listener listener() {
        return new KafkaProperties.Listener();
    }

}

密碼認證consumer_jaas.conf:方法二

上面static靜態代碼塊需要加載的文件

KafkaClient{
	org.apache.kafka.common.security.scram.ScramLoginModule required
	username="test"
	password="test";
};

  

 

監聽器

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService;
import com.zkteco.zkbiosecurity.auth.service.AuthRoleService;
import com.zkteco.zkbiosecurity.auth.service.AuthUserService;
import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem;
import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem;
import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto;
import com.zkteco.zkbiosecurity.pers.service.PersPersonService;
import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl;
import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * @Description
 *
 * @Author huang  createTime:2020/11/16 13:56
 * @return
 */
@Component
public class KafkaConsumer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //下面的主題是一個數組,可以同時訂閱多主題,只需按數組格式即可,也就是用“,”隔開
    @KafkaListener(topics = {"testTopic"})
    public void receive(List<ConsumerRecord<?, ?>> records) {
        for (ConsumerRecord<?, ?> record : records) {
            logger.info("消費得到的消息---key: " + record.key());
            logger.info("消費得到的消息---value: " + record.value().toString());

        }
    }
}

  

上面springboot整合kafka是基於springboot1.5.9版本的,需要寫配置類來加載

springboot2.0.0以上可以直接在application.properties配置(只是個人測試2.3.2可行,沒有深入研究)

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.5.3.RELEASE</version>
</dependency>

  

spring:
  # KAFKA
  kafka:
    # ָkafka服務器地址,可以指定多個
    bootstrap-servers: 192.168.8.15:9092
    #=============== consumer消費者配置  =======================
    consumer:
      #指定默認消費者的group id
      group-id: testGroup
      #earliest
      #當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      #latest
      #當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
      #none
      #topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
      auto-offset-reset: latest
      enable-auto-commit: true
      auto-commit-interval: 1000
      #指定消費key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1000    #每批最大條數,默認500
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: SCRAM-SHA-256
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";'
    listener:
      poll-timeout: 1000
      type: batch           #指定監聽的模式
      concurrency: 1        #同時處理線程數

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM