1、什么是阿里canal?
canal是阿里开源的,对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。
2、canal环境搭建
本文基于Window系统。
使用canal需要确保数据库开启了binlog:
show variables like'log_%';
如果没开启,在mysql my.ini配置文件添加配置,注意文件内存为的时候,注意编码格式必须为ANSI,不然会编译报错
[mysqld] # 开启 binlog log-bin=mysql-bin # 选择 ROW 模式 binlog-format=ROW # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 server_id=1
配置文件修改是否正确,使用命令,查看日志
mysqld --console
重启MySQL实例
net stop mysql
net start mysql
binlog开启后,创建一个canal用户并授权,官网配置是@%,表示所有服务器,因为本地测试的,所以改为localhost就可以
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' identified by 'canal'; FLUSH PRIVILEGES;
下载canal服务端,到官网releases下载对应资料,canal.deployer-1.1.5.tar.gz是服务端,解压后在conf文件夹里找到\example\instance.properties,修改数据库配置信息,dbUsername,dbPassword数据库账号密码
# position info(master数据库配置) canal.instance.master.address=gz-cdb-l5ixwzm1.sql.tencentcdb.com:59039 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password(用户名密码) canal.instance.dbUsername=canal canal.instance.dbPassword=canal1uo#A#9R # 加上默认数据库 canal.instance.defaultDatabaseName=tajiax_canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
查看canal实例名称,文件在D:\dev\canal.deployer-1.1.5\conf\canal.properties:
canal.destination=example
到canal服务器安装目录D:\dev\canal.deployer-1.1.5\bin,找到startup.bat执行。
在码云有示例->\ly\canal-application
3、Canal客户端测试
JDK 1.8
SpringBoot2.2.1
Maven 3.2+
开发工具
IntelliJ IDEA
smartGit
3.1、引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
3.2、配置canal
# canal实例名称,要跟canal-server运行时设置的destination一致
# D:\dev\canal.deployer-1.1.5\conf\canal.properties canal.destination=example # canal默认监听端口 canal.server=127.0.0.1:11111 logging.level.top.javatool.canal.client=warn
3.3、编写监听器,监听Canal消息
package com.lynch.entity; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler; @CanalTable("tb_user") @Component public class UserHandler implements EntryHandler<UserEntity> { // @Autowired // private RedisHandler redisHandler; // @Autowired // private Cache<Long, Item> itemCache;
@Override public void insert(UserEntity item) { System.out.println("insert," + item); // 写数据到JVM进程缓存 //itemCache.put(item.getId(), item); // 写数据到redis //redisHandler.saveItem(item);
} @Override public void update(UserEntity before, UserEntity after) { System.out.println("update before," + before); System.out.println("update after," + after); // 写数据到JVM进程缓存 //itemCache.put(after.getId(), after); // 写数据到redis //redisHandler.saveItem(after);
} @Override public void delete(UserEntity item) { System.out.println("delete," + item); // 删除数据到JVM进程缓存 //itemCache.invalidate(item.getId()); // 删除数据到redis //redisHandler.deleteItemById(item.getId());
} }
当表tb_user进行insert、update、delete操作时,会发现监控已生效。
注意:更改canal实例名称为canaltest
1、在D:\dev\canal.deployer-1.1.5\conf目录下,把example文件夹改成canaltest
2、在D:\dev\canal.deployer-1.1.5\conf\canal.properties文件中,找到canal.destination配置项改为canaltest
3、修改canal配置
canal.destination=canaltest