flink-connector-kafka consumer checkpoint源碼分析


轉發請注明原創地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html 

 

flink-connector-kafka consumer的topic分區分配源碼一文提到了在flink-connector-kafka的consumer初始化的時候有三種offset提交模式:KAFKA_PERIODIC,DISABLED和ON_CHECKPOINTS。

其中ON_CHECKPOINTS表示在flink做完checkpoint后主動向kafka提交offset的方法,本文主要分析一下flink-connector-kafka在源碼如何使用checkpoint機制實現offset的恢復和提交。

 flink conusmer的實現基類FlinkKafkaConsumerBase定義如下,這個類實現了了與checkpoin相關的三個接口CheckpointedFunction,CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>,CheckpointListener。根據官網文檔,CheckpointedRestoring的restoreState()方法已經被CheckpointedFunction的initializeState取代,所以重點關注三個方法實現

1initializeState()  實例初始化或者recover的時候調用

2snapshotState() 每次創建checkpoint的時候調用

3 notifyCheckpointComplete() 每次checkpoint結束的時候調用

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction,
        CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
View Code

 

initializeState

    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {

        // we might have been restored via restoreState() which restores from legacy operator state
        if (!restored) {
            restored = context.isRestored();
        }

        OperatorStateStore stateStore = context.getOperatorStateStore();
        offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

        if (context.isRestored()) {
            if (restoredState == null) {
                restoredState = new HashMap<>();
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                    restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
                }

                LOG.info("Setting restore state in the FlinkKafkaConsumer.");
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using the following offsets: {}", restoredState);
                }
            }
        } else {
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
    }
View Code

這個方法的邏輯比較簡單,在task恢復的時候從stateStore中序列化出來之前存儲的ListState<Tuple2<KafkaTopicPartition, Long>> 狀態數據,並放到restoredState這個變量,用於下面open方法直接恢復對應的分區和offset起始值。

 

snapshotState

    @Override
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {

            offsetsStateForCheckpoint.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    offsetsStateForCheckpoint.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }
View Code

snapshot方法創建checkpoint的做法是把當前的KafkaTopicPartition和目前消費到的offset值不斷存放到offsetsStateForCheckpoint這個state對象里,然后把當前的checkpointid和對應的offset存到pendingOffsetsToCommit這個linkmap。當前offset的獲取分兩個情況,初始化的時候(if (fetcher == null) {...})和fetcher已經初始化成功,初始化的時候從restoredState獲取,正常運行中獲取fetcher.snapshotCurrentState()。

 

notifyCheckpointComplete

public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // only one commit operation must be in progress
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
            }

            try {
                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                @SuppressWarnings("unchecked")
                HashMap<KafkaTopicPartition, Long> offsets =
                    (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingOffsetsToCommit.remove(0);
                }

                if (offsets == null || offsets.size() == 0) {
                    LOG.debug("Checkpoint state was empty.");
                    return;
                }

                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            } catch (Exception e) {
                if (running) {
                    throw e;
                }
                // else ignore exception if we are no longer running
            }
        }
    }
View Code

notifyCheckpointComplete主要是在checkpoint結束后在ON_CHECKPOINTS的情況下向kafka集群commit offset,方法調用時會拿到已經完成的checkpointid,從前文的pendingOffsetsToCommit列表里找到對應的offset。如果判斷索引不存在,則直接退出。否則,移除該索引對應的快照信息,然后將小於當前索引(較舊的)的快照信息也一並移除(這一點我之前解釋過,因為所有的檢查點都是按時間遞增有序的)。最后將當前完成的檢查點對應的消息的偏移量進行commit,也即commitOffsets。只不過這里該方法被定義為抽象方法,因為Kafka不同版本的API差別的原因,由適配不同版本的consumer各自實現,目前kafka09和010實現都是在Kafka09Fetcher內實現的commitInternalOffsetsToKafka方法。

 

參考文檔:

http://blog.csdn.net/yanghua_kobe/article/details/51503885


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM