思路
canal感知sql的改變,作為消息的提供者將消息(圖片的postion屬性,指圖片位於網頁的位置)放到rabbitmq的隊列,nginx作為消息的消費者,獲取消息,並通過Lua腳本更新數據
第一步,將消息放到消息隊列
啟動類上加上 @EnableCanalClient //聲明當前服務是canal的客戶端
配置文件
canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128
編寫rabbitmq的配置類
@Configuration
public class RabbitMQConfig {
//定義隊列的名稱
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//聲明隊列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}
注意Queue的包時springframework的amqp.core里的
編寫監聽類,監聽canal的消息,並發送到mq
1 @CanalEventListener//聲明當前的類是canal的監聽類 2 public class BusinessListener { 3 4 @Autowired 5 private RabbitTemplate rabbitTemplate; 6 7 /** 8 * 9 * @param eventType 當前操作數據庫的類型 10 * @param rowData 當前操作數據庫的數據 changgou_business 11 */ 12 @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//聲明監聽哪個庫的哪個表 13 public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { 14 System.err.println("廣告數據發生變化"); 15 16 //修改前數據 17 //rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的數據:"+c.getName()+"::"+c.getValue())); 18 /* for(CanalEntry.Column column: rowData.getBeforeColumnsList()) { 19 if(column.getName().equals("position")){ 20 System.out.println("發送消息到mq ad_update_queue:"+column.getValue()); 21 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //發送消息到mq 22 break; 23 } 24 } */ 25 26 //修改后數據 27 //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改變后的數據"+c.getName()+"::"+c.getValue())); 28 for(CanalEntry.Column column: rowData.getAfterColumnsList()) { 29 if(column.getName().equals("position")){ 30 System.out.println("發送消息到mq ad_update_queue:"+column.getValue()); 31 //發送消息到mq 沒有交換機,路由Key直接寫隊列名 32 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); 33 break; 34 } 35 } 36 } 37 }
這樣,當數據庫發生改變,canal就會通知這個類,這個類就會把position這個消息放到消息隊列里
第二步,監聽消息隊列,如果有消息,就通知nginx調用lua更新redis
在消息的消費者模塊引入坐標
<!--用於從消息隊列獲取數據--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用於遠程調用nginx--> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.9.0</version> </dependency>
創建監聽者,代碼如下
@Component public class AdListener { @RabbitListener(queues = "ad_update_queue") public void receiveMessage(String message){ System.out.println("接收到的消息未:" + message); //發起遠程調用nginx,進行更新 OkHttpClient okHttpClient = new OkHttpClient(); String url = "http://192.168.200.128/ad_update?position"+message; Request request = new Request.Builder().url(url).build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { //如果請求失敗 e.printStackTrace(); } @Override public void onResponse(Call call, Response response) throws IOException { //如果請求成功 System.out.println("請求成功:"+ response.message()); } }); } }
商城項目中還有商品行家同步到es的實例