在Spring中應用Kafka及配置
1、需要的jar包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
兼容:
• Apache Kafka Clients 2.0.0
• Spring Framework 5.1.x
• Minimum Java version: 8
2、Kafka配置類
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; /** * Kafka的配置類 * * @author yangyongjie * @date 2019/10/11 * @desc */ @Configuration @EnableKafka public class KafkaConfig { @Value("${kafka.bootstrap.servers}") private String bootstrapServers; @Value("${kafka.group.id}") private String groupId; @Value("${kafka.retries}") private String retries; /** * kafka消息監聽器的工廠類 * * @return */ @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); ContainerProperties containerProperties = factory.getContainerProperties(); // 當Acknowledgment.acknowledge()方法被調用即提交offset containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 調用commitAsync()異步提交 containerProperties.setSyncCommits(false); return factory; } /** * 消費者工廠 * * @return */ @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * 消費者拉取消息配置 * * @return */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(16); // kafka集群地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // groupId props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 開啟自動提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 自動提交offset到zk的時間間隔,時間單位是毫秒 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // session超時設置,14秒,超過這個時間會認為此消費者掛掉,將其從消費組中移除 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "14000"); //鍵的反序列化方式,key表示分區 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); //值的反序列化方式 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 生產者工廠 * * @return */ @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * 生產者發送消息配置 * * @return */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(8); // kafka集群地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 消息發送確認方式 props.put(ProducerConfig.ACKS_CONFIG, "1"); // 消息發送重試次數 props.put(ProducerConfig.RETRIES_CONFIG, retries); // 重試間隔時間 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); //鍵的反序列化方式,key表示分區 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); //值的反序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** * Kafka模版類,用來發送消息 * * @return */ @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); } }
3、發送消息
KafkaTemplate包裝了producer而且提供了便捷的方法發送消息到kafka topic。
API:
ListenableFuture<SendResult<K, V>> sendDefault(V data); //sendDefault方法需要提供給一個默認的topic ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); //topic,partition,key等信息在message頭中定義 Map<MetricName, ? extends Metric> metrics(); List<PartitionInfo> partitionsFor(String topic); <T> T execute(ProducerCallback<K, V, T> callback); // Flush the producer. void flush(); interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); }
如:
@Autowired private KafkaTemplate kafkaTemplate; kafkaTemplate.send("test", "this is my first demo");
4、消費消息
消費消息可以通過配置一個MessageListenerContainer然后提供一個Message Listener去接收消息;或者使用@KafkaListener注解
1、提供MessageListenerContainer和Message Listener的方式
MessageListenerContainer的兩個實現類:
• KafkaMessageListenerContainer 使用單線程接收所有的消息
• ConcurrentMessageListenerContainer 相當於多個KafkaMessageListenerContainer使用多個線程消費
另外需要提供一個Message Listener,目前提供的8個messageListener接口:
public interface MessageListener<K, V> { ❶ // 調用poll()來輪詢Kafka集群的消息,並自動提交offset void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { ❷ // 調用poll()來輪詢Kafka集群的消息,手動提交 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); }
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或使用提供的Consumer對象手動提交 public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❸ void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); }
// 調用poll()來輪詢Kafka集群的消息,使用提供的Consumer對象手動提交 public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❹ void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或手動提交,AckMode.RECORD不支持 public interface BatchMessageListener<K, V> { ❺ void onMessage(List<ConsumerRecord<K, V>> data); }
// 調用poll()來輪詢Kafka集群的消息,手動提交 public interface BatchAcknowledgingMessageListener<K, V> { ❻ void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); }
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或使用提供的Consumer對象手動提交,AckMode.RECORD不支持 public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❼ void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❽ // 調用poll()來輪詢Kafka集群的消息,使用提供的Consumer對象手動提交 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
需要注意的是:Consumer不是線程安全的,只能在調用此Listener的線程上調用此方法。
ConcurrentMessageListenerContainer,唯一的構造器和KafkaMessageListenerContainer的第一個構造器相似:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
有個屬性:private int concurrency = 1; 其值的多少表示將會創建多少個KafkaMessageListenerContainer。
KafkaMessageListenerContainer的第一個構造器中,當監聽多個topic時,如,需要監聽3個topic,每個topic有5個分區,那么設置concurrency=15的話,將會發現只有5個激活的Consumers,每個Consumer從每個topic中分配了一個分區,其他10個則處於空閑狀態。這是因為Kafka默認的PartitionAssignor(分區分配器)是RangeAssignor,上面的情況應該使用RoundRobinAssignor來代替。RoundRobinAssignor將會將分區分配給所有的Consumers,這樣每個Consumer會分得一個topic的一個分區。
想要切換PartitionAssignor,使用ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
KafkaMessageListenerContainer的第二個構造器中,ConcurrentMessageListenerContainer分發多個TopicPartition給KafkaMessageListenerContainer,也就是說,當有6個分區的時候,需要提供的concurrency 為3,每個ConcurrentMessageListenerContainer獲得兩個分區;當有5個分區的時候,前兩個ConcurrentMessageListenerContainer每個獲得2個分區,最后一個獲得一個分區。
Committing Offsets
consumer poll()方法將會返回一個或多個ConsumerRecords,每個ConsumerRecord都將會調用一次MessageListener。對於每個ConsumerRecord都提供了多個選擇去提交offsets,當enable.auto.commit設置為true的時候,Kafka將自動提交offsets,如果為false的話,有以下幾種AckMode(消息確認)方式:
RECORD | 提交offset當監聽器處理完每個ConsumerRecord后 |
BATCH | 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord |
TIME | 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord並且距離上次提交的時間超過了ackTime |
COUNT | 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord並且距離上次提交已經接收了超過ackCount個ConsumerRecord |
COUNT_TIME | 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord,當ackTime或者ackCount成立時 |
MANUAL | 在Acknowledgment.acknowledge()中使用,和BATCH相似 |
MANUAL_IMMEDIATE | 當Acknowledgment.acknowledge()方法被調用即提交offset |
默認的提交方式為BATCH,默認使用commitSync()同步提交。在ContainerProperties中修改
調用commitSync()同步提交或者commitAsync()異步提交,取決於syncCommits的配置,默認為true。
使用這種方式在上面的KafkaConfig配置類中加上如下配置:
使用監聽容器工廠,創建監聽容器:
/** * MessageListenerContainer * @param myKafkaMessageListener 自定義的消息監聽器,也可使用ConcurrentMessageListenerContainer * @return */ @Bean public KafkaMessageListenerContainer<Integer, String> getContainer(MyKafkaMessageListener myKafkaMessageListener) { ContainerProperties containerProperties = new ContainerProperties("test"); containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); //使用自定義的MessageListener containerProperties.setMessageListener(myKafkaMessageListener); containerProperties.setGroupId("bssout"); containerProperties.setClientId("myListener"); KafkaMessageListenerContainer<Integer, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties); return kafkaMessageListenerContainer; }
MyKafkaMessageListener :
@Component public class MyKafkaMessageListener extends MyAbstractKafkaMessageListener { private static final Logger LOGGER= LoggerFactory.getLogger(MyKafkaMessageListener.class); @Override public void onMessage(ConsumerRecord consumerRecord, Consumer consumer) { String topic=consumerRecord.topic(); String value= (String) consumerRecord.value(); LOGGER.info("my_value={}",value); consumer.commitAsync(); } }
MyAbstractKafkaMessageListener :
public abstract class MyAbstractKafkaMessageListener implements ConsumerAwareMessageListener { @Override public void onMessage(Object o) { return; } }
2、使用@KafkaListener注解的方式:
@Component public class KafkaListenerTest { private static final Logger LOGGER= LoggerFactory.getLogger(KafkaListenerTest.class); /** * id為consumerId * @param record */ @KafkaListener(topics = "test") public void listen(ConsumerRecord<?, ?> record,Acknowledgment acknowledgment) { LOGGER.info("test-value={}",record.value()); LOGGER.info("test-topic={}",record.topic());
// 手動提交offset
acknowledgment.acknowledge();
}
}
@KafkaListener注解的屬性(成員變量)詳解:
(1)、【String id() default ""】:唯一的標識,如果沒有提供,則會自動生成,如果提供了,id的值將會覆蓋系統配置的groupId的值,除非此注解的idIsGroup屬性設置為false(默認true)。
(2)、【String containerFactory() default ""】:KafkaListenerContainerFactory 監聽容器工廠的bean的name,若沒有指定,則使用系統默認的,如果系統配置的有多個需要指定name值
(3)、【String[] topics() default {}】:監聽的topic,可監聽多個,值可以為具體的topic name,property-placeholder keys或者 expressions(SPEL)
(4)、【String topicPattern() default ""】:上面topic參數的模式,可配置更加詳細的監聽信息,如監聽某個Topic的指定分區,或者從offset的某個位置開始監聽
(5)、【TopicPartition[] topicPartitions() default {}】:監聽topic的分區信息
(6)、【String groupId() default ""】:覆蓋系統配置的消費組groupId
(7)、【boolean idIsGroup() default true】:如果groupId屬性沒有指定,則用id屬性的值覆蓋系統指定的groupId
(8)、【String errorHandler() default ""】:設置一個KafkaListenerErrorHandler監聽異常處理器去處理方法拋出的異常
(9)、【String clientIdPrefix() default ""】:如果提供了,重寫在consumerFactory中配置的client id的前綴
(10)、【String concurrency() default ""】:重寫container factory的concurrency配置
(11)、【String autoStartup() default ""】:是否自動啟動
(12)、【String beanRef() default "__listener"】:真實監聽容器的BeanName,需要在 BeanName前加 "__"
另外這種方式要求一個配置類,在其當中有一個name為kafkaListenerContainerFactory的bean用來生成ConcurrentMessageListenerContainer,並且類上有@EnableKafka和@Configuration注解。如:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""); ... return props; } }
@EnableKafka注解向容器中導入了 KafkaBootstrapConfiguration 配置類,此配置類中注入了KafkaListenerAnnotationBeanPostProcessor @KafkaListener注解的后置處理器
/** * Enable Kafka listener annotated endpoints that are created under the covers by a * {@link org.springframework.kafka.config.AbstractKafkaListenerContainerFactory * @see KafkaListener * @see KafkaListenerAnnotationBeanPostProcessor * @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(KafkaBootstrapConfiguration.class) public @interface EnableKafka { }
@KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor類解析,其實現了BeanPostProcessor接口,並在postProcessAfterInitialization方法內解析@KafkaListener注解。
@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<Method> multiMethods = new ArrayList<Method>(); Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() { @Override public Set<KafkaListener> inspect(Method method) { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (hasClassLevelListeners) { Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); multiMethods.addAll(methodsWithHandler); } if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); if (this.logger.isTraceEnabled()) { this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass()); } } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }
更多詳細信息可參考:https://www.jianshu.com/c/0c9d83802b0c
官方文檔:https://docs.spring.io/autorepo/docs/spring-kafka-dist/2.2.0.RELEASE/reference/html/_reference.html