使用canal同步MySQL数据至Elasticsearch


版本使用

软件 版本
MySQL 8.0.18
canal 1.1.5
canal-adapter 1.1.5
Elasticsearch 7.4.0
Kibana 7.4.0
JDK 1.8.0_181

基础配置

安装宝塔面板

偷懒了,所以直接安装宝塔面板

Centos安装命令:

yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh

登录后安装LNMP套件:

安装jdk 8

下载

wget https://mirrors.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.rpm

安装

rpm -ivh jdk-8u181-linux-x64.rpm

检查是否安装成功

[root@VM-0-9-centos ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

安装docker

软件商店中找到docker管理器,点击安装

配置docker国内镜像

创建并修改/etc/docker/daemon.json文件

touch /etc/docker/daemon.json
vim /etc/docker/daemon.json

内容如下

{
  "registry-mirrors": [
    "https://registry.docker-cn.com",
    "http://hub-mirror.c.163.com",
    "https://docker.mirrors.ustc.edu.cn"
  ]
}

也可以直接在宝塔面板中配置

安装ElasticSearch

从docker下载es镜像

最后实际上用的是7.4.0,懒得改图了

docker pull elasticsearch:7.4.0

检查镜像

配置

创建配置文件elasticsearch.yml

cluster.name: "my-aliyun-cluster"   
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
xpack.security.enabled: false

启动

sudo docker run -d --name es7 -p 9200:9200 -p 9300:9300 -v /tmp/mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms256m -Xmx256m" elasticsearch:7.4.0

可以看到docker已经启动了

9200端口可以被访问到

安装Kibana

从docker下载kibana镜像

docker pull kibana:7.4.0

配置kibana.yml

注意,这里的ip地址需要docker inspect查看,用这里看到的那个

server.name: kibana
server.host: 0.0.0.0
elasticsearch.hosts: [ "http://内网ip:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"

启动

sudo docker run --name kibana6 -d -p 5601:5601 -v /tmp/mydata/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.9.0

可以正常访问kibana

配置canal

仓库地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
    • my.cnf在/etc/my.cnf
    • 设置完之后重启mysql服务,可以无脑在宝塔面板点一下
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    后来直接使用的阿里云的RDS,所以这一步我没有做

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

下载

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.15 版本为例

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
    
  • 解压缩

    mkdir /tmp/canal
    tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal
    
    • 解压完成后,进入 /tmp/canal 目录,可以看到如下结构

      [root@VM-0-9-centos canal]# ll
      total 20
      drwxr-xr-x 2 root root 4096 Dec 26 15:03 bin
      drwxr-xr-x 5 root root 4096 Dec 26 15:03 conf
      drwxr-xr-x 2 root root 4096 Dec 26 15:03 lib
      drwxrwxrwx 2 root root 4096 Aug 22 13:17 logs
      drwxrwxrwx 2 root root 4096 Aug 22 13:17 plugin
      

修改配置

创建测试数据库

CREATE TABLE user (
    `id` int(10) NOT NULL,
    `name` varchar(100) DEFAULT NULL,
    `role_id` int(10) NOT NULL,
    `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
    `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
);

修改配置

修改配置 conf/example/instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=数据库连接地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=数据库用户名
canal.instance.dbPassword=数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

canal.properties貌似也要改一下,是一个玄学问题

需要把这一行的注释去掉

canal.instance.parser.parallelThreadSize = 16

启动

[root@VM-0-9-centos bin]# sh ./startup.sh 

配置canal-adapter

准备

添加student以及映射关系

PUT /canal_user
{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "role_id": {
        "type": "long"
      },
      "c_time": {
          "type": "date"
      }
    }
  }
}

下载

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz

解压adapter压缩包,

mkdir /tmp/canal-adapter
tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal-adapter

配置application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181
#  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://mysql所在服务器外网ip:3306/canal?useUnicode=true
      username: aliyun_user
      password: Huigu12345!!
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es
        hosts: es所在服务器IP:9300
        properties:
          cluster.name: es的cluster名字

adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件

适配器表映射文件

创建并修改 conf/es/canal_user.yml文件:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: canal_user
  _type: _doc
  _id: _id
  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>='{0}'"
  commitBatch: 3000

测试

建表以及sql语句来源:https://segmentfault.com/a/1190000019066098?utm_source=tag-newest

插入数据

尝试在数据库中插入一条数据:

可以在日志中找到了:

在kibana中查询,可以得到这条结果:

GET /canal_user/_doc/7

更新数据

尝试更新一条数据

UPDATE user
SET name = 'zhengguo'
WHERE id = 7;

日志中可以看到

再次在kibana查询,可以看到已经变化了

GET /canal_user/_doc/7

删除数据

尝试删除一条数据

DELETE FROM user
WHERE id = 7;

日志中可以看到

再次在kibana查询,可以无法查询到这条记录了


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM