Flink 1.10 SQL 自定義KafkaUpsertSink


在 Flink 1.10 中SQL正式生產,在嘗試使用的時候,遇到了這樣的問題: KafkaTableSink 的 'update-mode' 只支持 ‘append’,如下面這樣:

CREATE TABLE user_log_sink (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior_sink',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',  # 僅支持 append
     'format.type' = 'json'
);

看起來好像沒問題,因為kafka 也只能往里面寫數據,不能刪數據

官網鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

但是,如果在SQL中使用到了group 就不行了,如下:

SELECT item_id, category_id, behavior, max(ts), min(proctime), max(proctime), count(user_id)
FROM user_log
group by item_id, category_id, behavior;

報錯如下:

Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.

之前一個同學在社區問這個問題,得到的建議是用 DataStream 轉一下。

大概看了下 KafkaTableSink 的源碼,有這樣的繼承關系,從 AppendStreamTableSink 繼承下來的

public class KafkaTableSink extends KafkaTableSinkBase
public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row>

 group by (非窗口的)語句是 以 key 撤回的,需要用 RetractStreamTableSink 或 UpsertStreamTableSink 。

public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>>
public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>>

注:RetractStreamTableSink 一般在Flink 內部使用,UpsertStreamTableSink 適合於連接外部存儲系統。

到這里,不能將 group by 的結果直接用 KafkaTableSink 的原因已經找到了,接下來就自己實現一個 KafkaUpsertTableSink,就可以解決我們的問題了。

參考如下類,實現了自定義的 KafkaUpsertTableSink:

KafkaTableSink
KafkaTableSinkBase
KafkaTableSourceSinkFactory
KafkaTableSourceSinkFactoryBase
KafkaValidator
和
HBaseUpsertTableSink
Elasticsearch7UpsertTableSink
Elasticsearch7UpsertTableSinkFactory

 參考上一篇翻譯:【翻譯】Flink Table API & SQL 自定義 Source & Sink 

直接把Kafka的一套copy出來修改:

MyKafkaValidator 直接copy KafkaValidator ,修改 connector_type 和 update-mode 檢驗的代碼:

public static final String CONNECTOR_TYPE_VALUE_KAFKA = "myKafka";
@Override
public void validate(DescriptorProperties properties) {
    super.validate(properties);
    properties.validateEnumValues(UPDATE_MODE, true, Collections.singletonList(UPDATE_MODE_VALUE_UPSERT));

    properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false);

    properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);

    validateStartupMode(properties);

    validateKafkaProperties(properties);

    validateSinkPartitioner(properties);
}

KafkaUpsertTableSinkBase 改為繼承 UpsertStreamTableSink:

public abstract class KafkaUpsertTableSinkBase implements UpsertStreamTableSink<Row>

修改對應 consumeDataStream 方法的實現: 將  DataStream<Tuple2<Boolean, Row>> 轉成 DataStream< Row>,讓 kafka 接收

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

    final SinkFunction<Row> kafkaProducer = createKafkaProducer(
            topic,
            properties,
            serializationSchema,
            partitioner);
    // update by venn
    return dataStream
            .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
                @Override
                public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception {
                    // true is upsert, false is delete,這里false 的直接丟棄了
                    if (element.f0) {
                        out.collect(element.f1);
                    } else {
                        //System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false");
 } } })
            .addSink(kafkaProducer)
            .setParallelism(dataStream.getParallelism())
            .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

 KafakUpsertTableSink 繼承 KafkaUpsertTableSinkBase:

public class KafkaUpsertTableSink extends KafkaUpsertTableSinkBase

修改對應實現:

@Override
public TypeInformation<Row> getRecordType() {
    return TypeInformation.of(Row.class);
}

KafkaUpsertTableSourceSinkFactoryBase 和 KafkaTableSourceSinkFactoryBase 一樣實現 : StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row>

public abstract class KafkaUpsertTableSourceSinkFactoryBase implements
        StreamTableSourceFactory<Row>,
        StreamTableSinkFactory<Row>

KafkaUpsertTableSourceSinkFactory 繼承 KafkaUpsertTableSourceSinkFactoryBase:

public class KafkaUpsertTableSourceSinkFactory extends KafkaUpsertTableSourceSinkFactoryBase

注意:將代碼中用到 KafkaValidator 改為 MyKafkaValidator

最后一個很重要的步驟是在 resource 目錄下添加文件夾  META_INF/services,並創建文件 org.apache.flink.table.factories.TableFactory,在文件中寫上新建的 Factory 類:

 TableFactory允許從基於字符串的屬性創建與表相關的不同實例。 調用所有可用的工廠以匹配給定的屬性集和相應的工廠類。

工廠利用 Java’s Service Provider Interfaces(SPI)進行發現。 這意味着每個依賴項和JAR文件都應在 META_INF/services 資源目錄中包含一個文件org.apache.flink.table.factories.TableFactory,該文件列出了它提供的所有可用表工廠。

注:不加不會加載新的工廠方法

好了,代碼改完了試下效果:

---sourceTable
CREATE TABLE user_log(
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3),
    proctime as PROCTIME()
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'connector.startup-mode' = 'earliest-offset',
    'format.type' = 'json'
);

---sinkTable
CREATE TABLE user_log_sink (
    item_id VARCHAR ,
    category_id VARCHAR ,
    behavior VARCHAR ,
    max_tx TIMESTAMP(3),
    min_prc TIMESTAMP(3),
    max_prc TIMESTAMP(3),
    coun BIGINT
) WITH (
    'connector.type' = 'myKafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior_sink',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

---insert
INSERT INTO user_log_sink
SELECT item_id, category_id, behavior, max(ts), min(proctime), max(proctime), count(user_id)
FROM user_log
group by item_id, category_id, behavior;

執行圖如下:

 

 在  KafkaUpsertTableSinkBase 中 查看消息的 布爾值:

 upsert 的消息:

 

比較尷尬的問題是,沒看到 false 的消息,哪怕是同一條消息一直發,count 值是在 增加,但是沒有 false 的去刪除上一條消息

輸出結果:

{"item_id":"3611281","category_id":"965809","behavior":"pv","max_tx":"2017-11-26T01:00:00Z","min_prc":"2020-04-08T05:20:26.694Z","max_prc":"2020-04-08T05:26:16.525Z","coun":10}
{"item_id":"3611281","category_id":"965809","behavior":"pv","max_tx":"2017-11-26T01:00:00Z","min_prc":"2020-04-08T05:20:26.694Z","max_prc":"2020-04-08T05:26:27.317Z","coun":11}

完整代碼可以在這里找到:https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/source

水平有限,樣例僅供參考 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM