疑惑點
本教程使用版本
- Canal版本:1.1.5
- Canal-Adapter版本:1.1.5
- Eleasticsearch版本:7.10.2
- MySQL版本:5.7.32
背景
MySQL表數據達到百萬級后,復雜的查詢導致響應時間很慢,計划用ES充當二級緩存。
整體架構如下:
canal可以通過監聽MySQL的binlog日志來實現數據庫增量的日志解析,再通過ClientAdapter把數據實時同步給ES。
Canal工作流程圖:
Canal工作原理:
- canal模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議;
- MySQL master收到dump請求,開始推送binary log給slave(即canal);
- canal解析binary log對象(原始為byte流)。
操作流程
- 准備工作:
- MySQL:存放原始數據;
- canal:解析數據庫日志,同步獲取到的增量變更;
- canal-adater:解析轉化數據到ES;
- ES:接收增量數據;
- 在ES中創建索引
- 要求Mapping中定義的字段名稱和類型與待同步數據保持一致;
- 安裝JDK:Canal基於Java開發,且版本≥1.8.0
- 安裝並啟動canal-server
- 安裝canal-server,然后修改配置文件關聯MySQL。canal-server模擬MySQL集群的一個slave,獲取MySQL集群Master節點的二進制日志(binary log),並將日志推送到Canal-adapter。
- 安裝並啟動canal-adapter
- 安裝canal-adapter,然后修改配置文件關聯MySQL和ES,以及定義MySQL數據到ES數據的映射字段,用來將數據同步到ES。
- 驗證增量數據同步
配置MySQL
查看MySQL是否開啟了binlog:
SHOW VARIABLES LIKE 'log_bin';
需要先開啟Binlog寫入功能,配置binlog-format為ROW模式,my.cnf配置如下:
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
Canal安裝配置
- 下載Canal
https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
canal.deployer對應的是canal的server端,負責訂閱並解析MySQL-Binlog。
解壓后的文件目錄
canal1.1.5
├── bin
│ │── restart.sh # 重啟
│ │── startup.sh # 啟動canal
│ │── stop.sh # 停止canal
│ └── ...
├── conf
│ │── canal.properties # 配置文件
│ │── example
│ │ ├── h2.mv.db
│ │ ├── instance.properties # 實例配置文件
│ │ └── meta.dat
│ │── logback.xml
│ └── ...
├── lib
├── logs
│ ├── canal
│ │ ├── canal.log
│ │ └── canal_stdout.log
│ └── example
│ ├── meta.log
│ └── example.log
└── plugin
- 配置Canal
修改conf/example/instance.properties
文件
# position info
canal.instance.master.address=127.0.0.1:3306 # 數據庫地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.dbUsername=root # 連接數據庫用戶名
canal.instance.dbPassword=123456 # 連接數據庫密碼
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=blog.user
- 啟動
./bin/startup.sh
Canal-adapter安裝配置
- 下載Canal adapter
https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
解壓后的文件目錄
canal.adapter-1.1.5
├── bin
│ │── restart.sh # 重啟
│ │── startup.sh # 啟動canal
│ │── stop.sh # 停止canal
│ └── ...
├── conf
│ │── application.yml # 配置文件
│ │── bootstrap.yml
│ │── es7
│ │ ├── customer.yml
│ │ └── ...
│ └── ...
├── lib
├── logs
└── plugin
- 配置adapter
修改conf/application.yml
文件
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/blog?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: http://127.0.0.1:9200,http://10.192.8.204:9200,http://10.192.8.209:9200
properties:
mode: rest # or transport
# security.auth: test:123456 # only used for rest mode
cluster.name: dev-cluster
刪除conf/es7/
下的所有文件,並新建自己的yml文件。
dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: exampleKey # 對應application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
_index: mytest_user # es 的索引名稱
_type: _doc # es 的type名稱, es7下無需配置此項
_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
# pk: id # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 數組或者對象屬性, array:; 代表以;字段里面是以;分隔的
# _obj: object # json對象
etlCondition: "where a.c_time>='{0}'" # etl 的條件參數
commitBatch: 3000 # 提交批大小
- 啟動ES
./bin/startup.sh
啟動錯誤信息
1. cannot be cast to com.alibaba.druid.pool.DruidDataSource
2022-02-28 14:52:57.684 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2022-02-28 14:52:57.734 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2022-02-28 14:52:58.037 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
...
at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[client-adapter.launcher-1.1.5.jar:na]
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
... 42 common frames omitted
Caused by: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:146) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
... 43 common frames omitted
在Github issues中找到#3717 解決方案
下載源碼壓縮包canal-canal-1.1.5.zip,解壓后用IDE打開項目,修改canal-canal-1.1.5/client-adapter/escore/pom.xml
依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
然后在項目根目錄下執行mvn clean package -Dmaven.test.skip=true
,然后到canal-canal-1.1.5\client-adapter\es7x\target\
目錄下找到client-adapter.es7x-1.1.5-jar-with-dependencies.jar
將這個文件復制到./canal_adapter/plugin/
下,重啟adapter。
2. NullPointerException
2022-03-01 09:45:29.443 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2022-03-01 09:45:29.464 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /usr/local/canal/canal.adapter-1.1.5/plugin
2022-03-01 09:45:29.524 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## something goes wrong when starting up the canal client adapters:
java.lang.NullPointerException: null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:782) ~[guava-22.0.jar:na]
...
at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:28) ~[canal.common-1.1.5.jar:na]
at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.init(CanalTCPConsumer.java:57) ~[connector.tcp-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.<init>(AdapterProcessor.java:74) ~[client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.lambda$init$0(CanalAdapterLoader.java:65) ~[client-adapter.launcher-1.1.5.jar:na]
at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[na:1.8.0_161]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:60) ~[client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) ~[client-adapter.launcher-1.1.5.jar:na]
Google了半天也沒找到網上類似的問題,所以懷疑是自己哪里配置文件寫錯了。經過對比,發現是多刪除了application.yml文件的配置信息。
3. Connection refused
2022-03-01 11:21:49.823 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2022-03-01 11:21:49.824 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error!
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198) ~[na:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115) ~[na:na]
at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:63) ~[na:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_161]
at sun.nio.ch.Net.connect(Net.java:454) ~[na:1.8.0_161]
at sun.nio.ch.Net.connect(Net.java:446) ~[na:1.8.0_161]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[na:1.8.0_161]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150) ~[na:na]
canal server中的canal.serverMode配置的是kafka,改為tcp后,一切正常。
解決思路
- 提示拒絕連接,要分析哪個連接被拒絕了;
- 查看服務是否開放了該端口。命令:
netstat -lntp
Kafka模式
配置canal.serverModel為kafka模式后,需要修改application.yml文件和
canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
kafka.bootstrap.servers: 10.192.1.5:9092,10.192.1.6:9092,10.192.1.7:9092,10.192.1.8:9092
canalAdapters:
- instance: topic_dev # mq topic name
修改映射文件,如vim es7/user.yml
:
dataSourceKey: defaultDS
destination: topic_dev # mq topic name
groupId: g1
esMapping:
數據初始化導入
全量導入:
curl http://127.0.0.1:8081/etl/es7/crm_opportunity.yml -X POST
統計導入結果:
curl http://127.0.0.1:8081/count/es7/crm_opportunity.yml