你真的了解Flink Kafka source嗎?


點擊上方藍字  關注我們

Flink 提供了專門的 Kafka 連接器,向 Kafka topic 中讀取或者寫入數據。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機制,可提供 exactly-once 的處理語義。為此,Flink 並不完全依賴於跟蹤 Kafka 消費組的偏移量,而是在內部跟蹤和檢查偏移量。本文內容較長,可以關注收藏。

引言

當我們在使用Spark Streaming、Flink等計算框架進行數據實時處理時,使用Kafka作為一款發布與訂閱的消息系統成為了標配。Spark Streaming與Flink都提供了相對應的Kafka Consumer,使用起來非常的方便,只需要設置一下Kafka的參數,然后添加kafka的source就萬事大吉了。如果你真的覺得事情就是如此的so easy,感覺媽媽再也不用擔心你的學習了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source為討論對象,首先從基本的使用入手,然后深入源碼逐一剖析,一並為你撥開Flink Kafka connector的神秘面紗。值得注意的是,本文假定讀者具備了Kafka的相關知識,關於Kafka的相關細節問題,不在本文的討論范圍之內。

Flink Kafka Consumer介紹

Flink Kafka Connector有很多個版本,可以根據你的kafka和Flink的版本選擇相應的包(maven artifact id)和類名。本文所涉及的Flink版本為1.10,Kafka的版本為2.3.4。Flink所提供的Maven依賴於類名如下表所示:

Demo示例

添加Maven依賴

<!--本文使用的是通用型的connector-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

簡單代碼案例

public class KafkaConnector {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 開啟checkpoint,時間間隔為毫秒
        senv.enableCheckpointing(5000L);
        // 選擇狀態后端
        senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
        //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
        Properties props = new Properties();
        // kafka broker地址
        props.put("bootstrap.servers""kms-2:9092,kms-3:9092,kms-4:9092");
        // 僅kafka0.8版本需要配置
        props.put("zookeeper.connect""kms-2:2181,kms-3:2181,kms-4:2181");
        // 消費者組
        props.put("group.id""test");
        // 自動偏移量提交
        props.put("enable.auto.commit"true);
        // 偏移量提交的時間間隔,毫秒
        props.put("auto.commit.interval.ms"5000);
        // kafka 消息的key序列化器
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        // kafka 消息的value序列化器
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        // 指定kafka的消費者從哪里開始消費數據
        // 共有三種方式,
        // #earliest
        // 當各分區下有已提交的offset時,從提交的offset開始消費;
        // 無提交的offset時,從頭開始消費
        // #latest
        // 當各分區下有已提交的offset時,從提交的offset開始消費;
        // 無提交的offset時,消費新產生的該分區下的數據
        // #none
        // topic各分區都存在已提交的offset時,
        // 從offset后開始消費;
        // 只要有一個分區不存在已提交的offset,則拋出異常
        props.put("auto.offset.reset""latest");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "qfbap_ods.code_city",
                new SimpleStringSchema(),
                props);
        //設置checkpoint后在提交offset,即oncheckpoint模式
        // 該值默認為true,
        consumer.setCommitOffsetsOnCheckpoints(true);

        // 最早的數據開始消費
        // 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。
        //consumer.setStartFromEarliest();

        // 消費者組最近一次提交的偏移量,默認。
        // 如果找不到分區的偏移量,那么將會使用配置中的 auto.offset.reset 設置
        //consumer.setStartFromGroupOffsets();

        // 最新的數據開始消費
        // 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。
        //consumer.setStartFromLatest();

        // 指定具體的偏移量時間戳,毫秒
        // 對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。
        // 如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。
        // 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
        //consumer.setStartFromTimestamp(1585047859000L);

        // 為每個分區指定偏移量
        /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
        consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/

        /**
         *
         * 請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時,
         * 這些起始位置配置方法不會影響消費的起始位置。
         * 在恢復時,每個 Kafka 分區的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定
         *
         */


        DataStreamSource<String> source = senv.addSource(consumer);
        // TODO
        source.print();
        senv.execute("test kafka connector");
    }
}

參數配置解讀

在Demo示例中,給出了詳細的配置信息,下面將對上面的參數配置進行逐一分析。

kakfa的properties參數配置

  • bootstrap.servers:kafka broker地址

  • zookeeper.connect:僅kafka0.8版本需要配置

  • group.id:消費者組

  • enable.auto.commit

    自動偏移量提交,該值的配置不是最終的偏移量提交模式,需要考慮用戶是否開啟了checkpoint,

    在下面的源碼分析中會進行解讀

  • auto.commit.interval.ms:偏移量提交的時間間隔,毫秒

  • key.deserializer:

    kafka 消息的key序列化器,如果不指定會使用ByteArrayDeserializer序列化器

  • value.deserializer

kafka 消息的value序列化器,如果不指定會使用ByteArrayDeserializer序列化器

  • auto.offset.reset

    指定kafka的消費者從哪里開始消費數據,共有三種方式,

  • 第一種:earliest
    當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

  • 第二種:latest
    當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據

  • 第三種:none
    topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

    注意:上面的指定消費模式並不是最終的消費模式,取決於用戶在Flink程序中配置的消費模式

Flink程序用戶配置的參數

  • consumer.setCommitOffsetsOnCheckpoints(true)

解釋:設置checkpoint后在提交offset,即oncheckpoint模式,該值默認為true,該參數會影響偏移量的提交方式,下面的源碼中會進行分析

  • consumer.setStartFromEarliest()

    解釋:最早的數據開始消費 ,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromGroupOffsets()

    解釋:消費者組最近一次提交的偏移量,默認。如果找不到分區的偏移量,那么將會使用配置中的 auto.offset.reset 設置,該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromLatest()

    解釋:最新的數據開始消費,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromTimestamp(1585047859000L)

    解釋:指定具體的偏移量時間戳,毫秒。對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。

  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

解釋:為每個分區指定偏移量,該方法為繼承父類FlinkKafkaConsumerBase的方法。

請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時,這些起始位置配置方法不會影響消費的起始位置。在恢復時,每個 Kafka 分區的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定。

Flink Kafka Consumer源碼解讀

繼承關系

Flink Kafka Consumer繼承了FlinkKafkaConsumerBase抽象類,而FlinkKafkaConsumerBase抽象類又繼承了RichParallelSourceFunction,所以要實現一個自定義的source時,有兩種實現方式:一種是通過實現SourceFunction接口來自定義並行度為1的數據源;另一種是通過實現ParallelSourceFunction接口或者繼承RichParallelSourceFunction來自定義具有並行度的數據源。FlinkKafkaConsumer的繼承關系如下圖所示。

源碼解讀

FlinkKafkaConsumer源碼

先看一下FlinkKafkaConsumer的源碼,為了方面閱讀,本文將盡量給出本比較完整的源代碼片段,具體如下所示:代碼較長,在這里可以先有有一個總體的印象,下面會對重要的代碼片段詳細進行分析。

public class FlinkKafkaConsumer<Textends FlinkKafkaConsumerBase<T{

    // 配置輪詢超時超時時間,使用flink.poll-timeout參數在properties進行配置
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    // 如果沒有可用數據,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
    //默認輪詢超時時間
    public static final long DEFAULT_POLL_TIMEOUT = 100L;
    // 用戶提供的kafka 參數配置
    protected final Properties properties;
    // 如果沒有可用數據,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
    protected final long pollTimeout;
    /**
     * 創建一個kafka的consumer source
     * @param topic                   消費的主題名稱
     * @param valueDeserializer       反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props                   用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }
    /**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支持訪問kafka消費的額外信息
     * 比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param topic                消費的主題名稱
     * @param deserializer         反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props                用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }
    /**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支持消費多個主題
     * @param topics          消費的主題名稱,多個主題為List集合
     * @param deserializer    反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props           用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支持消費多個主題,
     * @param topics         消費的主題名稱,多個主題為List集合
     * @param deserializer   反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
     * @param props          用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(topics, null, deserializer, props);
    }
    /**
     * 基於正則表達式訂閱多個topic
     * 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
     * @param subscriptionPattern   主題的正則表達式
     * @param valueDeserializer   反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
     * @param props               用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基於正則表達式訂閱多個topic
     * 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
     * @param subscriptionPattern   主題的正則表達式
     * @param deserializer          該反序列化類支持訪問kafka消費的額外信息,比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param props                 用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(null, subscriptionPattern, deserializer, props);
    }
    private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props)
 
{
        // 調用父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個參數為Properties,第二個參數為key,第三個參數為value默認值
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(
                checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT參數,則返回具體的配置值,否則返回默認值DEFAULT_POLL_TIMEOUT
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
   // 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher實例,
    // fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
        // 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
        // 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
        // 這將覆蓋用戶在properties中配置的任何設置
        // 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
        // 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
        // 可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,
        // 就會將kafka properties的enable.auto.commit強制置為false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }
    //父類(FlinkKafkaConsumerBase)方法重寫
    // 返回一個分區發現類,分區發現可以使用kafka broker的高級consumer API發現topic和partition的元數據
    @Override
    protected AbstractPartitionDiscoverer createPartitionDiscoverer(
        KafkaTopicsDescriptor topicsDescriptor,
        int indexOfThisSubtask,
        int numParallelSubtasks)
 
{

        return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
    }

    /**
     *判斷是否在kafka的參數開啟了自動提交,即enable.auto.commit=true,
     * 並且auto.commit.interval.ms>0,
     * 注意:如果沒有沒有設置enable.auto.commit的參數,則默認為true
     *       如果沒有設置auto.commit.interval.ms的參數,則默認為5000毫秒
     * @return
     */

    @Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }

    /**
     * 確保配置了kafka消息的key與value的反序列化方式,
     * 如果沒有配置,則使用ByteArrayDeserializer序列化器,
     * 該類的deserialize方法是直接將數據進行return,未做任何處理
     * @param props
     */

    private static void setDeserializer(Properties props) {
        final String deSerName = ByteArrayDeserializer.class.getName();

        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }
}

分析

上面的代碼已經給出了非常詳細的注釋,下面將對比較關鍵的部分進行分析。

  • 構造方法分析

FlinkKakfaConsumer提供了7種構造方法,如上圖所示。不同的構造方法分別具有不同的功能,通過傳遞的參數也可以大致分析出每種構造方法特有的功能,為了方便理解,本文將對其進行分組討論,具體如下:

單topic

/**
     * 創建一個kafka的consumer source
     * @param topic                   消費的主題名稱
     * @param valueDeserializer       反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props                   用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

/**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支持訪問kafka消費的額外信息
     * 比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param topic                消費的主題名稱
     * @param deserializer         反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props                用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }

上面兩種構造方法只支持單個topic,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema參數的構造方法可以獲取更多的附屬信息,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等信息,可以選擇使用此方式的構造方法。以上兩種方法都調用了私有的構造方法,私有構造方法的分析見下面。

多topic

/**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支持消費多個主題
     * @param topics          消費的主題名稱,多個主題為List集合
     * @param deserializer    反序列化類型,用於將kafka的字節消息轉換為Flink的對象
     * @param props           用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 創建一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支持消費多個主題,
     * @param topics         消費的主題名稱,多個主題為List集合
     * @param deserializer   反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
     * @param props          用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(topics, null, deserializer, props);
    }

上面的兩種多topic的構造方法,可以使用一個list集合接收多個topic進行消費,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema參數的構造方法可以獲取更多的附屬信息,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等信息,可以選擇使用此方式的構造方法。以上兩種方法都調用了私有的構造方法,私有構造方法的分析見下面。

正則匹配topic

/**
     * 基於正則表達式訂閱多個topic
     * 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
     * @param subscriptionPattern   主題的正則表達式
     * @param valueDeserializer   反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
     * @param props               用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基於正則表達式訂閱多個topic
     * 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
     * @param subscriptionPattern   主題的正則表達式
     * @param deserializer          該反序列化類支持訪問kafka消費的額外信息,比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param props                 用戶傳入的kafka參數
     */

    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(null, subscriptionPattern, deserializer, props);
    }

實際的生產環境中可能有這樣一些需求,比如有一個flink作業需要將多種不同的數據聚合到一起,而這些數據對應着不同的kafka topic,隨着業務增長,新增一類數據,同時新增了一個kafka topic,如何在不重啟作業的情況下作業自動感知新的topic。首先需要在構建FlinkKafkaConsumer時的properties中設置flink.partition-discovery.interval-millis參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時FLinkKafkaConsumer內部會啟動一個單獨的線程定期去kafka獲取最新的meta信息。具體的調用執行信息,參見下面的私有構造方法

私有構造方法

    private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props)
 
{

        // 調用父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個參數為Properties,第二個參數為key,第三個參數為value默認值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值是開啟分區發現的配置參數,在properties里面配置flink.partition-discovery.interval-millis=5000(大於0的數),如果沒有配置則使用PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(表示禁用分區發現)
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(
                checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT參數,則返回具體的配置值,否則返回默認值DEFAULT_POLL_TIMEOUT
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
  • 其他方法分析

KafkaFetcher對象創建

   // 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher實例,
    // fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
        // 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
        // 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
        // 這將覆蓋用戶在properties中配置的任何設置
        // 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
        // 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
        // 可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,
        // 就會將kafka properties的enable.auto.commit強制置為false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }

該方法的作用是返回一個fetcher實例,fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream),在這里對自動偏移量提交模式進行了強制調整,即確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交。這將覆蓋用戶在properties中配置的任何設置,簡單可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,就會將kafka properties的enable.auto.commit強制置為false。關於offset的提交模式,見下文的偏移量提交模式分析。

判斷是否設置了自動提交

   @Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }

判斷是否在kafka的參數開啟了自動提交,即enable.auto.commit=true,並且auto.commit.interval.ms>0, 注意:如果沒有沒有設置enable.auto.commit的參數,則默認為true, 如果沒有設置auto.commit.interval.ms的參數,則默認為5000毫秒。該方法會在FlinkKafkaConsumerBase的open方法進行初始化的時候調用。

反序列化

private static void setDeserializer(Properties props) {
         // 默認的反序列化方式 
        final String deSerName = ByteArrayDeserializer.class.getName();
         //獲取用戶配置的properties關於key與value的反序列化模式
        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         // 如果配置了,則使用用戶配置的值
        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        // 沒有配置,則使用ByteArrayDeserializer進行反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }

確保配置了kafka消息的key與value的反序列化方式,如果沒有配置,則使用ByteArrayDeserializer序列化器,
ByteArrayDeserializer類的deserialize方法是直接將數據進行return,未做任何處理。

FlinkKafkaConsumerBase源碼

@Internal
public abstract class FlinkKafkaConsumerBase<Textends RichParallelSourceFunction<Timplements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction 
{

    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private boolean enableCommitOnCheckpoints = true;
    /**
     * 偏移量的提交模式,僅能通過在FlinkKafkaConsumerBase#open(Configuration)進行配置
     * 該值取決於用戶是否開啟了checkpoint

     */

    private OffsetCommitMode offsetCommitMode;
    /**
     * 配置從哪個位置開始消費kafka的消息,
     * 默認為StartupMode#GROUP_OFFSETS,即從當前提交的偏移量開始消費
     */

    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private Long startupOffsetsTimestamp;

    /**
     * 確保當偏移量的提交模式為ON_CHECKPOINTS時,禁用自動提交,
     * 這將覆蓋用戶在properties中配置的任何設置。
     * 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
     * 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false,即禁用自動提交
     * @param properties       kafka配置的properties,會通過該方法進行覆蓋
     * @param offsetCommitMode    offset提交模式
     */

    static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        }
    }

    /**
     * 決定是否在開啟checkpoint時,在checkpoin之后提交偏移量,
     * 只有用戶配置了啟用checkpoint,該參數才會其作用
     * 如果沒有開啟checkpoint,則使用kafka的配置參數:enable.auto.commit
     * @param commitOnCheckpoints
     * @return
     */

    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
        this.enableCommitOnCheckpoints = commitOnCheckpoints;
        return this;
    }
    /**
     * 從最早的偏移量開始消費,
     *該模式下,Kafka 中的已經提交的偏移量將被忽略,不會用作起始位置。
     *可以通過consumer1.setStartFromEarliest()進行設置
     */

    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     * 從最新的數據開始消費,
     *  該模式下,Kafka 中的 已提交的偏移量將被忽略,不會用作起始位置。
     *
     */

    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *指定具體的偏移量時間戳,毫秒
     *對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。
     * 如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。
     * 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
     */

    protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
        checkArgument(startupOffsetsTimestamp >= 0"The provided value for the startup offsets timestamp is invalid.");

        long currentTimestamp = System.currentTimeMillis();
        checkArgument(startupOffsetsTimestamp <= currentTimestamp,
            "Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);

        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *
     * 從具體的消費者組最近提交的偏移量開始消費,為默認方式
     * 如果沒有發現分區的偏移量,使用auto.offset.reset參數配置的值
     * @return
     */

    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *為每個分區指定偏移量進行消費
     */

    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
        return this;
    }
    @Override
    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        // 決定偏移量的提交模式,
        // 第一個參數為是否開啟了自動提交,
        // 第二個參數為是否開啟了CommitOnCheckpoint模式
        // 第三個參數為是否開啟了checkpoint
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

       // 省略的代碼
    }

// 省略的代碼
    /**
     * 創建一個fetcher用於連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
     * @param sourceContext   數據輸出的上下文
     * @param subscribedPartitionsToStartOffsets  當前sub task需要處理的topic分區集合,即topic的partition與offset的Map集合
     * @param watermarksPeriodic    可選,一個序列化的時間戳提取器,生成periodic類型的 watermark
     * @param watermarksPunctuated  可選,一個序列化的時間戳提取器,生成punctuated類型的 watermark
     * @param runtimeContext        task的runtime context上下文
     * @param offsetCommitMode      offset的提交模式,有三種,分別為:DISABLED(禁用偏移量自動提交),ON_CHECKPOINTS(僅僅當checkpoints完成之后,才提交偏移量給kafka)
     * KAFKA_PERIODIC(使用kafka自動提交函數,周期性自動提交偏移量)
     * @param kafkaMetricGroup   Flink的Metric
     * @param useMetrics         是否使用Metric
     * @return                   返回一個fetcher實例
     * @throws Exception
     */

    protected abstract AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup kafkaMetricGroup,
            boolean useMetrics) throws Exception;
    protected abstract boolean getIsAutoCommitEnabled();
    // 省略的代碼
}

上述代碼是FlinkKafkaConsumerBase的部分代碼片段,基本上對其做了詳細注釋,里面的有些方法是FlinkKafkaConsumer繼承的,有些是重寫的。之所以在這里給出,可以對照FlinkKafkaConsumer的源碼,從而方便理解。

偏移量提交模式分析

Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行為。請注意:Flink Kafka Consumer 不依賴於提交的 offset 來實現容錯保證。提交的 offset 只是一種方法,用於公開 consumer 的進度以便進行監控。

配置 offset 提交行為的方法是否相同,取決於是否為 job 啟用了 checkpointing。在這里先給出提交模式的具體結論,下面會對兩種方式進行具體的分析。基本的結論為:

  • 開啟checkpoint

  • 情況1:用戶通過調用 consumer 上的 setCommitOffsetsOnCheckpoints(true) 方法來啟用 offset 的提交(默認情況下為 true )
    那么當 checkpointing 完成時,Flink Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態中。
    這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態中的 offset 一致。
    注意,在這個場景中,Properties 中的自動定期 offset 提交設置會被完全忽略。
    此情況使用的是ON_CHECKPOINTS

  • 情況2:用戶通過調用 consumer 上的 setCommitOffsetsOnCheckpoints("false") 方法來禁用 offset 的提交,則使用DISABLED模式提交offset

  • 未開啟checkpoint
    Flink Kafka Consumer 依賴於內部使用的 Kafka client 自動定期 offset 提交功能,因此,要禁用或啟用 offset 的提交

  • 情況1:配置了Kafka properties的參數配置了"enable.auto.commit" = "true"或者 Kafka 0.8 的 auto.commit.enable=true,使用KAFKA_PERIODIC模式提交offset,即自動提交offset

  • 情況2:沒有配置enable.auto.commit參數,使用DISABLED模式提交offset,這意味着kafka不知道當前的消費者組的消費者每次消費的偏移量。

提交模式源碼分析

  • offset的提交模式

public enum OffsetCommitMode {
    // 禁用偏移量自動提交
    DISABLED,
    // 僅僅當checkpoints完成之后,才提交偏移量給kafka
    ON_CHECKPOINTS,
    // 使用kafka自動提交函數,周期性自動提交偏移量
    KAFKA_PERIODIC;
}
  • 提交模式的調用

public class OffsetCommitModes {
    public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing)
 
{
        // 如果開啟了checkinpoint,執行下面判斷
        if (enableCheckpointing) {
            // 如果開啟了checkpoint,進一步判斷是否在checkpoin啟用時提交(setCommitOffsetsOnCheckpoints(true)),如果是則使用ON_CHECKPOINTS模式
            // 否則使用DISABLED模式
            return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
        } else {
            // 若Kafka properties的參數配置了"enable.auto.commit" = "true",則使用KAFKA_PERIODIC模式提交offset
            // 否則使用DISABLED模式
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
        }
    }
}

小結

本文主要介紹了Flink Kafka Consumer,首先對FlinkKafkaConsumer的不同版本進行了對比,然后給出了一個完整的Demo案例,並對案例的配置參數進行了詳細解釋,接着分析了FlinkKafkaConsumer的繼承關系,並分別對FlinkKafkaConsumer以及其父類FlinkKafkaConsumerBase的源碼進行了解讀,最后從源碼層面分析了Flink Kafka Consumer的偏移量提交模式,並對每一種提交模式進行了梳理。

往期 精彩回顧




Flink的八種分區策略源碼解讀
Flink1.10集成Hive快速入門
Flink運行架構剖析
Flink的狀態后端(State Backends)
基於Canal與Flink實現數據實時增量同步(一)
基於Canal與Flink實現數據實時增量同步(二)
掃碼關注更多精彩
你點的每個贊,我都認真當成了喜歡

本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM