flink 消費 kafka 數據,提交消費組 offset 有三種類型
- 1、開啟 checkpoint : 在 checkpoint 完成后提交
- 2、開啟 checkpoint,禁用 checkpoint 提交: 不提交消費組 offset
- 3、不開啟 checkpoint: 依賴kafka client 的自動提交
重點當然是開啟 checkpoint 的時候,怎么提交消費組的 offset
一個簡單的 flink 程序: 讀取kafka topic 數據,寫到另一個 topic
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // enable checkpoint val stateBackend = new FsStateBackend("file:///out/checkpoint") env.setStateBackend(stateBackend) env.enableCheckpointing(1 * 60 * 1000, CheckpointingMode.EXACTLY_ONCE) val prop = Common.getProp // prop.setProperty("enable.auto.commit", "true") // prop.setProperty("auto.commit.interval.ms", "15000") val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop) // kafkaSource.setCommitOffsetsOnCheckpoints(false) val kafkaProducer = new FlinkKafkaProducer[String]("kafka_offset_out", new SimpleStringSchema(), prop) // kafkaProducer.setWriteTimestampToKafka(true) env.addSource(kafkaSource) .setParallelism(1) .map(node => { node.toString + ",flinkx" }) .addSink(kafkaProducer) // execute job env.execute("KafkaToKafka")
## 1 啟動 checkpoint
開啟checkpoint 默認值就是 消費組 offset 的提交方式是: ON_CHECKPOINTS
offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的時候會設置:
FlinkKafkaConsumer 提交消費者的 offset 的行為在 FlinkKafkaConsumerBase open 的時候會設置:
@Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, // 默認值 true ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
fromConfiguration 對應代碼
public static OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) { if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; } }
當 flink 觸發一次 checkpoint 的時候,會依次調用所有算子的 notifyCheckpointComplete 方法,kafka source 會調用到 FlinkKafkaConsumerBase.notifyCheckpointComplete
注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父類
@Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { .... if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // only one commit operation must be in progress ... try { // 獲取當前checkpoint id 對應的待提交的 offset index final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); if (posInMap == -1) { LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}", getRuntimeContext().getIndexOfThisSubtask(), checkpointId); return; } // 根據 offset index 獲取 offset 值,待提交的就直接刪除了 @SuppressWarnings("unchecked") Map<KafkaTopicPartition, Long> offsets = (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); .... // 調用 KafkaFetcher的 commitInternalOffsetsToKafka 方法 提交 offset fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); ....
最后調用了 AbstractFetcher.commitInternalOffsetsToKafka
public final void commitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { // Ignore sentinels. They might appear here if snapshot has started before actual offsets values // replaced sentinels doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback); } protected abstract void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception;
AbstractFetcher.doCommitInternalOffsetsToKafka 的實現 KafkaFetcher.doCommitInternalOffsetsToKafka
使用 Map<KafkaTopicPartition, Long> offsets 構造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit
注:offset + 1 表示下一次消費的位置
@Override protected void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { @SuppressWarnings("unchecked") List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates(); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size()); for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) { Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); if (lastProcessedOffset != null) { checkState(lastProcessedOffset >= 0, "Illegal offset value to commit"); // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. // This does not affect Flink's checkpoints/saved state. long offsetToCommit = lastProcessedOffset + 1; offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit)); partition.setCommittedOffset(offsetToCommit); } } // record the work to be committed by the main consumer thread and make sure the consumer notices that consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); }
然后調用 KafkaConsumerThread.setOffsetsToCommit: 將待提交的 offset 放到 kafka 的消費線程對於的屬性 nextOffsetsToCommit 中,等待下一個消費循環提交
void setOffsetsToCommit( Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, @Nonnull KafkaCommitCallback commitCallback) { // 把待提交的 offsetsToCommit 放到 nextOffsetsToCommit 中,供 kafka 的消費線程來取 // 返回值不為 null,說明上次的沒提交完成 // record the work to be committed by the main consumer thread and make sure the consumer notices that if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) { log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + "This does not compromise Flink's checkpoint integrity."); } // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon handover.wakeupProducer(); synchronized (consumerReassignmentLock) { if (consumer != null) { consumer.wakeup(); } else { // the consumer is currently isolated for partition reassignment; // set this flag so that the wakeup state is restored once the reassignment is complete hasBufferedWakeup = true; } } }
然后就到了kafka 消費的線程,KafkaConsumerThread.run 方法中: 這里是消費 kafka 數據的地方,也提交對應消費組的offset
@Override public void run() { ... this.consumer = getConsumer(kafkaProperties); .... // 循環從kafka poll 數據 // main fetch loop while (running) { // 這里就是提交 offset 的地方了 // check if there is something to commit if (!commitInProgress) { // nextOffsetsToCommit 就是 那邊線程放入 offset 的對象了 // get and reset the work-to-be committed, so we don't repeatedly commit the same final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); // 如果取出commitOffsetsAndCallback 不為空,就異步提交 offset 到kafka if (commitOffsetsAndCallback != null) { log.debug("Sending async offset commit request to Kafka broker"); // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } ... // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } ... }
到這里就能看到 flink 的offset 提交到了 kafka 中
## 2 開啟 checkpoint 禁用 commit on checkpoint
這是啟動 checkpoing kafka consumer offset 提交的默認行為,現在看下,關閉在 checkpoint 的時候提交:
先關閉 commitOnCheckpoints
val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), Common.getProp) kafkaSource.setCommitOffsetsOnCheckpoints(false)
對應方法代碼:
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) { // enableCommitOnCheckpoints 的默認值是 true this.enableCommitOnCheckpoints = commitOnCheckpoints; return this; }
警告: 如果啟用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消費組的 offset 不會提交到 kafka,也就是說: 消費組的 offset 是不會有變化的
如下 CURRENT-OFFSET 是不會變化的:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_offset 0 4172 4691 519 - - -
官網: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
## 3 不開啟 checkpoint 模式
禁用了 checkpointing,則 Flink Kafka Consumer 依賴於內部使用的 Kafka client 自動定期 offset 提交功能
要禁用或啟用 offset 的提交,只需將 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值設置為提供的 Properties 配置中的適當值
prop.setProperty("enable.auto.commit", "true")
prop.setProperty("auto.commit.interval.ms", "15000")
然后,發現這個問題超綱了,跳過
O(∩_∩)O哈哈~
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文