思路
canal感知sql的改变,作为消息的提供者将消息(图片的postion属性,指图片位于网页的位置)放到rabbitmq的队列,nginx作为消息的消费者,获取消息,并通过Lua脚本更新数据
第一步,将消息放到消息队列
启动类上加上 @EnableCanalClient //声明当前服务是canal的客户端
配置文件
canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128
编写rabbitmq的配置类
@Configuration
public class RabbitMQConfig {
//定义队列的名称
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//声明队列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}
注意Queue的包时springframework的amqp.core里的
编写监听类,监听canal的消息,并发送到mq
1 @CanalEventListener//声明当前的类是canal的监听类 2 public class BusinessListener { 3 4 @Autowired 5 private RabbitTemplate rabbitTemplate; 6 7 /** 8 * 9 * @param eventType 当前操作数据库的类型 10 * @param rowData 当前操作数据库的数据 changgou_business 11 */ 12 @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//声明监听哪个库的哪个表 13 public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { 14 System.err.println("广告数据发生变化"); 15 16 //修改前数据 17 //rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue())); 18 /* for(CanalEntry.Column column: rowData.getBeforeColumnsList()) { 19 if(column.getName().equals("position")){ 20 System.out.println("发送消息到mq ad_update_queue:"+column.getValue()); 21 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //发送消息到mq 22 break; 23 } 24 } */ 25 26 //修改后数据 27 //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+"::"+c.getValue())); 28 for(CanalEntry.Column column: rowData.getAfterColumnsList()) { 29 if(column.getName().equals("position")){ 30 System.out.println("发送消息到mq ad_update_queue:"+column.getValue()); 31 //发送消息到mq 没有交换机,路由Key直接写队列名 32 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); 33 break; 34 } 35 } 36 } 37 }
这样,当数据库发生改变,canal就会通知这个类,这个类就会把position这个消息放到消息队列里
第二步,监听消息队列,如果有消息,就通知nginx调用lua更新redis
在消息的消费者模块引入坐标
<!--用于从消息队列获取数据--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用于远程调用nginx--> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.9.0</version> </dependency>
创建监听者,代码如下
@Component public class AdListener { @RabbitListener(queues = "ad_update_queue") public void receiveMessage(String message){ System.out.println("接收到的消息未:" + message); //发起远程调用nginx,进行更新 OkHttpClient okHttpClient = new OkHttpClient(); String url = "http://192.168.200.128/ad_update?position"+message; Request request = new Request.Builder().url(url).build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { //如果请求失败 e.printStackTrace(); } @Override public void onResponse(Call call, Response response) throws IOException { //如果请求成功 System.out.println("请求成功:"+ response.message()); } }); } }
商城项目中还有商品行家同步到es的实例