canal实现当数据库改变时,同步数据到redis


思路

canal感知sql的改变,作为消息的提供者将消息(图片的postion属性,指图片位于网页的位置)放到rabbitmq的队列,nginx作为消息的消费者,获取消息,并通过Lua脚本更新数据

第一步,将消息放到消息队列

  启动类上加上 @EnableCanalClient //声明当前服务是canal的客户端

   配置文件

canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128

  编写rabbitmq的配置类  

@Configuration
public class RabbitMQConfig {
//定义队列的名称
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//声明队列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}

注意Queue的包时springframework的amqp.core里的

  编写监听类,监听canal的消息,并发送到mq

 1 @CanalEventListener//声明当前的类是canal的监听类
 2 public class BusinessListener {
 3 
 4     @Autowired
 5     private RabbitTemplate rabbitTemplate;
 6 
 7     /**
 8      *
 9      * @param eventType  当前操作数据库的类型
10      * @param rowData   当前操作数据库的数据 changgou_business
11      */
12     @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//声明监听哪个库的哪个表
13     public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
14         System.err.println("广告数据发生变化");
15 
16         //修改前数据
17         //rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));
18         /* for(CanalEntry.Column column: rowData.getBeforeColumnsList()) {
19             if(column.getName().equals("position")){
20                 System.out.println("发送消息到mq  ad_update_queue:"+column.getValue());
21                 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue());  //发送消息到mq
22                 break;
23             }
24         } */
25 
26         //修改后数据
27         //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+"::"+c.getValue()));
28         for(CanalEntry.Column column: rowData.getAfterColumnsList()) {
29             if(column.getName().equals("position")){
30                 System.out.println("发送消息到mq  ad_update_queue:"+column.getValue());
31                 //发送消息到mq 没有交换机,路由Key直接写队列名
32                 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue());
33                 break;
34             }
35         }
36     }
37 }

  这样,当数据库发生改变,canal就会通知这个类,这个类就会把position这个消息放到消息队列里

第二步,监听消息队列,如果有消息,就通知nginx调用lua更新redis

  在消息的消费者模块引入坐标

 <!--用于从消息队列获取数据-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--用于远程调用nginx-->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.9.0</version>
        </dependency>

 

  创建监听者,代码如下

@Component
public class AdListener {

    @RabbitListener(queues = "ad_update_queue")
    public void receiveMessage(String message){
        System.out.println("接收到的消息未:" + message);

        //发起远程调用nginx,进行更新
        OkHttpClient okHttpClient = new OkHttpClient();
        String url = "http://192.168.200.128/ad_update?position"+message;
        Request request = new Request.Builder().url(url).build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                //如果请求失败
                e.printStackTrace();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                //如果请求成功
                System.out.println("请求成功:"+ response.message());
            }
        });

    }
}

 

商城项目中还有商品行家同步到es的实例

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM