<!--新增kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
由于我们的springboot版本比较低,是1.5.9的,所以使用的kafka依赖是1.1.2的,但由于客户安装的kafka服务是2.0.0的版本,所以要排除spring-kafka自带的kafka-clients,导入2.0.0的版本。
注意:不同的springboot版本要导入不同的spring-kafka依赖,有版本对应的关系,否则会启动报错。版本对应关系自行百度测试
消费者配置文件consumer.properties:
kafka.bootstrap-servers=192.168.8.15:9092 kafka.consumer.auto-offset-reset=latest kafka.consumer.enable-auto-commit=true kafka.consumer.auto-commit-interval=100 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.group-id=testGroup kafka.consumer.session.timeout=6000 kafka.consumer.concurrency=10 kafka.consumer.max-poll-records=500 #kafka.consumer.topic=kafka-test-topic kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
创建消费者配置类:方法一
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 消费者配置
*
* @Author huang createTime:2020/11/16 13:57
* @return
*/
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {
//@Autowired
//private BaseSysParamService baseSysParamService;
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
/** 一次调用poll()操作时返回的最大记录数,默认值为500 */
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.consumer.sasl-jaas-config}")
private String kafkaConsumerSASLJaasConfig;
// static {
// String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
//// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
// System.setProperty("java.security.auth.login.config", path);
// }
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
//String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
//if (StringUtils.isNotBlank(kafkaIp)){
// servers = kafkaIp;
//}
//String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
//if (StringUtils.isNotBlank(kafkaSASL)){
// kafkaConsumerSASLJaasConfig = kafkaSASL;
//}
Map props =new HashMap<>();
//消费者参数设置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费
//用户密码认证参数
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
return props;
}
@Bean
public KafkaProperties.Listener listener() {
return new KafkaProperties.Listener();
}
}
创建消费者配置类:方法二
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 消费者配置
*
* @Author huang createTime:2020/11/16 13:57
* @return
*/
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {
@Autowired
private BaseSysParamService baseSysParamService;
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
/** 一次调用poll()操作时返回的最大记录数,默认值为500 */
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
// @Value("${kafka.consumer.sasl-jaas-config}")
//private String kafkaConsumerSASLJaasConfig;
static {
String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
//// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
System.setProperty("java.security.auth.login.config", path);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
//String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
//if (StringUtils.isNotBlank(kafkaIp)){
// servers = kafkaIp;
//}
//String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
//if (StringUtils.isNotBlank(kafkaSASL)){
// kafkaConsumerSASLJaasConfig = kafkaSASL;
//}
Map props =new HashMap<>();
//消费者参数设置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消费
//用户密码认证参数
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
//props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
return props;
}
@Bean
public KafkaProperties.Listener listener() {
return new KafkaProperties.Listener();
}
}
密码认证consumer_jaas.conf:方法二
上面static静态代码块需要加载的文件
KafkaClient{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="test"
password="test";
};
监听器
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService;
import com.zkteco.zkbiosecurity.auth.service.AuthRoleService;
import com.zkteco.zkbiosecurity.auth.service.AuthUserService;
import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem;
import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem;
import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto;
import com.zkteco.zkbiosecurity.pers.service.PersPersonService;
import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl;
import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Description
*
* @Author huang createTime:2020/11/16 13:56
* @return
*/
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
@KafkaListener(topics = {"testTopic"})
public void receive(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
logger.info("消费得到的消息---key: " + record.key());
logger.info("消费得到的消息---value: " + record.value().toString());
}
}
}
上面springboot整合kafka是基于springboot1.5.9版本的,需要写配置类来加载
springboot2.0.0以上可以直接在application.properties配置(只是个人测试2.3.2可行,没有深入研究)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.3.RELEASE</version>
</dependency>
spring:
# KAFKA
kafka:
# ָkafka服务器地址,可以指定多个
bootstrap-servers: 192.168.8.15:9092
#=============== consumer消费者配置 =======================
consumer:
#指定默认消费者的group id
group-id: testGroup
#earliest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none
#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 1000
#指定消费key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1000 #每批最大条数,默认500
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-256
jaas:
config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";'
listener:
poll-timeout: 1000
type: batch #指定监听的模式
concurrency: 1 #同时处理线程数
