在這里,我很榮幸的宣布,開源項目 flink sqlSubmit 1.0 發布了。👏👏👏
乘着這幾天假,在家把很早就該完成的事情做了點,第一個就是把sqlSubmit 稍微修改了下,勉強發布1.0版本。
地址:https://github.com/springMoon/sqlSubmit/releases/tag/sqlSubmit-1.0
功能
sqlSubmit 目前是基於 flink 1.10.1 開發,目前只測試過如下功能(就是簡單的跑了下)
- kafka Source/Sink
- Kafak Upsert Sink
項目中resources/sql/connector 中 kafkaToKafka.sql 和 kafkaUpserDemo.sql
由於環境比較惡劣(只有一台老mac電腦),只裝了kafka,所以這次就只有 kafka 相關的功能。
HiveCatalog 也因為沒有 hive 暫時放棄。
等我在電腦上搗騰上mysql/hbase/elasticsearch 就加上對應的sql demo 文件。
KafkaUpsertSink
kafka source/sink 沒什么好說的,就簡單介紹下 KafkaUpsertSink,目前的 flink 版本中,kafka Sink 類: KafkaTableSink 繼承自 AppendStreamTableSink,所以 kafka 表作為 sink upsert-mode 只支持 append,但是 sql 中使用了 group by 后,輸出的流就變成 UpsertStream ,然后不能執行。
對一般kafka sink 表,執行如下 sql
-- kafka sink CREATE TABLE user_log_sink ( user_id VARCHAR ,max_tx BIGINT ) WITH ( 'connector.type' = 'kafka' ,'connector.version' = 'universal' ,'connector.topic' = 'user_behavior_sink' -- required: topic name from which the table is read ,'connector.properties.zookeeper.connect' = 'localhost:2181' -- required: specify the ZooKeeper connection string ,'connector.properties.bootstrap.servers' = 'localhost:9092' -- required: specify the Kafka server connection string ,'connector.properties.group.id' = 'user_log' -- optional: required in Kafka consumer, specify consumer group ,'connector.startup-mode' = 'group-offsets' -- optional: valid modes are "earliest-offset", "latest-offset", "group-offsets", "specific-offsets" ,'connector.sink-partitioner' = 'fixed' --optional fixed 每個 flink 分區數據只發到 一個 kafka 分區 -- round-robin flink 分區輪詢分配到 kafka 分區 -- custom 自定義分區策略 --,'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner' -- 自定義分區類 ,'format.type' = 'json' -- required: 'csv', 'json' and 'avro'. ); -- insert insert into user_log_sink select user_id, count(user_id) from user_log group by user_id;
會報如下錯誤:
org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.
這種時候,很容易就想到自定義一個KafkaUsertSink,實現 UpsertStreamTableSink。(站在巨人的肩膀上)只需要參照Flink 中的KafkaTableSink 一套即可。
KafkaTableSink
KafkaTableSinkBase
KafkaTableSourceSinkFactory
KafkaTableSourceSinkFactoryBase
KafkaValidator
將 KafkaTableSinkBase 實現的接口改為 UpsertStreamTableSink ,然后實現對應的方法即可。
我把KafkaTableSinkBase 和 KafkaTableSink 合並了一下,代碼目錄 com.rookie.submit.connector.kafka
KafkaUpsertTableSink
KafkaUpsertTableSinkFactory
KafkaUpsertTableSinkFactoryBase
UpsertKafkaValidator
KafkaUpserTableSink 實現 UpsertStreamTableSink
UpsertKafkaValidator 則是修改對應的 table properties,將 'connector.type' = 'kafka' , 改為 'connector.type' = 'upsertKafka'
最重要的一段邏輯是將 DataStream<Tuple2<Boolean, Row>> 轉為 DataStream<Row> ,將 UpsertStream 的 標示列去掉(根據自己實際的業務需要修改)
@Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { final SinkFunction<Row> kafkaProducer = createKafkaProducer( topic, properties, serializationSchema, partitioner); // todo cast DataStream<Tuple2<Boolean, Row>> to DataStream<Row> return dataStream .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() { @Override public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception { // upsertStream include insert/update/delete change, true is upsert, false is delete if (element.f0) { out.collect(element.f1); } else { System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false"); } } }) .addSink(kafkaProducer) .setParallelism(dataStream.getParallelism()) .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); }
測試時,執行 com.rookie.submit.main.SqlSubmit ,需要傳入參數: --sql sql/connector/kafkaUpsertDemo.sql (--sql 參數指定要執行的sql 腳本路徑)
執行結果如下:
具體請移步 github 查看項目代碼。
注:本人水平有限,開發的代碼也比較雞肋,建議使用 flink on zeppelin,flink 社區大力支持中。
flink 1.10.1 web UI 也優化了很多,比之前好看多了
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文