【sqlSubmit】1.0 正式發布


在這里,我很榮幸的宣布,開源項目 flink sqlSubmit 1.0 發布了。👏👏👏

乘着這幾天假,在家把很早就該完成的事情做了點,第一個就是把sqlSubmit 稍微修改了下,勉強發布1.0版本。

地址:https://github.com/springMoon/sqlSubmit/releases/tag/sqlSubmit-1.0

功能

  sqlSubmit 目前是基於 flink 1.10.1 開發,目前只測試過如下功能(就是簡單的跑了下)

  • kafka Source/Sink
  • Kafak Upsert Sink

項目中resources/sql/connector 中 kafkaToKafka.sql  和 kafkaUpserDemo.sql      

由於環境比較惡劣(只有一台老mac電腦),只裝了kafka,所以這次就只有 kafka 相關的功能。

HiveCatalog 也因為沒有 hive 暫時放棄。

等我在電腦上搗騰上mysql/hbase/elasticsearch 就加上對應的sql demo 文件。

KafkaUpsertSink

kafka source/sink 沒什么好說的,就簡單介紹下 KafkaUpsertSink,目前的 flink 版本中,kafka Sink 類:  KafkaTableSink 繼承自  AppendStreamTableSink,所以 kafka 表作為 sink upsert-mode 只支持 append,但是 sql 中使用了 group by 后,輸出的流就變成 UpsertStream ,然后不能執行。

對一般kafka sink 表,執行如下 sql

-- kafka sink
CREATE TABLE user_log_sink (
  user_id VARCHAR
  ,max_tx BIGINT
) WITH (
  'connector.type' = 'kafka'
  ,'connector.version' = 'universal'
  ,'connector.topic' = 'user_behavior_sink'                            -- required: topic name from which the table is read
  ,'connector.properties.zookeeper.connect' = 'localhost:2181'    -- required: specify the ZooKeeper connection string
  ,'connector.properties.bootstrap.servers' = 'localhost:9092'    -- required: specify the Kafka server connection string
  ,'connector.properties.group.id' = 'user_log'                   -- optional: required in Kafka consumer, specify consumer group
  ,'connector.startup-mode' = 'group-offsets'                     -- optional: valid modes are "earliest-offset", "latest-offset", "group-offsets",  "specific-offsets"
  ,'connector.sink-partitioner' = 'fixed'                         --optional fixed 每個 flink 分區數據只發到 一個 kafka 分區
                                                                          -- round-robin flink 分區輪詢分配到 kafka 分區
                                                                          -- custom 自定義分區策略
  --,'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'   -- 自定義分區類
  ,'format.type' = 'json'                 -- required:  'csv', 'json' and 'avro'.
);

-- insert
insert into user_log_sink
select user_id, count(user_id)
from user_log
group by user_id;

會報如下錯誤:

org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.

這種時候,很容易就想到自定義一個KafkaUsertSink,實現  UpsertStreamTableSink。(站在巨人的肩膀上)只需要參照Flink 中的KafkaTableSink 一套即可。

KafkaTableSink 
KafkaTableSinkBase
KafkaTableSourceSinkFactory
KafkaTableSourceSinkFactoryBase
KafkaValidator

將 KafkaTableSinkBase 實現的接口改為  UpsertStreamTableSink ,然后實現對應的方法即可。

我把KafkaTableSinkBase 和 KafkaTableSink 合並了一下,代碼目錄 com.rookie.submit.connector.kafka

KafkaUpsertTableSink
KafkaUpsertTableSinkFactory
KafkaUpsertTableSinkFactoryBase
UpsertKafkaValidator

KafkaUpserTableSink 實現 UpsertStreamTableSink

UpsertKafkaValidator 則是修改對應的 table properties,將  'connector.type' = 'kafka' , 改為  'connector.type' = 'upsertKafka'

最重要的一段邏輯是將 DataStream<Tuple2<Boolean, Row>> 轉為 DataStream<Row>  ,將 UpsertStream 的 標示列去掉(根據自己實際的業務需要修改)

@Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

        final SinkFunction<Row> kafkaProducer = createKafkaProducer(
                topic,
                properties,
                serializationSchema,
                partitioner);

        // todo cast DataStream<Tuple2<Boolean, Row>> to DataStream<Row>
        return dataStream
                .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
                    @Override
                    public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception {
                        // upsertStream include insert/update/delete change, true is upsert, false is delete
                        if (element.f0) {
                            out.collect(element.f1);
                        } else {
                            System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false");
                        }
                    }
                })
                .addSink(kafkaProducer)
                .setParallelism(dataStream.getParallelism())
                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
    }

測試時,執行 com.rookie.submit.main.SqlSubmit ,需要傳入參數: --sql sql/connector/kafkaUpsertDemo.sql (--sql 參數指定要執行的sql 腳本路徑)

執行結果如下:

 

 

具體請移步 github 查看項目代碼。

 

注:本人水平有限,開發的代碼也比較雞肋,建議使用 flink on zeppelin,flink 社區大力支持中。

flink 1.10.1 web UI 也優化了很多,比之前好看多了

 

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM