Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
使用方式如上,核心就是對SourceFunction的實現
FlinkKafkaConsumer010除了重寫createFetcher外,大部分都是繼承自FlinkKafkaConsumerBase
FlinkKafkaConsumerBase
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction, CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>
FlinkKafkaConsumerBase繼承RichParallelSourceFunction,實現4個接口
RichFunction.open
先看看FlinkKafkaConsumerBase初始化,
@Override public void open(Configuration configuration) { // determine the offset commit mode // offsetCommitMode有三種,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED;如果打開checkpoint,offest會記錄在snapshot中,否則offset會定期寫回kafka topic offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); // initialize subscribed partitions List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); //獲取topic的partition信息 subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); //Map<KafkaTopicPartition, Long>,用於記錄每個partition的offset if (restoredState != null) { //如果有可恢復的state for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { if (restoredState.containsKey(kafkaTopicPartition)) { //如果state中包含該partition subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); //將snapshot中的offset恢復 } } } else { //如果沒有state,那么初始化subscribedPartitionsToStartOffsets initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets, kafkaTopicPartitions, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), startupMode, specificStartupOffsets); //startupMode,有下面幾種,默認是GROUP_OFFSETS if (subscribedPartitionsToStartOffsets.size() != 0) { switch (startupMode) { case EARLIEST: //從最早的開始讀 LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case LATEST: //從最新的開始讀 LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case SPECIFIC_OFFSETS: //從特定的offset開始讀 LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), specificStartupOffsets, subscribedPartitionsToStartOffsets.keySet()); List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); } } if (partitionsDefaultedToGroupOffsets.size() > 0) { //說明你有某些partitions沒有指定offset,所以用了默認的GROUP_OFFSET LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", getRuntimeContext().getIndexOfThisSubtask(), partitionsDefaultedToGroupOffsets.size(), partitionsDefaultedToGroupOffsets); } break; default: case GROUP_OFFSETS: //根據kafka group中的offset開始讀 LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); } } } } protected static void initializeSubscribedPartitionsToStartOffsets( Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets, List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { //如果這個partition會分配到該task;所以只有被分配到的partition會有offset數據,這里實際做了partition的分配 if (startupMode != StartupMode.SPECIFIC_OFFSETS) { //如果不是SPECIFIC_OFFSETS,就把offset設為特定的常量值 subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); } else { KafkaTopicPartition partition = kafkaTopicPartitions.get(i); Long specificOffset = specificStartupOffsets.get(partition); if (specificOffset != null) { // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); //設置上你配置的partition對應的offset,注意需要減一 } else { //如果沒有該partition的offset,就用默認的GROUP_OFFSET subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); } } } } }
初始化的工作主要是恢復和初始化,topic partition的offset
RichParallelSourceFunction
核心run函數,
@Override public void run(SourceContext<T> sourceContext) throws Exception { // we need only do work, if we actually have partitions assigned if (!subscribedPartitionsToStartOffsets.isEmpty()) { // create the fetcher that will communicate with the Kafka brokers final AbstractFetcher<T, ?> fetcher = createFetcher( sourceContext, //sourceContext,用於發送record和watermark subscribedPartitionsToStartOffsets, //partition,offset對應關系 periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode); //offsetCommitMode有三種,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED // publish the reference, for snapshot-, commit-, and cancel calls // IMPORTANT: We can only do that now, because only now will calls to // the fetchers 'snapshotCurrentState()' method return at least // the restored offsets this.kafkaFetcher = fetcher; if (!running) { return; } // (3) run the fetcher' main work method fetcher.runFetchLoop(); } }
主要就是創建Fetcher,並啟動,Fetcher中做了具體的工作
創建Fetcher的參數都中大多比較容易理解, 除了,
periodicWatermarkAssigner
punctuatedWatermarkAssigner
這些是用來產生watermark的,參考 Flink - watermark
CheckpointedFunction 接口
主要實現,initializeState,snapshotState函數
initializeState,目的就是從stateBackend中把offset state恢復到restoredState;這個數據在open時候會被用到
@Override public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); //從StateBackend讀出state if (context.isRestored()) { if (restoredState == null) { restoredState = new HashMap<>(); for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); //將offsetsStateForCheckpoint數據恢復到restoredState } } } }
snapshotState,做snapshot的邏輯
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetsStateForCheckpoint.clear(); //transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { //... } else { HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //從fetcher snapshot最新的offset數據 if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); //增加pendingOffset } for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { offsetsStateForCheckpoint.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); //把offset存入stateBackend } } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // truncate the map of pending offsets to commit, to prevent infinite growth while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { //pending的太多,刪掉老的 pendingOffsetsToCommit.remove(0); } } }
CheckpointListener 接口
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception { final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // only one commit operation must be in progress try { final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); //在pendingOffsetsToCommit中找這個checkpoint if (posInMap == -1) { LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); return; } @SuppressWarnings("unchecked") HashMap<KafkaTopicPartition, Long> offsets = (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); //移除該checkpoint // remove older checkpoints in map for (int i = 0; i < posInMap; i++) { pendingOffsetsToCommit.remove(0); //把比這個checkpoint更老的也都刪掉 } if (offsets == null || offsets.size() == 0) { LOG.debug("Checkpoint state was empty."); return; } fetcher.commitInternalOffsetsToKafka(offsets);//將offset信息發給kafka的group } } }
Kafka010Fetcher
FlinkKafkaConsumer010也就重寫createFetcher
不同的kafka版本就Fetcher是不一樣的
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>
Kafka010Fetcher的不同,
@Override protected void emitRecord( T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord<?, ?> consumerRecord) throws Exception { // we attach the Kafka 0.10 timestamp here emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); //0.10支持record中帶timestap } /** * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, * changing binary signatures. */ @Override protected KafkaConsumerCallBridge010 createCallBridge() { return new KafkaConsumerCallBridge010(); //CallBridge目的是在封裝各個版本kafka consumer之間的差異 }
KafkaConsumerCallBridge010封裝0.10版本中做assignPartitions,seek上API和其他版本的差異性
public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { @Override public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { consumer.assign(topicPartitions); } @Override public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) { consumer.seekToBeginning(Collections.singletonList(partition)); } @Override public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) { consumer.seekToEnd(Collections.singletonList(partition)); } }
Kafka09Fetcher
關鍵是runFetchLoop,啟動KafkaConsumerThread
並從handover中取出records,然后封裝發出去
@Override public void runFetchLoop() throws Exception { try { final Handover handover = this.handover; //handover用於在Fetcher線程和consumer線程間傳遞數據 // kick off the actual Kafka consumer consumerThread.start(); //new KafkaConsumerThread,真正的consumer線程 while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the fetcher thread final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); //從handover中拿出數據 // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());//ConsumerRecords中的結構是Map<TopicPartition, List<ConsumerRecord<K, V>>> records for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset()); if (deserializer.isEndOfStream(value)) { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation emitRecord(value, partition, record.offset(), record); } } } }
這里有個重要的結構是,subscribedPartitionStates
AbstractFetcher
// create our partition state according to the timestamp/watermark mode this.subscribedPartitionStates = initializeSubscribedPartitionStates( assignedPartitionsWithInitialOffsets, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader);
可以看到,把這些信息都合並放到subscribedPartitionStates,尤其是assignedPartitionsWithInitialOffsets
/** * Utility method that takes the topic partitions and creates the topic partition state * holders. If a watermark generator per partition exists, this will also initialize those. */ private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates( Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { switch (timestampWatermarkMode) { case NO_TIMESTAMPS_WATERMARKS: { //....... case PERIODIC_WATERMARKS: { @SuppressWarnings("unchecked") KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = //KafkaTopicPartitionStateWithPeriodicWatermarks是KafkaTopicPartitionState的子類 (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[]) new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; //大小和assignedPartitionsToInitialOffsets一樣 int pos = 0; for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) { KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); //生成kafkaHandle,這個就是TopicPartition信息的抽象,為了屏蔽版本間結構的差異 AssignerWithPeriodicWatermarks<T> assignerInstance = //AssignerWithPeriodicWatermarks watermarksPeriodic.deserializeValue(userCodeClassLoader); partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( //對於PUNCTUATED_WATERMARKS,這里是KafkaTopicPartitionStateWithPunctuatedWatermarks partition.getKey(), kafkaHandle, assignerInstance); //對於NO_TIMESTAMPS_WATERMARKS,這里沒有assignerInstance參數 partitions[pos].setOffset(partition.getValue()); //設置offset pos++; } return partitions; } case PUNCTUATED_WATERMARKS: { //...... }
subscribedPartitionStates,中包含該TopicPartition的offset和watermark的提取邏輯
KafkaConsumerThread
@Override public void run() { // this is the means to talk to FlinkKafkaConsumer's main thread final Handover handover = this.handover; //線程間數據交換結構 // This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. final KafkaConsumer<byte[], byte[]> consumer; try { consumer = new KafkaConsumer<>(kafkaProperties); //初始化kafka consumer } catch (Throwable t) { handover.reportError(t); return; } // from here on, the consumer is guaranteed to be closed properly try { // The callback invoked by Kafka once an offset commit is complete final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); //這個callback,只是commitInProgress = false,表示commit結束 // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset // values yet; replace those with actual offsets, according to what the sentinel value represent. for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) { if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { //先把kafka group offset強制設為earliest或latest,然后用kafka上最新的offset更新當前的offset consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle()); partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle()); partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); //如果GROUP_OFFSET,就直接讀取kafka group上的offset } else { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); //其他情況,就用partition自帶的offset,比如從state中恢復出來的offset } } // from now on, external operations may call the consumer this.consumer = consumer; // the latest bulk of records. may carry across the loop if the thread is woken up // from blocking on the handover ConsumerRecords<byte[], byte[]> records = null; // main fetch loop while (running) { // check if there is something to commit if (!commitInProgress) { //同時只能有一個commit // get and reset the work-to-be committed, so we don't repeatedly commit the same final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); //checkpoint的時候會snapshot fetcher的offset,並通過fetcher.commitInternalOffsetsToKafka設置
if (toCommit != null) { // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(toCommit, offsetCommitCallback); //異步commit offset } } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try { records = consumer.poll(pollTimeout); //從kafka讀取數據 } catch (WakeupException we) { continue; } } try { handover.produce(records); //放入handover records = null; } catch (Handover.WakeupException e) { // fall through the loop } } // end main fetch loop } }
