【源碼】FlinkKafkaConsumer 消費 kafka 流程


之前看 Flink Source 的 metrics 時候,看到 FlinkKafkaConsuemr 消費 Kafka 數據的代碼,感覺比較有意思,就仔細看下了

大致流程如下:

FlinkKafKaConsuemr 的啟動流程就不細講了,直接跳到 FlinkKafkaConsumerBase.run 中,創建 KafkaFetcher,並啟動 拉取kafka 數據的循環

@Override
public void run(SourceContext<T> sourceContext) throws Exception {

  // new 一個 KafkaFetcher(消費kafka 的類)
  // from this point forward:
  //   - 'snapshotState' will draw offsets from the fetcher,
  //     instead of being built from `subscribedPartitionsToStartOffsets`
  //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
  //     Kafka through the fetcher, if configured to do so)
  this.kafkaFetcher = createFetcher(
      sourceContext,
      subscribedPartitionsToStartOffsets,
      watermarkStrategy,
      (StreamingRuntimeContext) getRuntimeContext(),
      offsetCommitMode,
      getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
      useMetrics);


  // depending on whether we were restored with the current state version (1.3),
  // remaining logic branches off into 2 paths:
  //  1) New state - partition discovery loop executed as separate thread, with this
  //                 thread running the main fetcher loop
  //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
  if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
    // 執行 拉取 循環
    kafkaFetcher.runFetchLoop();
  } else {
    runWithPartitionDiscovery();
  }
}
private void runWithPartitionDiscovery() throws Exception {
  final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
  createAndStartDiscoveryLoop(discoveryLoopErrorRef);

  kafkaFetcher.runFetchLoop();

  ...
}

再看下 KafkaFetcher 的 runFetchLoop 方法: 啟動消費 kafka topic 的線程,並獲取數據(同步的 Handover 在兩個線程間使用全局變量共享數據)

@Override
public void runFetchLoop() throws Exception {
  try {
    // kick off the actual Kafka consumer
    // 啟動消費 Kafka topic 的線程
    consumerThread.start();
    // 無限循環,從 handover 中獲取 kafka 數據
    while (running) {
      // this blocks until we get the next records
      // it automatically re-throws exceptions encountered in the consumer thread
      // 從 handover 取 消費出來的 kafka 數據
      final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

      // get the records for each topic partition
      // 獲取訂閱分區的數據
      for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {

        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
          records.records(partition.getKafkaPartitionHandle());
        // 處理 一下
        partitionConsumerRecordsHandler(partitionRecords, partition);
      }
    }
  }
  finally {
    // this signals the consumer thread that no more work is to be done
    consumerThread.shutdown();
  }

  // on a clean exit, wait for the runner thread
  try {
    consumerThread.join();
  }
  catch (InterruptedException e) {
    // may be the result of a wake-up interruption after an exception.
    // we ignore this here and only restore the interruption state
    Thread.currentThread().interrupt();
  }
}


protected void partitionConsumerRecordsHandler(
    List<ConsumerRecord<byte[], byte[]>> partitionRecords,
    KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {

  for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    // 反序列化 數據
    deserializer.deserialize(record, kafkaCollector);

    // emit the actual records. this also updates offset state atomically and emits
    // watermarks
    // emit 數據 帶上 timestamp(這里也可以看出,從kafka 消費的數據,
    // 帶着 數據的 分區、offset、timestamp 信息,這些屬性在下游使用正確的姿勢,都是可以在 下游算子獲取到的)
    emitRecordsWithTimestamps(
      kafkaCollector.getRecords(),
      partition,
      record.offset(),
      record.timestamp());

    if (kafkaCollector.isEndOfStreamSignalled()) {
      // end of stream signaled
      running = false;
      break;
    }
  }
}

Fetcher 線程就是這樣的

看下 consumerThread , KafkaFetcher 初始化的時候,會同時創建一個 KafkaConsumerThread 的對象,KafkaConsumerThread 集成了 Thread 類,所以是多線程的

