之前看 Flink Source 的 metrics 時候,看到 FlinkKafkaConsuemr 消費 Kafka 數據的代碼,感覺比較有意思,就仔細看下了
大致流程如下:
FlinkKafKaConsuemr 的啟動流程就不細講了,直接跳到 FlinkKafkaConsumerBase.run 中,創建 KafkaFetcher,並啟動 拉取kafka 數據的循環
@Override public void run(SourceContext<T> sourceContext) throws Exception { // new 一個 KafkaFetcher(消費kafka 的類) // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { // 執行 拉取 循環 kafkaFetcher.runFetchLoop(); } else { runWithPartitionDiscovery(); } } private void runWithPartitionDiscovery() throws Exception { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); createAndStartDiscoveryLoop(discoveryLoopErrorRef); kafkaFetcher.runFetchLoop(); ... }
再看下 KafkaFetcher 的 runFetchLoop 方法: 啟動消費 kafka topic 的線程,並獲取數據(同步的 Handover 在兩個線程間使用全局變量共享數據)
@Override public void runFetchLoop() throws Exception { try { // kick off the actual Kafka consumer // 啟動消費 Kafka topic 的線程 consumerThread.start(); // 無限循環,從 handover 中獲取 kafka 數據 while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread // 從 handover 取 消費出來的 kafka 數據 final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition // 獲取訂閱分區的數據 for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); // 處理 一下 partitionConsumerRecordsHandler(partitionRecords, partition); } } } finally { // this signals the consumer thread that no more work is to be done consumerThread.shutdown(); } // on a clean exit, wait for the runner thread try { consumerThread.join(); } catch (InterruptedException e) { // may be the result of a wake-up interruption after an exception. // we ignore this here and only restore the interruption state Thread.currentThread().interrupt(); } } protected void partitionConsumerRecordsHandler( List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception { for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { // 反序列化 數據 deserializer.deserialize(record, kafkaCollector); // emit the actual records. this also updates offset state atomically and emits // watermarks // emit 數據 帶上 timestamp(這里也可以看出,從kafka 消費的數據, // 帶着 數據的 分區、offset、timestamp 信息,這些屬性在下游使用正確的姿勢,都是可以在 下游算子獲取到的) emitRecordsWithTimestamps( kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()); if (kafkaCollector.isEndOfStreamSignalled()) { // end of stream signaled running = false; break; } } }
Fetcher 線程就是這樣的
看下 consumerThread , KafkaFetcher 初始化的時候,會同時創建一個 KafkaConsumerThread 的對象,KafkaConsumerThread 集成了 Thread 類,所以是多線程的
public class KafkaConsumerThread<T> extends Thread
@Override public void run() { // this is the means to talk to FlinkKafkaConsumer's main thread // 中轉 對象 final Handover handover = this.handover; // This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. try { // 消費線程啟動的時候,會創建 kafka consumer this.consumer = getConsumer(kafkaProperties); } catch (Throwable t) { handover.reportError(t); return; } // from here on, the consumer is guaranteed to be closed properly try { // register Kafka's very own metrics in Flink's metric reporters if (useMetrics) { // register Kafka metrics to Flink // 注冊 kafka 的 metrics 到 Flink 中,從 webUi 的 metrics 可以找到 kafka 的metrics Map<MetricName, ? extends Metric> metrics = consumer.metrics(); if (metrics == null) { // MapR's Kafka implementation returns null here. log.info("Consumer implementation does not support metrics"); } else { // we have Kafka metrics, register them for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { consumerMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); // TODO this metric is kept for compatibility purposes; should remove in the future subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); } } } // main fetch loop // 主要的消費 無限循環() while (running) { // check if there is something to commit // 提交 kafka 的 offset if (!commitInProgress) { // get and reset the work-to-be committed, so we don't repeatedly commit the same final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); if (commitOffsetsAndCallback != null) { log.debug("Sending async offset commit request to Kafka broker"); // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } // get the next batch of records, unless we did not manage to hand the old batch over // 只有 records 是 null 的時候,才 poll 數據,不然就覆蓋了(這里涉及到多線程調用,如果當前線程在 wait 或 就緒狀態 就會沒辦法 轉移 records ) if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { // 處理消費到的數據 handover.produce(records); records = null; } catch (Handover.WakeupException e) { // fall through the loop } } }
KafkaConsuemrThread 做的事情就是,從Kafka 消費數據,通過 handover.producer 轉移數據
下面就到了中轉數據的 Handover 類了
貼下 Handover 類的注釋:
/** * The Handover is a utility to hand over data (a buffer of records) and exception from a * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a * "size one blocking queue", with some extras around exception reporting, closing, and * waking up thread without {@link Thread#interrupt() interrupting} threads. * * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between * the thread that runs the KafkaConsumer class and the main thread. * * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException} * rather than a thread interrupt. * * <p>The Handover can also be "closed", signalling from one thread to the other that it * the thread has terminated. */ Handover 是一種實用程序,用於將數據(記錄的緩沖區)和異常從<i>生產者</i>線程移交給<i>消費者</i>線程。 它的行為實際上像一個“阻塞隊列”,圍繞異常報告,關閉和喚醒線程而沒有{@link Thread#interrupt()中斷}線程。 <p> Flink Kafka Consumer中使用該類在運行KafkaConsumer類的線程與主線程之間移交數據和異常。 <p>切換的概念是使用{@link WakeupException}而不是線程中斷來“喚醒”生產者線程。 <p>還可以“關閉”切換,從一個線程向另一線程發信號,表明該線程已終止。
看下對應代碼: 兩個同步方法,生產和消費數據,使用全局變量 next 中轉,使用 同步代碼塊 保證數據的一致性
// 同步對象 private final Object lock = new Object(); // 全局變量,數據交換用 private ConsumerRecords<byte[], byte[]> next; // 消費線程 public ConsumerRecords<byte[], byte[]> pollNext() throws Exception { // 進入同步方法(加鎖) synchronized (lock) { // 如果 next 是 null 的,調用 wait 是消費線程 等待(釋放鎖,等待生產線程喚醒) while (next == null && error == null) { lock.wait(); } // 將 next 的值賦給 局部變量 n ConsumerRecords<byte[], byte[]> n = next; // 如果 n 不為 null if (n != null) { // 將 next 設為 null next = null; // 喚醒 生產線程 lock.notifyAll(); // 返回 n 到 KafkaFetcher return n; } else { ExceptionUtils.rethrowException(error, error.getMessage()); // this statement cannot be reached since the above method always throws an exception // this is only here to silence the compiler and any warnings return ConsumerRecords.empty(); } } } // 生產線程 public void produce(final ConsumerRecords<byte[], byte[]> element) throws InterruptedException, WakeupException, ClosedException { // 進入同步代碼塊(加鎖) synchronized (lock) { // 如果 全局變量 next 不為空,調用 wait 使線程進入等待 (釋放鎖,等待消費線程喚醒) while (next != null && !wakeupProducer) { lock.wait(); } wakeupProducer = false; // if there is still an element, we must have been woken up // 到這來不等於 null 就說明多線程異常了,主動拋出異常 if (next != null) { throw new WakeupException(); } // if there is no error, then this is open and can accept this element else if (error == null) { // next 是 null 的,把從 kafka 消費出來的數據,放到全局變量 next 中 next = element; // 調用 notifyAll 方法,喚醒 消費線程 lock.notifyAll(); } // an error marks this as closed for the producer else { throw new ClosedException(); } } // 出同步方法,釋放鎖 }
整個流程如上面的流程圖: KafkaFetcher 啟動 kafkaConsumerThread 線程,KafkaConsumerThread (循環)從 kafka 消費到數據后,使用 Handover 的同步方法,轉移數據,KafkaFetcher (循環)同樣調用 Handover 的同步方法 獲取 KafkaConsumerThread 消費出來的數據
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文