canal是alibaba開源的基於mysql binlog解析工具,可利用它實現mysql增量訂閱/消費,典型的應用場景如下圖:
利用canal,可以將mysql的數據變化,通過解析binlog,投遞到kafka(或rocket mq),mq的消費方,可以把這些數據變化,應用到不同的業務場景,比如:
1. 同步到redis(即:數據庫的變化自動同步到緩存)
2. 同步到es搜索引擎(即:數據庫的變化自動刷新ES索引)
3. 同步到其它異構數據庫(即:mysql的變化,自動同步到pg、oracle等其它類型的數據庫)
下面是mac本上,搭建standalone單機模式的過程:
一、安裝zookeeper
注:canal、kafka都依賴zk,所以得先安裝zk
1.1 wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
1.2 tar -zxvf zookeeper-3.4.14.tar.gz
1.3 cd zookeeper-3.4.14
1.4 cp conf/zoo_sample.cfg conf/zoo.cfg
1.5 vim conf/zoo.cfg 參考下面的內容
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/Users/jimmy/soft/zookeeper-3.4.14/data dataLogDir=/Users/jimmy/soft/zookeeper-3.4.14/logs clientPort=2181
注:dataDir, dataLogDir的目錄大家可自行調整,如果沒有,請先創建,且zk必須有寫入權限
1.6 bin/zkServer.sh start-foreground 如果看到終端有類似下面的輸入:
2019-05-26 13:27:38,667 [myid:] - INFO [main:ServerCnxnFactory@117] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
2019-05-26 13:27:38,682 [myid:] - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
就表示啟動成功了。
注:start-foreground表示前台啟動,如果啟動過程中有任何錯誤,也會直接輸出,首次啟動時,用這種方式可以快速排錯。如果啟動成功后,可以ctrl+c,然后用bin/zkServer.sh start 轉入后台運行模式。
二、安裝kafka
2.1 wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
2.2 tar -zxvf kafka_2.11-2.1.1.tgz
2.3 vim config/server.properties 修改下面幾處
... listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 ... log.dirs=/Users/jimmy/soft/kafka_2.11-2.1.1/logs
注:主要就是指定個zk的地址,以及日志目錄
2.4 bin/kafka-server-start.sh config/server.properties 啟動
如果控制台沒輸出錯誤信息,lsof -i:9092 端口都在(另開1個終端檢測),說明啟用成功,Ctrl+C停掉,再用 bin/kafka-server-start.sh -daemon config/server.properties 后台方式運行
三、安裝mysql
3.1 brew install mysql@5.7
注:不建議安裝mysql 5.8版本,因為5.8采用了新的身份驗證方式,canal在連接時,低版本會遇到問題(將來canal可能會支持5.8版本)
3.2 brew services start mysql@5.7 (注:start換成stop就是停止)
3.3 隨便找個mysql客戶端連上去,創建canal專用連接賬號
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal.1.1.3.x'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
注:上面Canal.1.1.3.x 是canal的密碼,大家根據需求自行修改。
3.4 調整my.cnf參數,啟用binlog功能
vim /usr/local/etc/my.cnf
# Default Homebrew MySQL server config [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 # Only allow connections from localhost bind-address=127.0.0.1
調整好后,重啟mysql,然后連上去,輸入:
show master status;
驗證一下,如果能看到類似:
就表示ok了。
四、安裝canal
4.1 wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
4.2 tar -zxvf canal.deployer-1.1.3.tar.gz
4.3 cd canal-1.1.3 (注:如果解析后的目錄名,不是這個,大家自行調整)
4.4 vim conf/example/instance.properties
.... canal.instance.mysql.slaveId=1234 .... canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name=mysql-bin.000001 canal.instance.master.position=154 ... canal.instance.dbUsername=canal canal.instance.dbPassword=Canal.1.1.3.x canal.instance.connectionCharset=UTF-8 ... # table regex(同步sample庫中的employee,city這二張表) canal.instance.filter.regex=sample\\.employee,sample\\.city # mq config(上面的二張表,數據變化,投放到sample-data中) canal.mq.topic=sample-data ...
主要是指定:mysql地址及binlog起始位置(注:最好與上一步show master status里輸出的信息一致),連接用戶名/密碼,以及kafka mq的topic信息(上面的配置,我們會把sample庫的employee,city這二張表的變化,都投遞到sample-data這個topic中)
4.5 vim conf/canal.properties
... canal.serverMode=kafka ... canal.destinations = example ... canal.mq.servers = 127.0.0.1:9092
注:上述關鍵配置,表示canal將使用kafka作為mq,同時conf/example作為desination之一。
4.6 bin/startup.sh 啟動
啟動完成后,是否成功要通過日志查看
cat logs/canal/canal.log 如果能看到類似下面的輸出:
2019-05-26 14:43:36.468 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ...... 2019-05-26 14:43:36.468 [destination = metrics , address = null , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - parse events has an error com.alibaba.otter.canal.parse.exception.CanalParseException: illegal connection is null 2019-05-26 14:43:36.482 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop metrics successful. 2019-05-26 14:43:36.713 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4435,serverId=1,gtid=,timestamp=1558841400000] cost : 409ms , the next step is binlog dump
就表示成功了。
注:如果啟動不成功,可能原因有:
1、canal連接不上mysql,日志里會有相應提示,可嘗試用mysql客戶端,以canal里配置的用戶名、密碼連接測試一下
2、提示無法讀取binlog,找不到binlog文件之類。如果mysql里show master status正常,多半是之前canal上次運行時記錄了錯誤的binlog起始位置。可嘗試調整/conf/example/meta.dat文件中的值
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000004","position":2805,"serverId":1,"timestamp":1558880055000}}}],"destination":"example"}y
該文件為一個標准json,里面記錄了binlog的文件名及起始位置。或者簡單粗暴點,刪除/conf/example下的.dat以及.db文件。
4.7 查看kafka中的消息
先在sample數據庫中,隨便建二個表,並修改幾行數據,表結構如下:
CREATE TABLE `employee` ( `id` bigint(20) unsigned zerofill NOT NULL AUTO_INCREMENT COMMENT '主鍵id', `name` varchar(100) NOT NULL DEFAULT '' COMMENT '姓名', `update_at` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '最后更新時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4
及
CREATE TABLE `city` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `city_name` varchar(100) DEFAULT '' COMMENT '城市名', `create_at` datetime(3) DEFAULT CURRENT_TIMESTAMP(3), `update_at` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3), PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='城市表'
然后回到終端界面。
4.7.1 cd kafka_2.11-2.1.1 進入kafka目錄
4.7.2 bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 查看所有topic, 如果輸出信息中,有sample-data這個topic,表示topic正常
4.7.3 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic sample-data 查看所有sample topic的消息
看到類似下面的輸出(注:為了便於閱讀,已經做了json格式化處理)
{ "data": [{ "id": "6", "name": "楊俊明", "update_at": "2019-05-26 14:51:30.094" }], "database": "sample", "es": 1558853490000, "id": 4, "isDdl": false, "mysqlType": { "id": "bigint(20) unsigned zerofill", "name": "varchar(100)", "update_at": "datetime(3)" }, "old": [{ "name": "新名稱", "update_at": "2019-05-26 11:30:00.899" }], "pkNames": ["id"], "sql": "", "sqlType": { "id": -5, "name": 12, "update_at": 93 }, "table": "employee", "ts": 1558853490132, "type": "UPDATE" }
或
{ "data": [{ "id": "2", "city_name": "北京", "create_at": "2019-05-26 22:14:15.508", "update_at": "2019-05-26 22:14:15.508" }], "database": "sample", "es": 1558880055000, "id": 5, "isDdl": false, "mysqlType": { "id": "bigint", "city_name": "varchar(100)", "create_at": "datetime(3)", "update_at": "datetime(3)" }, "old": null, "pkNames": ["id"], "sql": "", "sqlType": { "id": -5, "city_name": 12, "create_at": 93, "update_at": 93 }, "table": "city", "ts": 1558880055712, "type": "INSERT" }
參考文章:
https://github.com/alibaba/canal/wiki/Kafka-QuickStart
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
https://www.jianshu.com/p/93d9018e2fa1