本文介紹如何使用canal增量同步mysql數據庫信息到ElasticSearch。(注意:是增量!!!)
1.簡介
1.1 canal介紹
Canal是一個基於MySQL二進制日志的高性能數據同步系統。Canal廣泛用於阿里巴巴集團(包括https://www.taobao.com),以提供可靠的低延遲增量數據管道,github地址:https://github.com/alibaba/canal
Canal Server能夠解析MySQL binlog並訂閱數據更改,而Canal Client可以實現將更改廣播到任何地方,例如數據庫和Apache Kafka。
它具有以下功能:
- 支持所有平台。
- 支持由Prometheus提供支持的細粒度系統監控。
- 支持通過不同方式解析和訂閱MySQL binlog,例如通過GTID。
- 支持高性能,實時數據同步。(詳見Performance)
- Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持
- Docker支持。
缺點:
不支持全量更新,只支持增量更新。
完整wiki地址:https://github.com/alibaba/canal/wiki
1.2 運作原理
原理很簡單:
- Canal模擬MySQL的slave的交互協議,偽裝成mysql slave,並將轉發協議發送到MySQL Master服務器。
- MySQL Master接收到轉儲請求並開始將二進制日志推送到slave(即canal)。
- Canal將二進制日志對象解析為自己的數據類型(原始字節流)
如圖所示:
1.3 同步es
在同步數據到es的時候需要使用適配器:canal adapter。目前最新版本1.1.3,下載地址:https://github.com/alibaba/canal/releases。
目前es貌似支持6.x版本,不支持7.x版本!!!
2.准備工作
2.1 es和jdk
安裝es可以參考:https://www.dalaoyang.cn/article/78
安裝jdk可以參考:https://www.dalaoyang.cn/article/16
2.2 安裝canal server
下載canal.deployer-1.1.3.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
解壓文件
tar -zxvf canal.deployer-1.1.3.tar.gz
進入解壓后的文件夾
cd canal.deployer-1.1.3
修改conf/example/instance.properties文件,主要注意以下幾處:
- canal.instance.master.address:數據庫地址,例如127.0.0.1:3306
- canal.instance.dbUsername:數據庫用戶
- canal.instance.dbPassword:數據庫密碼
完整內容如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
回到canal.deployer-1.1.3目錄下,啟動canal:
sh bin/startup.sh
查看日志:
vi logs/canal/canal.log
查看具體instance日志:
vi logs/example/example.log
關閉命令
sh bin/stop.sh
2.3 安裝canal-adapter
下載canal.adapter-1.1.3.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
解壓
tar -zxvf canal.adapter-1.1.3.tar.gz
進入解壓后的文件夾
cd canal.adapter-1.1.3
修改conf/application.yml文件,主要注意如下內容,由於是yml文件,注意我這里說明的屬性名稱:
- server.port:canal-adapter端口號
- canal.conf.canalServerHost:canal-server地址和ip
- canal.conf.srcDataSources.defaultDS.url:數據庫地址
- canal.conf.srcDataSources.defaultDS.username:數據庫用戶名
- canal.conf.srcDataSources.defaultDS.password:數據庫密碼
- canal.conf.canalAdapters.groups.outerAdapters.hosts:es主機地址,tcp端口
完整內容如下:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: 12345678
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: es
hosts: 127.0.0.1:9300
properties:
cluster.name: elasticsearch
另外需要配置conf/es/*.yml文件,adapter將會自動加載conf / es下的所有.yml結尾的配置文件。在介紹配置前,需要先介紹一下本案例使用的表結構,如下:
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(200) NOT NULL,
`address` varchar(1000) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
需要手動去es中創建索引,比如這里使用es-head創建,如下圖:
test索引結構如下:
{
"mappings":{
"_doc":{
"properties":{
"name":{
"type":"text"
},
"address":{
"type":"text"
}
}
}
}
}
接下來創建test.yml(文件名隨意),內容很好理解_index為索引名稱,sql為對應語句,內容如下:
dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
_index: test
_type: _doc
_id: _id
upsert: true
sql: "select a.id as _id,a.name,a.address from test a"
commitBatch: 3000
配置完成后,回到canal-adapter根目錄,執行命令啟動
bin/startup.sh
查看日志
vi logs/adapter/adapter.log
關閉canal-adapter命令
bin/stop.sh
3.測試
都啟動成功后,先查看一下es-head,如圖,現在是沒有任何數據的。
接下來,我們在數據庫中插入一條數據進行測試,語句如下:
INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝陽區');
然后在看一下es-head,如下
接下來看一下日志,如下:
2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝陽區"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"}
Affected indexes: test
小知識點:上面介紹的查看日志的方法可能不是很好用,推薦使用如下語法,比如查看日志最后200行:
tail -200f logs/adapter/adapter.log
4.總結
1.全量更新不能實現,但是增刪改都是可以的。
2.一定要提前創建好索引。
3.es配置的是tcp端口,比如默認的9300