Canal 與 Kafka 集成安裝與配置
vim 編輯中 >>> 后為列出原內容其后緊接的 <<< 行為對其的更改,沒有前置符號的表示新添
主機環境
- CentOS 7.6 內存至少 1.5G,否則服務會啟動不起來
軟件版本
- MySQL 5.7.28
- OpenJDK 8
- Zookeeper 3.5.6-bin
- Kafka 2.12(Scala)-2.3.0
- Canal deployer-1.1.4
MySQL 安裝
采用從官方源直接安裝的方式
- 添加 MySQL 5.7 官方源
rpm -ivh https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm - 更新源
yum -y update - 安裝 MySQL 5.7
yum -y install mysql-community-server - 添加 Canal 所需 MySQL 配置
vim /etc/my.cnf[mysqld] 下新添配置
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
- 啟動 MySQL
systemctl start mysqld查看狀態systemctl status mysqld - 查看生成的臨時密碼
cat /var/log/mysqld.log | grep 'temporary password' - 初次配置
mysql_secure_installation按照提示完成配置 - 測試中允許 root 遠程連接並添加 Canal 所需的用戶,
mysql -uroot -p后執行 SQL
> USE mysql;
> UPDATE user SET Host = '%' WHERE User = 'root';
> CREATE USER canal IDENTIFIED BY 'Canal123!';
> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
> FLUSH PRIVILEGES;
> EXIT
Java 安裝
采用從官方源直接安裝的方式
- 安裝源中的 OpenJDK 8
yum install -y java-1.8.0-openjdk.x86_64 - 驗證安裝
java -version
Zookeeper 安裝
- 在
https://zookeeper.apache.org/releases.html中查找 zookeeper 最新 bin 版本壓縮包的 HTTP 方式的下載鏈接 - 下載壓縮包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz - 解壓縮
mkdir -p tmp/zookeeper/ && tar zxvf apache-zookeeper-3.5.6-bin.tar.gz -C tmp/zookeeper/ - 將解壓縮后的軟件移動至 /usr/local 下
mkdir /usr/local/zookeeper && mv tmp/zookeeper/apache-zookeeper-3.5.6-bin/* /usr/local/zookeeper/ - 添加 zookeeper 環境變量
vim /etc/profile末尾添加
# Set zookeeper env
export ZOOKEEPER_HOME=/usr/local/zookeeper/
export PATH=$PATH:$ZOOKEEPER_HOME/bin
- 使環境變量生效
source /etc/profile - 創建配置文件,從默認的配置文件創建,因為 zookeeper 啟動時會去找 conf/zoo.cfg 作為配置文件
cd /usr/local/zookeeper/
cp conf/zoo_sample.cfg conf/zoo.cfg
mkdir data
- 編輯配置
vim conf/zoo.cfg,此處配置的是單機,如果需要配置集群也是在這里
>>> dataDir=/tmp/zookeeper
<<< dataDir=/usr/local/zookeeper/data # 這是 zookeeper 的數據目錄
admin.serverPort=2191
- 啟動 zookeeper
zkServer.sh start查看狀態zkServer.sh status - 返回工作目錄
cd -
Kafka 安裝
- 在
https://kafka.apache.org/downloads中查找 kafka 最新版本的 HTTP 方式下載鏈接 - 下載壓縮包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz - 解壓縮
mkdir tmp/kafka && tar zxvf kafka_2.12-2.3.0.tgz -C tmp/kafka - 將解壓縮后的軟件移動至 /usr/local 下
mkdir /usr/local/kafka && mv tmp/kafka/kafka_2.12-2.3.0/* /usr/local/kafka && cd /usr/local/kafka - 添加 kafka 環境變量
vim /etc/profile末尾添加
# Set Kafka env
export KAFKA_HOME=/usr/local/kafka/
export PATH=$PATH:$KAFKA_HOME/bin
- 使環境變量生效
source /etc/profile - 修改配置文件
vim config/server.properties,配置集群的話還需要更改broker.id
>>> zookeeper.connect=localhost:2181 # 這里需要配置成 zookeeper 的地址,這里默認就是正確的
>>> #listeners=PLAINTEXT://:9092
<<< listeners=PLAINTEXT://:9092 # 刪除前面的注釋符號
- 啟動 Kafka 守護進程
kafka-server-start.sh -daemon config/server.properties & - 測試創建一個 topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test列出 topickafka-topics.sh --list --zookeeper localhost:2181 - 返回工作目錄
cd -
Canal 安裝
- 在
https://github.com/alibaba/canal/releases中查找 canal 最新deploy 版本的下載鏈接 - 下載壓縮包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz - 解壓
mkdir tmp/canal && tar zxvf canal.deployer-1.1.4.tar.gz -C tmp/canal/ - 將解壓后的文件復制進 /usr/local,
mkdir /usr/local/canal && mv tmp/canal/* /usr/local/canal/ - 創建測試配置直接使用自帶的樣例進行更改,並復制出默認配置以備后期增加庫時使用,這里的每一個文件夾就代表一個數據源,多個數據源就復制出多個文件夾后在 canal.properties 的 canal.destinations 里逗號分隔進行配置
mv conf/example/ conf/test && cp conf/test/instance.properties instance.properties.bak - 修改單個數據源的配置
vim conf/test/instance.properties
>>> canal.instance.master.address=127.0.0.1:3306 # 數據庫服務器地址
>>> canal.instance.dbUsername=canal # 數據庫用戶名
>>> canal.instance.dbPassword=canal
<<< canal.instance.dbPassword=Canal123! # 數據庫密碼
>>> canal.instance.filter.regex=.*\\..* # 數據表過濾正則,dbName.tbName,默認的是所有庫的所有表
>>> canal.mq.topic=example
<<< canal.mq.topic=test # 這個數據庫存儲進 Kafka 時使用的 topic
- 修改 Canal 全局設置
vim conf/canal.properties
>>> canal.destinations = example
<<< canal.destinations = test # 這里配置開啟的 instance,具體方法上面步驟有說明
>>> canal.serverMode = tcp
<<< canal.serverMode = kafka # 更改模式,直接把數據扔進 Kafka
>>> canal.mq.servers = 127.0.0.1:6667
<<< canal.mq.servers = 127.0.0.1:9092 # Kafka 的地址
>>> canal.mq.batchSize = 16384
<<< canal.mq.batchSize = 16384 # 這里沒有更改,值應該小於 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里沒設置,這里也就不更改了
>>> canal.mq.flatMessage = false
<<< canal.mq.flatMessage = true # 使用文本格式(JSON)進行傳輸,否則 Kafka 里扔進去的是二進制數據,雖然不影響,但是看起來不方便
- 開啟 Canal
bin/startup.sh,因為 Canal 的腳本名稱都太普通,所以沒有添加到 PATH 里 - 查看日志是否有異常
vim logs/canal/canal.logvim logs/test/test.log - 測試,連上數據庫嘗試執行更改 SQL
CREATE DATABASE IF NOT EXISTS test;
USE test;
CREATE TABLE test_tb(
c1 INT COMMENT "中文測試",
c2 VARCHAR(36)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- 查看 Kafka 中的數據,列出所有 topic,
kafka-topics.sh --list --zookeeper localhost:2181,消費其中的數據kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning此時應該已經可以列出創建表時的語句
后期 Bug 修復:
- 消費端 Kafka 不進數據,Canal 日志報錯
org.apache.kafka.common.errors.RecordTooLargeException,認為是 Kafka 消息體大小限制造成的,需要同時修改 Kafka 與 Canal 消息體的最大限制- 修改 Kafka 配置,
server.properties中修改或添加配置項message.max.bytes=100000000,producer.properties中修改或添加配置項max.request.size=100000000,consumer.properties中修改或添加配置項max.partition.fetch.bytes=100000000,重啟 Kafka - 修改 Canal 配置,
canal.properties修改canal.mq.maxRequestSize參數值為90000000,重啟 Canal - 查看 Canal 日志是否報錯
Could not find first log file name in binary log index file at...如果報錯則停止 Canal ,再刪除實例配置下的meta.dat文件,再啟動 Canal 即可
- 修改 Kafka 配置,
