轉載請注明原創地址 http://www.cnblogs.com/dongxiao-yang/p/7200599.html
flink官方提供了連接kafka的connector實現,由於調試的時候發現部分消費行為與預期不太一致,所以需要研究一下源碼。
flink-connector-kafka目前已有kafka 0.8、0.9、0.10三個版本的實現,本文以FlinkKafkaConsumer010版本代碼為例。
FlinkKafkaConsumer010類的父類繼承關系如下,FlinkKafkaConsumerBase包含了大多數實現。
FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
其中每個版本的FlinkKafkaConsumerBase內部都實現了一個對應的AbstractFetcher用來拉取kafka數據,繼承關系如下
Kafka010Fetcher<T> extends Kafka09Fetcher<T>extends AbstractFetcher<T, TopicPartition>
FlinkKafkaConsumerBase類定義如下,繼承了RichParallelSourceFunction和CheckpointedFunction等接口。
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction, CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
FlinkKafkaConsumer內部各方法的執行細節
initializeState
public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { if (restoredState == null) { restoredState = new HashMap<>(); for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Setting restore state in the FlinkKafkaConsumer."); if (LOG.isDebugEnabled()) { LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { restoredState = null; } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } }
根據運行日志,initializeState會在flinkkafkaconusmer初始化的時候最先調用,方法通過運行時上下文FunctionSnapshotContext調用getOperatorStateStore和getSerializableListState拿到了checkpoint里面的state對象,如果這個task是從失敗等過程中恢復的過程中,context.isRestored()會被判定為true,程序會試圖從flink checkpoint里獲取原來分配到的kafka partition以及最后提交完成的offset。
open
public void open(Configuration configuration) { // determine the offset commit mode offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); switch (offsetCommitMode) { case ON_CHECKPOINTS: LOG.info("Consumer subtask {} will commit offsets back to Kafka on completed checkpoints.", getRuntimeContext().getIndexOfThisSubtask()); break; case KAFKA_PERIODIC: LOG.info("Consumer subtask {} will commit offsets back to Kafka periodically using the Kafka client's auto commit.", getRuntimeContext().getIndexOfThisSubtask()); break; default: case DISABLED: LOG.info("Consumer subtask {} has disabled offset committing back to Kafka." + " This does not compromise Flink's checkpoint integrity.", getRuntimeContext().getIndexOfThisSubtask()); } // initialize subscribed partitions List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null."); subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); if (restoredState != null) { for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { if (restoredState.containsKey(kafkaTopicPartition)) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); } } LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets, kafkaTopicPartitions, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), startupMode, specificStartupOffsets); if (subscribedPartitionsToStartOffsets.size() != 0) { switch (startupMode) { case EARLIEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case LATEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case SPECIFIC_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), specificStartupOffsets, subscribedPartitionsToStartOffsets.keySet()); List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); } } if (partitionsDefaultedToGroupOffsets.size() > 0) { LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", getRuntimeContext().getIndexOfThisSubtask(), partitionsDefaultedToGroupOffsets.size(), partitionsDefaultedToGroupOffsets); } break; default: case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); } } } }
open方法會在initializeState技術后調用,主要邏輯分為幾個步驟
1 判斷offsetCommitMode。根據kafka的auto commit ,setCommitOffsetsOnCheckpoints()的值(默認為true)以及flink運行時有沒有開啟checkpoint三個參數的組合,
offsetCommitMode共有三種模式:ON_CHECKPOINTS checkpoint結束后提交offset;KAFKA_PERIODIC kafkaconsumer自帶的定期提交功能;DISABLED 不提交
2 分配kafka partition 。如果initializeState階段已經拿到了state之前存儲的partition,直接繼續讀取對應的分區,如果是第一次初始化,調initializeSubscribedPartitionsToStartOffsets
方法計算當前task對應的分區列表
protected static void initializeSubscribedPartitionsToStartOffsets( Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets, List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { if (startupMode != StartupMode.SPECIFIC_OFFSETS) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); } else { if (specificStartupOffsets == null) { throw new IllegalArgumentException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + ", but no specific offsets were specified"); } KafkaTopicPartition partition = kafkaTopicPartitions.get(i); Long specificOffset = specificStartupOffsets.get(partition); if (specificOffset != null) { // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); } else { subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); } } } } }
可以看到,flink采用分區號逐個對flink並發任務數量取余的方式來分配partition,如果i % numParallelSubtasks == indexOfThisSubtask,那么這個i分區就歸屬當前分區擁有。
partition的分區結果記錄在私有變量Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets 里,用於后續初始化consumer。
run方法
@Override public void run(SourceContext<T> sourceContext) throws Exception { if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // we need only do work, if we actually have partitions assigned if (!subscribedPartitionsToStartOffsets.isEmpty()) { // create the fetcher that will communicate with the Kafka brokers final AbstractFetcher<T, ?> fetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode); // publish the reference, for snapshot-, commit-, and cancel calls // IMPORTANT: We can only do that now, because only now will calls to // the fetchers 'snapshotCurrentState()' method return at least // the restored offsets this.kafkaFetcher = fetcher; if (!running) { return; } // (3) run the fetcher' main work method fetcher.runFetchLoop(); } else { // this source never completes, so emit a Long.MAX_VALUE watermark // to not block watermark forwarding sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); // wait until this is canceled final Object waitLock = new Object(); while (running) { try { //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (waitLock) { waitLock.wait(); } } catch (InterruptedException e) { if (!running) { // restore the interrupted state, and fall through the loop Thread.currentThread().interrupt(); } } } } }
可以看到計算好的subscribedPartitionsToStartOffsets被傳到了擁有consumerThread的AbstractFetcher實例內部,KafkaConsumerThread通過調用consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates));方法最終調用到了consumer.assign(topicPartitions);手動向consumer實例指定了topic分配。
參考文檔: