在 Flink 1.10 中SQL正式生產,在嘗試使用的時候,遇到了這樣的問題: KafkaTableSink 的 'update-mode' 只支持 ‘append’,如下面這樣:
CREATE TABLE user_log_sink ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior_sink', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'append', # 僅支持 append 'format.type' = 'json' );
看起來好像沒問題,因為kafka 也只能往里面寫數據,不能刪數據
官網鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
但是,如果在SQL中使用到了group 就不行了,如下:
SELECT item_id, category_id, behavior, max(ts), min(proctime), max(proctime), count(user_id) FROM user_log group by item_id, category_id, behavior;
報錯如下:
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.
之前一個同學在社區問這個問題,得到的建議是用 DataStream 轉一下。
大概看了下 KafkaTableSink 的源碼,有這樣的繼承關系,從 AppendStreamTableSink 繼承下來的
public class KafkaTableSink extends KafkaTableSinkBase public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row>
group by (非窗口的)語句是 以 key 撤回的,需要用 RetractStreamTableSink 或 UpsertStreamTableSink 。
public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>>
注:RetractStreamTableSink 一般在Flink 內部使用,UpsertStreamTableSink 適合於連接外部存儲系統。
到這里,不能將 group by 的結果直接用 KafkaTableSink 的原因已經找到了,接下來就自己實現一個 KafkaUpsertTableSink,就可以解決我們的問題了。
參考如下類,實現了自定義的 KafkaUpsertTableSink:
KafkaTableSink
KafkaTableSinkBase
KafkaTableSourceSinkFactory
KafkaTableSourceSinkFactoryBase
KafkaValidator
和
HBaseUpsertTableSink
Elasticsearch7UpsertTableSink
Elasticsearch7UpsertTableSinkFactory
參考上一篇翻譯:【翻譯】Flink Table API & SQL 自定義 Source & Sink
直接把Kafka的一套copy出來修改:
MyKafkaValidator 直接copy KafkaValidator ,修改 connector_type 和 update-mode 檢驗的代碼:
public static final String CONNECTOR_TYPE_VALUE_KAFKA = "myKafka"; @Override public void validate(DescriptorProperties properties) { super.validate(properties); properties.validateEnumValues(UPDATE_MODE, true, Collections.singletonList(UPDATE_MODE_VALUE_UPSERT)); properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false); properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); validateStartupMode(properties); validateKafkaProperties(properties); validateSinkPartitioner(properties); }
KafkaUpsertTableSinkBase 改為繼承 UpsertStreamTableSink:
public abstract class KafkaUpsertTableSinkBase implements UpsertStreamTableSink<Row>
修改對應 consumeDataStream 方法的實現: 將 DataStream<Tuple2<Boolean, Row>> 轉成 DataStream< Row>,讓 kafka 接收
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { final SinkFunction<Row> kafkaProducer = createKafkaProducer( topic, properties, serializationSchema, partitioner); // update by venn return dataStream .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() { @Override public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception { // true is upsert, false is delete,這里false 的直接丟棄了 if (element.f0) { out.collect(element.f1); } else { //System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false"); } } }) .addSink(kafkaProducer) .setParallelism(dataStream.getParallelism()) .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); }
KafakUpsertTableSink 繼承 KafkaUpsertTableSinkBase:
public class KafkaUpsertTableSink extends KafkaUpsertTableSinkBase
修改對應實現:
@Override public TypeInformation<Row> getRecordType() { return TypeInformation.of(Row.class); }
KafkaUpsertTableSourceSinkFactoryBase 和 KafkaTableSourceSinkFactoryBase 一樣實現 : StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row>
public abstract class KafkaUpsertTableSourceSinkFactoryBase implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row>
KafkaUpsertTableSourceSinkFactory 繼承 KafkaUpsertTableSourceSinkFactoryBase:
public class KafkaUpsertTableSourceSinkFactory extends KafkaUpsertTableSourceSinkFactoryBase
注意:將代碼中用到 KafkaValidator 改為 MyKafkaValidator
最后一個很重要的步驟是在 resource 目錄下添加文件夾 META_INF/services,並創建文件 org.apache.flink.table.factories.TableFactory,在文件中寫上新建的 Factory 類:
TableFactory允許從基於字符串的屬性創建與表相關的不同實例。 調用所有可用的工廠以匹配給定的屬性集和相應的工廠類。
工廠利用 Java’s Service Provider Interfaces(SPI)進行發現。 這意味着每個依賴項和JAR文件都應在 META_INF/services 資源目錄中包含一個文件org.apache.flink.table.factories.TableFactory,該文件列出了它提供的所有可用表工廠。
注:不加不會加載新的工廠方法
好了,代碼改完了試下效果:
---sourceTable CREATE TABLE user_log( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), proctime as PROCTIME() ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); ---sinkTable CREATE TABLE user_log_sink ( item_id VARCHAR , category_id VARCHAR , behavior VARCHAR , max_tx TIMESTAMP(3), min_prc TIMESTAMP(3), max_prc TIMESTAMP(3), coun BIGINT ) WITH ( 'connector.type' = 'myKafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior_sink', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'update-mode' = 'upsert', 'format.type' = 'json' ); ---insert INSERT INTO user_log_sink SELECT item_id, category_id, behavior, max(ts), min(proctime), max(proctime), count(user_id) FROM user_log group by item_id, category_id, behavior;
執行圖如下:
在 KafkaUpsertTableSinkBase 中 查看消息的 布爾值:
upsert 的消息:
比較尷尬的問題是,沒看到 false 的消息,哪怕是同一條消息一直發,count 值是在 增加,但是沒有 false 的去刪除上一條消息
輸出結果:
{"item_id":"3611281","category_id":"965809","behavior":"pv","max_tx":"2017-11-26T01:00:00Z","min_prc":"2020-04-08T05:20:26.694Z","max_prc":"2020-04-08T05:26:16.525Z","coun":10}
{"item_id":"3611281","category_id":"965809","behavior":"pv","max_tx":"2017-11-26T01:00:00Z","min_prc":"2020-04-08T05:20:26.694Z","max_prc":"2020-04-08T05:26:27.317Z","coun":11}
完整代碼可以在這里找到:https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/source
水平有限,樣例僅供參考
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文