Debezium-Flink-Hudi:實時流式CDC


1. 什么是Debezium

Debezium是一個開源的分布式平台,用於捕捉變化數據(change data capture)的場景。它可以捕捉數據庫中的事件變化(例如表的增、刪、改等),並將其轉為事件流,使得下游應用可以看到這些變化,並作出指定響應。

 

2. Debezium常規使用架構

根據Debezium官網[1]提供的常規使用的架構圖:

 

可以看到,在對RMSDB數據源做數據攝入時,使用的是Kafka Connect。Source Connector從數據庫中獲取記錄並發送到Kafka;Sink Connectors將記錄從Kafka Topic 傳播到其他系統中。

上圖中分別對MySQL 與 PostgreSQL部署了connector:

  1. MySQL connector使用的是一個客戶端庫訪問binlog
  2. PostgreSQL connector讀取的是的一個replication stream

 

另一種方式是僅部署Debezium Server(不帶Kakfa),架構如下圖所示:

此方式使用的是Debezium自帶的Source Connector。數據庫端的事件會被序列化為JSON或Apache Avro格式,然后發送到其他消息系統如Kinesis、Apache Pulsar等。

 

3. 部署Debezium

在此次部署中,我們使用的均為AWS 資源:

  1. 使用AWS RDS MySQL作為源端數據庫
  2. 使用AWS EKS 部署Kafka Connector
  3. 使用AWS MSK 部署Kafka
  4. Kafka下游為AWS EMR,運行Flink,實現增量載入Hudi表

此處會省去創建AWS RDS、EKS、MSK 以及 EMR的過程,主要介紹搭建過程中的具體使用到的方法。

 

3.1. AWS EKS部署Kafka Connector

3.1.1. 安裝Operator Framework 與 Strimzi Apache Kafka Operator

先安裝Operator Framework[2],它是一個用來管理k8s原生應用(Operator)的開源工具。然后安裝Kafka可以使用Strimzi Apache Kafka Operator[3]

安裝最新版 operator-framework[4],當前版本為 0.18.1

kubectl apply -f https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.18.1/crds.yaml

kubectl apply -f https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.18.1/olm.yaml

 

安裝Strimzi Apache Kafka Operator:

kubectl apply -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

$ kubectl get csv -n operators
NAME                               DISPLAY   VERSION   REPLACES                           PHASE
strimzi-cluster-operator.v0.23.0   Strimzi   0.23.0    strimzi-cluster-operator.v0.22.1   Succeeded

 

3.1.2. 打包Debezium的MySQL Kafka Connector

下面部署Debezium 的 MySQL Kafka Connector。

源端數據庫為MySQL,所以下載 debezium-connector-mysql,版本為1.5.0.Final:

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.0.Final
/debezium-connector-mysql-1.5.0.Final-plugin.tar.gz

tar -zxvf debezium-connector-mysql-1.5.0.Final-plugin.tar.gz

 

然后我們build一個自定義的debezium-connector-mysql Docker鏡像:

創建Dockerfile:

FROM strimzi/kafka:0.20.1-kafka-2.6.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

 

Bulid鏡像並推送:

# 登錄aws ecr
> aws ecr get-login --no-include-email

# Build 鏡像
> sudo docker build . -t {ECR_Repository}/connect-debezium

# 推送到ECR
> sudo docker push {ECR_Repository}/connect-debezium

 

3.1.3. 部署 Debezium MySQL Connector

$ cat debezium-mysql-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connector
  namespace: kafka
#  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
#    strimzi.io/use-connector-resources: "true"
spec:
  version: 2.8.0
  replicas: 1
  bootstrapServers: xxxx
  image: xxxxxx.dkr.ecr.cn-north-1.amazonaws.com.cn/connect-debezium:latest
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
status.storage.replication.factor: -1

$ kubectl apply -f debezium-mysql-connector.yaml

$ kubectl get pods -n kafka
NAME                                          READY   STATUS    RESTARTS   AGE
debezium-connector-connect-69c98cc784-kqvww   1/1     Running   0          5m44s

 

替換其中的bootstrapServers為AWS MSK bootstrapServers;image為3.1.2 步驟中打包的鏡像地址。

使用本地代理訪問Kafka Connect 服務,並驗證可用 Connectors:

$ kubectl port-forward service/debezium-connector-connect-api 8083:8083 -n kafka

$ curl localhost:8083/connector-plugins
[{
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.5.0.Final"
}, {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.6.0"
}
…
]

 

編寫 MySQL Connector 配置文件:

$ cat mysql-connector-tang.json
{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "xxxxx",
    "database.port": "3306",
    "database.user": "xxxx",
    "database.password": "xxxx",
    "database.server.id": "184055",
    "database.server.name": "mysql-tang",
    "database.include.list": "tang ",
    "database.history.kafka.bootstrap.servers": "xxxxx",
    "database.history.kafka.topic": " changes.tang"
  }
}

 

將配置推送到 Kafka Connector:

$ cat mysql-connector.json | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @-
HTTP/1.1 201 Created
Date: Fri, 21 May 2021 11:00:25 GMT
Location: http://localhost:8083/connectors/mysql-connector-tang
Content-Type: application/json
Content-Length: 733
Server: Jetty(9.4.24.v20191120)

# 驗證已經創建connector
$ curl localhost:8083/connectors/
["mysql-connector-tang"]

 

3.1.4. 驗證

部署完成后,在AWS RDS MySQL 中創建庫與測試表,並寫入測試數據。此時在AWS MSK中未發現對應 events生成。

查看connector 的pod 日志:

$ kubectl logs debezium-connector-connect-69c98cc784-kqvww -n kafka
….
io.debezium.DebeziumException: The MySQL server is not configured to use a ROW binlog_format, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.
        at io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(MySqlConnectorTask.java:203)
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:85)
        at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)

 

可以看到MySQLConnector需要MySQL server 配置 binlog_format 為 ROW。

修改此配置后,再次通過進行kafka-console-consumer.sh 進行驗證,即可看到測試數據庫中的所有事件:

$ ./kafka-console-consumer.sh --bootstrap-server xxxx --topic schema-changes.inventory --from-beginning
…
{
  "source" : {
    "server" : "mysql-tang"
  },
  "position" : {
    "ts_sec" : 1621585297,
    "file" : "mysql-bin-changelog.000015",
    "pos" : 511,
    "snapshot" : true
  },
  "databaseName" : "inventory",
  "ddl" : "CREATE DATABASE `inventory` CHARSET latin1 COLLATE latin1_swedish_ci",
  "tableChanges" : [ ]
}
…
{
  "source" : {
    "server" : "mysql-tang"
  },
  "position" : {
    "ts_sec" : 1621585297,
    "file" : "mysql-bin-changelog.000015",
    "pos" : 511,
    "snapshot" : true
  },
  "databaseName" : "inventory",
  "ddl" : "CREATE TABLE `test` (\n  `id` int(11) DEFAULT NULL,\n  `name` varchar(10) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=latin1",
  "tableChanges" : [ {
    "type" : "CREATE",
    "id" : "\"inventory\".\"test\"",
    "table" : {
      "defaultCharsetName" : "latin1",
      "primaryKeyColumnNames" : [ ],
      "columns" : [ {
        "name" : "id",
        "jdbcType" : 4,
        "typeName" : "INT",
        "typeExpression" : "INT",
        "charsetName" : null,
        "length" : 11,
        "position" : 1,
        "optional" : true,
        "autoIncremented" : false,
        "generated" : false
      }, {
        "name" : "name",
        "jdbcType" : 12,
        "typeName" : "VARCHAR",
        "typeExpression" : "VARCHAR",
        "charsetName" : "latin1",
        "length" : 10,
        "position" : 2,
        "optional" : true,
        "autoIncremented" : false,
        "generated" : false
      } ]
    }
  } ]
}

 

4. Flink 消費Debezium 類型消息

RMDB數據經Debezium Connector寫入Kafka后,先由Flink進行消費。可以參考Flink官網中對Debezium格式的處理代碼[5]

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 -- using 'debezium-json' as the format to interpret Debezium JSON messages
 -- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format
 'format' = 'debezium-json'
)

 

5. 寫入Hudi表

RMDB數據經Debezium Connector寫入Kafka后,接下來通過 Flink 將流式數據寫入到一張Hudi表,實現實時數據到Hudi。此部分可以參考Hudi官網對Flink支持的代碼[6]

CREATE TABLE t1(
  uuid VARCHAR(20), -- you can use 'PRIMARY KEY NOT ENFORCED' syntax to mark the field as record key
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'table_base_path',
  'write.tasks' = '1', -- default is 4 ,required more resource
  'compaction.tasks' = '1', -- default is 10 ,required more resource
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

 

5.1. 依賴包問題

在這個過程中,有一點需要注意的是,在使用Hudi官網提到的 hudi-flink-bundle_2.11-0.7.0.jar (或hudi-flink-bundle_2.11-0.8.0.jar) 時,會遇到以下問題:

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hudi' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

 

從報錯來看,hudi-flink-bundle_2.11-0.7.0.jar版本並未提供flink 與 hudi 通過 “connector=hudi” 集成的功能。但是在最新版的Hudi tutorial中有提到(當前為hudi 0.9 版本)需要hudi-flink-bundle_2.1?-*.*.*.jar。

於是筆者嘗試了手動編譯hudi 0.9 版本,build出hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar。但是在編譯過程中遇到以下問題:

[ERROR] Failed to execute goal on project hudi-hadoop-mr: Could not resolve dependencies for project org.apache.hudi:hudi-hadoop-mr:jar:0.9.0-SNAPSHOT: Failed to collect dependencies at org.apache.hive:hive-exec:jar:core:2.3.2 -> org.apache.calcite:calcite-core:jar:1.10.0 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [nexus-aliyun (http://maven.aliyun.com/nexus/content/groups/public/, default, releases), datanucleus (http://www.datanucleus.org/downloads/maven2, default, releases), glassfish-repository (http://maven.glassfish.org/content/groups/glassfish, default, disabled), glassfish-repo-archive (http://maven.glassfish.org/content/groups/glassfish, default, disabled), apache.snapshots (http://repository.apache.org/snapshots, default, snapshots), central (http://repo.maven.apache.org/maven2, default, releases), conjars (http://conjars.org/repo, default, releases+snapshots)] -> [Help 1]

 

此問題說明的是無法從提供的任一maven 源中拉取org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 包。

解決此問題的方法是:手動下載此jar包(位置為https://public.nexus.pentaho.org/repository/proxy-public-3rd-party-release/org/pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde/pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar

),並install 到本地 maven倉庫中,再修改對應編譯模塊的pom文件,加上此依賴說明即可。

Maven install package的命令如:

../apache-maven-3.8.1/bin/mvn install:install-file -DgroupId=org.pentaho -DartifactId=pentaho-aggdesigner-algorithm -Dversion=5.1.5-jhyde -Dpackaging=jar -Dfile=/home/hadoop/.m2/repository/org/pentaho/pentaho-aggdesigner-algorithm/5.15-jhyde/pentaho-aggdesigner-algorithm-5.15-jhyde.jar

此過程完成后,可以成功解決flink sql 映射 hudi 表的問題。

 

5.2. Flink 版本問題

在AWS EMR 最新版 emr-5.33.0 下,Flink版本為1.12.1,而hudi 0.9 版本編譯所需的Flink版本為1.12.2。

筆者在編譯0.9 版本 hudi 的 hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar后,在EMR-5.33.0 下使用,遇到版本不一致報出的 NoSuchMethod問題。嘗試各種jar包替換后仍未解決。

所以最終使用的是自建Flink 1.12.2 版本集群。

 

6. Flink消費Debezium與寫入Hudi測試

使用簡單的測試表進行測試。

MySQL中建表:

create table customer(id varchar(20), name varchar(10), age int, user_level varchar(10));

 

啟動Flink程序,主體代碼為:

package cdc

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object DebeziumHudi {

  def main(args: Array[String]): Unit = {
    // Env settings
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val blinkStreamSetting = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    val tableEnv = StreamTableEnvironment.create(senv, blinkStreamSetting)
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT)

    val table_base_path = args(0)
    val table_type = args(1)

    // kafka config
    val topicName = "my-con.tangdb.customer"
    val bootstrapServers = "xxxx:9092"
    val groupID = "group_mysql_tangdb"

    // create kafka table
    val create_kafka_table_sql = "CREATE TABLE customer(\n" +
      "id VARCHAR(20),\n" +
      "name VARCHAR(10),\n" +
      "age int,\n" +
      "user_level VARCHAR(20) \n" +
      ") WITH (\n" +
      "  'connector' = 'kafka',\n" +
      "  'topic' = '" + topicName + "',\n" +
      "  'properties.bootstrap.servers' = '" + bootstrapServers + "',\n" +
      "  'properties.group.id' = '" + groupID + "',\n" +
      "  'debezium-json.schema-include' = 'true',\n" +
      "  'format' = 'debezium-json'\n" +
      ")"


    // hudi table config
    //val table_base_path = "s3://xxx-hudi/customer/"
    //val table_type = "COPY_ON_WRITE"

    // create hudi table
    val create_hudi_table_sql = "CREATE TABLE customers_hudi(\n" +
      "id VARCHAR(20) PRIMARY KEY NOT ENFORCED,\n" +
      "name VARCHAR(10),\n" +
      "age INT,\n" +
      "ts TIMESTAMP(3), \n" +
      "`user_level` VARCHAR(20) ) \n" +
      "PARTITIONED BY (user_level) \n" +
      "WITH (\n" +
      "  'connector' = 'hudi',\n" +
      "  'path' = '" + table_base_path +"',\n" +
      "  'table.type' = '" + table_type + "',\n" +
      "  'read.tasks' = '1',\n" +
      "  'write.tasks' = '1',\n" +
      "  'compaction.tasks' = '1',\n" +
      "  'write.batch.size' = '8',\n" +
      "  'compaction.delta_commits' = '2',\n" +
      "  'compaction.delta_seconds' = '10' " +
      ")"

    // do sql query
    tableEnv.executeSql(create_kafka_table_sql)
    tableEnv.executeSql(create_hudi_table_sql)
    tableEnv.executeSql("insert into customers_hudi (id, name, age, ts, user_level) select id, name, age, current_timestamp, user_level from customer")

  }

}

 

提交Flink程序后正常運行:

 

使用MySQL procedure 不斷向customer 表中寫入數據。可以觀察到hudi路徑下出現對應分區路徑,並出現結果文件:

$ hdfs dfs -ls s3://xxx-hudi/customer/
Found 3 items
drwxrwxrwx   - hadoop hadoop          0 1970-01-01 00:00 s3://tang-hudi/customer/.hoodie
drwxrwxrwx   - hadoop hadoop          0 1970-01-01 00:00 s3://tang-hudi/customer/lv2
drwxrwxrwx   - hadoop hadoop          0 1970-01-01 00:00 s3://tang-hudi/customer/lv3

$ hdfs dfs -ls s3://xxx-hudi/customer/lv2/
Found 2 items
-rw-rw-rw-   1 hadoop hadoop         93 2021-05-24 13:52 s3://tang-hudi/customer/lv2/.hoodie_partition_metadata
-rw-rw-rw-   1 hadoop hadoop    2092019 2021-05-24 14:00 s3://tang-hudi/customer/lv2/e8195cc8-aae4-4462-8605-7f4eceac90ce_0-1-0_20210524134250.parquet

 

7. 驗證hudi表

首先使用 AWS S3 Select 查詢目標parquet文件,可以拿到正確結果:

 

 

但是,而后分別使用了 SparkSQL與 Hive對Hudi表地址進行映射並執行讀取操作,結果均失敗。暫未得出失敗原因。

初步判斷可能與包環境依賴有關。由於最新版AWS EMR emr-5.33.0 下,Flink版本為1.12.1,而hudi 0.9 版本編譯所需的Flink版本為1.12.2。所以筆者使用了自建的Flink集群,當時僅考慮了Flink與Hudi版本保持一致,但未將Spark與Hive版本納入考慮范圍內,所以可能導致了此原因。

 

8. 總結

總體來看,Debezium是一個非常方便部署使用的CDC工具,可以有效地將RMSDB數據抽取到消息系統中,供不同的下游應用消費。而Flink直接對接Debezium與Hudi的功能,極大方便了數據湖場景下的實時數據ingestion。

 

References

[1] https://debezium.io/documentation/reference/1.5/architecture.html

[2] https://operatorhub.io

[3] https://operatorhub.io/operator/strimzi-kafka-operator

[4] https://github.com/operator-framework/operator-lifecycle-manager/releases/

[5] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/

[6] https://hudi.apache.org/docs/flink-quick-start-guide.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM