flink cdc demo


Flink 1.11 發布了支持 MySQL CDC(Changelog Data Capture) 可以直接解析 Canal / Debezium 格式的 MySQL Binlog

對於實時處理程序,MySQL 的 Binlog 是很重要的數據源(上一個項目幾乎所有實時數據都是來自業務系統的數據庫,
也就是MySQL,算上分庫分表,接了上千個 MySQL 表的 binlog)

Flink 1.11 的 CDC 發布之后,第一時間就嘗試了一下 Canal 的 binlog 格式,不過感覺一般,還要部署解析的 Canal / Debezium,我們自己開發的解析 MySQL binlog 的組件,
比 Cancel 適合更適合我們,就放棄繼續研究這個東西了

最近一段時間,也看到很多同學在社區提起 Flink CDC ,一直以為是類似於 Stateful Functions 的 Flink 組件,今天看下下,才發現是雲邪大佬開源的 Flink-cdc-connector

GitHub 地址: https://github.com/ververica/flink-cdc-connectors

Flink-cdc-connector 就是個 CDC 組件,可以跳過 Canal / Debezium 等解析 Binlog 的工具,直接獲取 MySQL 的 Binlog,轉換成 Flink 的流表

Flink-cdc-connector 目前支持 MySQL 和 Postgres 兩種數據庫,詳情查看 GitHub

----------------進入主題--------------------------

這次是 cdc 的demo,所以需要將數據寫到 MySQL 中, 所以我就直接起了兩個任務:

1、 kafka to mysql
2、 mysql cdc to kafka

## kafka to mysql

數據還是使用的之前從天池公開數據集中下載的 user_log,kafka source 沒什么好說的,直接從 kafka 讀取數據,使用 jdbc sink 寫入到 mysql 中

mysql 表結構:

create table user_log
(
    id          int auto_increment primary key,
    user_id     varchar(20) not null,
    item_id     varchar(20) null,
    category_id varchar(20) null,
    behavior    varchar(10) null,
    ts          datetime    null
)
comment '天池公開數據集-淘寶用戶數據';

flink jdbc sink 表如下:

CREATE TABLE mysql_table_venn_user_log_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts timestamp(3)
) WITH (
  'connector' = 'jdbc'
  ,'url' = 'jdbc:mysql://venn:3306/venn'
  ,'table-name' = 'user_log'
  ,'username' = 'root'
  ,'password' = '123456'
  ,'sink.buffer-flush.max-rows' = '100' -- default
  ,'sink.buffer-flush.interval' = '1s'
  ,'sink.max-retries' = '3'
);


寫入數據如下:

 

 ## mysql cdc to kafka

這個要稍微麻煩一點,由於 cdc source 是 upsert 的,並且不支持 WindowGroupAgg,所以使用了自己開發的 kafka upsert sink(注意: 代碼中直接過濾了 delete 流)

The main method caused an error: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, cdc_mysql_venn_user_log]], fields=[id, user_id, item_id, category_id, behavior, ts])
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)

源碼: KafkaUpsertTableSink

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

    final SinkFunction<Row> kafkaProducer = createKafkaProducer(
            topic,
            properties,
            serializationSchema,
            partitioner);

    // todo cast DataStream<Tuple2<Boolean, Row>> to DataStream<Row>
    return dataStream
            .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
                @Override
                public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception {
                    // upsertStream include insert/update/delete change, true is upsert, false is delete
                    // create new row include upsert message
                    if (element.f0) {
                        out.collect(element.f1);
                    } else {
                        System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false");
                    }
                }
            })
            .addSink(kafkaProducer)
            .setParallelism(dataStream.getParallelism())
            .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

下面來看 cdc 的 sql:

-- creates a mysql mysql table source
drop table if exists cdc_mysql_venn_user_log;
CREATE TABLE cdc_mysql_venn_user_log (
  id varchar
  ,user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,proc_time as PROCTIME()
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'venn',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'database-name' = 'venn',
 'table-name' = 'user_log'
);

-- kafka sink
drop table if exists cdc_mysql_user_log_sink;
CREATE TABLE cdc_mysql_user_log_sink (
  id varchar
  ,user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
) WITH (
  'connector.type' = 'upsertKafka'
  ,'connector.version' = 'universal'
  ,'connector.topic' = 'cdc_mysql_user_log_sink'
  ,'connector.properties.zookeeper.connect' = 'venn:2181'
  ,'connector.properties.bootstrap.servers' = 'venn:9092'
  ,'format.type' = 'json'
);

-- sink to kafka
insert into cdc_mysql_user_log_sink
select id, user_id, item_id, category_id, behavior, ts
from cdc_mysql_venn_user_log;

注: flink cdc connector 表不支持定義 watermark

java.lang.UnsupportedOperationException: Currently, defining WATERMARK on a changelog source is not supported.

刪除mysql 表數據時,taskmanager 打印 delete message:

 

 

完整代碼請查看: GitHub https://github.com/springMoon/sqlSubmit

拓展: Flink SQL CDC 上線!我們總結了 13 條生產實踐經驗 https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM