Flink sql kafka source 自定義並行度


本文參考康琪大佬的博客:使Flink SQL Kafka Source支持獨立設置並行度

一直覺得 Flink Sql 需要指定算子並行度的功能,哪怕是基於 SQL 解析出來的算子不能添加並行度,source、sink、join 的算子也應該有修改並行度的功能。

恰好看到大佬的博客,Kafka 是最常用的數據源組件了,所以決定在 sqlSubmit 中也加入相應的實現。

Streaming Api 設置並行度

基於 Flink Streaming api,要給 Kafka Source 指定並行度,只需要在 env.addSource() 后面調用 setParallelism() 方法指定並行度就可以,如下:


val kafkaSource = new FlinkKafkaConsumer[ObjectNode](topic, new JsonNodeDeserializationSchema(), Common.getProp)
val stream = env.addSource(kafkaSource)
        .setParallelism(12)

Sql Api 設置並行度

先看一個讀kafka 的 SQL

-- kafka source
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_log'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
);

CREATE TABLE user_log_sink (
   `day` string
   ,num bigint
   ,min_user_id bigint
   ,max_user_id bigint
) WITH (
   'connector' = 'print'
);

insert into user_log_sink
select `day`
, num
, min_user_id, max_user_id
from(
select DATE_FORMAT(ts,'yyyyMMdd') `day`
,count(distinct user_id) num
,min(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) min_user_id
,max(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) max_user_id
from user_log
-- where DATE_FORMAT(ts,'yyyyMMdd') = date_format(current_timestamp, 'yyyyMMdd')
group by DATE_FORMAT(ts,'yyyyMMdd')
)t1
 where num % 2 = 0
;

流圖如下:

仔細看任務流圖,所有的算子的並行度都是參數: table.exec.resource.default-parallelism 指定的

要修改 Source 的並行度,其他算子的並行度保持不變,從 Streaming Api 看,只需要給 sql 翻譯后的 StreamSource 指定並行度,就可以做到我們想要的結果。

那就直接找到 flink sql 源碼 kafka source 創建的地方: KafkaDynamicSource.getScanRuntimeProvider 方法


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    final DeserializationSchema<RowData> keyDeserialization =
            createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

    final DeserializationSchema<RowData> valueDeserialization =
            createDeserialization(context, valueDecodingFormat, valueProjection, null);

    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    final KafkaSource<RowData> kafkaSource =
            createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);

    return new DataStreamScanProvider() {
        @Override
        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
            if (watermarkStrategy == null) {
                watermarkStrategy = WatermarkStrategy.noWatermarks();
            }
            // 創建 DataStreamSource
            return execEnv.fromSource(
                    kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
        }

        @Override
        public boolean isBounded() {
            return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
        }
    };
}

從源碼可以看到創建了 KafkaSource ,並且 調用了 execEnv.fromSource 方法,按照 Streaming api 的思路,直接在 execEnv.fromSource 后面添加 setParallelism 就好了,改好的代碼如下:


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    final DeserializationSchema<RowData> keyDeserialization =
            createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

    final DeserializationSchema<RowData> valueDeserialization =
            createDeserialization(context, valueDecodingFormat, valueProjection, null);

    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    final KafkaSource<RowData> kafkaSource =
            createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);

    return new DataStreamScanProvider() {
        @Override
        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
            if (watermarkStrategy == null) {
                watermarkStrategy = WatermarkStrategy.noWatermarks();
            }

            DataStreamSource<RowData> dataDataStreamSource = execEnv.fromSource(
                    kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
            int defaultParallelism = execEnv.getParallelism();

            // add by venn for custom source parallelism
            // 很多任務不需要設置並行度,所以加了個判空條件
            // 如果設置的並行度等於 全局的並行度也不做處理
            if (parallelism != null && parallelism > 0 && parallelism != defaultParallelism) {
                dataDataStreamSource.setParallelism(parallelism);
                // todo 參考博客的大佬有斷開 source 算子和后續算子的 算子鏈,我覺得不需要,如果並行度不一樣自動會斷開
                // 並行度一樣,也不需要斷開算子鏈
//                    dataDataStreamSource.disableChaining();
            }
            return dataDataStreamSource;
        }

        @Override
        public boolean isBounded() {
            return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
        }
    };

}

並行度設置需要在 Kafka Table Source 配置項中添加並行度的配置,參考 Flink 自定義 Http Table Source 自定義 source 中獲取配置 sql ddl 中的參數的方法:

  1. 在 KafkaConnectorOptions 類中添加配置項
public static final ConfigOption<Integer> SOURCE_PARALLELISM 
    = ConfigOptions.key("source.parallelism")
        .intType()
        .noDefaultValue()
        .withDescription("Defines a custom parallelism for the source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.");


  1. 在 KafkaDynamicTableFactory 中添加可選參數: SOURCE_PARALLELISM 和 獲取參數值並傳遞到創建 Kafka Source 的地方

添加可選參數不添加,解析 sql DDL 時,無法解析參數


@Override
public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(FactoryUtil.FORMAT);
    options.add(KEY_FORMAT);
    options.add(KEY_FIELDS);
    options.add(KEY_FIELDS_PREFIX);
    options.add(VALUE_FORMAT);
    options.add(VALUE_FIELDS_INCLUDE);
    options.add(TOPIC);
    options.add(TOPIC_PATTERN);
    options.add(PROPS_GROUP_ID);
    options.add(SCAN_STARTUP_MODE);
    options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
    options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
    options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
    options.add(SINK_PARTITIONER);
    options.add(SINK_PARALLELISM);
    options.add(DELIVERY_GUARANTEE);
    options.add(TRANSACTIONAL_ID_PREFIX);
    options.add(SINK_SEMANTIC);
    options.add(SOURCE_PARALLELISM);
    return options;
}

解析參數:


@Override
public DynamicTableSource createDynamicTableSource(Context context) {
   
    .....
    // 字段值可能為空,所以類型為 Integer
    final Integer parallelism = tableOptions.getOptional(SOURCE_PARALLELISM).orElse(null);

    return createKafkaTableSource(
            physicalDataType,
            keyDecodingFormat.orElse(null),
            valueDecodingFormat,
            keyProjection,
            valueProjection,
            keyPrefix,
            getSourceTopics(tableOptions),
            getSourceTopicPattern(tableOptions),
            properties,
            startupOptions.startupMode,
            startupOptions.specificOffsets,
            startupOptions.startupTimestampMillis,
            context.getObjectIdentifier().asSummaryString(),
            parallelism);
}

新加一個 createKafkaTableSource 的重載方法,添加 parallelism 變量為參數


protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis,
            String tableIdentifier,
            Integer parallelism) {
    return new KafkaDynamicSource(
            physicalDataType,
            keyDecodingFormat,
            valueDecodingFormat,
            keyProjection,
            valueProjection,
            keyPrefix,
            topics,
            topicPattern,
            properties,
            startupMode,
            specificStartupOffsets,
            startupTimestampMillis,
            false,
            tableIdentifier,
            parallelism);
    }

KafkaDynamicSource 構造方法中添加 Integer 類型的變量 parallelism 接收 KafkaDynamicTableFactory 傳遞過來的 parallelism 參數。

大功告成了嗎,測試一下先

全部SQL 同上,在 kafka source 的屬性中添加參數: 'source.parallelism' = '2'

  • 注: 全局並行度為 1

CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_log'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
  ,'source.parallelism' = '2'
);


任務流圖:

算子的並行度跟預期的有點不一樣,只設置了 source 的並行度,后續的 Calc 的並行度也跟着變了

查看 Calc 算子的源碼可以看到:


@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    final ExecEdge inputEdge = getInputEdges().get(0);
    final Transformation<RowData> inputTransform =
            (Transformation<RowData>) inputEdge.translateToPlan(planner);
    final CodeGeneratorContext ctx =
            new CodeGeneratorContext(planner.getTableConfig())
                    .setOperatorBaseClass(operatorBaseClass);

    final CodeGenOperatorFactory<RowData> substituteStreamOperator =
            CalcCodeGenerator.generateCalcOperator(
                    ctx,
                    inputTransform,
                    (RowType) getOutputType(),
                    JavaScalaConversionUtil.toScala(projection),
                    JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),
                    retainHeader,
                    getClass().getSimpleName());
    return new OneInputTransformation<>(
            inputTransform,
            getDescription(),
            substituteStreamOperator,
            InternalTypeInfo.of(getOutputType()),
            inputTransform.getParallelism());
}

substituteStreamOperator 就是 CodeGen 生產的 Calc 算子,並行度直接取的輸入算子的並行度 inputTransform.getParallelism()

所以 Calc 算子的並行度和 Kafka source 的並行度一樣,修改源碼將並行度設置為 全局並行度


protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    final ExecEdge inputEdge = getInputEdges().get(0);
    final Transformation<RowData> inputTransform =
            (Transformation<RowData>) inputEdge.translateToPlan(planner);
    final CodeGeneratorContext ctx =
            new CodeGeneratorContext(planner.getTableConfig())
                    .setOperatorBaseClass(operatorBaseClass);

    final CodeGenOperatorFactory<RowData> substituteStreamOperator =
            CalcCodeGenerator.generateCalcOperator(
                    ctx,
                    inputTransform,
                    (RowType) getOutputType(),
                    JavaScalaConversionUtil.toScala(projection),
                    JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),
                    retainHeader,
                    getClass().getSimpleName());

    // add by venn for custom source parallelism
    int parallelism = inputTransform.getParallelism();
    int defaultParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
    if (parallelism != defaultParallelism) {
        // update calc operator parallelism to default parallelism
        parallelism = defaultParallelism;
    }

    return new OneInputTransformation<>(
            inputTransform,
            getDescription(),
            substituteStreamOperator,
            InternalTypeInfo.of(getOutputType()),
            parallelism);
}

生成完的算子流圖:

現在才大功告成

完整代碼參考: github sqlSubmit

-----------------------分割線------------------------------------

  • 2022-01-13 改

在 sqlSubmit 的配置文件中添加了參數: table.exec.source.force-break-chain ,用以在 kafka sql source 后面判斷是否要斷開下游算子和 source 的 chain。

由於在 CommonExecCalc 拿不到 ddl 中定義的參數,添加了一個任務級別的參數( 在 sqlSubmit.properties 中添加參數 table.exec.source.force-break-chain 默認值為 false,如果為 true 會斷開 chain),通過 env 放到 GlobalJobParameter 中,KafkaDynamicSource 通過 env 從 GlobalJobParameter 中取,CommonExecCalc 有 planner 可以直接拿到任務級別的參數。

  • 對於指定了 kafka sql source 的情況,是否需要斷開 source 算子和后續的算子:根據 source 的並行度和默認並行度判斷是否斷開,還是用參數控制是否斷開。最后還是決定安裝大佬博客里的寫的,用參數控制是否斷開。主要是source 和后續算子的關系不定,如果只是基於並行度判斷是否 chain 在一起太過僵硬,還不如再加一個參數控制,這樣默認算子會 chain 在一起,下游算子和 source 並行度保持一致,如果需要端口,后續算子的並行度會和全局並行度保持一致。

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM