自定義flink-kafka-connector
背景:消費特定開始和結束位置的kafka數據,需求是執行flink任務消費完特定位置數據后,任務自行停止。但批任務並不支持消費kafka數據,而流任務不會自行停止,因此需要自定義kafka連接器。flink1.14版本中,DataStream Connectors 有一個屬性setBounded,可以設定消費的結束位置,但Table API目前並不支持設定結束消費位置,正好可以模仿着DataStream修改源代碼,flink代碼版本是1.14。
本文主要參考這篇文章,這篇文章給我很大的幫助,在此基礎對一些細節地方進行了修改,其中修改了兩處比較關鍵的地方,最終滿足了需求。首先是修改流批任務判斷條件,保證在批任務情況下,消費到kafka中的數據。其次保證任務消費到指定位置后任務停止。最后是進行打包測試,打包過程中注意格式,網絡也會有一定的影響,后續也會將jar包放到后面,可直接使用。
flink-connector-kafka_2.11-1.14.4.jar
主要的修改地方
1,批模式處理流數據
在KafkaSourceBuilder中設置批標識Boundedness.BOUNDED,保證能做批任務情況下處理kfka流數據,這個標識也可在其他位置設置,根據自己需要進行設置。
2,設置結束偏移位置,仿照開始偏移位置設置結束偏移位置
在KafkaSourceBuilder新建setEndOffsets方法並給stoppingOffsetsInitializer屬性賦值
3,設置結束偏移位置的方式
在KafkaConnectorOptionsUtil里面,仿照開始getStartupOptions方法新建getEndupOptions方法,針對特定偏移位置進行針對性修改,同樣還是仿照開始位置進行設置,具體修改請參照一下內容。
注意事項;
代碼格式需要注意,換行不能少,空格都不能多,不然打包的時候無法通過。
還有就是import的時候也要注意,避免IDEA自動導入的問題,不然打包也會失敗。操作步驟如下
1, File->settings->Editor->Code Style->java->imports
2, Class count to use import with '' 值為100
3, Names count to use static import with '*' 值為100
之后就可以順利的將自定義jar包打包成功,之后直接替換本地項目中的flink-kafka-connector.jar,注意名字要完成匹配,這樣后續代碼運行,使用的就是修改后的jar包。
具體修改如下:
下載flink源代碼1.14版本https://gitee.com/apache/flink.git(最好通過中文github網站進行下載,比較快),共有7個需要修改的地方:
1,KafkaSourceBuilder
設置結束消費位置和有限數據標志
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
public KafkaSourceBuilder<OUT> setEndOffsets(OffsetsInitializer stoppingOffsetsInitializer) {
//這個地方設置結束偏移位置,是整個修改的核心
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = Boundedness.BOUNDED;//有限數據標志,保證批任務處理kafka(流)數據
return this;
}
仿照開始偏移位置,設置結束偏移位置,這里之所以設置 this.boundedness = Boundedness.BOUNDED,是因為批任務並不支持消費kafka(流)類型數據,不設置會報如下錯誤:
Querying an unbounded table '%s' in batch mode is not allowed. "
+ "The table source is unbounded.
也可在其他方式設置該屬性,這個根據自己的需求可自行調整。
2,EndupMode
新增一個EndupMode配置文件,也是仿照開始的配置文件編寫
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndupMode.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.config;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
/** End modes for the Kafka Consumer. */
@Internal
public enum EndupMode {
/** End from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** End from the latest offset. */
LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
/**
* Start from user-supplied timestamp for each partition. Since this mode will have specific
* offsets to start with, we do not need a sentinel value; using Long.MIN_VALUE as a
* placeholder.
*/
TIMESTAMP(Long.MIN_VALUE),
/**
* Start from user-supplied specific offsets for each partition. Since this mode will have
* specific offsets to start with, we do not need a sentinel value; using Long.MIN_VALUE as a
* placeholder.
*/
SPECIFIC_OFFSETS(Long.MIN_VALUE);
/** The sentinel offset value corresponding to this startup mode. */
private long stateSentinel;
EndupMode(long stateSentinel) {
this.stateSentinel = stateSentinel;
}
}
3, KafkaConnectorOptions
設置結束消費kafka的相關參數
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
仿照開始配置編寫結束配置,這里也很重要,后續調用的方式可根據此處編寫。
參數名 | 參數值 |
---|---|
scan.startup.mode | 可選值:'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' |
scan.startup.specific-offsets | 指定每個分區的偏移量,比如:'partition:0,offset:42;partition:1,offset:300' |
scan.startup.timestamp-millis | 直接指定開始時間戳,long類型 |
scan.endup.mode | 可選值:'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' |
scan.endup.specific-offsets | 指定每個分區的偏移量,比如:'partition:0,offset:42;partition:1,offset:300' |
scan.sendup.timestamp-millis | 直接指定結束時間戳,long類型 |
public static final ConfigOption<ScanEndupMode> SCAN_ENDUP_MODE =
ConfigOptions.key("scan.endup.mode")
.enumType(ScanEndupMode.class)
.defaultValue(ScanEndupMode.GROUP_OFFSETS)
.withDescription("Endup mode for Kafka consumer.");
public static final ConfigOption<String> SCAN_ENDUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.endup.specific-offsets")
.stringType()
.noDefaultValue()
.withDescription(
"Optional offsets used in case of \"specific-offsets\" endup mode");
public static final ConfigOption<Long> SCAN_ENDUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.endup.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription("Optional timestamp used in case of \"timestamp\" endup mode");
/** Endup mode for the Kafka consumer, see {@link #SCAN_ENDUP_MODE}. */
public enum ScanEndupMode implements DescribedEnum {
LATEST_OFFSET("latest-offset", text("End from the latest offset.")),
GROUP_OFFSETS(
"group-offsets",
text(
"End from committed offsets in ZooKeeper / Kafka brokers of a specific consumer group.")),
TIMESTAMP("timestamp", text("End from user-supplied timestamp for each partition.")),
SPECIFIC_OFFSETS(
"specific-offsets",
text("End from user-supplied specific offsets for each partition."));
private final String value;
private final InlineElement description;
ScanEndupMode(String value, InlineElement description) {
this.value = value;
this.description = description;
}
@Override
public String toString() {
return value;
}
@Override
public InlineElement getDescription() {
return description;
}
}
4,KafkaConnectorOptionsUtil
kafka結束消費位置,根據參數創建相關偏移量對象
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
public static EndupOptions getEndupOptions(ReadableConfig tableOptions) {
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
final EndupMode endupMode =
tableOptions
.getOptional(KafkaConnectorOptions.SCAN_ENDUP_MODE)
.map(KafkaConnectorOptionsUtil::endfromOption)
.orElse(EndupMode.GROUP_OFFSETS);
//這個地方需要注意一下,需要創建一個獲取結束偏移位置的方法
if (endupMode == EndupMode.SPECIFIC_OFFSETS) {
buildSpecificEndOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets);
}
//
final EndupOptions options = new EndupOptions();
options.endupMode = endupMode;
options.specificOffsets = specificOffsets;
if (endupMode == EndupMode.TIMESTAMP) {
options.endupTimestampMillis = tableOptions.get(SCAN_ENDUP_TIMESTAMP_MILLIS);
}
return options;
}
private static void buildSpecificEndOffsets(
ReadableConfig tableOptions,
String topic,
Map<KafkaTopicPartition, Long> specificOffsets) {
String specificOffsetsStrOpt = tableOptions.get(SCAN_ENDUP_SPECIFIC_OFFSETS);
final Map<Integer, Long> offsetMap =
parseSpecificOffsets(specificOffsetsStrOpt, SCAN_ENDUP_SPECIFIC_OFFSETS.key());
offsetMap.forEach(
(partition, offset) -> {
final KafkaTopicPartition topicPartition =
new KafkaTopicPartition(topic, partition);
specificOffsets.put(topicPartition, offset);
});
}
這個地方是仿照開始偏移位置進行編寫,這個其實很容易看出來,但是由於最開始不理解浪費了好長時間。仿照buildSpecificOffsets 進行編寫,根據開始SCAN_STARTUP_SPECIFIC_OFFSETS,設置SCAN_ENDUP_SPECIFIC_OFFSETS,這個屬性對應的值是從前端建表的時候傳入的。
private static EndupMode endfromOption(KafkaConnectorOptions.ScanEndupMode scanEndupMode) {
switch (scanEndupMode) {
case LATEST_OFFSET:
return EndupMode.LATEST;
case GROUP_OFFSETS:
return EndupMode.GROUP_OFFSETS;
case SPECIFIC_OFFSETS:
return EndupMode.SPECIFIC_OFFSETS;
case TIMESTAMP:
return EndupMode.TIMESTAMP;
default:
throw new TableException(
"Unsupported endup mode. Validator should have checked that.");
}
}
/** Kafka endup options. * */
public static class EndupOptions {
public EndupMode endupMode;
public Map<KafkaTopicPartition, Long> specificOffsets;
public long endupTimestampMillis;
}
5,KafkaDynamicSource
對應前面的修改,后續創建數據源方法也要修改,將新增的參數加入即可
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
6,KafkaDynamicTableFactory
同理跟隨前面新增的參數,后續創建對象也需加上
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java!
7,UpsertKafkaDynamicTableFactory
同理跟隨前面新增的參數,后續創建對象也需加上
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
以后是全部需要修改的地方。
下面測試文件會在打包時會報錯,修改一下就行了
具體的測試代碼如下:
1,創建運行環境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,settings);
2,建表
有3種設置結束偏移位置的方式,以下是具體案例
// 1,建表語句,latest-offset
// 'scan.endup.mode' = 'specific-offsets',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `age` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test02',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'scan.endup.mode' = 'latest-offset',\n" +
" 'format' = 'csv'\n" +
")";
// 2,建立連接sql 特定偏移位置
// 'scan.endup.mode' = 'specific-offsets',\n" +
// 'scan.endup.specific-offsets' = 'partition:0,offset:22',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `age` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test02',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'scan.endup.mode' = 'specific-offsets',\n" +
" 'scan.endup.specific-offsets' = 'partition:0,offset:22',\n" +
" 'format' = 'csv'\n" +
")";
// 3,建立連接sql 特定時間點
// " 'scan.endup.mode' = 'timestamp',\n" +
// " 'scan.endup.timestamp-millis' = '1648124880000',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `age` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test02',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'scan.endup.mode' = 'timestamp',\n" +
" 'scan.endup.timestamp-millis' = '1648124880000',\n" +
" 'format' = 'csv'\n" +
")";
//執行sql創建表
streamTableEnvironment.executeSql(connectSql);
3,輸出邏輯
//查詢邏輯
Table result = streamTableEnvironment.sqlQuery("select user_id ,item_id,age from KafkaTable");
//表數據轉流數據 方便輸出
DataStream<Row> rowDataStream = streamTableEnvironment.toDataStream(result);
streamExecutionEnvironment.execute();