延時隊列-使用rabbitMq實現消費者延時消費


1  背景

  app服務端在入庫一個單號的時候會推送一條消息到消息隊列,之后由我這兒獲取到消息以后去公司內部調取接口獲得數據並入庫。

2  問題

  現在出現一個問題,接口提供的數據由於需要數據入庫----腳本同步數據到csv文件 間隔大概5分鍾 --> 腳本讀取csv文件到緩存 間隔大概也是五分鍾,所以我在獲取到消息隊列的消息並去調取接口時,數據是不完整的。

3 解決

  1).  “生產者”(也可以是消費者再轉發)把消息丟到一個無人消費的隊列Q1,並為其設置存活時長,也就是需要延時的時長,例如10秒,

  2)  . 再為其設置消息死信交換機和死信路由,為了讓消息死亡后可以通過設置的路由和路由鍵去往真正的消費隊列Q2

4 代碼

   首先是Q1

////獲取app到件時的消息-延時隊列
public static final String QUEUE_NEW_MANAGE_PUTSHIP_CACHE = "shipMiddleService-new-manage-PutShip-Cache";

//獲取app到件時的消息
public static final String QUEUE_NEW_MANAGE_PUTSHIP = "shipMiddleService-new-manage-PutShip";

//Q1
@Bean
Queue shipMiddleServiceNewManagePutShipCache(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",10000);//ms
arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT);//指定一個普通的交換機
arguments.put("x-dead-letter-routing-key",QUEUE_NEW_MANAGE_PUTSHIP);//指定到Q2,Q2是真正監聽的隊列
return new Queue(QUEUE_NEW_MANAGE_PUTSHIP_CACHE,true,false,false,arguments);
}

//Q2
@Bean
Queue shipMiddleServiceNewManagePutShip(){
return new Queue(QUEUE_NEW_MANAGE_PUTSHIP);
}

//交換機
@Bean
DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_DIRECT);
}

//交換機綁定到Q1
@Bean
public Binding deadPutShipBinding(){
return BindingBuilder.bind(shipMiddleServiceNewManagePutShipCache()).to(directExchange()).with(QUEUE_NEW_MANAGE_PUTSHIP_CACHE);
}

//交換機綁定到Q2
@Bean
public Binding queueNewManagePutship() {
return BindingBuilder.bind(shipMiddleServiceNewManagePutShip()).to(directExchange()).with(QUEUE_NEW_MANAGE_PUTSHIP);
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM