Canal實戰


疑惑點

本教程使用版本

  • Canal版本:1.1.5
  • Canal-Adapter版本:1.1.5
  • Eleasticsearch版本:7.10.2
  • MySQL版本:5.7.32

背景


MySQL表數據達到百萬級后,復雜的查詢導致響應時間很慢,計划用ES充當二級緩存。

整體架構如下:

整體架構

canal可以通過監聽MySQL的binlog日志來實現數據庫增量的日志解析,再通過ClientAdapter把數據實時同步給ES。

Canal工作流程圖:

Canal工作流程圖

Canal工作原理:

  • canal模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議;
  • MySQL master收到dump請求,開始推送binary log給slave(即canal);
  • canal解析binary log對象(原始為byte流)。

操作流程


  1. 准備工作:
    1. MySQL:存放原始數據;
    2. canal:解析數據庫日志,同步獲取到的增量變更;
    3. canal-adater:解析轉化數據到ES;
    4. ES:接收增量數據;
  2. 在ES中創建索引
    1. 要求Mapping中定義的字段名稱和類型與待同步數據保持一致;
  3. 安裝JDK:Canal基於Java開發,且版本≥1.8.0
  4. 安裝並啟動canal-server
    1. 安裝canal-server,然后修改配置文件關聯MySQL。canal-server模擬MySQL集群的一個slave,獲取MySQL集群Master節點的二進制日志(binary log),並將日志推送到Canal-adapter。
  5. 安裝並啟動canal-adapter
    1. 安裝canal-adapter,然后修改配置文件關聯MySQL和ES,以及定義MySQL數據到ES數據的映射字段,用來將數據同步到ES。
  6. 驗證增量數據同步

配置MySQL

查看MySQL是否開啟了binlog:

SHOW VARIABLES LIKE 'log_bin';

需要先開啟Binlog寫入功能,配置binlog-format為ROW模式,my.cnf配置如下:

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

Canal安裝配置

  1. 下載Canal

https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

canal.deployer對應的是canal的server端,負責訂閱並解析MySQL-Binlog。

解壓后的文件目錄

canal1.1.5
├── bin
│   │── restart.sh    # 重啟
│   │── startup.sh    # 啟動canal
│   │── stop.sh       # 停止canal
│   └── ...
├── conf
│   │── canal.properties  # 配置文件
│   │── example
│   │   ├── h2.mv.db
│   │   ├── instance.properties  # 實例配置文件
│   │   └── meta.dat
│   │── logback.xml
│   └── ...
├── lib
├── logs
│   ├── canal
│   │   ├── canal.log
│   │   └── canal_stdout.log
│   └── example
│       ├── meta.log
│       └── example.log
└── plugin
  1. 配置Canal

修改conf/example/instance.properties文件

# position info
canal.instance.master.address=127.0.0.1:3306    # 數據庫地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

canal.instance.dbUsername=root                   # 連接數據庫用戶名
canal.instance.dbPassword=123456                 # 連接數據庫密碼
canal.instance.connectionCharset = UTF-8

canal.instance.filter.regex=blog.user
  1. 啟動
./bin/startup.sh

Canal-adapter安裝配置

  1. 下載Canal adapter

https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

解壓后的文件目錄

canal.adapter-1.1.5
├── bin
│   │── restart.sh    # 重啟
│   │── startup.sh    # 啟動canal
│   │── stop.sh       # 停止canal
│   └── ...
├── conf
│   │── application.yml  # 配置文件
│   │── bootstrap.yml
│   │── es7
│   │   ├── customer.yml
│   │   └── ...
│   └── ...
├── lib
├── logs
└── plugin
  1. 配置adapter

修改conf/application.yml文件

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/blog?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
    - instance: example # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              hosts: http://127.0.0.1:9200,http://10.192.8.204:9200,http://10.192.8.209:9200
              properties:
                mode: rest # or transport
                # security.auth: test:123456 #  only used for rest mode
                cluster.name: dev-cluster

刪除conf/es7/下的所有文件,並新建自己的yml文件。

dataSourceKey: defaultDS        # 源數據源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 對應application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
  _index: mytest_user           # es 的索引名稱
  _type: _doc                   # es 的type名稱, es7下無需配置此項
  _id: _id                      # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
#  pk: id                       # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 數組或者對象屬性, array:; 代表以;字段里面是以;分隔的
#    _obj: object               # json對象
  etlCondition: "where a.c_time>='{0}'"     # etl 的條件參數
  commitBatch: 3000                         # 提交批大小
  1. 啟動ES
./bin/startup.sh

啟動錯誤信息

1. cannot be cast to com.alibaba.druid.pool.DruidDataSource


2022-02-28 14:52:57.684 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2022-02-28 14:52:57.734 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2022-02-28 14:52:58.037 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
        at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
        ...
        at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[client-adapter.launcher-1.1.5.jar:na]
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
        at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        ... 42 common frames omitted
Caused by: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
        at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:146) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        ... 43 common frames omitted

在Github issues中找到#3717 解決方案

下載源碼壓縮包canal-canal-1.1.5.zip,解壓后用IDE打開項目,修改canal-canal-1.1.5/client-adapter/escore/pom.xml依賴

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <scope>provided</scope>
</dependency>

然后在項目根目錄下執行mvn clean package -Dmaven.test.skip=true,然后到canal-canal-1.1.5\client-adapter\es7x\target\目錄下找到client-adapter.es7x-1.1.5-jar-with-dependencies.jar

將這個文件復制到./canal_adapter/plugin/下,重啟adapter。

2. NullPointerException

2022-03-01 09:45:29.443 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2022-03-01 09:45:29.464 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /usr/local/canal/canal.adapter-1.1.5/plugin
2022-03-01 09:45:29.524 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## something goes wrong when starting up the canal client adapters:
java.lang.NullPointerException: null
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:782) ~[guava-22.0.jar:na]
        ...
        at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:28) ~[canal.common-1.1.5.jar:na]
        at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.init(CanalTCPConsumer.java:57) ~[connector.tcp-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.<init>(AdapterProcessor.java:74) ~[client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.lambda$init$0(CanalAdapterLoader.java:65) ~[client-adapter.launcher-1.1.5.jar:na]
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[na:1.8.0_161]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:60) ~[client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) ~[client-adapter.launcher-1.1.5.jar:na]

Google了半天也沒找到網上類似的問題,所以懷疑是自己哪里配置文件寫錯了。經過對比,發現是多刪除了application.yml文件的配置信息。

3. Connection refused

2022-03-01 11:21:49.823 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2022-03-01 11:21:49.824 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error!
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
        at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198) ~[na:na]
        at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115) ~[na:na]
        at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:63) ~[na:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5.jar:na]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_161]
        at sun.nio.ch.Net.connect(Net.java:454) ~[na:1.8.0_161]
        at sun.nio.ch.Net.connect(Net.java:446) ~[na:1.8.0_161]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[na:1.8.0_161]
        at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150) ~[na:na]

canal server中的canal.serverMode配置的是kafka,改為tcp后,一切正常。

解決思路

  1. 提示拒絕連接,要分析哪個連接被拒絕了;
  2. 查看服務是否開放了該端口。命令:netstat -lntp

Kafka模式

配置canal.serverModel為kafka模式后,需要修改application.yml文件和

canal.conf:
  mode: kafka #tcp kafka rocketMQ rabbitMQ
    kafka.bootstrap.servers: 10.192.1.5:9092,10.192.1.6:9092,10.192.1.7:9092,10.192.1.8:9092
  canalAdapters:
  - instance: topic_dev  # mq topic name

修改映射文件,如vim es7/user.yml

dataSourceKey: defaultDS
destination: topic_dev      # mq topic name
groupId: g1
esMapping: 

數據初始化導入

全量導入:

curl http://127.0.0.1:8081/etl/es7/crm_opportunity.yml -X POST

統計導入結果:

curl http://127.0.0.1:8081/count/es7/crm_opportunity.yml

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM