轉載 https://blog.csdn.net/asd5629626/article/details/82776450 https://blog.csdn.net/asd5629626/article/details/82746771
spring-boot 版本 1.5.12
依賴使用spring-kafka1.3.3(對應kafka-clients版本0.11.0.0,請使用於kafka版本對應版本的依賴)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.12.RELEASE</version> <relativePath/> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.3.RELEASE</version> </dependency>
1、自定義監聽工廠 (resources目錄下面kafka.properties文件中定義對應參數)
##============== kafka =====================
kafka.consumer.bootstrap.servers = 192.168.11.133:9092
kafka.consumer.session.timout.ms = 15000
kafka.consumer.max.poll.interval.ms = 300000
kafka.consumer.max.poll.records = 500
kafka.consumer.heartbeat.interval.ms = 3000
kafka.consumer.group.id = person-file-manage
#消費者並發啟動個數(對應分區個數)每個listener方法
kafka.concurrency=10
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.bootstrap.servers}")
private String servers;
@Value("${kafka.consumer.session.timout.ms}")
private String sessionTimeout;
@Value("${kafka.consumer.max.poll.interval.ms}")
private String pollInterval;
@Value("${kafka.consumer.max.poll.records}")
private String pollRecords;
@Value("${kafka.consumer.heartbeat.interval.ms}")
private String heartbeatInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
/**
* 消費者基礎配置
*
* @return Map
*/
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(9);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pollRecords);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 自定義 ConcurrentKafkaListenerContainerFactory 初始化消費者
*
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* 將監聽者注入到IOC中,也可以采用注解方式,此方式只是為了便於確定監聽者的分布
*
* @return MqSinkReceiver
*/
@Bean
public MqSinkReceiver listener() {
return new MqSinkReceiver();
}
}
2、監聽器
public class MqSinkReceiver {
@Autowired
private MqListener mqListener;
private final LoggerUtilI logger = LoggerUtilI.getLogger(this.getClass().getName());
/**
* 歸檔統計
*
* @param consumerRecord 消息體
* @param ack Acknowledgment
*/
@KafkaListener(id = "clusterPersonfileConsumer", topics = {"personfile-new-clustering"}, containerFactory = "ackContainerFactory")
public void inputPersonfileNewCluster(ConsumerRecord consumerRecord, Acknowledgment ack) {
if (consumerRecord != null) {
JSONObject jsonParam = JSONObject.parseObject(consumerRecord.value().toString());
logger.info("接收到數據平台的歸檔kafka消息" + jsonParam.toString());
try {
mqListener.clusterStatistic(jsonParam);
if (ack != null) {
ack.acknowledge();
}
} catch (BusinessException | ParseException e) {
logger.error("歸檔統計異常:" + e);
}
}
}
}
3、spring-boot容器即可
#消費者並發啟動個數(對應分區個數)每個listener方法
kafka.concurrency=10
將啟動器的並發提高到和分區數一致
kafka 消費能力的提高
1、自動提交的實現
2、autoCommitIntervalMs 設置每次隔多久自動提交offset
3、kafka.max.poll.interval.ms 和 sessionTimeout
max.poll.interval.ms ,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group
之后就會觸發導致reblance
·heartbeat.interval.ms
心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。
這個值必須設置的小於session.timeout.ms,因為:
當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。
通常設置的值要低於session.timeout.ms的1/3。
默認值是:3000 (3s)
·session.timeout.ms
Consumer session 過期時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
其默認值是:10000 (10 s)