public class KafkaConsumerThread<T> extends Thread
@Override
public void run() {

  // this is the means to talk to FlinkKafkaConsumer's main thread
  // 中轉 對象
  final Handover handover = this.handover;

  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    // 消費線程啟動的時候,會創建 kafka consumer
    this.consumer = getConsumer(kafkaProperties);
  }
  catch (Throwable t) {
    handover.reportError(t);
    return;
  }

  // from here on, the consumer is guaranteed to be closed properly
  try {
    // register Kafka's very own metrics in Flink's metric reporters
    if (useMetrics) {
      // register Kafka metrics to Flink
      // 注冊 kafka 的 metrics 到 Flink 中,從 webUi 的 metrics 可以找到 kafka 的metrics
      Map<MetricName, ? extends Metric> metrics = consumer.metrics();
      if (metrics == null) {
        // MapR's Kafka implementation returns null here.
        log.info("Consumer implementation does not support metrics");
      } else {
        // we have Kafka metrics, register them
        for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
          consumerMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));

          // TODO this metric is kept for compatibility purposes; should remove in the future
          subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
        }
      }
    }
    // main fetch loop
    // 主要的消費 無限循環()
    while (running) {

      // check if there is something to commit
      // 提交 kafka 的 offset 
      if (!commitInProgress) {
        // get and reset the work-to-be committed, so we don't repeatedly commit the same
        final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
            nextOffsetsToCommit.getAndSet(null);

        if (commitOffsetsAndCallback != null) {
          log.debug("Sending async offset commit request to Kafka broker");

          // also record that a commit is already in progress
          // the order here matters! first set the flag, then send the commit command.
          commitInProgress = true;
          consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
        }
      }

      // get the next batch of records, unless we did not manage to hand the old batch over
      // 只有 records 是 null 的時候,才 poll 數據,不然就覆蓋了(這里涉及到多線程調用,如果當前線程在 wait 或 就緒狀態 就會沒辦法 轉移 records )
      if (records == null) {
        try {
          records = consumer.poll(pollTimeout);
        }
        catch (WakeupException we) {
          continue;
        }
      }

      try {
        // 處理消費到的數據
        handover.produce(records);
        records = null;
      }
      catch (Handover.WakeupException e) {
        // fall through the loop
      }
   

  }
}

KafkaConsuemrThread 做的事情就是,從Kafka 消費數據,通過 handover.producer 轉移數據

下面就到了中轉數據的 Handover 類了
貼下 Handover 類的注釋:

/**
 * The Handover is a utility to hand over data (a buffer of records) and exception from a
 * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
 * "size one blocking queue", with some extras around exception reporting, closing, and
 * waking up thread without {@link Thread#interrupt() interrupting} threads.
 *
 * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
 * the thread that runs the KafkaConsumer class and the main thread.
 *
 * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
 * rather than a thread interrupt.
 *
 * <p>The Handover can also be "closed", signalling from one thread to the other that it
 * the thread has terminated.
 */

Handover 是一種實用程序,用於將數據(記錄的緩沖區)和異常從<i>生產者</i>線程移交給<i>消費者</i>線程。 它的行為實際上像一個“阻塞隊列”,圍繞異常報告,關閉和喚醒線程而沒有{@link Thread#interrupt()中斷}線程。
<p> Flink Kafka Consumer中使用該類在運行KafkaConsumer類的線程與主線程之間移交數據和異常。
<p>切換的概念是使用{@link WakeupException}而不是線程中斷來“喚醒”生產者線程。
<p>還可以“關閉”切換,從一個線程向另一線程發信號,表明該線程已終止。

看下對應代碼: 兩個同步方法,生產和消費數據,使用全局變量 next 中轉,使用 同步代碼塊 保證數據的一致性

// 同步對象
private final Object lock = new Object();
// 全局變量,數據交換用
private ConsumerRecords<byte[], byte[]> next;

// 消費線程
public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
  // 進入同步方法(加鎖)
  synchronized (lock) {
    // 如果 next 是 null 的,調用 wait 是消費線程 等待(釋放鎖,等待生產線程喚醒)
    while (next == null && error == null) {
      lock.wait();
    }
    // 將 next 的值賦給 局部變量 n
    ConsumerRecords<byte[], byte[]> n = next;
    // 如果 n 不為 null
    if (n != null) {
      // 將 next 設為 null
      next = null;
      // 喚醒 生產線程
      lock.notifyAll();
      // 返回 n 到 KafkaFetcher
      return n;
    }
    else {
      ExceptionUtils.rethrowException(error, error.getMessage());

      // this statement cannot be reached since the above method always throws an exception
      // this is only here to silence the compiler and any warnings
      return ConsumerRecords.empty();
    }
  }
}
// 生產線程
public void produce(final ConsumerRecords<byte[], byte[]> element)
    throws InterruptedException, WakeupException, ClosedException {
  // 進入同步代碼塊(加鎖)
  synchronized (lock) {
    // 如果 全局變量 next 不為空,調用 wait 使線程進入等待 (釋放鎖,等待消費線程喚醒)
    while (next != null && !wakeupProducer) {
      lock.wait();
    }

    wakeupProducer = false;

    // if there is still an element, we must have been woken up
    // 到這來不等於 null 就說明多線程異常了,主動拋出異常
    if (next != null) {
      throw new WakeupException();
    }
    // if there is no error, then this is open and can accept this element
    else if (error == null) {
      // next 是 null 的,把從 kafka 消費出來的數據,放到全局變量 next 中
      next = element;
      // 調用 notifyAll 方法,喚醒 消費線程
      lock.notifyAll();
    }
    // an error marks this as closed for the producer
    else {
      throw new ClosedException();
    }
  }
  // 出同步方法,釋放鎖
}

整個流程如上面的流程圖: KafkaFetcher 啟動 kafkaConsumerThread 線程,KafkaConsumerThread (循環)從 kafka 消費到數據后,使用 Handover 的同步方法,轉移數據,KafkaFetcher (循環)同樣調用 Handover 的同步方法 獲取 KafkaConsumerThread 消費出來的數據

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM