spring boot整合kafka服务创建消费者消费消息


<!--新增kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.2.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

  由于我们的springboot版本比较低,是1.5.9的,所以使用的kafka依赖是1.1.2的,但由于客户安装的kafka服务是2.0.0的版本,所以要排除spring-kafka自带的kafka-clients,导入2.0.0的版本。

注意:不同的springboot版本要导入不同的spring-kafka依赖,有版本对应的关系,否则会启动报错。版本对应关系自行百度测试

 

消费者配置文件consumer.properties:

kafka.bootstrap-servers=192.168.8.15:9092

kafka.consumer.auto-offset-reset=latest
kafka.consumer.enable-auto-commit=true
kafka.consumer.auto-commit-interval=100
kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.group-id=testGroup
kafka.consumer.session.timeout=6000
kafka.consumer.concurrency=10
kafka.consumer.max-poll-records=500

#kafka.consumer.topic=kafka-test-topic

kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";

 

创建消费者配置类:方法一

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description  消费者配置
 *
 * @Author huang  createTime:2020/11/16 13:57
 * @return
 */
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {

    //@Autowired
    //private BaseSysParamService baseSysParamService;

    @Value("${kafka.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    /** 一次调用poll()操作时返回的最大记录数,默认值为500 */
    @Value("${kafka.consumer.max-poll-records}")
    private int maxPollRecords;

    @Value("${kafka.consumer.sasl-jaas-config}")
    private String kafkaConsumerSASLJaasConfig;

//    static {
//        String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
////        System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
//        System.setProperty("java.security.auth.login.config", path);
//    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
        //if (StringUtils.isNotBlank(kafkaIp)){
          //  servers = kafkaIp;
        //}
        //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
        //if (StringUtils.isNotBlank(kafkaSASL)){
          //  kafkaConsumerSASLJaasConfig = kafkaSASL;
        //}
        Map props =new HashMap<>();
        //消费者参数设置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费
        //用户密码认证参数
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        return props;
    }

    @Bean
    public KafkaProperties.Listener listener() {
        return new KafkaProperties.Listener();
    }

}

  

创建消费者配置类:方法二

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description  消费者配置
 *
 * @Author huang  createTime:2020/11/16 13:57
 * @return
 */
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {

    @Autowired
    private BaseSysParamService baseSysParamService;

    @Value("${kafka.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    /** 一次调用poll()操作时返回的最大记录数,默认值为500 */
    @Value("${kafka.consumer.max-poll-records}")
    private int maxPollRecords;

   // @Value("${kafka.consumer.sasl-jaas-config}")
    //private String kafkaConsumerSASLJaasConfig;

    static {
        String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
////        System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
        System.setProperty("java.security.auth.login.config", path);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
        //if (StringUtils.isNotBlank(kafkaIp)){
          //  servers = kafkaIp;
        //}
        //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
        //if (StringUtils.isNotBlank(kafkaSASL)){
          //  kafkaConsumerSASLJaasConfig = kafkaSASL;
        //}
        Map props =new HashMap<>();
        //消费者参数设置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费
        //用户密码认证参数
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        //props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        return props;
    }

    @Bean
    public KafkaProperties.Listener listener() {
        return new KafkaProperties.Listener();
    }

}

密码认证consumer_jaas.conf:方法二

上面static静态代码块需要加载的文件

KafkaClient{
	org.apache.kafka.common.security.scram.ScramLoginModule required
	username="test"
	password="test";
};

  

 

监听器

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService;
import com.zkteco.zkbiosecurity.auth.service.AuthRoleService;
import com.zkteco.zkbiosecurity.auth.service.AuthUserService;
import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem;
import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem;
import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto;
import com.zkteco.zkbiosecurity.pers.service.PersPersonService;
import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl;
import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * @Description
 *
 * @Author huang  createTime:2020/11/16 13:56
 * @return
 */
@Component
public class KafkaConsumer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
    @KafkaListener(topics = {"testTopic"})
    public void receive(List<ConsumerRecord<?, ?>> records) {
        for (ConsumerRecord<?, ?> record : records) {
            logger.info("消费得到的消息---key: " + record.key());
            logger.info("消费得到的消息---value: " + record.value().toString());

        }
    }
}

  

上面springboot整合kafka是基于springboot1.5.9版本的,需要写配置类来加载

springboot2.0.0以上可以直接在application.properties配置(只是个人测试2.3.2可行,没有深入研究)

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.5.3.RELEASE</version>
</dependency>

  

spring:
  # KAFKA
  kafka:
    # ָkafka服务器地址,可以指定多个
    bootstrap-servers: 192.168.8.15:9092
    #=============== consumer消费者配置  =======================
    consumer:
      #指定默认消费者的group id
      group-id: testGroup
      #earliest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      #latest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      #none
      #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      enable-auto-commit: true
      auto-commit-interval: 1000
      #指定消费key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1000    #每批最大条数,默认500
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: SCRAM-SHA-256
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";'
    listener:
      poll-timeout: 1000
      type: batch           #指定监听的模式
      concurrency: 1        #同时处理线程数

  

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM