kafka在springboot環境下多線程請求和多線程消費
1.需求描述:
接到一個需求,A模塊將某些渠道獲取的數據發送到kafka,B模塊從kafka消費數據,設置的主題是r2p5,即設置了5個分區,為了消費速度最大化,代碼中設置了五個線程
開發完生產者的代碼如下:
//pom.xml引入kafka配置包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--此處的版本號依賴父版本,省略-->
</dependency>
//java代碼
@RestController
public class TestController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("/test")
public void test(){
kafkaTemplate.send("topicName","msg");
}
}
消費者代碼如下(配置類是百度的,因為代碼在內網,單敲麻煩):
1)配置類
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
final static String list ="10.28.18.103:6667";
/**
* Description:獲取配置
* Date: 2017年7月11日
* @author shaqf
*/
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
return props;
}
/** 獲取工廠 */
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
/** 獲取實例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
factory1.setConsumerFactory(consumerFactory());
factory1.setConcurrency(5);
factory1.getContainerProperties().setPollTimeout(3000);
System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
return factory1;
}
}
- 消費者java代碼
@Controller
public class ConsumerController {
@KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
//獲取數據邏輯處理
}
}
2.問題描述
用postman單個線程啟用發送批量數據的情況下數據正常,但是使用jmeter啟用五個線程發送批量數據的情況下會出現消費數據丟失的問題,經過驗證排除了生產者丟數據的可能,而zookeeper是很久以前部署的公司測試環境服務器,理論上不會出問題,經過查證,kafka的消費者本身是線程不安全的,需要對消費者做下處理,個人在網上copy多線程代碼時出現了一些問題,記錄下:
參考的博主地址(2條消息) springboot集成kafka多線程定時消費_weixin_40510917的博客-CSDN博客_kafka定時消費
第二版的消費者處理類:
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
//kafka消費對象
private KafkaConsumer<Object, Object> consumer;
//線程池對象
private ExecutorService executors;
//kafka屬性配置()
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.1.240:9092");
props.put("group.id", "test01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//下面兩個參數是我新加的,不加的情況下會報錯,報錯見第三小節
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
return props;
}
//初始化kafka連接
@PostConstruct
public void initKafkaConfig() {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
}
/**
* 多線程消費kafka數據
* @param workerNum
*/
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
if(consumer!=null){
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord record : consumerRecords) {
executors.submit(new Worker(record));
commitOffset();
}
}
}else{
Properties props =initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
}
}
}
private void commitOffset(){
try{
consumer.commitAsync();
}catch(Exception E){
consumer.commitSync();
}
}
}
package com.netintech.kafka.impl;
import com.alibaba.fastjson.JSONObject;
import com.netintech.kafka.bean.Test;
import com.netintech.kafka.service.TestService;
import com.netintech.kafka.task.SendVehicleInfo;
import com.netintech.kafka.utils.SpringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
/**
* 多線程kafka消費類
*/
public class OneWork implements Runnable {
//日志類
private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
private ConsumerRecord<String, String> consumerRecord;
public OneWork(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
try{
//執行自己的邏輯
//todo
}catch (Exception e){
LOG.info("異常錯誤信息:"+e.getMessage());
}
}
}
實際調用使用的是定時器
@Controller
@EnableScheduling
public class ConController{
@Autowired
private ConsumerHandler consumers;
@Scheduled(corn="${work.start:0/1 * * * * ?}")
public void listen(){
consumer.execute(5);//這個5參數在實際中我並沒有使用,而是在代碼中寫死,如果需要可以配置。
}
}
3.報錯(沒加兩個參數之前)
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
a retriable exception.You should retry commiting the lastest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException:null
4.總結
加上參數之后,從日志來看沒有報錯,數據也沒有丟失,但是真正入庫是否可行還有待驗證,此外,報錯的原因以及為什么加上兩個參數就解決問題,我其實並不理解,有待考證