Flink 1.11 發布了支持 MySQL CDC(Changelog Data Capture) 可以直接解析 Canal / Debezium 格式的 MySQL Binlog
對於實時處理程序,MySQL 的 Binlog 是很重要的數據源(上一個項目幾乎所有實時數據都是來自業務系統的數據庫,
也就是MySQL,算上分庫分表,接了上千個 MySQL 表的 binlog)
Flink 1.11 的 CDC 發布之后,第一時間就嘗試了一下 Canal 的 binlog 格式,不過感覺一般,還要部署解析的 Canal / Debezium,我們自己開發的解析 MySQL binlog 的組件,
比 Cancel 適合更適合我們,就放棄繼續研究這個東西了
最近一段時間,也看到很多同學在社區提起 Flink CDC ,一直以為是類似於 Stateful Functions 的 Flink 組件,今天看下下,才發現是雲邪大佬開源的 Flink-cdc-connector
GitHub 地址: https://github.com/ververica/flink-cdc-connectors
Flink-cdc-connector 就是個 CDC 組件,可以跳過 Canal / Debezium 等解析 Binlog 的工具,直接獲取 MySQL 的 Binlog,轉換成 Flink 的流表
Flink-cdc-connector 目前支持 MySQL 和 Postgres 兩種數據庫,詳情查看 GitHub
----------------進入主題--------------------------
這次是 cdc 的demo,所以需要將數據寫到 MySQL 中, 所以我就直接起了兩個任務:
1、 kafka to mysql
2、 mysql cdc to kafka
## kafka to mysql
數據還是使用的之前從天池公開數據集中下載的 user_log,kafka source 沒什么好說的,直接從 kafka 讀取數據,使用 jdbc sink 寫入到 mysql 中
mysql 表結構:
create table user_log ( id int auto_increment primary key, user_id varchar(20) not null, item_id varchar(20) null, category_id varchar(20) null, behavior varchar(10) null, ts datetime null ) comment '天池公開數據集-淘寶用戶數據';
flink jdbc sink 表如下:
CREATE TABLE mysql_table_venn_user_log_sink ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,ts timestamp(3) ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://venn:3306/venn' ,'table-name' = 'user_log' ,'username' = 'root' ,'password' = '123456' ,'sink.buffer-flush.max-rows' = '100' -- default ,'sink.buffer-flush.interval' = '1s' ,'sink.max-retries' = '3' );
寫入數據如下:
## mysql cdc to kafka
這個要稍微麻煩一點,由於 cdc source 是 upsert 的,並且不支持 WindowGroupAgg,所以使用了自己開發的 kafka upsert sink(注意: 代碼中直接過濾了 delete 流)
The main method caused an error: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, cdc_mysql_venn_user_log]], fields=[id, user_id, item_id, category_id, behavior, ts]) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
源碼: KafkaUpsertTableSink
@Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { final SinkFunction<Row> kafkaProducer = createKafkaProducer( topic, properties, serializationSchema, partitioner); // todo cast DataStream<Tuple2<Boolean, Row>> to DataStream<Row> return dataStream .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() { @Override public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception { // upsertStream include insert/update/delete change, true is upsert, false is delete // create new row include upsert message if (element.f0) { out.collect(element.f1); } else { System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false"); } } }) .addSink(kafkaProducer) .setParallelism(dataStream.getParallelism()) .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); }
下面來看 cdc 的 sql:
-- creates a mysql mysql table source drop table if exists cdc_mysql_venn_user_log; CREATE TABLE cdc_mysql_venn_user_log ( id varchar ,user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3) ,proc_time as PROCTIME() ,PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'venn', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'venn', 'table-name' = 'user_log' ); -- kafka sink drop table if exists cdc_mysql_user_log_sink; CREATE TABLE cdc_mysql_user_log_sink ( id varchar ,user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'upsertKafka' ,'connector.version' = 'universal' ,'connector.topic' = 'cdc_mysql_user_log_sink' ,'connector.properties.zookeeper.connect' = 'venn:2181' ,'connector.properties.bootstrap.servers' = 'venn:9092' ,'format.type' = 'json' ); -- sink to kafka insert into cdc_mysql_user_log_sink select id, user_id, item_id, category_id, behavior, ts from cdc_mysql_venn_user_log;
注: flink cdc connector 表不支持定義 watermark
java.lang.UnsupportedOperationException: Currently, defining WATERMARK on a changelog source is not supported.
刪除mysql 表數據時,taskmanager 打印 delete message:
完整代碼請查看: GitHub https://github.com/springMoon/sqlSubmit
拓展: Flink SQL CDC 上線!我們總結了 13 條生產實踐經驗 https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文