延时队列-使用rabbitMq实现消费者延时消费


1  背景

  app服务端在入库一个单号的时候会推送一条消息到消息队列,之后由我这儿获取到消息以后去公司内部调取接口获得数据并入库。

2  问题

  现在出现一个问题,接口提供的数据由于需要数据入库----脚本同步数据到csv文件 间隔大概5分钟 --> 脚本读取csv文件到缓存 间隔大概也是五分钟,所以我在获取到消息队列的消息并去调取接口时,数据是不完整的。

3 解决

  1).  “生产者”(也可以是消费者再转发)把消息丢到一个无人消费的队列Q1,并为其设置存活时长,也就是需要延时的时长,例如10秒,

  2)  . 再为其设置消息死信交换机和死信路由,为了让消息死亡后可以通过设置的路由和路由键去往真正的消费队列Q2

4 代码

   首先是Q1

////获取app到件时的消息-延时队列
public static final String QUEUE_NEW_MANAGE_PUTSHIP_CACHE = "shipMiddleService-new-manage-PutShip-Cache";

//获取app到件时的消息
public static final String QUEUE_NEW_MANAGE_PUTSHIP = "shipMiddleService-new-manage-PutShip";

//Q1
@Bean
Queue shipMiddleServiceNewManagePutShipCache(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",10000);//ms
arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT);//指定一个普通的交换机
arguments.put("x-dead-letter-routing-key",QUEUE_NEW_MANAGE_PUTSHIP);//指定到Q2,Q2是真正监听的队列
return new Queue(QUEUE_NEW_MANAGE_PUTSHIP_CACHE,true,false,false,arguments);
}

//Q2
@Bean
Queue shipMiddleServiceNewManagePutShip(){
return new Queue(QUEUE_NEW_MANAGE_PUTSHIP);
}

//交换机
@Bean
DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_DIRECT);
}

//交换机绑定到Q1
@Bean
public Binding deadPutShipBinding(){
return BindingBuilder.bind(shipMiddleServiceNewManagePutShipCache()).to(directExchange()).with(QUEUE_NEW_MANAGE_PUTSHIP_CACHE);
}

//交换机绑定到Q2
@Bean
public Binding queueNewManagePutship() {
return BindingBuilder.bind(shipMiddleServiceNewManagePutShip()).to(directExchange()).with(QUEUE_NEW_MANAGE_PUTSHIP);
}



免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM