<!--新增kafka依賴-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
由於我們的springboot版本比較低,是1.5.9的,所以使用的kafka依賴是1.1.2的,但由於客戶安裝的kafka服務是2.0.0的版本,所以要排除spring-kafka自帶的kafka-clients,導入2.0.0的版本。
注意:不同的springboot版本要導入不同的spring-kafka依賴,有版本對應的關系,否則會啟動報錯。版本對應關系自行百度測試
消費者配置文件consumer.properties:
kafka.bootstrap-servers=192.168.8.15:9092 kafka.consumer.auto-offset-reset=latest kafka.consumer.enable-auto-commit=true kafka.consumer.auto-commit-interval=100 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.group-id=testGroup kafka.consumer.session.timeout=6000 kafka.consumer.concurrency=10 kafka.consumer.max-poll-records=500 #kafka.consumer.topic=kafka-test-topic kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
創建消費者配置類:方法一
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 消費者配置
*
* @Author huang createTime:2020/11/16 13:57
* @return
*/
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {
//@Autowired
//private BaseSysParamService baseSysParamService;
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
/** 一次調用poll()操作時返回的最大記錄數,默認值為500 */
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.consumer.sasl-jaas-config}")
private String kafkaConsumerSASLJaasConfig;
// static {
// String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
//// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
// System.setProperty("java.security.auth.login.config", path);
// }
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//並發數量
factory.setConcurrency(concurrency);
//批量獲取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
//String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
//if (StringUtils.isNotBlank(kafkaIp)){
// servers = kafkaIp;
//}
//String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
//if (StringUtils.isNotBlank(kafkaSASL)){
// kafkaConsumerSASLJaasConfig = kafkaSASL;
//}
Map props =new HashMap<>();
//消費者參數設置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費
//用戶密碼認證參數
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
return props;
}
@Bean
public KafkaProperties.Listener listener() {
return new KafkaProperties.Listener();
}
}
創建消費者配置類:方法二
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 消費者配置
*
* @Author huang createTime:2020/11/16 13:57
* @return
*/
@Configuration
@PropertySource("classpath:config/consumer.properties")
public class KafkaConsumerConfig {
@Autowired
private BaseSysParamService baseSysParamService;
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
/** 一次調用poll()操作時返回的最大記錄數,默認值為500 */
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
// @Value("${kafka.consumer.sasl-jaas-config}")
//private String kafkaConsumerSASLJaasConfig;
static {
String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath();
//// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf");
System.setProperty("java.security.auth.login.config", path);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//並發數量
factory.setConcurrency(concurrency);
//批量獲取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
//String kafkaIp = baseSysParamService.getValByName("third.kafka.ip");
//if (StringUtils.isNotBlank(kafkaIp)){
// servers = kafkaIp;
//}
//String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl");
//if (StringUtils.isNotBlank(kafkaSASL)){
// kafkaConsumerSASLJaasConfig = kafkaSASL;
//}
Map props =new HashMap<>();
//消費者參數設置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費
//用戶密碼認證參數
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
//props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
return props;
}
@Bean
public KafkaProperties.Listener listener() {
return new KafkaProperties.Listener();
}
}
密碼認證consumer_jaas.conf:方法二
上面static靜態代碼塊需要加載的文件
KafkaClient{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="test"
password="test";
};
監聽器
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService;
import com.zkteco.zkbiosecurity.auth.service.AuthRoleService;
import com.zkteco.zkbiosecurity.auth.service.AuthUserService;
import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem;
import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem;
import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto;
import com.zkteco.zkbiosecurity.pers.service.PersPersonService;
import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl;
import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Description
*
* @Author huang createTime:2020/11/16 13:56
* @return
*/
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//下面的主題是一個數組,可以同時訂閱多主題,只需按數組格式即可,也就是用“,”隔開
@KafkaListener(topics = {"testTopic"})
public void receive(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
logger.info("消費得到的消息---key: " + record.key());
logger.info("消費得到的消息---value: " + record.value().toString());
}
}
}
上面springboot整合kafka是基於springboot1.5.9版本的,需要寫配置類來加載
springboot2.0.0以上可以直接在application.properties配置(只是個人測試2.3.2可行,沒有深入研究)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.3.RELEASE</version>
</dependency>
spring:
# KAFKA
kafka:
# ָkafka服務器地址,可以指定多個
bootstrap-servers: 192.168.8.15:9092
#=============== consumer消費者配置 =======================
consumer:
#指定默認消費者的group id
group-id: testGroup
#earliest
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
#latest
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
#none
#topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 1000
#指定消費key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1000 #每批最大條數,默認500
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-256
jaas:
config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";'
listener:
poll-timeout: 1000
type: batch #指定監聽的模式
concurrency: 1 #同時處理線程數
