Flink1.11中的CDC Connectors操作實踐



Flink1.11引入了CDC的connector,通過這種方式可以很方便地捕獲變化的數據,大大簡化了數據處理的流程。Flink1.11的CDC connector主要包括:MySQL CDCPostgres CDC,同時對Kafka的Connector支持canal-jsondebezium-json以及changelog-json的format。本文主要分享以下內容:

  • CDC簡介
  • Flink提供的 table format
  • 使用過程中的注意點
  • mysql-cdc的操作實踐
  • canal-json的操作實踐
  • changelog-json的操作實踐

簡介

Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。因此,它可以充分利用Debezium的功能。

特點

  • 支持讀取數據庫快照,並且能夠持續讀取數據庫的變更日志,即使發生故障,也支持exactly-once 的處理語義

  • 對於DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業中使用多個數據庫和表上的變更數據。

  • 對於Table/SQL API 的CDC connector,用戶可以使用SQL DDL創建CDC數據源,來監視單個表上的數據變更。

使用場景

  • 數據庫之間的增量數據同步
  • 審計日志
  • 數據庫之上的實時物化視圖
  • 基於CDC的維表join

Flink提供的 table format

Flink提供了一系列可以用於table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用過程中的注意點

使用MySQL CDC的注意點

如果要使用MySQL CDC connector,對於程序而言,需要添加如下依賴:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

使用canal-json的注意點

如果要使用Kafka的canal-json,對於程序而言,需要添加如下依賴:

<!-- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>


如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由於Flink1.11的安裝包 的lib目錄下並沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc

使用changelog-json的注意點

如果要使用Kafka的changelog-json Format,對於程序而言,需要添加如下依賴:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

mysql-cdc的操作實踐

創建MySQL數據源表

在創建MySQL CDC表之前,需要先創建MySQL的數據表,如下:

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '編號',
  `consignee` varchar(100DEFAULT NULL COMMENT '收貨人',
  `consignee_tel` varchar(20DEFAULT NULL COMMENT '收件人電話',
  `total_amount` decimal(10,2DEFAULT NULL COMMENT '總金額',
  `order_status` varchar(20DEFAULT NULL COMMENT '訂單狀態,1表示下單,2表示支付',
  `user_id` bigint(20DEFAULT NULL COMMENT '用戶id',
  `payment_way` varchar(20DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000DEFAULT NULL COMMENT '送貨地址',
  `order_comment` varchar(200DEFAULT NULL COMMENT '訂單備注',
  `out_trade_no` varchar(50DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)',
  `trade_body` varchar(200DEFAULT NULL COMMENT '訂單描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '創建時間',
  `operate_time` datetime DEFAULT NULL COMMENT '操作時間',
  `expire_time` datetime DEFAULT NULL COMMENT '失效時間',
  `tracking_no` varchar(100DEFAULT NULL COMMENT '物流單編號',
  `parent_order_id` bigint(20DEFAULT NULL COMMENT '父訂單編號',
  `img_url` varchar(200DEFAULT NULL COMMENT '圖片路徑',
  `province_id` int(20DEFAULT NULL COMMENT '地區',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476'lAXjcL''13408115089'433.00'2'10'2''OYyAdSdLxedceqovndCD''ihjAYsSjrgJMQVdFQnSy''8728720206''''2020-06-18 02:21:38'NULLNULLNULLNULLNULL9);
INSERT INTO `order_info`
VALUES (477'QLiFDb''13415139984'772.00'1'90'2''OizYrQbKuWvrvdfpkeSZ''wiBhhqhMndCCgXwmWVQq''1679381473''''2020-06-18 09:12:25'NULLNULLNULLNULLNULL3);
INSERT INTO `order_info`
VALUES (478'iwKjQD''13320383859'88.00'1'107'1''cbXLKtNHWOcWzJVBWdAs''njjsnknHxsxhuCCeNDDi''0937074290''''2020-06-18 15:56:34'NULLNULLNULLNULLNULL7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '編號',
  `order_id` bigint(20DEFAULT NULL COMMENT '訂單編號',
  `sku_id` bigint(20DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200DEFAULT NULL COMMENT 'sku名稱(冗余)',
  `img_url` varchar(200DEFAULT NULL COMMENT '圖片名稱(冗余)',
  `order_price` decimal(10,2DEFAULT NULL COMMENT '購買價格(下單時sku價格)',
  `sku_num` varchar(200DEFAULT NULL COMMENT '購買個數',
  `create_time` datetime DEFAULT NULL COMMENT '創建時間',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細表';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (13294768'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz'8900.00'3''2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (13304779'榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍全網通 移動聯通電信''http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne'2452.00'4''2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (13314784'小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍 全網通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機''http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv'1442.00'1''2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (13324788'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV'8900.00'3''2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (13334788'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP'8900.00'1''2020-06-18 15:56:34');

Flink SQL Cli創建CDC數據源

啟動 Flink 集群,再啟動 SQL CLI,執行下面命令:

-- 創建訂單信息表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(105)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);

在Flink SQL Cli中查詢該表的數據:result-mode: tableau,+表示數據的insert

在SQL CLI中創建訂單詳情表:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(105),
 create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);

查詢結果如下:

執行JOIN操作:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
      order_status = '2'-- 已支付
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;

canal-json的操作實踐

關於cannal的使用方式,可以參考我的另一篇文章:基於Canal與Flink實現數據實時增量同步(一)。我已經將下面的表通過canal同步到了kafka,具體格式為:

{
    "data":[
        {
            "id":"1",
            "region_name":"華北"
        },
        {
            "id":"2",
            "region_name":"華東"
        },
        {
            "id":"3",
            "region_name":"東北"
        },
        {
            "id":"4",
            "region_name":"華中"
        },
        {
            "id":"5",
            "region_name":"華南"
        },
        {
            "id":"6",
            "region_name":"西南"
        },
        {
            "id":"7",
            "region_name":"西北"
        }
    ],
    "database":"mydw",
    "es":1597128441000,
    "id":102,
    "isDdl":false,
    "mysqlType":{
        "id":"varchar(20)",
        "region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "id":12,
        "region_name":12
    },
    "table":"base_region",
    "ts":1597128441424,
    "type":"INSERT"
}

在SQL CLI中創建該canal-json格式的表:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);

查詢結果如下:

changelog-json的操作實踐

創建MySQL數據源

參見上面的order_info

Flink SQL Cli創建changelog-json表

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(105)
WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd'as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 訂單已支付
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

查詢表看一下結果:

再查一下kafka的數據:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

當將另外兩個訂單的狀態order_status更新為2時,總金額=443+772+88=1293再觀察數據:

再看kafka中的數據:

總結

本文基於Flink1.11的SQL,對新添加的CDC connector的使用方式進行了闡述。主要包括MySQL CDC connector、canal-json及changelog-json的format,並指出了使用過程中的注意點。另外本文給出了完整的使用示例,如果你有現成的環境,那么可以直接進行測試使用。

如果覺得有用,請分享、點贊、在看


本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM