cannl同步mysql數據到es中
canal組件介紹
canal-admin
(非必須但推薦使用):為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。
canal-server:
服務端,從mysql讀取binlog日志獲取增量日志,可以通過tcp、kafka、RocketMQ等方式與客戶端通信;通過zookeeper搭建集群。
canal-adapter
客戶端,根據canal-server獲取的增量日志執行適配到其他諸如elasticsearch、redis、mysql等端,實現數據同步。
本次實驗環境

首先我們需要下載canal的各個組件canal-server、canal-adapter、canal-admin,下載地址:https://github.com/alibaba/canal/releases

es及相關工具的部署可參考Elasticsearch詳解及部署
1、開啟mysql的binlog
使用canal-server需要先准備mysql,對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式
vi /etc/my.cnf
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
配置完成后重啟mysql,並查詢是否配置生效:ON就是開啟
systemctl restart mysqld
可以進入數據庫再次查看下
mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec) mysql> show variables like 'binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
2、mysql用戶數據准備
創建一個canal用戶並對其進行授權
CREATE USER canal IDENTIFIED BY 'Cjz123456.'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
創建一個測試用戶數據庫及表
create database canal;
USE canal; CREATE TABLE product ( id bigint(20) NOT NULL AUTO_INCREMENT, title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, sub_title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, price decimal(10, 2) NULL DEFAULT NULL, pic varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
3、創建存放canal的目錄
mkdir /usr/local/canal-server
mkdir /usr/local/canal-adpter
mkdir /usr/local/canal-admin
4、部署canal-server
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-server
vim /usr/local/canal-server/conf/example/instance.properties
# 需要同步數據的MySQL地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # 用於同步數據的數據庫賬號 canal.instance.dbUsername=canal # 用於同步數據的數據庫密碼 canal.instance.dbPassword=Cjz123456. # 數據庫連接編碼 canal.instance.connectionCharset = UTF-8 # 需要訂閱binlog的表過濾正則表達式 canal.instance.filter.regex=.*\\..*
cd /usr/local/canal-server/bin
./startup.sh
查看日志查看是否正常啟動
cd /usr/local/canal-server/logs/
tail -f example/example.log

tail -f canal/canal.log

5、部署canal-adapter
tar -zxvf canal.adapter-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-adpter/
cd /usr/local/canal-adpter/conf
vim application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # 客戶端的模式,可選tcp kafka rocketMQ
flatMessage: true # 扁平message開關, 是否以json字符串形式投遞數據, 僅在kafka/rocketMQ模式下有效
zookeeperHosts: # 對應集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批數量
retries: 0 # 重試次數, -1為無限重試
timeout: # 同步超時時間, 單位毫秒
accessKey:
secretKey:
consumerProperties:
canal.tcp.server.host: 127.0.0.1:11111 #設置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.111.129:3306/canal?useUnicode=true&charaterEncoding=utf-8&useSSL=false
username: canal
password: Cjz123456.
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: http://192.168.111.129:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
security.auth: elastic:elastic # only used for rest mode
cluster.name: my-es
修改 canal-adapter/conf/es7/mytest_user.yml 文件,用於配置MySQL中的表與Elasticsearch中索引的映射關系
vim es7/mytest_user.yml
dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
_index: canal_product # es 的索引名稱
_id: _id # 將Mysql表里的id對應上es上的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
sql: "SELECT
p.id AS _id,
p.title,
p.sub_title,
p.price,
p.pic
FROM
product p" # sql映射
etlCondition: "where p.id>={}" #etl的條件參數
commitBatch: 3000 # 提交批大小
cd /usr/local/canal-adpter/bin/
./startup.sh
查看日志看是否正常啟動
cd /usr/local/canal-adpter/logs/adapter/
tail -f adapter.log

6、創建ES索引
可以直接通過Kibana頁面進行創建,不難看出,索引的屬性對應着就是我們表的屬性(Mysql表id與es的_id做了對應,所以這里不需要設置)
PUT canal_product
{
"mappings": {
"properties": {
"title": {
"type": "text"
},
"sub_title": {
"type": "text"
},
"pic": {
"type": "text"
},
"price": {
"type": "double"
}
}
}
}

7、驗證
canal初始化是不會全量同步數據的,所以我們需要登錄mysql,插入一條數據
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 15, '小米8', ' 全面屏游戲智能手機 6GB+64GB', 1999.00, NULL );
創建完成后在Kibana上查看即可發現數據已經同步
GET canal_product/_search

在elasticsearch-head上查看也是,可以看出,我們的表id變成了es的_id

在Mysql上修改數據看看
UPDATE product SET title='小米10' WHERE id=15;
可以看到我們修改的數據也進行了同步

8、canal-admin的安裝
tar -zxvf canal.admin-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-admin/
創建 canal-admin 需要使用的數據庫canal_manager,創建的SQL腳本在canal-admin包下的conf/canal_manager.sql
cd /usr/local/canal-admin/conf/
導入sql腳本
mysql -uroot -p < canal_manager.sql
修改配置文件conf/application.yml,按如下配置即可,主要是修改數據源配置和canal-admin的管理賬號配置,注意需要用一個有讀寫權限的數據庫賬號;
vim application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: root #數據庫管理員賬號
password: Cjz123456. #數據庫管理員賬號密碼
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
接下來對之前搭建的canal-server的conf/canal_local.properties文件進行配置,主要是修改canal-admin的配置
cd /usr/local/canal-server/conf/
vim canal_local.properties
# register ip canal.register.ip = 192.168.111.129 # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
啟動canal-admin
cd /usr/local/canal-admin/bin/
./startup.sh
啟動完好后即可訪問頁面,訪問ip:8089,初始用戶密碼admin/123456

重啟canal-server
cd /usr/local/canal-server/bin/
./restart.sh local
再次回到我們的canal-admin管理頁面,即可看到該節點信息

該文章參考於:https://blog.csdn.net/zh1998wx/article/details/123101442
