摘要:CDL是一種簡單、高效的數據實時集成服務,能夠從各種OLTP數據庫中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消費Topic中的數據並導入到大數據生態軟件應用中,從而實現數據的實時入湖。
本文分享自華為雲社區《華為FusionInsight MRS CDL使用指南》,作者:晉紅輕。
說明
CDL是一種簡單、高效的數據實時集成服務,能夠從各種OLTP數據庫中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消費Topic中的數據並導入到大數據生態軟件應用中,從而實現數據的實時入湖。
CDL服務包含了兩個重要的角色:CDLConnector和CDLService。CDLConnector是具體執行數據抓取任務的實例,CDLService是負責管理和創建任務的實例。
本此實踐介紹以mysql作為數據源進行數據抓取
前提條件
- MRS集群已安裝CDL服務。
- MySQL數據庫需要開啟mysql的bin log功能(默認情況下是開啟的)。
查看MySQL是否開啟bin log:
使用工具或者命令行連接MySQL數據庫(本示例使用navicat工具連接),執行show variables like 'log_%'命令查看。
例如在navicat工具選擇"File > New Query"新建查詢,輸入如下SQL命令,單擊"Run"在結果中"log_bin"顯示為"ON"則表示開啟成功。
show variables like 'log_%'
工具准備
現在cdl只能使用rest api的方式進行命令提交,所以需要提前安裝工具進行調試。本文使用VSCode工具。
完成之后安裝rest client插件:
完成之后創建一個cdl.http的文件進行編輯:
創建CDL任務
CDL任務創建的流程圖如下所示:
說明:需要先創建一個MySQL link, 在創建一個Kafka link, 然后再創建一個CDL同步任務並啟動。
MySQL link部分rest請求代碼
@hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal = "172.16.9.113:21005" @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### get links get https://{{host}}/api/v1/cdl/link ### mysql link validate post https://{{host}}/api/v1/cdl/link?validate=true content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重復 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的類型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //數據庫安裝節點的ip { "name": "port", "value": {{mysql_port}} },//數據庫監聽的端口 { "name": "database.name", "value": {{mysql_database}} }, //連接的數據庫名 { "name": "user", "value": {{mysql_user}} }, //用戶 { "name": "password","value": {{mysql_password}} } ,//密碼 { "name":"schema", "value": {{mysql_database}}}//同數據庫名 ] } } ### mysql link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重復 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的類型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //數據庫安裝節點的ip { "name": "port", "value": {{mysql_port}} },//數據庫監聽的端口 { "name": "database.name", "value": {{mysql_database}} }, //連接的數據庫名 { "name": "user", "value": {{mysql_user}} }, //用戶 { "name": "password","value": {{mysql_password}} } ,//密碼 { "name":"schema", "value": {{mysql_database}}}//同數據庫名 ] } } ### mysql link update put https://{{host}}/api/v1/cdl/link/MySQL_link content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重復 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的類型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //數據庫安裝節點的ip { "name": "port", "value": {{mysql_port}} },//數據庫監聽的端口 { "name": "database.name", "value": {{mysql_database}} }, //連接的數據庫名 { "name": "user", "value": {{mysql_user}} }, //用戶 { "name": "password","value": {{mysql_password}} } ,//密碼 { "name":"schema", "value": {{mysql_database}}}//同數據庫名 ] } }
Kafka link部分rest請求代碼
### get links get https://{{host}}/api/v1/cdl/link ### kafka link validate post https://{{host}}/api/v1/cdl/link?validate=true content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式為SASL_PLAINTEXT,普通模式為PLAINTEXT ] } } ### kafka link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式為SASL_PLAINTEXT,普通模式為PLAINTEXT ] } } ### kafka link update put https://{{host}}/api/v1/cdl/link/kafka_link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式為SASL_PLAINTEXT,普通模式為PLAINTEXT ] } }
CDL任務命令部分rest請求代碼
@hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal = "172.16.9.113:21005" @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### create job post https://{{host}}/api/v1/cdl/job content-type: application/json { "job_type": "CDL_JOB", //job類型,目前只支持CDL_JOB這一種 "name": "mysql_to_kafka", //job名稱 "description":"mysql_to_kafka", //job描述 "from-link-name": "MySQL_link", //數據源Link "to-link-name": "kafka_link", //目標源Link "from-config-values": { "inputs": [ {"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"}, {"name" : "schema", "value" : "hudi"}, {"name" : "db.name.alias", "value" : "hudi"}, {"name" : "whitelist", "value" : "hudisource"}, {"name" : "tables", "value" : "hudisource"}, {"name" : "tasks.max", "value" : "10"}, {"name" : "mode", "value" : "insert,update,delete"}, {"name" : "parse.dml.data", "value" : "true"}, {"name" : "schema.auto.creation", "value" : "false"}, {"name" : "errors.tolerance", "value" : "all"}, {"name" : "multiple.topic.partitions.enable", "value" : "false"}, {"name" : "topic.table.mapping", "value" : "[ {\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"} ]" }, {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式為SASL_PLAINTEXT,普通模式為PLAINTEXT {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式為SASL_PLAINTEXT,普通模式為PLAINTEXT ] }, "to-config-values": {"inputs": []}, "job-config-values": { "inputs": [ {"name" : "global.topic", "value" : "demo"} ] } } ### get all job get https://{{host}}/api/v1/cdl/job ### submit job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start ### get job status get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka ### stop job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop ### delete job DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka
場景驗證
生產庫MySQL原始數據如下:
提交CDL任務之后
增加操作: insert into hudi.hudisource values (11,“蔣語堂”,38,“女”,“圖”,“播放器”,28732);
對應kafka消息體:
更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;
對應kafka消息體:
刪除操作:delete from hudi.hudisource where uid=11;
對應kafka消息體: