Flink 最佳實踐之使用 Canal 同步 MySQL 數據至 TiDB


簡介: 本文將介紹如何將 MySQL 中的數據,通過 Binlog + Canal 的形式導入到 Kafka 中,繼而被 Flink 消費的案例。

一. 背景介紹

本文將介紹如何將 MySQL 中的數據,通過 Binlog + Canal 的形式導入到 Kafka 中,繼而被 Flink 消費的案例。

為了能夠快速的驗證整套流程的功能性,所有的組件都以單機的形式部署。如果手上的物理資源不足,可以將本文中的所有組件一台 4G 1U 的虛擬機環境中。

如果需要在生產環境中部署,建議將每一個組件替換成高可用的集群部署方案。

其中,我們單獨創建了一套 Zookeeper 單節點環境,Flink、Kafka、Canal 等組件共用這個 Zookeeper 環境。

針對於所有需要 JRE 的組件,如 Flink,Kafka,Canal,Zookeeper,考慮到升級 JRE 可能會影響到其他的應用,我們選擇每個組件獨立使用自己的 JRE 環境。

本文分為兩個部分,其中,前七小節主要介紹基礎環境的搭建,最后一個小節介紹了數據是如何在各個組件中流通的。

image

數據的流動經過以下組件:

  • MySQL 數據源生成 Binlog。
  • Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
  • Flink 使用 flink-sql-connector-kafka API,消費 Kafka Topic 中的數據。
  • Flink 在通過 flink-connector-jdbc,將數據寫入到 TiDB 中。

TiDB + Flink 的結構,支持開發與運行多種不同種類的應用程序。

目前主要的特性主要包括:

  • 批流一體化。
  • 精密的狀態管理。
  • 事件時間支持。
  • 精確的一次狀態一致性保障。

Flink 可以運行在包括 YARN、Mesos、Kubernetes 在內的多種資源管理框架上,還支持裸機集群上獨立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時也支持使用 TiUP 在裸機集群上獨立部署。

TiDB + Flink 結構常見的幾類應用如下:

  • 事件驅動型應用:

    • 反欺詐。
    • 異常檢測。
    • 基於規則的報警。
    • 業務流程監控。
  • 數據分析應用:

    • 網絡質量監控。
    • 產品更新及試驗評估分析。
    • 事實數據即席分析。
    • 大規模圖分析。
  • 數據管道應用:

    • 電商實時查詢索引構建。
    • 電商持續 ETL。

二. 環境介紹

2.1 操作系統環境

[root@r20 topology]# cat /etc/redhat-release CentOS Stream release 8

2.2 軟件環境

Item Version Download link
TiDB v4.0.9 https://download.pingcap.org/tidb-community-server-v4.0.9-linux-amd64.tar.gz 1
Kafka v2.7.0 https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
Flink v1.12.1 https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz
Jre v1.8.0_281 https://javadl.oracle.com/webapps/download/AutoDL?BundleId=244058_89d678f2be164786b292527658ca1605
Zookeeper v3.6.2 https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
flink-sql-connector-kafka v1.12.1 https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.12.0/flink-sql-connector-kafka_2.12-1.12.0.jar
flink-connector-jdbc v1.12.0 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.12.0/flink-connector-jdbc_2.12-1.12.0.jar
MySQL v8.0.23 https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
Canal v1.1.4 https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.3 機器分配

Hostname IP Component
r21 192.168.12.21 TiDB Cluster
r22 192.168.12.22 Kafka
r23 192.168.12.23 Flink
r24 192.168.12.24 Zookeeper
r25 192.168.12.25 MySQL
r26 192.168.12.26 Canal

三. 部署 TiDB Cluster

與傳統的單機數據庫相比,TiDB 具有以下優勢:

  • 純分布式架構,擁有良好的擴展性,支持彈性的擴縮容。
  • 支持 SQL,對外暴露 MySQL 的網絡協議,並兼容大多數 MySQL 的語法,在大多數場景下可以直接替換 MySQL。
  • 默認支持高可用,在少數副本失效的情況下,數據庫本身能夠自動進行數據修復和故障轉移,對業務透明。
  • 支持 ACID 事務,對於一些有強一致需求的場景友好,例如:銀行轉賬。
  • 具有豐富的工具鏈生態,覆蓋數據遷移、同步、備份等多種場景。

在內核設計上,TiDB 分布式數據庫將整體架構拆分成了多個模塊,各模塊之間互相通信,組成完整的 TiDB 系統。對應的架構圖如下:

image

在本文中,我們只做最簡單的功能測試,所以部署了一套單節點但副本的 TiDB,涉及到了以下的三個模塊:

  • TiDB Server:SQL 層,對外暴露 MySQL 協議的連接 endpoint,負責接受客戶端的連接,執行 SQL 解析和優化,最終生成分布式執行計划。
  • PD (Placement Driver) Server:整個 TiDB 集群的元信息管理模塊,負責存儲每個 TiKV 節點實時的數據分布情況和集群的整體拓撲結構,提供 TiDB Dashboard 管控界面,並為分布式事務分配事務 ID。
  • TiKV Server:負責存儲數據,從外部看 TiKV 是一個分布式的提供事務的 Key-Value 存儲引擎。

3.1 TiUP 部署模板文件

# # Global variables are applied to all deployments and used as the default value of # # the deployments if a specific deployment value is missing. global: user: "tidb" ssh_port: 22 deploy_dir: "/opt/tidb-c1/" data_dir: "/opt/tidb-c1/data/" # # Monitored variables are applied to all the machines. #monitored: # node_exporter_port: 19100 # blackbox_exporter_port: 39115 # deploy_dir: "/opt/tidb-c3/monitored" # data_dir: "/opt/tidb-c3/data/monitored" # log_dir: "/opt/tidb-c3/log/monitored" # # Server configs are used to specify the runtime configuration of TiDB components. # # All configuration items can be found in TiDB docs: # # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/ # # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/ # # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/ # # All configuration items use points to represent the hierarchy, e.g: # # readpool.storage.use-unified-pool # # # # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false binlog.ignore-error: false tikv-client.copr-cache.enable: true tikv: server.grpc-concurrency: 4 raftstore.apply-pool-size: 2 raftstore.store-pool-size: 2 rocksdb.max-sub-compactions: 1 storage.block-cache.capacity: "16GB" readpool.unified.max-thread-count: 12 readpool.storage.use-unified-pool: false readpool.coprocessor.use-unified-pool: true raftdb.rate-bytes-per-sec: 0 pd: schedule.leader-schedule-limit: 4 schedule.region-schedule-limit: 2048 schedule.replica-schedule-limit: 64 pd_servers: - host: 192.168.12.21 ssh_port: 22 name: "pd-2" client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379" data_dir: "/opt/tidb-c1/data/pd-12379" log_dir: "/opt/tidb-c1/log/pd-12379" numa_node: "0" # # The following configs are used to overwrite the `server_configs.pd` values. config: schedule.max-merge-region-size: 20 schedule.max-merge-region-keys: 200000 tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000" log_dir: "/opt/tidb-c1/log/tidb-14000" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tidb` values. config: log.slow-query-file: tidb-slow-overwrited.log tikv-client.copr-cache.enable: true tikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160" data_dir: "/opt/tidb-c1/data/tikv-12160" log_dir: "/opt/tidb-c1/log/tikv-12160" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tikv` values. config: server.grpc-concurrency: 4 #server.labels: { zone: "zone1", dc: "dc1", host: "host1" } #monitoring_servers: # - host: 192.168.12.21 # ssh_port: 22 # port: 19090 # deploy_dir: "/opt/tidb-c1/prometheus-19090" # data_dir: "/opt/tidb-c1/data/prometheus-19090" # log_dir: "/opt/tidb-c1/log/prometheus-19090" #grafana_servers: # - host: 192.168.12.21 # port: 13000 # deploy_dir: "/opt/tidb-c1/grafana-13000" #alertmanager_servers: # - host: 192.168.12.21 # ssh_port: 22 # web_port: 19093 # cluster_port: 19094 # deploy_dir: "/opt/tidb-c1/alertmanager-19093" # data_dir: "/opt/tidb-c1/data/alertmanager-19093" # log_dir: "/opt/tidb-c1/log/alertmanager-19093"

3.2 TiDB Cluster 環境

本文重點非部署 TiDB Cluster,作為快速實驗環境,只在一台機器上部署單副本的 TiDB Cluster 集群。不需要部署監控環境。

[root@r20 topology]# tiup cluster display tidb-c1-v409 Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409 Cluster type: tidb Cluster name: tidb-c1-v409 Cluster version: v4.0.9 SSH type: builtin Dashboard URL: http://192.168.12.21:12379/dashboard ID Role Host Ports OS/Arch Status Data Dir Deploy Dir -- ---- ---- ----- ------- ------ -------- ---------- 192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379 192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000 192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160 Total nodes: 4

創建用於測試的表

mysql> show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+-------------------------------------------------------------------------------------------------------------------------------+ | t1 | CREATE TABLE `t1` (  `id` int(11) NOT NULL,  PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin | +-------+-------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec)

四. 部署 Zookeeper 環境

在本實驗中單獨配置 Zookeeper 環境,為 Kafka 和 Flink 環境提供服務。

作為實驗演示方案,只部署單機環境。

4.1 解壓 Zookeeper 包

[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz [root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper

4.2 部署用於 Zookeeper 的 jre

[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz [root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

修改 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 環境變量

## add bellowing env var in the head of zkEnv.sh JAVA_HOME=/opt/zookeeper/jre

4.3 創建 Zookeeper 的配置文件

[root@r24 conf]# cat zoo.cfg | grep -v "#" tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper/data clientPort=2181

4.4 啟動 Zookeeper

[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start

4.5 檢查 Zookeeper 的狀態

## check zk status [root@r24 bin]# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: standalone ## check OS port status [root@r24 bin]# netstat -ntlp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd tcp6 0 0 :::2181 :::* LISTEN 15062/java tcp6 0 0 :::8080 :::* LISTEN 15062/java tcp6 0 0 :::22 :::* LISTEN 942/sshd tcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection [root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181

4.6 關於 Zookeeper 的建議

我個人有一個關於 Zookeeper 的不成熟的小建議:

Zookeeper 集群版本一定要開啟網絡監控。特別是要關注 system metrics 里面的 network bandwidth。

五. 部署 Kafka

Kafka 是一個分布式流處理平台,主要應用於兩大類的應用中:

  • 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue)
  • 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)

image

Kafka 有四個核心的 API:

  • The Producer API 允許一個應用程序發布一串流式的數據到一個或者多個Kafka topic。
  • The Consumer API 允許一個應用程序訂閱一個或多個 topic ,並且對發布給他們的流式數據進行處理。
  • The Streams API 允許一個應用程序作為一個流處理器,消費一個或者多個topic產生的輸入流,然后生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
  • The Connector API 允許構建並運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程序或者數據系統。比如,連接到一個關系型數據庫,捕捉表(table)的所有變更內容。

在本實驗中只做功能性驗證,只搭建一個單機版的 Kafka 環境。

5.1 下載並解壓 Kafka

[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz [root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka

5.2 部署用於 Kafka 的 jre

[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz [root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre

修改 Kafka 的 jre 環境變量

[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh ## add bellowing line in the head of kafka-run-class.sh JAVA_HOME=/opt/kafka/jre

5.3 修改 Kafka 配置文件

修改 Kafka 配置文件 /opt/kafka/config/server.properties

## change bellowing variable in /opt/kafka/config/server.properties broker.id=0 listeners=PLAINTEXT://192.168.12.22:9092 log.dirs=/opt/kafka/logs zookeeper.connect=i192.168.12.24:2181

5.4 啟動 Kafka

[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

5.5 查看 Kafka 的版本信息

Kafka 並沒有提供 --version 的 optional 來查看 Kafka 的版本信息。

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka -rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc -rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc -rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc ... ...

其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。

六. 部署 Flink

Apache Flink 是一個框架和分布式處理引擎,用於在無邊界和有邊界數據流上進行有狀態的計算。Flink 能在所有常見集群環境中運行,並能以內存速度和任意規模進行計算。

支持高吞吐、低延遲、高性能的分布式處理框架 Apache Flink 是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。

image

本實驗只做功能性測試,僅部署單機 Flink 環境。

6.1 下載並分發 Flink

[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz [root@r23 soft]# mv flink-1.12.1 /opt/flink

6.2 部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz [root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre

6.3 添加 Flink 需要的 lib

Flink 消費 Kafka 數據,需要 flink-sql-connector-kafka 包。

Flink 鏈接 MySQL/TiDB,需要 flink-connector-jdbc 包。

[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/ [root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/

6.4 修改 Flink 配置文件

## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml jobmanager.rpc.address: 192.168.12.23 env.java.home: /opt/flink/jre

6.5 啟動 Flink

[root@r23 ~]# /opt/flink/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.

6.6 查看 Flink GUI

image

七. 部署 MySQL

7.1 解壓 MySQL package

[root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz [root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/

7.2 創建 MySQL Service 文件

[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service [root@r25 support-files]# cat mysqld.service [Unit] Description=MySQL 8.0 database server After=syslog.target After=network.target [Service] Type=simple User=mysql Group=mysql #ExecStartPre=/usr/libexec/mysql-check-socket #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n # Note: we set --basedir to prevent probes that might trigger SELinux alarms, # per bug #547485 ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade #ExecStopPost=/opt/mysql/bin/mysql-wait-stop # Give a reasonable amount of time for the server to start up/shut down TimeoutSec=300 # Place temp files in a secure directory, not /tmp PrivateTmp=true Restart=on-failure RestartPreventExitStatus=1 # Sets open_files_limit LimitNOFILE = 10000 # Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command. Environment=MYSQLD_PARENT_PID=1 [Install] WantedBy=multi-user.target ## copy mysqld.service to /usr/lib/systemd/system/ [root@r25 support-files]# cp mysqld.service /usr/lib/systemd/system/

7.3 創建 my.cnf 文件

[root@r34 opt]# cat /etc/my.cnf [mysqld] port=3306 basedir=/opt/mysql datadir=/opt/mysql/data socket=/opt/mysql/data/mysql.socket max_connections = 100 default-storage-engine = InnoDB character-set-server=utf8 log-error = /opt/mysql/log/error.log slow_query_log = 1 long-query-time = 30 slow_query_log_file = /opt/mysql/log/show.log min_examined_row_limit = 1000 log-slow-slave-statements log-queries-not-using-indexes #skip-grant-tables

7.4 初始化並啟動 MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console [root@r25 ~]# chown -R mysql:mysql /opt/mysql [root@r25 ~]# systemctl start mysqld ## check mysql temp passord from /opt/mysql/log/error.log 2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-

7.5 創建一個新的 MySQL 用戶用以連接 Canal

## change mysql temp password firstly mysql> alter user 'root'@'localhost' identified by 'mysql'; Query OK, 0 rows affected (0.00 sec) ## create a management user 'root'@'%' mysql> create user 'root'@'%' identified by 'mysql'; Query OK, 0 rows affected (0.01 sec) mysql> grant all privileges on *.* to 'root'@'%'; Query OK, 0 rows affected (0.00 sec) ## create a canal replication user 'canal'@'%' mysql> create user 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.01 sec) mysql> grant select, replication slave, replication client on *.* to 'canal'@'%'; Query OK, 0 rows affected (0.00 sec) mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)

7.6 在 MySQL 中創建用於測試的表

mysql> show create table test.t2; +-------+----------------------------------------------------------------------------------+ | Table | Create Table | +-------+----------------------------------------------------------------------------------+ | t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 | +-------+----------------------------------------------------------------------------------+ 1 row in set (0.00 sec)

八. 部署 Canal

Canal 主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務 trigger 獲取增量變更。

從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

image

基於日志增量訂閱和消費的業務包括:

  • 數據庫鏡像。
  • 數據庫實時備份。
  • 索引構建和實時維護(拆分異構索引、倒排索引等)。
  • 業務 cache 刷新。
  • 帶業務邏輯的增量數據處理。

當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

8.1 解壓 Canal 包

[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal

8.2 部署 Canal 的 jre

[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz [root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre ## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh JAVA=/opt/canal/jre/bin/java

8.3 修改 Canal 的配置文件

修改 /opt/canal/conf/canal.properties 配置文件

## modify bellowing configuration canal.zkServers =192.168.12.24:2181 canal.serverMode = kafka canal.destinations = example ## 需要在 /opt/canal/conf 目錄下創建一個 example 文件夾,用於存放 destination 的配置 canal.mq.servers = 192.168.12.22:9092

修改 /opt/canal/conf/example/instance.properties 配置文件

## modify bellowing configuration canal.instance.master.address=192.168.12.25:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.filter.regex=.*\\..* ## 過濾數據庫的表 canal.mq.topic=canal-kafka

九. 配置數據流向

9.1 MySQL Binlog -> Canal -> Kafka 通路

9.1.1 查看 MySQL Binlog 信息

查看 MySQL Binlog 信息,確保 Binlog 是正常的。

mysql> show master status; +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000001 | 2888 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set (0.00 sec)

9.1.2 在 Kafka 中創建一個 Topic

在 Kafka 中創建一個 Topic canal-kafka,這個Topic 的名字要與 Canal 配置文件 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 對應:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \ > --zookeeper 192.168.12.24:2181 \ > --config max.message.bytes=12800000 \ > --config flush.messages=1 \ > --replication-factor 1 \ > --partitions 1 \ > --topic canal-kafka Created topic canal-kafka. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager) [2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)

查看 Kafka 中所有的 Topic:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181 __consumer_offsets canal-kafka ticdc-test

查看 Kafka 中 Topic ticdc-test 的信息:

[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

9.1.3 啟動 Canal

在啟動 Canal 之前,需要在 Canal 節點上查看一下端口的情況:

## check MySQL 3306 port ## canal.instance.master.address=192.168.12.25:3306 [root@r26 bin]# telnet 192.168.12.25 3306 ## check Kafka 9092 port ## canal.mq.servers = 192.168.12.22:9092 [root@r26 bin]# telnet 192.168.12.22 9092 ## check zookeeper 2181 port ## canal.zkServers = 192.168.12.24:2181 [root@r26 bin]# telnet 192.168.12.24 2181

啟動 Canal:

[root@r26 bin]# /opt/canal/bin/startup.sh cd to /opt/canal/bin for workaround relative path LOG CONFIGURATION : /opt/canal/bin/../conf/logback.xml canal conf : /opt/canal/bin/../conf/canal.properties CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar: cd to /opt/canal/bin for continue

9.1.4 查看 Canal 日志

查看 /opt/canal/logs/example/example.log

2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump

9.1.5 查看 Kafka 中 consumer 信息

在 MySQL 中插入一條測試信息:

mysql> insert into t2 values(1); Query OK, 1 row affected (0.00 sec)

查看 consumer 的信息,已經有了剛才插入的測試數據:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"} {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"} {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}

9.2 Kafka -> Flink 通路

在 Flink 中創建 t2 表,connector 類型為 kafka。

## create a test table t2 in Flink Flink SQL> create table t2(id int) > WITH ( > 'connector' = 'kafka', > 'topic' = 'canal-kafka', > 'properties.bootstrap.servers' = '192.168.12.22:9092', > 'properties.group.id' = 'canal-kafka-consumer-group', > 'format' = 'canal-json', > 'scan.startup.mode' = 'latest-offset' > ); Flink SQL> select * from t1;

在 MySQL 中在插入一條測試數據:

mysql> insert into test.t2 values(2); Query OK, 1 row affected (0.00 sec)

從 Flink 中可以實時同步數據:

Flink SQL> select * from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366 id 2

9.3 Flink -> TiDB 通路

9.3.1 在 下游的 TiDB 中創建用於測試的表

[root@r20 soft]# mysql -uroot -P14000 -hr21 mysql> create table t3 (id int); Query OK, 0 rows affected (0.31 sec)

9.3.2 在 Flink 中創建測試表

Flink SQL> CREATE TABLE t3 ( > id int > ) with ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://192.168.12.21:14000/test', > 'table-name' = 't3', > 'username' = 'root', > 'password' = 'mysql' > ); Flink SQL> insert into t3 values(3); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: a0827487030db177ee7e5c8575ef714e

9.3.3 在下游 TiDB 中查看插入的數據

mysql> select * from test.t3; +------+ | id | +------+ | 3 | +------+ 1 row in set (0.00 sec)

原文鏈接

本文為阿里雲原創內容,未經允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM