kafka在springboot環境下多線程請求和多線程消費


kafka在springboot環境下多線程請求和多線程消費

1.需求描述:

接到一個需求,A模塊將某些渠道獲取的數據發送到kafka,B模塊從kafka消費數據,設置的主題是r2p5,即設置了5個分區,為了消費速度最大化,代碼中設置了五個線程

開發完生產者的代碼如下:

//pom.xml引入kafka配置包
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
   <!--此處的版本號依賴父版本,省略-->
</dependency>

//java代碼
@RestController
public class TestController {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @RequestMapping("/test")
    public void test(){
        kafkaTemplate.send("topicName","msg");
    }
}

消費者代碼如下(配置類是百度的,因為代碼在內網,單敲麻煩):

1)配置類



@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
    final static String list ="10.28.18.103:6667";
 
    /**
     * Description:獲取配置
     * Date:        2017年7月11日
     * @author      shaqf
     */
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
        return props;
    }
    /** 獲取工廠 */
    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }
    /** 獲取實例 */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
        factory1.setConsumerFactory(consumerFactory());
        factory1.setConcurrency(5);
        factory1.getContainerProperties().setPollTimeout(3000);
        System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
        return factory1;
    }
 
   
}

  1. 消費者java代碼
@Controller
public class ConsumerController {
    
    @KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
    public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
       //獲取數據邏輯處理
    }
}

2.問題描述

用postman單個線程啟用發送批量數據的情況下數據正常,但是使用jmeter啟用五個線程發送批量數據的情況下會出現消費數據丟失的問題,經過驗證排除了生產者丟數據的可能,而zookeeper是很久以前部署的公司測試環境服務器,理論上不會出問題,經過查證,kafka的消費者本身是線程不安全的,需要對消費者做下處理,個人在網上copy多線程代碼時出現了一些問題,記錄下:

參考的博主地址(2條消息) springboot集成kafka多線程定時消費_weixin_40510917的博客-CSDN博客_kafka定時消費

第二版的消費者處理類:

package com.example.demo_kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class ConsumerHandler {
    //kafka消費對象
    private KafkaConsumer<Object, Object> consumer;
    //線程池對象
    private ExecutorService executors;
    //kafka屬性配置()
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.1.240:9092");
        props.put("group.id", "test01");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //下面兩個參數是我新加的,不加的情況下會報錯,報錯見第三小節
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
        return props;
    }
    //初始化kafka連接
    @PostConstruct
    public void initKafkaConfig() {
        Properties properties = initConfig();
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("test001"));
    }

    /**
     * 多線程消費kafka數據
     * @param workerNum
     */
    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (true) {
            if(consumer!=null){
                 ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
            if (!consumerRecords.isEmpty()) {
                for (final ConsumerRecord record : consumerRecords) {
                    executors.submit(new Worker(record));
                    commitOffset();
                }
            }
            }else{
                Properties props =initConfig();
                consumer = new KafkaConsumer<>(properties);
                consumer.subscribe(Collections.singleton("test001"));
            }
           
        }

    }
    private void commitOffset(){
        try{
            consumer.commitAsync();
            
        }catch(Exception E){
            consumer.commitSync();
        }
    }
}


package com.netintech.kafka.impl;
import com.alibaba.fastjson.JSONObject;
import com.netintech.kafka.bean.Test;
import com.netintech.kafka.service.TestService;
import com.netintech.kafka.task.SendVehicleInfo;
import com.netintech.kafka.utils.SpringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
 
/**
 * 多線程kafka消費類
 */
public class OneWork implements Runnable {
 
      //日志類
      private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
 
      private ConsumerRecord<String, String> consumerRecord;
 
      public OneWork(ConsumerRecord record) {
         this.consumerRecord = record;
      }
 
      @Override
      public void run() {
            try{
                  //執行自己的邏輯
                  //todo
            }catch (Exception e){
                  LOG.info("異常錯誤信息:"+e.getMessage());
            }
      }
}

實際調用使用的是定時器

@Controller
@EnableScheduling
public class ConController{
    @Autowired
    private ConsumerHandler consumers;
    @Scheduled(corn="${work.start:0/1 * * * * ?}")
    public void listen(){
        consumer.execute(5);//這個5參數在實際中我並沒有使用,而是在代碼中寫死,如果需要可以配置。
    }
}

3.報錯(沒加兩個參數之前)

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
a retriable exception.You should retry commiting the lastest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException:null

4.總結

加上參數之后,從日志來看沒有報錯,數據也沒有丟失,但是真正入庫是否可行還有待驗證,此外,報錯的原因以及為什么加上兩個參數就解決問題,我其實並不理解,有待考證


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM