使用canal同步MySQL數據至Elasticsearch


版本使用

軟件 版本
MySQL 8.0.18
canal 1.1.5
canal-adapter 1.1.5
Elasticsearch 7.4.0
Kibana 7.4.0
JDK 1.8.0_181

基礎配置

安裝寶塔面板

偷懶了,所以直接安裝寶塔面板

Centos安裝命令:

yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh

登錄后安裝LNMP套件:

安裝jdk 8

下載

wget https://mirrors.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.rpm

安裝

rpm -ivh jdk-8u181-linux-x64.rpm

檢查是否安裝成功

[root@VM-0-9-centos ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

安裝docker

軟件商店中找到docker管理器,點擊安裝

配置docker國內鏡像

創建並修改/etc/docker/daemon.json文件

touch /etc/docker/daemon.json
vim /etc/docker/daemon.json

內容如下

{
  "registry-mirrors": [
    "https://registry.docker-cn.com",
    "http://hub-mirror.c.163.com",
    "https://docker.mirrors.ustc.edu.cn"
  ]
}

也可以直接在寶塔面板中配置

安裝ElasticSearch

從docker下載es鏡像

最后實際上用的是7.4.0,懶得改圖了

docker pull elasticsearch:7.4.0

檢查鏡像

配置

創建配置文件elasticsearch.yml

cluster.name: "my-aliyun-cluster"   
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
xpack.security.enabled: false

啟動

sudo docker run -d --name es7 -p 9200:9200 -p 9300:9300 -v /tmp/mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms256m -Xmx256m" elasticsearch:7.4.0

可以看到docker已經啟動了

9200端口可以被訪問到

安裝Kibana

從docker下載kibana鏡像

docker pull kibana:7.4.0

配置kibana.yml

注意,這里的ip地址需要docker inspect查看,用這里看到的那個

server.name: kibana
server.host: 0.0.0.0
elasticsearch.hosts: [ "http://內網ip:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"

啟動

sudo docker run --name kibana6 -d -p 5601:5601 -v /tmp/mydata/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.9.0

可以正常訪問kibana

配置canal

倉庫地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量訂閱&消費組件

准備

  • 對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
    
    • 注意:針對阿里雲 RDS for MySQL , 默認打開了 binlog , 並且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步
    • my.cnf在/etc/my.cnf
    • 設置完之后重啟mysql服務,可以無腦在寶塔面板點一下
  • 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

    后來直接使用的阿里雲的RDS,所以這一步我沒有做

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

下載

  • 下載 canal, 訪問 release 頁面 , 選擇需要的包下載, 如以 1.1.15 版本為例

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
    
  • 解壓縮

    mkdir /tmp/canal
    tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal
    
    • 解壓完成后,進入 /tmp/canal 目錄,可以看到如下結構

      [root@VM-0-9-centos canal]# ll
      total 20
      drwxr-xr-x 2 root root 4096 Dec 26 15:03 bin
      drwxr-xr-x 5 root root 4096 Dec 26 15:03 conf
      drwxr-xr-x 2 root root 4096 Dec 26 15:03 lib
      drwxrwxrwx 2 root root 4096 Aug 22 13:17 logs
      drwxrwxrwx 2 root root 4096 Aug 22 13:17 plugin
      

修改配置

創建測試數據庫

CREATE TABLE user (
    `id` int(10) NOT NULL,
    `name` varchar(100) DEFAULT NULL,
    `role_id` int(10) NOT NULL,
    `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
    `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
);

修改配置

修改配置 conf/example/instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=數據庫連接地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=數據庫用戶名
canal.instance.dbPassword=數據庫密碼
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

canal.properties貌似也要改一下,是一個玄學問題

需要把這一行的注釋去掉

canal.instance.parser.parallelThreadSize = 16

啟動

[root@VM-0-9-centos bin]# sh ./startup.sh 

配置canal-adapter

准備

添加student以及映射關系

PUT /canal_user
{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "role_id": {
        "type": "long"
      },
      "c_time": {
          "type": "date"
      }
    }
  }
}

下載

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz

解壓adapter壓縮包,

mkdir /tmp/canal-adapter
tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal-adapter

配置application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181
#  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://mysql所在服務器外網ip:3306/canal?useUnicode=true
      username: aliyun_user
      password: Huigu12345!!
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es
        hosts: es所在服務器IP:9300
        properties:
          cluster.name: es的cluster名字

adapter將會自動加載 conf/es 下的所有.yml結尾的配置文件

適配器表映射文件

創建並修改 conf/es/canal_user.yml文件:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: canal_user
  _type: _doc
  _id: _id
  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>='{0}'"
  commitBatch: 3000

測試

建表以及sql語句來源:https://segmentfault.com/a/1190000019066098?utm_source=tag-newest

插入數據

嘗試在數據庫中插入一條數據:

可以在日志中找到了:

在kibana中查詢,可以得到這條結果:

GET /canal_user/_doc/7

更新數據

嘗試更新一條數據

UPDATE user
SET name = 'zhengguo'
WHERE id = 7;

日志中可以看到

再次在kibana查詢,可以看到已經變化了

GET /canal_user/_doc/7

刪除數據

嘗試刪除一條數據

DELETE FROM user
WHERE id = 7;

日志中可以看到

再次在kibana查詢,可以無法查詢到這條記錄了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM