前言
本篇只介紹 TCP模式 下詳細的canal相關配置。
- 高可用 請參考文章:【Canal——高可用架構設計與應用】

1.准備
1.1.組件
JDK:1.8版本及以上;
ElasticSearch:6.x版本,目前貌似不支持7.x版本;
Kibana:6.x版本;
Canal.deployer:1.1.4
Canal.Adapter:1.1.4
1.2.配置
- 需要先開啟MySQL的 binlog 寫入功能,配置 binlog-format 為 ROW 模式
找到my.cnf文件,我的目錄是/etc/my.cnf,添加以下配置:
log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

然后重啟mysql,用以下命令檢查一下binlog是否正確啟動:
mysql> show variables like 'log_bin%'; +---------------------------------+----------------------------------+ | Variable_name | Value | +---------------------------------+----------------------------------+ | log_bin | ON | | log_bin_basename | /data/mysql/data/mysql-bin | | log_bin_index | /data/mysql/data/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | +---------------------------------+----------------------------------+ 5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
- 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'Aa123456.'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
2.安裝
2.1.ElasticSearch
安裝配置方法:https://www.cnblogs.com/caoweixiong/p/11826295.html
2.2.canal.deployer
2.2.1.下載解壓
直接下載 訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發布版本包 下載方式,比如以1.1.4版本為例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz 自己編譯 git clone git@github.com:alibaba/canal.git cd canal; mvn clean install -Dmaven.test.skip -Denv=release 編譯完成后,會在根目錄下產生target/canal.deployer-$version.tar.gz
mkdir /usr/local/canal tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal
解壓完成后,進入 /usr/local/canal目錄,可以看到如下結構:

2.2.2.配置
- 配置server
cd /usr/local/canal/conf
vi canal.properties
標紅的需要我們重點關注的,也是平常修改最多的參數:
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = #canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務,默認:無 # register ip to zookeeper canal.register.ip = #運行canal-server服務的主機IP,可以不用配置,他會自動綁定一個本機的IP canal.port = 11111 #canal-server監聽的端口(TCP模式下,非TCP模式不監聽1111端口) canal.metrics.pull.port = 11112 #canal-server metrics.pull監聽的端口 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers = #canal server鏈接zookeeper集群的鏈接信息,集群模式下要配置zookeeper進行協調配置,單機模式可以不用配置 # flush data to zk canal.zookeeper.flush.period = 1000 #canal持久化數據到zookeeper上的更新頻率,單位毫秒 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = tcp #canal-server運行的模式,TCP模式就是直連客戶端,不經過中間件。kafka和mq是消息隊列的模式 # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} #存放數據的路徑 canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb #下面是一些系統參數的配置,包括內存、網絡等 canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config #這里是心跳檢查的配置,做HA時會用到 canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config #binlog過濾的配置,指定過濾那些SQL canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false #是否忽略DCL的query語句,比如grant/create user等,默認false canal.instance.filter.query.dml = false #是否忽略DML的query語句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query記錄),默認false canal.instance.filter.query.ddl = false #是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index.
(目前支持的ddl類型主要為table級別的操作,create databases/trigger/procedure暫時划分為dcl類型),默認false
canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check #binlog格式檢測,使用ROW模式,非ROW模式也不會報錯,但是同步不到數據 canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true #並行解析配置,如果是單個CPU就把下面這個true改為false ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# canal.destinations = example #canal-server創建的實例,在這里指定你要創建的實例的名字,比如test1,test2等,逗號隔開 # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
- 配置example
在根配置文件中創建了實例名稱之后,需要在根配置的同級目錄下創建該實例目錄,canal-server為我們提供了一個示例的實例配置,因此我們可以直接復制該示例,舉個例子吧:根配置配置了如下實例:
[root@aliyun conf]# vim canal.properties ... canal.destinations = user_order,delivery_info ... 我們需要在根配置的同級目錄下創建這兩個實例 [root@aliyun conf]# pwd /usr/local/canal-server/conf [root@aliyun conf]# cp -a example/ user_order [root@aliyun conf]# cp -a example/ delivery_info
這里只舉例1個example的配置:
vi /usr/local/canal/conf/example/instance.properties
標紅的需要我們重點關注的,也是平常修改最多的參數:
###################################################
mysql serverId , v1.0.26+ will autoGencanal.instance.mysql.slaveId=11
# enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=172.16.10.26:3306 #指定要讀取binlog的MySQL的IP地址和端口 canal.instance.master.journal.name= #從指定的binlog文件開始讀取數據 canal.instance.master.position= #指定偏移量,做過主從復制的應該都理解這兩個參數。
#tips:binlog和偏移量也可以不指定,則canal-server會從當前的位置開始讀取。我建議不設置
canal.instance.master.timestamp= #mysql主庫鏈接時起始的binlog的時間戳,默認:無 canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal
#這幾個參數是設置高可用配置的,可以配置mysql從庫的信息 #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal #指定連接mysql的用戶密碼 canal.instance.dbPassword=Aa123456. canal.instance.connectionCharset = UTF-8 #字符集 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex
# mysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\\)
# 常見例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打頭的表:canal\\.canal.*
# 4. canal schema下的一張表:canal.test1
# 5. 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
canal.instance.filter.regex=risk.canal,risk.cwx #這個是比較重要的參數,匹配庫表白名單,比如我只要test庫的user表的增量數據,則這樣寫 test.user
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
2.2.3.啟動
bin/startup.sh
- 查看 server 日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
- 查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
- 關閉
bin/stop.sh
2.3.canal.adapter
2.3.1.下載解壓
訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發布版本包 下載方式,比如以1.1.4版本為例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
mkdir /usr/local/canal-adapter tar zxvf canal.adapter-1.1.4.tar.gz -C /usr/local/canal-adapter
2.3.2.配置
- adapter配置
cd /usr/local/canal-adapter
vim conf/application.yml
標紅的需要我們重點關注的,也是平常修改最多的參數:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #模式 canalServerHost: 127.0.0.1:11111 #指定canal-server的地址和端口 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: #數據源配置,從哪里獲取數據 defaultDS: #指定一個名字,在ES的配置中會用到,唯一 url: jdbc:mysql://172.16.10.26:3306/risk?useUnicode=true username: root password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name #指定在canal-server配置的實例 groups: - groupId: g1 #默認就好,組標識 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es #輸出到哪里,指定es hosts: 172.16.99.2:40265 #指定es的地址,注意端口為es的傳輸端口9300 properties: # mode: transport # or rest # security.auth: test:123456 # only used for rest mode cluster.name: log-es-cluster #指定es的集群名稱
- es配置
[root@aliyun es]# pwd /usr/local/canal-adapter/conf/es [root@aliyun es]# ll total 12 -rwxrwxrwx 1 root root 466 Apr 4 10:27 biz_order.yml #這三個配置文件是自帶的,可以刪除,不過最好不要刪除,因為可以參考他的格式 -rwxrwxrwx 1 root root 855 Apr 4 10:27 customer.yml -rwxrwxrwx 1 root root 416 Apr 4 10:27 mytest_user.yml
創建canal.yml文件:
cp customer.yml canal.yml
vim conf/es/canal.yml
標紅的需要我們重點關注的,也是平常修改最多的參數:
dataSourceKey: defaultDS #指定數據源,這個值和adapter的application.yml文件中配置的srcDataSources值對應。 destination: example #指定canal-server中配置的某個實例的名字,注意:我們可能配置多個實例,你要清楚的知道每個實例收集的是那些數據,不要瞎搞。 groupId: g1 #組ID,默認就好 esMapping: #ES的mapping(映射) _index: canal #要同步到的ES的索引名稱(自定義),需要自己在ES上創建哦! _type: _doc #ES索引的類型名稱(自定義) _id: _id #ES標示文檔的唯一標示,通常對應數據表中的主鍵ID字段,注意我這里寫成的是"_id",有個下划線哦!
#pk: id #如果不需要_id, 則需要指定一個屬性為主鍵屬性
sql: "select t.id as _id, t.name, t.sex, t.age, t.amount, t.email, t.occur_time from canal t" #這里就是數據表中的每個字段到ES索引中叫什么名字的sql映射,注意映射到es中的每個字段都要是唯一的,不能重復。
#etlCondition: "where t.occur_time>='{0}'"
commitBatch: 3000
sql映射文件寫完之后,要去ES上面創建對應的索引和映射,映射要求要和sql文件的映射保持一致,即sql映射中有的字段在ES的索引映射中必須要有,否則同步會報字段錯誤,導致失敗。
2.3.3.創建mysql表和es索引
CREATE TABLE `canal` ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(20) NULL COMMENT '名稱', sex varchar(2) NULL COMMENT '性別', age int NULL COMMENT '年齡', amount decimal(12,2) NULL COMMENT '資產', email varchar(50) NULL COMMENT '郵箱', occur_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
{ "mappings": { "_doc": { "properties": { "id": { "type": "long" }, "name": { "type": "text" }, "sex": { "type": "text" }, "age": { "type": "long" }, "amount": { "type": "text" }, "email": { "type": "text" }, "occur_time": { "type": "date" } } } } }
2.3.4.啟動
cd /usr/local/canal-adapter
./bin/startup.sh
查看日志:
cat logs/adapter/adapter.log

2.4.Kibana
安裝配置方法:https://www.cnblogs.com/caoweixiong/p/11826655.html
3.驗證
- 沒有數據時:

- 插入1條數據:
insert into canal(id,name,sex,age,amount,email,occur_time) values(null,'cwx','男',18,100000000,'249299170@qq.com',now());


- 更新1條數據:
update canal set name='cwx1',sex='女',age=28,amount=200000,email='asdf',occur_time=now() where id=16;


- 刪除1條數據:
delete from canal where id=16;


4.總結
4.1.全量更新不能實現,但是增刪改都是可以的;
4.2.一定要提前創建好es索引;
4.3.es配置的是tcp端口,比如默認的9300;
4.4.目前es貌似支持6.x版本,不支持7.x版本;
