canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
起源:早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
工作原理:mysql 的主從備份
偽裝自己是mysql 的slave 解析binlog
開啟mysql 的binlog
找到mysql的my.ini文件
打開文件后,添加以下內容
server_id=1 ###代表集群模式第一台機器 binlog_format=ROW ###行模式 log_bin=mysql_bin.log ###binlog的文件名稱
執行sql語句
show variables like '%log_bin%'
查看是否開啟binlog
create user canal identified by ‘canal’; grant select,replication slave,replication client on*.* to 'canal'@'%'; grant all privileges on . TO 'canal'@'%'; flush privileges
創建新的mysql用戶 並賦予權限
下載canal服務https://github.com/alibaba/canal/releases/
打開canal服務conf下的配置文件canal.properties
這里配置端口號
復制conf目錄下的example
粘貼到此目錄下 修改文件名
進入你新建的文件夾 打開instance.properties
接着運行bin目錄下的starup.bat
如果錯誤 刪除starup.bat 中11行的@Rem
自此canal 的服務 就已經配置好了 接着需要搭建一個canal的客戶端工程
package com.aila; import com.xpand.starter.canal.annotation.EnableCanalClient; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author: {---chenzhichao---} * @Date: 2020/6/2 15:11 */ @SpringBootApplication @EnableCanalClient public class MyCanalApplication { public static void main(String[] args) { SpringApplication.run(MyCanalApplication.class, args); } }
package com.aila.Listener; import com.alibaba.otter.canal.protocol.CanalEntry; import com.xpand.starter.canal.annotation.CanalEventListener; import com.xpand.starter.canal.annotation.ListenPoint; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; import java.util.Map; import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.DELETE; import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.INSERT; import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.UPDATE; /** * @Author: {---chenzhichao---} * @Date: 2020/6/8 11:00 */ @CanalEventListener public class NewsListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisTemplate redisTemplate; @ListenPoint(schema = "class19",table = "city") public void mytest1(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ Map<String,String> newData = new HashMap<>(); rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue())); for (String s : newData.keySet()) { System.out.println(s+":"+String.valueOf(newData.get(s))); } } @ListenPoint(schema = "class19",table = "hot_news") public void mytest2(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ switch (eventType) { case INSERT: System.out.println("INSERT "); break; case UPDATE: System.out.println("UPDATE "); break; case DELETE: System.out.println("DELETE "); break; default: break; } } @ListenPoint(schema = "class19",table = "hot_news", eventType= {INSERT, UPDATE})//對熱點信息表新增或更新 public void mytest3(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ Map<String,String> newData = new HashMap<>(); rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue())); for (String s : newData.keySet()) { System.out.println(s+":"+String.valueOf(newData.get(s))); } String id = newData.get("id"); String content = newData.get("content"); String name = newData.get("name"); //stringRedisTemplate.boundValueOps(id).set(content); redisTemplate.boundHashOps(id+"hash").put(name,content); } @ListenPoint(schema = "class19",table = "hot_news",eventType= DELETE)//對熱點信息表刪除 public void mytest4(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ Map<String,String> newData = new HashMap<>(); rowData.getBeforeColumnsList().forEach((c)->newData.put(c.getName(),c.getValue())); for (String s : newData.keySet()) { System.out.println(s+":"+String.valueOf(newData.get(s))); } String id = newData.get("id"); String content = newData.get("content"); String name = newData.get("name"); //stringRedisTemplate.delete(id); redisTemplate.delete(id+"hash"); } }
canal.client.instances.example.host=127.0.0.1 canal.client.instances.example.port=11111 canal.client.instances.example.batchSize=1000 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=czc spring.rabbitmq.password=qpalzm spring.redis.host=127.0.0.1 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/class19?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
經過測試沒有問題