<!--新增kafka依赖--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.2.RELEASE</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> </dependency>
由于我们的springboot版本比较低,是1.5.9的,所以使用的kafka依赖是1.1.2的,但由于客户安装的kafka服务是2.0.0的版本,所以要排除spring-kafka自带的kafka-clients,导入2.0.0的版本。
注意:不同的springboot版本要导入不同的spring-kafka依赖,有版本对应的关系,否则会启动报错。版本对应关系自行百度测试
消费者配置文件consumer.properties:
kafka.bootstrap-servers=192.168.8.15:9092 kafka.consumer.auto-offset-reset=latest kafka.consumer.enable-auto-commit=true kafka.consumer.auto-commit-interval=100 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.group-id=testGroup kafka.consumer.session.timeout=6000 kafka.consumer.concurrency=10 kafka.consumer.max-poll-records=500 #kafka.consumer.topic=kafka-test-topic kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
创建消费者配置类:方法一
import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @Description 消费者配置 * * @Author huang createTime:2020/11/16 13:57 * @return */ @Configuration @PropertySource("classpath:config/consumer.properties") public class KafkaConsumerConfig { //@Autowired //private BaseSysParamService baseSysParamService; @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; /** 一次调用poll()操作时返回的最大记录数,默认值为500 */ @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; @Value("${kafka.consumer.sasl-jaas-config}") private String kafkaConsumerSASLJaasConfig; // static { // String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath(); //// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf"); // System.setProperty("java.security.auth.login.config", path); // } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //并发数量 factory.setConcurrency(concurrency); //批量获取 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip"); //if (StringUtils.isNotBlank(kafkaIp)){ // servers = kafkaIp; //} //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl"); //if (StringUtils.isNotBlank(kafkaSASL)){ // kafkaConsumerSASLJaasConfig = kafkaSASL; //} Map props =new HashMap<>(); //消费者参数设置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers); props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费 //用户密码认证参数 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig); return props; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); } }
创建消费者配置类:方法二
import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @Description 消费者配置 * * @Author huang createTime:2020/11/16 13:57 * @return */ @Configuration @PropertySource("classpath:config/consumer.properties") public class KafkaConsumerConfig { @Autowired private BaseSysParamService baseSysParamService; @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; /** 一次调用poll()操作时返回的最大记录数,默认值为500 */ @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; // @Value("${kafka.consumer.sasl-jaas-config}") //private String kafkaConsumerSASLJaasConfig; static { String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath(); //// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf"); System.setProperty("java.security.auth.login.config", path); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //并发数量 factory.setConcurrency(concurrency); //批量获取 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip"); //if (StringUtils.isNotBlank(kafkaIp)){ // servers = kafkaIp; //} //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl"); //if (StringUtils.isNotBlank(kafkaSASL)){ // kafkaConsumerSASLJaasConfig = kafkaSASL; //} Map props =new HashMap<>(); //消费者参数设置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers); props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费 //用户密码认证参数 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); //props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig); return props; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); } }
密码认证consumer_jaas.conf:方法二
上面static静态代码块需要加载的文件
KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test"; };
监听器
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService; import com.zkteco.zkbiosecurity.auth.service.AuthRoleService; import com.zkteco.zkbiosecurity.auth.service.AuthUserService; import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem; import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem; import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto; import com.zkteco.zkbiosecurity.pers.service.PersPersonService; import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl; import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; /** * @Description * * @Author huang createTime:2020/11/16 13:56 * @return */ @Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开 @KafkaListener(topics = {"testTopic"}) public void receive(List<ConsumerRecord<?, ?>> records) { for (ConsumerRecord<?, ?> record : records) { logger.info("消费得到的消息---key: " + record.key()); logger.info("消费得到的消息---value: " + record.value().toString()); } } }
上面springboot整合kafka是基于springboot1.5.9版本的,需要写配置类来加载
springboot2.0.0以上可以直接在application.properties配置(只是个人测试2.3.2可行,没有深入研究)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.3.RELEASE</version> </dependency>
spring: # KAFKA kafka: # ָkafka服务器地址,可以指定多个 bootstrap-servers: 192.168.8.15:9092 #=============== consumer消费者配置 ======================= consumer: #指定默认消费者的group id group-id: testGroup #earliest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 #latest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 #none #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest enable-auto-commit: true auto-commit-interval: 1000 #指定消费key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 1000 #每批最大条数,默认500 properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: SCRAM-SHA-256 jaas: config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";' listener: poll-timeout: 1000 type: batch #指定监听的模式 concurrency: 1 #同时处理线程数