canal解決緩存穿透 對數據庫同步數據至redis 或EleasticSearch


canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。

起源:早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。

工作原理:mysql 的主從備份

偽裝自己是mysql 的slave 解析binlog 

 

開啟mysql 的binlog

找到mysql的my.ini文件

打開文件后,添加以下內容

server_id=1 ###代表集群模式第一台機器
binlog_format=ROW ###行模式
log_bin=mysql_bin.log ###binlog的文件名稱

 


執行sql語句
show variables like '%log_bin%'

查看是否開啟binlog

create user canal identified by ‘canal’;

grant select,replication slave,replication client on*.* to 'canal'@'%';
grant all privileges on . TO 'canal'@'%';
flush privileges

 

創建新的mysql用戶  並賦予權限

下載canal服務https://github.com/alibaba/canal/releases/

打開canal服務conf下的配置文件canal.properties

這里配置端口號

復制conf目錄下的example 

粘貼到此目錄下 修改文件名

進入你新建的文件夾 打開instance.properties

接着運行bin目錄下的starup.bat

如果錯誤  刪除starup.bat 中11行的@Rem

自此canal 的服務 就已經配置好了 接着需要搭建一個canal的客戶端工程

package com.aila;

import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: {---chenzhichao---}
 * @Date: 2020/6/2 15:11
 */
@SpringBootApplication
@EnableCanalClient
public class MyCanalApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyCanalApplication.class, args);
    }

}
package com.aila.Listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.HashMap;
import java.util.Map;

import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.DELETE;
import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.INSERT;
import static com.alibaba.otter.canal.protocol.CanalEntry.EventType.UPDATE;

/**
 * @Author: {---chenzhichao---}
 * @Date: 2020/6/8 11:00
 */
@CanalEventListener
public class NewsListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    @ListenPoint(schema = "class19",table = "city")
    public void mytest1(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        Map<String,String> newData = new HashMap<>();
        rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue()));
        for (String s : newData.keySet()) {
            System.out.println(s+":"+String.valueOf(newData.get(s)));
        }
    }
    @ListenPoint(schema = "class19",table = "hot_news")
    public void mytest2(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
            switch (eventType) {
                case INSERT:
                    System.out.println("INSERT ");
                    break;
                case UPDATE:
                    System.out.println("UPDATE ");
                    break;
                case DELETE:
                    System.out.println("DELETE ");
                    break;
                default:
                    break;
            }
    }
    @ListenPoint(schema = "class19",table = "hot_news",
            eventType= {INSERT, UPDATE})//對熱點信息表新增或更新
    public void mytest3(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        Map<String,String> newData = new HashMap<>();
        rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue()));
        for (String s : newData.keySet()) {
            System.out.println(s+":"+String.valueOf(newData.get(s)));
        }
        String id = newData.get("id");
        String content = newData.get("content");
        String name = newData.get("name");
        //stringRedisTemplate.boundValueOps(id).set(content);
        redisTemplate.boundHashOps(id+"hash").put(name,content);
    }
    @ListenPoint(schema = "class19",table = "hot_news",eventType= DELETE)//對熱點信息表刪除
    public void mytest4(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        Map<String,String> newData = new HashMap<>();
        rowData.getBeforeColumnsList().forEach((c)->newData.put(c.getName(),c.getValue()));
        for (String s : newData.keySet()) {
            System.out.println(s+":"+String.valueOf(newData.get(s)));
        }
        String id = newData.get("id");
        String content = newData.get("content");
        String name = newData.get("name");
        //stringRedisTemplate.delete(id);
        redisTemplate.delete(id+"hash");
    }

}
canal.client.instances.example.host=127.0.0.1
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=czc
spring.rabbitmq.password=qpalzm
spring.redis.host=127.0.0.1
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/class19?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC

 

經過測試沒有問題  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM