spring kafka consumer原理解析一


前言:Spring kafka 是 Spring 對 kafka API的一次封裝,省去了寫生產和消費的代碼,而只需要加個注解即可使用 kafka 收發消息。然而 Spring 是如何實現的呢?與我們自己手寫有啥不同?see ↓

以下僅對消費者源碼進行分析:

一、消費者的創建

老套路:查看源碼首先是找到入口,無可厚非,spring kafka 的入口即為@KafkaListenner注解,因為我們在使用Spring kafka時配置一個@KafkaListnner即可消費到 kafka 的數據。

(1)入口:KafkaListenerAnnotationBeanPostProcessor 一個實現了 BPP 的類,以下只貼出核心的幾個方法 ↓

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); // 找到所有有@KafkaListenner的方法和類 final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<Method>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

                        @Override
                        public Set<KafkaListener> inspect(Method method) {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        }

                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        new ReflectionUtils.MethodFilter() {

                            @Override
                            public boolean matches(Method method) {
                                return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
                            }

                        });
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); // 然后跟着這行代碼往下面這個方法看,接下來對取到的有@KafkaListenner類和方法進行了什么處理
            }
        }
        return bean;
    }

    private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
            Object bean, String beanName) {
        List<Method> checkedMethods = new ArrayList<Method>();
        for (Method method : multiMethods) {
            checkedMethods.add(checkProxy(method, bean));
        }
     // 這里會遍歷拿到的 classLevelListeners 集合,即每一個 kafkaListenner, 然后創建了一個 endpoint 對象,接下來看 processListenner 方法
for (KafkaListener classLevelListener : classLevelListeners) { MultiMethodKafkaListenerEndpoint<K, V> endpoint = new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods, bean); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName); } } protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) {
     // 從這個方法可以知道,spring 會把拿到的 kafkaListenner 的一些信息(topic, partitions等)封裝到那個 endpoint 里面 endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(
this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); String group = kafkaListener.group(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } KafkaListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } }      // 封裝成了 endpoint 對象之后則進行注冊 this.registrar.registerEndpoint(endpoint, factory); } }

通過分析這個 BPP 可以獲得的信息是:在  IOC 容器初始化的時候獲取所有帶有 @KafkaListenner 注解的方法或類,然后將其信息封裝到 endpoint 對象中,最后則對 endpoint 對象進行注冊。接下來對注冊進行分析 ↓

(2)KafkaListenerEndpointRegistrar 一個實現了 InitializingBean 的類,為啥要實現它呢?暫且埋下伏筆。

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();

    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // 可以看出 這里又將 endpoint 封裝到了 KafkaListenerEndpointDescriptor 對象中,然后將其注冊(保存)到了endpointDescriptors (一個List)中去。
// 於是從一開始根據 @KafkaListener 注解獲取對象到注冊則全部結束,是不是 consumer 創建就到此結束?當然還有一些工作,接下來就是真正核心的東西:創建 kafkaListener 容器
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this
.endpointDescriptors.add(descriptor); } } }
// 創建容器 start, 這就是為什么要實現 InitializingBean 接口的原因。因為在獲取 @KafkaListener 的一些信息以及注冊完成之后,還需要創建 KafkaListener 容器 @Override
public void afterPropertiesSet() { registerAllEndpoints(); } protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) {
      // 這里發現,spring 對每一個 kafkaListener 注冊一個 ListenerContainer 容器,即有多少個 @KafkaListener 它就會注冊多少個 Listener 容器 
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } } }

從以上解析可以知道:spring 會將 @KafkaListener 注解中的信息注冊到一個 list 集合中,信息注冊完成之后則會進行容器的創建,有多少個 @KafkaListener 就注冊多少個 Listener 容器。然而,容器到底是什么容器呢?是如何注冊的?又有什么用呢?

(3)KafkaListenerEndpointRegistry   在這個類里我們將看到 Spring 是如何進行容器注冊的

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
        ApplicationListener<ContextRefreshedEvent> {

  private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<String, MessageListenerContainer>();

    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        registerListenerContainer(endpoint, factory, false); }
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null");      // 獲取 id 即為 @KafkaListener 里的 id 作為 Map 的 key 值 String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'");
       // 創建容器,這里是創建什么容器,又是如何創建的呢? MessageListenerContainer container
= createListenerContainer(endpoint, factory);
      // 將容器注冊到 listenerContainers (一個 Map)中 this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); }
       // 容器啟動
if (startImmediately) { startIfNecessary(container); } } } }

單單這個類遠遠滿足不了我們對容器注冊的好奇心,這里我們只能獲取到這些信息:創建容器然后將容器注冊到了一個 Map 中,最后是啟動容器。然而容器是什么容器又是如何創建的還未知,於是接着摸索 ↓

(4)ConcurrentKafkaListenerContainerFactory  通過不懈的追蹤,終於找到了 ‘元凶’.是的就是這個類創建了 Listener 容器。

public class ConcurrentKafkaListenerContainerFactory<K, V>
        extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {

  @Override
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) { Collection<TopicPartitionInitialOffset> topicPartitions = endpoint.getTopicPartitions(); if (!topicPartitions.isEmpty()) { ContainerProperties properties = new ContainerProperties( topicPartitions.toArray(new TopicPartitionInitialOffset[topicPartitions.size()])); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } else { Collection<String> topics = endpoint.getTopics(); if (!topics.isEmpty()) { ContainerProperties properties = new ContainerProperties(topics.toArray(new String[topics.size()])); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } else { ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } } } }

哎呀,原來是創建了 ConcurrentMessageListenerContainer 啊,現在終於明白了 Spring 原來是創建這個 Listener 容器。然而這個容器又是何方聖神,come on ↓

(5)ConcurrentMessageListenerContainer  於是明白了容器的注冊過程(創建 ConcurrentMessageListenerContainer  容器,然后把容器注冊到 listenerContainers (一個 Map)中,最后就是啟動容器)之后,就是研究容器究竟做了什么。

// 大家留意一下 AbstractMessageListenerContainer 這個類喔,待會有驚喜
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

   private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

    private int concurrency = 1;

    @Override
    protected void doStart() {
        if (!isRunning()) {
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
       // 這里可以知道,分區數不能小於這個 concurrency,這個東西到底是什么呢?現在知道它是一個 int 屬性而且默認值為1
if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true);        // 然后根據 concurrency 遍歷創建 KafkaMessageListenerContainer,並啟動這個容器,最后將其注冊到 containers (一個 List)中 for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } if (getBeanName() != null) { container.setBeanName(getBeanName() + "-" + i); } if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.start(); this.containers.add(container); } } } }

看了這里是不是有些疑問?為啥容器里面又創建容器?而且這個容器創建的個數還可以使用 concurrency 指定。那么這個 KafkaMessageListenerContainer 又是何方聖神,它和 ConcurrentMessageListenerContainer  又有什么關系呢?帶着這些疑問繼續追尋。

(6)KafkaMessageListenerContainer 

// 大家發現沒,這個類也繼承了 AbstractMessageListenerContainer,這一點可以知道,此類和 ConcurrentMessageListenerContainer 是來自一個老子的(兄弟關系)
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

  private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener;

    @Override
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();

        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof GenericAcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
        }
        else if (messageListener instanceof GenericMessageListener) {
            this.listener = (GenericMessageListener<?>) messageListener;
        }
        else {
            throw new IllegalStateException("messageListener must be 'MessageListener' "
                    + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        if (containerProperties.getListenerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-L-");
            containerProperties.setListenerTaskExecutor(listenerExecutor);
        }
     // 終於,在這里我們知道原來 Spring 是在這里創建的 kafka consumer,不信可以到這個 ListenerConsumer 里去瞧瞧
this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true);
     // 看到這個 Future 似乎 get 到了啥東西。是的,多線程,這里使用了多線程-為每一個listenerConsumer開啟一個線程 this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this
.listenerConsumer); } }

通過對這個類的分析,我們明白了 KafkaMessageListenerContainer 和 ConcurrentMessageListenerContainer  的關系,而且還發現 Spring 實際創建 kafka consumer 也是在這個容器中發生的,而且還嗅到了多線程。於是進一步確認,經過資料查閱果然:

//接口,封裝原生KafkaConsumer,一個container封裝一個consumer
interface MessageListenerContainer;
//單線程container實現,只啟動一個consumer
class KafkaMessageListenerContainer implemets MessageListenerContainer;
//多線程container實現,負責創建多個KafkaMessageListenerContainer
class ConcurrentMessageListenerContainer implemets MessageListenerContainer;

//接口,工廠模式,container工廠,負責創建container,當使用@KafkaListener時需要提供
interface KafkaListenerContainerFactory<C extends MessageListenerContainer>;
//container工廠的唯一實現,且參數為多線程container,如果需要單線程,setConsurrency(null)即可,這也是默認參數
class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>

大家看到了沒有,一切都水落石出:原來 Spring 提供了單線程和多線程兩種方式使用 kafka consumer,而是否使用多線程由之前那個 int 屬性  concurrency 變量決定,而且在分析 ConcurrentMessageListenerContainer  的時候知道它的默認值是1,也就是默認是使用單線程的。而且這個線程數還不能大於分區數。順便提及一下:spring kafka 提供了兩種提交方式,手動提交和自動提交,而手動提交 Spring 是如何監聽到的呢,這個就是在 KafkaMessageListenerContainer 我們看到的那個 acknowledgingMessageListener 做出的貢獻了。

 

消費者剩下花絮請看:spring kafka consumer原理解析二


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM