spring kafka consumer原理解析二


前言:

 在 spring kafka consumer原理解析一 里談到了spring kafka 容器的加載過程,即每個 @KafkaListenner 會對應加載一個 CurrentMessageListennerContainer(一個多線程 kafka容器),而在 CurrentMessageListennerContainer 里面其實是創建了多個KafkaMessageListennerContainer (一個單線程 kafka 容器),到底創建幾個是有 currency 這個參數指定。最終是在 kafkaMessageListennerContainer 創建 ListenerConsumer (一個線程類)調用 KafkaConsumer API 進行 kafka 的操作。然而通過調用 Kafka API 是如何進行操作的呢? see ↓

解析:

(1)ListenerConsumer (這個線程類是 Spring kafka 最終實現消費者代碼的核心所在)

 

     private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
     @Override
public void run() { if (this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.theListener).registerSeekCallback(this); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) {
          // 初始化分區:也就是通過 seekToEnd(kafka consumer API)獲取最新的偏移量,然后將其保存起來 initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to // trigger it, but only if the container is not set to autocommit // otherwise we will process records on a separate thread
          // 判斷是否是自動提交,如果不是自動提交則新開一個線程將數據(records)作為參數推至帶有 @KafkaListener 方法中,然后手動提交為什么要新開一個線程呢?繼續往下探索 ↓ if (!this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try {
            // 如果是手動提交則判斷現在是否要進行 commit
if (!this.autoCommit) { processCommits(); }
            // 獲取最新的偏移量 processSeeks();
if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); }
            // 拉取數據 ConsumerRecords
<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue
               // 如果是自動提交則直接在這里將數據推送至帶有 @KafkaListener 的方法中,相比手動提交不同的是:手動提交時新開了一個線程處理,而自動提交是在主線程執行這段邏輯。
               // 為什么呢?這里說得很明白,因為設置為手動提交的時候,spring kafka 會把拉取下來的數據線放入一個隊列中緩存起來,而推送線程是每次從那個緩存隊列中獲取數據推送過去,
               // 而自動提交則是每次獲取一點數據就直接推送過去 if (this.autoCommit) { invokeListener(records); } else {
                 // 這里是將數據存至緩存隊列,如果緩存隊列滿了,則會調用 pause 暫停分區,即會停止拉取數據,直至緩存隊列達到未滿狀態
if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; if (this.theListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } if (this.listenerInvokerFuture != null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM