背景
在支持客戶中,我們發現有一些客戶公司已經存在一些數據通道,不允許業務直接消費MySQL Binlog,所有的數據消費都是從Kafka中獲取,所以寫這篇文檔分享下如何消費Kafka中canal格式的數據寫到到starrocks,實現CDC。
數據流向
Mysql Binlog-->Canal-->kafka-->Flink SQL-->StarRocks
環境准備
1. Canal
關於canal相關的配置這里就不贅述了,建議大家可以參考
使用canal從mysql同步binlog導入StarRocks安裝和配置下canal以及依賴環境
2. Flink
這里我們需要利用flink sql完成數據的讀取和寫入,所以需要大家安裝flink服務。以下介紹單機版Flink安裝教程。(如果公司已有flink集群服務,可跳過這一部分)
- 下載 Flink, 推薦使用1.13,最低支持版本1.11。
- 下載 Flink CDC connector,請注意下載對應Flink版本的Flink-MySQL-CDC。
- 下載 Flink StarRocks connector,請注意1.13版本和1.11/1.12版本使用不同的connector.
- 下載Flink SQL Kafka connector,請注意下載Flink對應版本的connector,我這里下載的1.13.3版本flink-sql-connector-kafka_2.12-1.13.3.jar
- 復制 flink-sql-connector-kafka_2.12-1.13.3.jar,flink-sql-connector-mysql-cdc-xxx.jar,flink-connector-starrocks-xxx.jar 到 flink-xxx/lib/
啟動Flink服務:
cd flink-xxx ./bin/start-cluster.sh
DDL
Kafka中數據樣例
{ "data":[ { "id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc", "agent_id":"16", "http_port":"8031", "rpc_port":"9020", "query_port":"8306", "edit_log_port":"9010", "meta_dir":"", "absolute_meta_dir":"/home/disk1/sr/data/sr/meta", "log_dir":"", "absolute_log_dir":"/home/disk1/sr/starrocks-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log", "role":"FOLLOWER", "install_path":"/home/disk1/sr/starrocks-manager-20211008", "absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe", "deleted":"0", "deleted_at":"0", "created_at":"1633759183484", "updated_at":"1634240355691" } ], "database":"test", "es":1634240355000, "id":1076, "isDdl":false, "mysqlType":{ "id":"varchar(48)", "agent_id":"int(11)", "http_port":"int(11)", "rpc_port":"int(11)", "query_port":"int(11)", "edit_log_port":"int(11)", "meta_dir":"text", "absolute_meta_dir":"text", "log_dir":"text", "absolute_log_dir":"text", "role":"varchar(32)", "install_path":"text", "absolute_migrate_path":"text", "deleted":"tinyint(1)", "deleted_at":"bigint(20)", "created_at":"bigint(20)", "updated_at":"bigint(20)" }, "old":[ { "updated_at":"1634240295633" } ], "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":12, "agent_id":4, "http_port":4, "rpc_port":4, "query_port":4, "edit_log_port":4, "meta_dir":2005, "absolute_meta_dir":2005, "log_dir":2005, "absolute_log_dir":2005, "role":12, "install_path":2005, "absolute_migrate_path":2005, "deleted":-7, "deleted_at":-5, "created_at":-5, "updated_at":-5 }, "table":"fe_instances", "ts":1634240355886, "type":"UPDATE" }
StarRocks
create database canaltest; CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int(11) NULL, `http_port` int(11) NULL, `rpc_port` int(11) NULL, `query_port` int(11), `edit_log_port` int(11), `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar(32), `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint(1), `deleted_at` bigint(20), `created_at` bigint(20), `updated_at` bigint(20) ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 3 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );
Flink SQL
可以寫到文件flink-create.sql中
CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主機名 'properties.group.id' = 'canal_group', #kafka消費組 'format' = 'canal-json' -- 使用 canal-json 格式 ); CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' #starrocks中的表名 ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
啟動任務測試
cd flink-xxx ./bin/sql-client.sh -f flink-create.sql
#查看任務狀態
./bin/flink list
#輸出如下圖表示正常啟動
Waiting for response...
------------------ Running/Restarting Jobs -------------------
18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
確認數據是否已經導入starrocks中
select * from canaltest.canal_test_sink;

常見問題排查
1. Flink任務沒有報錯的時候
第一步:確認binlog是否開啟,可以通過 SHOW VARIABLES LIKE 'log_bin'查看;
第二步:確認flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本為5.7和8.0.X)是否滿足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本
第三步:逐步判斷是查源表還是寫starrocks的問題,這里利用下面的sql文件演示一下,該文件是上面生成步驟生成的flink-create.sql
安裝的Flink目錄下執行下面語句進入flink-sql
bin/sql-client.sh
首先驗證讀取source表是否正常
#分別把上面的sql粘貼進來判斷是查詢源表的問題還是寫入到starrocks的問題
CREATE DATABASE IF NOT EXISTS `testdb`; CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` varchar, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint ) with ( 'connector' = 'kafka', 'topic' = 'canal_test', #kafka topic名字 'properties.bootstrap.servers' = '$kafka_host:9092', 'properties.group.id' = 'canal_group', 'format' = 'canal-json' -- 使用 canal-json 格式 );
#驗證source是否正常
select * from `testdb`.`canal_test_source`
再驗證寫入starrocks是否正常
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` ( `id` STRING NOT NULL, `agent_id` int NULL, `http_port` int NULL, `rpc_port` int NULL, `query_port` int NULL, `edit_log_port` int, `meta_dir` STRING, `absolute_meta_dir` STRING, `log_dir` STRING, `absolute_log_dir` STRING, `role` STRING, `install_path` STRING, `absolute_migrate_path` STRING, `deleted` tinyint, `deleted_at` bigint, `created_at` bigint, `updated_at` bigint, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'load-url' = '$fe_host:8030', 'sink.properties.row_delimiter' = '\x02', 'username' = 'root', 'database-name' = 'canaltest', 'sink.properties.column_separator' = '\x01', 'jdbc-url' = 'jdbc:mysql://$fe_host:9030', 'password' = '', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'table-name' = 'canal_test_sink' ); INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
2. Flink任務出錯
第一步:確認flink集群是否有啟動,可能有的同學本地下載的flink沒有啟動,需要./bin/start-cluster.sh啟動下flink
第二步:根據具體的報錯再具體分析