轉發請注明原創地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html
《flink-connector-kafka consumer的topic分區分配源碼》一文提到了在flink-connector-kafka的consumer初始化的時候有三種offset提交模式:KAFKA_PERIODIC,DISABLED和ON_CHECKPOINTS。
其中ON_CHECKPOINTS表示在flink做完checkpoint后主動向kafka提交offset的方法,本文主要分析一下flink-connector-kafka在源碼如何使用checkpoint機制實現offset的恢復和提交。
flink conusmer的實現基類FlinkKafkaConsumerBase定義如下,這個類實現了了與checkpoin相關的三個接口CheckpointedFunction,CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>,CheckpointListener。根據官網文檔,CheckpointedRestoring的restoreState()方法已經被CheckpointedFunction的initializeState取代,所以重點關注三個方法實現
1initializeState() 實例初始化或者recover的時候調用
2snapshotState() 每次創建checkpoint的時候調用
3 notifyCheckpointComplete() 每次checkpoint結束的時候調用
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction, CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
initializeState
@Override public final void initializeState(FunctionInitializationContext context) throws Exception { // we might have been restored via restoreState() which restores from legacy operator state if (!restored) { restored = context.isRestored(); } OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { if (restoredState == null) { restoredState = new HashMap<>(); for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Setting restore state in the FlinkKafkaConsumer."); if (LOG.isDebugEnabled()) { LOG.debug("Using the following offsets: {}", restoredState); } } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } }
這個方法的邏輯比較簡單,在task恢復的時候從stateStore中序列化出來之前存儲的ListState<Tuple2<KafkaTopicPartition, Long>> 狀態數據,並放到restoredState這個變量,用於下面open方法直接恢復對應的分區和offset起始值。
snapshotState
@Override public final void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("snapshotState() called on closed source"); } else { offsetsStateForCheckpoint.clear(); final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); } } else { HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); } for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { offsetsStateForCheckpoint.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // truncate the map of pending offsets to commit, to prevent infinite growth while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { pendingOffsetsToCommit.remove(0); } } } }
snapshot方法創建checkpoint的做法是把當前的KafkaTopicPartition和目前消費到的offset值不斷存放到offsetsStateForCheckpoint這個state對象里,然后把當前的checkpointid和對應的offset存到pendingOffsetsToCommit這個linkmap。當前offset的獲取分兩個情況,初始化的時候(if (fetcher == null) {...})和fetcher已經初始化成功,初始化的時候從restoredState獲取,正常運行中獲取fetcher.snapshotCurrentState()。
notifyCheckpointComplete
public final void notifyCheckpointComplete(long checkpointId) throws Exception { if (!running) { LOG.debug("notifyCheckpointComplete() called on closed source"); return; } final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { LOG.debug("notifyCheckpointComplete() called on uninitialized source"); return; } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // only one commit operation must be in progress if (LOG.isDebugEnabled()) { LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId); } try { final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); if (posInMap == -1) { LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); return; } @SuppressWarnings("unchecked") HashMap<KafkaTopicPartition, Long> offsets = (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); // remove older checkpoints in map for (int i = 0; i < posInMap; i++) { pendingOffsetsToCommit.remove(0); } if (offsets == null || offsets.size() == 0) { LOG.debug("Checkpoint state was empty."); return; } fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); } catch (Exception e) { if (running) { throw e; } // else ignore exception if we are no longer running } } }
notifyCheckpointComplete主要是在checkpoint結束后在ON_CHECKPOINTS的情況下向kafka集群commit offset,方法調用時會拿到已經完成的checkpointid,從前文的pendingOffsetsToCommit列表里找到對應的offset。如果判斷索引不存在,則直接退出。否則,移除該索引對應的快照信息,然后將小於當前索引(較舊的)的快照信息也一並移除(這一點我之前解釋過,因為所有的檢查點都是按時間遞增有序的)。最后將當前完成的檢查點對應的消息的偏移量進行commit,也即commitOffsets。只不過這里該方法被定義為抽象方法,因為Kafka不同版本的API差別的原因,由適配不同版本的consumer各自實現,目前kafka09和010實現都是在Kafka09Fetcher內實現的commitInternalOffsetsToKafka方法。
參考文檔:
http://blog.csdn.net/yanghua_kobe/article/details/51503885
