Spring Kafka


在Spring中應用Kafka及配置

1、需要的jar包

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

兼容:

  • Apache Kafka Clients 2.0.0
  • Spring Framework 5.1.x
  • Minimum Java version: 8

 

2、Kafka配置類

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka的配置類
 *
 * @author yangyongjie
 * @date 2019/10/11
 * @desc
 */
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Value("${kafka.group.id}")
    private String groupId;

    @Value("${kafka.retries}")
    private String retries;

    /**
     * kafka消息監聽器的工廠類
     *
     * @return
     */
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        ContainerProperties containerProperties = factory.getContainerProperties();
        // 當Acknowledgment.acknowledge()方法被調用即提交offset
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 調用commitAsync()異步提交
        containerProperties.setSyncCommits(false);
        return factory;
    }

    /**
     * 消費者工廠
     *
     * @return
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消費者拉取消息配置
     *
     * @return
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        // kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // groupId
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 開啟自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 自動提交offset到zk的時間間隔,時間單位是毫秒
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // session超時設置,14秒,超過這個時間會認為此消費者掛掉,將其從消費組中移除
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "14000");
        //鍵的反序列化方式,key表示分區
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        //值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * 生產者工廠
     *
     * @return
     */
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生產者發送消息配置
     *
     * @return
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 消息發送確認方式
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        // 消息發送重試次數
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        // 重試間隔時間
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        //鍵的反序列化方式,key表示分區
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        //值的反序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * Kafka模版類,用來發送消息
     *
     * @return
     */
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }

}

 

3、發送消息

KafkaTemplate包裝了producer而且提供了便捷的方法發送消息到kafka topic。

API:

ListenableFuture<SendResult<K, V>> sendDefault(V data); //sendDefault方法需要提供給一個默認的topic
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message); //topic,partition,key等信息在message頭中定義
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
 T doInKafka(Producer<K, V> producer);
}

如:

    @Autowired
    private KafkaTemplate kafkaTemplate;

    kafkaTemplate.send("test", "this is my first demo");

 

4、消費消息

  消費消息可以通過配置一個MessageListenerContainer然后提供一個Message Listener去接收消息;或者使用@KafkaListener注解

  1、提供MessageListenerContainer和Message Listener的方式

MessageListenerContainer的兩個實現類:

  • KafkaMessageListenerContainer   使用單線程接收所有的消息

  • ConcurrentMessageListenerContainer  相當於多個KafkaMessageListenerContainer使用多個線程消費

  另外需要提供一個Message Listener,目前提供的8個messageListener接口:

public interface MessageListener<K, V> { ❶ // 調用poll()來輪詢Kafka集群的消息,並自動提交offset
 void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { ❷ // 調用poll()來輪詢Kafka集群的消息,手動提交
 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或使用提供的Consumer對象手動提交
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❸ void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); }
// 調用poll()來輪詢Kafka集群的消息,使用提供的Consumer對象手動提交
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❹ void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或手動提交,AckMode.RECORD不支持
public interface BatchMessageListener<K, V> { ❺ void onMessage(List<ConsumerRecord<K, V>> data); }
// 調用poll()來輪詢Kafka集群的消息,手動提交
public interface BatchAcknowledgingMessageListener<K, V> { ❻ void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); }
// 調用poll()來輪詢Kafka集群的消息,自動提交offset或使用提供的Consumer對象手動提交,AckMode.RECORD不支持
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❼ void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❽ // 調用poll()來輪詢Kafka集群的消息,使用提供的Consumer對象手動提交 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }

需要注意的是:Consumer不是線程安全的,只能在調用此Listener的線程上調用此方法。

 ConcurrentMessageListenerContainer,唯一的構造器和KafkaMessageListenerContainer的第一個構造器相似:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)

有個屬性:private int concurrency = 1; 其值的多少表示將會創建多少個KafkaMessageListenerContainer。

  KafkaMessageListenerContainer的第一個構造器中,當監聽多個topic時,如,需要監聽3個topic,每個topic有5個分區,那么設置concurrency=15的話,將會發現只有5個激活的Consumers,每個Consumer從每個topic中分配了一個分區,其他10個則處於空閑狀態。這是因為Kafka默認的PartitionAssignor(分區分配器)是RangeAssignor,上面的情況應該使用RoundRobinAssignor來代替。RoundRobinAssignor將會將分區分配給所有的Consumers,這樣每個Consumer會分得一個topic的一個分區。

想要切換PartitionAssignor,使用ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

  KafkaMessageListenerContainer的第二個構造器中,ConcurrentMessageListenerContainer分發多個TopicPartition給KafkaMessageListenerContainer,也就是說,當有6個分區的時候,需要提供的concurrency 為3,每個ConcurrentMessageListenerContainer獲得兩個分區;當有5個分區的時候,前兩個ConcurrentMessageListenerContainer每個獲得2個分區,最后一個獲得一個分區。

 

  Committing Offsets

  consumer poll()方法將會返回一個或多個ConsumerRecords,每個ConsumerRecord都將會調用一次MessageListener。對於每個ConsumerRecord都提供了多個選擇去提交offsets,當enable.auto.commit設置為true的時候,Kafka將自動提交offsets,如果為false的話,有以下幾種AckMode(消息確認)方式:

RECORD 提交offset當監聽器處理完每個ConsumerRecord后
BATCH 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord
TIME 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord並且距離上次提交的時間超過了ackTime
COUNT 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord並且距離上次提交已經接收了超過ackCount個ConsumerRecord
COUNT_TIME 提交offset當監聽器處理完poll()返回的所有的ConsumerRecord,當ackTime或者ackCount成立時
MANUAL 在Acknowledgment.acknowledge()中使用,和BATCH相似
MANUAL_IMMEDIATE 當Acknowledgment.acknowledge()方法被調用即提交offset

  默認的提交方式為BATCH,默認使用commitSync()同步提交。在ContainerProperties中修改

  調用commitSync()同步提交或者commitAsync()異步提交,取決於syncCommits的配置,默認為true。

 

  使用這種方式在上面的KafkaConfig配置類中加上如下配置:

使用監聽容器工廠,創建監聽容器:

 /**
     * MessageListenerContainer
     * @param myKafkaMessageListener 自定義的消息監聽器,也可使用ConcurrentMessageListenerContainer
     * @return
     */
    @Bean
    public KafkaMessageListenerContainer<Integer, String> getContainer(MyKafkaMessageListener myKafkaMessageListener) {
        ContainerProperties containerProperties = new ContainerProperties("test");
        containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);

        //使用自定義的MessageListener
        containerProperties.setMessageListener(myKafkaMessageListener);
        containerProperties.setGroupId("bssout");
        containerProperties.setClientId("myListener");

        KafkaMessageListenerContainer<Integer, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);

        return kafkaMessageListenerContainer;
    }

MyKafkaMessageListener :

@Component
public class MyKafkaMessageListener extends MyAbstractKafkaMessageListener  {

    private static final Logger LOGGER= LoggerFactory.getLogger(MyKafkaMessageListener.class);

    @Override
    public void onMessage(ConsumerRecord consumerRecord, Consumer consumer) {
        String topic=consumerRecord.topic();
        String value= (String) consumerRecord.value();
        LOGGER.info("my_value={}",value);
        consumer.commitAsync();
    }
}

MyAbstractKafkaMessageListener :

public abstract class MyAbstractKafkaMessageListener implements ConsumerAwareMessageListener {

    @Override
    public void onMessage(Object o) {
        return;
    }
}

 

  2、使用@KafkaListener注解的方式:

@Component
public class KafkaListenerTest {

    private static final Logger LOGGER= LoggerFactory.getLogger(KafkaListenerTest.class);

    /**
     * id為consumerId
     * @param record
     */
    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record,Acknowledgment acknowledgment) {
        LOGGER.info("test-value={}",record.value());
        LOGGER.info("test-topic={}",record.topic());

      // 手動提交offset
      acknowledgment.acknowledge();

    }
}

@KafkaListener注解的屬性(成員變量)詳解:

  (1)、【String id() default ""】:唯一的標識,如果沒有提供,則會自動生成,如果提供了,id的值將會覆蓋系統配置的groupId的值,除非此注解的idIsGroup屬性設置為false(默認true)。

  (2)、【String containerFactory() default ""】:KafkaListenerContainerFactory 監聽容器工廠的bean的name,若沒有指定,則使用系統默認的,如果系統配置的有多個需要指定name值

  (3)、【String[] topics() default {}】:監聽的topic,可監聽多個,值可以為具體的topic name,property-placeholder keys或者 expressions(SPEL)

  (4)、【String topicPattern() default ""】:上面topic參數的模式,可配置更加詳細的監聽信息,如監聽某個Topic的指定分區,或者從offset的某個位置開始監聽

  (5)、【TopicPartition[] topicPartitions() default {}】:監聽topic的分區信息

  (6)、【String groupId() default ""】:覆蓋系統配置的消費組groupId

  (7)、【boolean idIsGroup() default true】:如果groupId屬性沒有指定,則用id屬性的值覆蓋系統指定的groupId

  (8)、【String errorHandler() default ""】:設置一個KafkaListenerErrorHandler監聽異常處理器去處理方法拋出的異常

  (9)、【String clientIdPrefix() default ""】:如果提供了,重寫在consumerFactory中配置的client id的前綴

  (10)、【String concurrency() default ""】:重寫container factory的concurrency配置

  (11)、【String autoStartup() default ""】:是否自動啟動

  (12)、【String beanRef() default "__listener"】:真實監聽容器的BeanName,需要在 BeanName前加 "__"

 

另外這種方式要求一個配置類,在其當中有一個name為kafkaListenerContainerFactory的bean用來生成ConcurrentMessageListenerContainer,並且類上有@EnableKafka和@Configuration注解。如:

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        ...
        return props;
    }
}

 @EnableKafka注解向容器中導入了 KafkaBootstrapConfiguration 配置類,此配置類中注入了KafkaListenerAnnotationBeanPostProcessor @KafkaListener注解的后置處理器

/**
 * Enable Kafka listener annotated endpoints that are created under the covers by a
 * {@link org.springframework.kafka.config.AbstractKafkaListenerContainerFactory
 * @see KafkaListener
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}

  @KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor類解析,其實現了BeanPostProcessor接口,並在postProcessAfterInitialization方法內解析@KafkaListener注解。

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<Method>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

                        @Override
                        public Set<KafkaListener> inspect(Method method) {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        }

                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }

 

  

  更多詳細信息可參考:https://www.jianshu.com/c/0c9d83802b0c

  官方文檔:https://docs.spring.io/autorepo/docs/spring-kafka-dist/2.2.0.RELEASE/reference/html/_reference.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM