<!--新增kafka依賴--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.2.RELEASE</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> </dependency>
由於我們的springboot版本比較低,是1.5.9的,所以使用的kafka依賴是1.1.2的,但由於客戶安裝的kafka服務是2.0.0的版本,所以要排除spring-kafka自帶的kafka-clients,導入2.0.0的版本。
注意:不同的springboot版本要導入不同的spring-kafka依賴,有版本對應的關系,否則會啟動報錯。版本對應關系自行百度測試
消費者配置文件consumer.properties:
kafka.bootstrap-servers=192.168.8.15:9092 kafka.consumer.auto-offset-reset=latest kafka.consumer.enable-auto-commit=true kafka.consumer.auto-commit-interval=100 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.group-id=testGroup kafka.consumer.session.timeout=6000 kafka.consumer.concurrency=10 kafka.consumer.max-poll-records=500 #kafka.consumer.topic=kafka-test-topic kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
創建消費者配置類:方法一
import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @Description 消費者配置 * * @Author huang createTime:2020/11/16 13:57 * @return */ @Configuration @PropertySource("classpath:config/consumer.properties") public class KafkaConsumerConfig { //@Autowired //private BaseSysParamService baseSysParamService; @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; /** 一次調用poll()操作時返回的最大記錄數,默認值為500 */ @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; @Value("${kafka.consumer.sasl-jaas-config}") private String kafkaConsumerSASLJaasConfig; // static { // String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath(); //// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf"); // System.setProperty("java.security.auth.login.config", path); // } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //並發數量 factory.setConcurrency(concurrency); //批量獲取 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip"); //if (StringUtils.isNotBlank(kafkaIp)){ // servers = kafkaIp; //} //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl"); //if (StringUtils.isNotBlank(kafkaSASL)){ // kafkaConsumerSASLJaasConfig = kafkaSASL; //} Map props =new HashMap<>(); //消費者參數設置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers); props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費 //用戶密碼認證參數 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig); return props; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); } }
創建消費者配置類:方法二
import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @Description 消費者配置 * * @Author huang createTime:2020/11/16 13:57 * @return */ @Configuration @PropertySource("classpath:config/consumer.properties") public class KafkaConsumerConfig { @Autowired private BaseSysParamService baseSysParamService; @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; /** 一次調用poll()操作時返回的最大記錄數,默認值為500 */ @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; // @Value("${kafka.consumer.sasl-jaas-config}") //private String kafkaConsumerSASLJaasConfig; static { String path = Thread.currentThread().getContextClassLoader().getResource("config/consumer_jaas.conf").getPath(); //// System.setProperty("java.security.auth.login.config", "F:\\workspace\\springbootkafka\\src\\main\\resources\\config\\consumer_jaas.conf"); System.setProperty("java.security.auth.login.config", path); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //並發數量 factory.setConcurrency(concurrency); //批量獲取 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { //String kafkaIp = baseSysParamService.getValByName("third.kafka.ip"); //if (StringUtils.isNotBlank(kafkaIp)){ // servers = kafkaIp; //} //String kafkaSASL = baseSysParamService.getValByName("third.kafka.sasl"); //if (StringUtils.isNotBlank(kafkaSASL)){ // kafkaConsumerSASLJaasConfig = kafkaSASL; //} Map props =new HashMap<>(); //消費者參數設置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers); props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//批量消費 //用戶密碼認證參數 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); //props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig); return props; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); } }
密碼認證consumer_jaas.conf:方法二
上面static靜態代碼塊需要加載的文件
KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test"; };
監聽器
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zkteco.zkbiosecurity.auth.service.AuthDepartmentService; import com.zkteco.zkbiosecurity.auth.service.AuthRoleService; import com.zkteco.zkbiosecurity.auth.service.AuthUserService; import com.zkteco.zkbiosecurity.auth.vo.AuthRoleItem; import com.zkteco.zkbiosecurity.auth.vo.AuthUserItem; import com.zkteco.zkbiosecurity.auth.vo.ThirdDepartmentDto; import com.zkteco.zkbiosecurity.pers.service.PersPersonService; import com.zkteco.zkbiosecurity.pers.service.impl.ThirdDataServiceImpl; import com.zkteco.zkbiosecurity.pers.vo.ThirdPersonDto; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; /** * @Description * * @Author huang createTime:2020/11/16 13:56 * @return */ @Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //下面的主題是一個數組,可以同時訂閱多主題,只需按數組格式即可,也就是用“,”隔開 @KafkaListener(topics = {"testTopic"}) public void receive(List<ConsumerRecord<?, ?>> records) { for (ConsumerRecord<?, ?> record : records) { logger.info("消費得到的消息---key: " + record.key()); logger.info("消費得到的消息---value: " + record.value().toString()); } } }
上面springboot整合kafka是基於springboot1.5.9版本的,需要寫配置類來加載
springboot2.0.0以上可以直接在application.properties配置(只是個人測試2.3.2可行,沒有深入研究)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.3.RELEASE</version> </dependency>
spring: # KAFKA kafka: # ָkafka服務器地址,可以指定多個 bootstrap-servers: 192.168.8.15:9092 #=============== consumer消費者配置 ======================= consumer: #指定默認消費者的group id group-id: testGroup #earliest #當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 #latest #當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 #none #topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 auto-offset-reset: latest enable-auto-commit: true auto-commit-interval: 1000 #指定消費key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 1000 #每批最大條數,默認500 properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: SCRAM-SHA-256 jaas: config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";' listener: poll-timeout: 1000 type: batch #指定監聽的模式 concurrency: 1 #同時處理線程數