4、canal同步mysql數據到es中


cannl同步mysql數據到es中

canal組件介紹

canal-admin
(非必須但推薦使用):為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。

canal-server:
服務端,從mysql讀取binlog日志獲取增量日志,可以通過tcp、kafka、RocketMQ等方式與客戶端通信;通過zookeeper搭建集群。

canal-adapter
客戶端,根據canal-server獲取的增量日志執行適配到其他諸如elasticsearch、redis、mysql等端,實現數據同步。


本次實驗環境

首先我們需要下載canal的各個組件canal-servercanal-adaptercanal-admin,下載地址https://github.com/alibaba/canal/releases

es及相關工具的部署可參考Elasticsearch詳解及部署

1、開啟mysql的binlog

使用canal-server需要先准備mysql,對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式

vi /etc/my.cnf

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

配置完成后重啟mysql,並查詢是否配置生效:ON就是開啟

systemctl restart mysqld

可以進入數據庫再次查看下

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)

mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

2、mysql用戶數據准備

 創建一個canal用戶並對其進行授權

CREATE USER canal IDENTIFIED BY 'Cjz123456.';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

創建一個測試用戶數據庫及表

create database canal;
USE canal; CREATE TABLE product ( id bigint(20) NOT NULL AUTO_INCREMENT, title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, sub_title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, price decimal(10, 2) NULL DEFAULT NULL, pic varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

3、創建存放canal的目錄  

mkdir /usr/local/canal-server

mkdir /usr/local/canal-adpter

mkdir /usr/local/canal-admin  

4、部署canal-server

tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-server

vim /usr/local/canal-server/conf/example/instance.properties

# 需要同步數據的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用於同步數據的數據庫賬號
canal.instance.dbUsername=canal
# 用於同步數據的數據庫密碼
canal.instance.dbPassword=Cjz123456.
# 數據庫連接編碼
canal.instance.connectionCharset = UTF-8
# 需要訂閱binlog的表過濾正則表達式
canal.instance.filter.regex=.*\\..*

cd /usr/local/canal-server/bin

./startup.sh

查看日志查看是否正常啟動

cd /usr/local/canal-server/logs/

tail -f example/example.log

tail -f canal/canal.log

5、部署canal-adapter

tar -zxvf canal.adapter-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-adpter/

cd /usr/local/canal-adpter/conf

vim application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # 客戶端的模式,可選tcp kafka rocketMQ
  flatMessage: true # 扁平message開關, 是否以json字符串形式投遞數據, 僅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 對應集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批數量
  retries: 0 # 重試次數, -1為無限重試
  timeout: # 同步超時時間, 單位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111 #設置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.111.129:3306/canal?useUnicode=true&charaterEncoding=utf-8&useSSL=false
      username: canal
      password: Cjz123456.
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: http://192.168.111.129:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          security.auth: elastic:elastic #  only used for rest mode
          cluster.name: my-es

修改 canal-adapter/conf/es7/mytest_user.yml 文件,用於配置MySQL中的表與Elasticsearch中索引的映射關系

vim es7/mytest_user.yml

dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
  _index: canal_product # es 的索引名稱
  _id: _id  # 將Mysql表里的id對應上es上的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  sql: "SELECT
         p.id AS _id,
         p.title,
         p.sub_title,
         p.price,
         p.pic
        FROM
         product p"        # sql映射
  etlCondition: "where p.id>={}"   #etl的條件參數
  commitBatch: 3000   # 提交批大小

cd /usr/local/canal-adpter/bin/

./startup.sh

查看日志看是否正常啟動 

cd /usr/local/canal-adpter/logs/adapter/

tail -f adapter.log

6、創建ES索引

可以直接通過Kibana頁面進行創建,不難看出,索引的屬性對應着就是我們表的屬性(Mysql表id與es的_id做了對應,所以這里不需要設置)

PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}

7、驗證

canal初始化是不會全量同步數據的,所以我們需要登錄mysql,插入一條數據

INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 15, '小米8', ' 全面屏游戲智能手機 6GB+64GB', 1999.00, NULL );

創建完成后在Kibana上查看即可發現數據已經同步

GET canal_product/_search

在elasticsearch-head上查看也是,可以看出,我們的表id變成了es的_id

在Mysql上修改數據看看

UPDATE product SET title='小米10' WHERE id=15;

 可以看到我們修改的數據也進行了同步

 

8、canal-admin的安裝

tar -zxvf canal.admin-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-admin/

創建 canal-admin 需要使用的數據庫canal_manager,創建的SQL腳本在canal-admin包下的conf/canal_manager.sql

cd /usr/local/canal-admin/conf/ 

導入sql腳本

mysql -uroot -p < canal_manager.sql

修改配置文件conf/application.yml,按如下配置即可,主要是修改數據源配置和canal-admin的管理賬號配置,注意需要用一個有讀寫權限的數據庫賬號;

vim application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root             #數據庫管理員賬號
  password: Cjz123456.       #數據庫管理員賬號密碼
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

接下來對之前搭建的canal-server的conf/canal_local.properties文件進行配置,主要是修改canal-admin的配置  

cd /usr/local/canal-server/conf/

vim canal_local.properties

# register ip
canal.register.ip = 192.168.111.129

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

啟動canal-admin

cd /usr/local/canal-admin/bin/

./startup.sh

啟動完好后即可訪問頁面,訪問ip:8089,初始用戶密碼admin/123456

重啟canal-server

cd /usr/local/canal-server/bin/

./restart.sh local

再次回到我們的canal-admin管理頁面,即可看到該節點信息


該文章參考於:https://blog.csdn.net/zh1998wx/article/details/123101442

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM