Spring for Apache Kafka @KafkaListener使用及注意事項


 

 

官方文檔:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

 

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

 

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo類:

public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }


配置類及注解:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM