springboot + @KafkaListener 手動提交及消費能力優化


轉載 https://blog.csdn.net/asd5629626/article/details/82776450  https://blog.csdn.net/asd5629626/article/details/82746771

spring-boot 版本 1.5.12

依賴使用spring-kafka1.3.3(對應kafka-clients版本0.11.0.0,請使用於kafka版本對應版本的依賴)

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/>
</parent> 

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.3.RELEASE</version>
</dependency>

1、自定義監聽工廠  (resources目錄下面kafka.properties文件中定義對應參數)

 
         

##============== kafka =====================
kafka.consumer.bootstrap.servers = 192.168.11.133:9092
kafka.consumer.session.timout.ms = 15000
kafka.consumer.max.poll.interval.ms = 300000
kafka.consumer.max.poll.records = 500
kafka.consumer.heartbeat.interval.ms = 3000
kafka.consumer.group.id = person-file-manage

#消費者並發啟動個數(對應分區個數)每個listener方法

kafka.concurrency=10

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value("${kafka.consumer.bootstrap.servers}")
private String servers;

@Value("${kafka.consumer.session.timout.ms}")
private String sessionTimeout;

@Value("${kafka.consumer.max.poll.interval.ms}")
private String pollInterval;

@Value("${kafka.consumer.max.poll.records}")
private String pollRecords;

@Value("${kafka.consumer.heartbeat.interval.ms}")
private String heartbeatInterval;

@Value("${kafka.consumer.group.id}")
private String groupId;

/**
* 消費者基礎配置
*
* @return Map
*/
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(9);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pollRecords);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

/**
* 自定義 ConcurrentKafkaListenerContainerFactory 初始化消費者
*
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}

/**
* 將監聽者注入到IOC中,也可以采用注解方式,此方式只是為了便於確定監聽者的分布
*
* @return MqSinkReceiver
*/
@Bean
public MqSinkReceiver listener() {
return new MqSinkReceiver();
}

}

 


2、監聽器

 
         
public class MqSinkReceiver {

@Autowired
private MqListener mqListener;

private final LoggerUtilI logger = LoggerUtilI.getLogger(this.getClass().getName());

/**
* 歸檔統計
*
* @param consumerRecord 消息體
* @param ack Acknowledgment
*/
@KafkaListener(id = "clusterPersonfileConsumer", topics = {"personfile-new-clustering"}, containerFactory = "ackContainerFactory")
public void inputPersonfileNewCluster(ConsumerRecord consumerRecord, Acknowledgment ack) {
if (consumerRecord != null) {
JSONObject jsonParam = JSONObject.parseObject(consumerRecord.value().toString());
logger.info("接收到數據平台的歸檔kafka消息" + jsonParam.toString());
try {
mqListener.clusterStatistic(jsonParam);
if (ack != null) {
ack.acknowledge();
}
} catch (BusinessException | ParseException e) {
logger.error("歸檔統計異常:" + e);
}
}
}
}
 

3、spring-boot容器即可

#消費者並發啟動個數(對應分區個數)每個listener方法
kafka.concurrency=10
將啟動器的並發提高到和分區數一致

 

kafka 消費能力的提高

1、自動提交的實現

2、autoCommitIntervalMs 設置每次隔多久自動提交offset

3、kafka.max.poll.interval.ms 和 sessionTimeout

max.poll.interval.ms ,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group

之后就會觸發導致reblance

·heartbeat.interval.ms

心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。

    這個值必須設置的小於session.timeout.ms,因為:

當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

    通常設置的值要低於session.timeout.ms的1/3。

    默認值是:3000 (3s)

·session.timeout.ms

Consumer session 過期時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。

其默認值是:10000 (10 s)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM