- 用戶下單,需要在訂單的有效截止時間前30分鍾,提醒用戶去使用。同時在到達有效截止時間,要將訂單狀態設置為失效。這時候可以用延時隊列可以很好的解決,用戶下單之后,計算出結束時間前半個小時的時長,發送一條延時消息提醒用戶使用。訂單結束的時長發送訂單已經失效的消息。
入口
/**
* 爆品助力狀態提醒
*
* @param req 爆品助力失敗
*/
@RequestMapping(path = "/mq/product/sendProductHelpStatusMessage", method = RequestMethod.POST)
Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req);
生產者
@Override
public Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
req.put("sendTime",sdf.format(new Date()));
String beanToJson = JsonUtils.beanToJson(req);
log.info("sendProductHelpStatusMessage:{}",beanToJson);
productHelpStatusMessageChannel.productHelpStatusOutput().send(MessageBuilder.withPayload(beanToJson).setHeader("x-delay", req.get("delay")).build());
return 1;
}
將消息發送出去,延時delay毫秒,同時記錄下消息發送的時間。這樣就可以根據傳遞的參數來確定延時的具體時長。
消費者
package org.xxx.mq.provider.consumer.product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.xxx.mq.api.channel.consumer.ProductHelpStatusMessageChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
@Slf4j
@EnableBinding(value = {ProductHelpStatusMessageChannel.class})
public class ProductHelpStatusConsumer {
@StreamListener(target = ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
public void receiveProductHelpStatusWxMessage(String message){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("receiveProductHelpStatusWxMessage:{},receiveTime,{}",message,sdf.format(new Date()));
}
}
接受消息,同時記錄下接受消息的時間。
通道
package org.xxx.mq.api.channel.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface ProductHelpStatusMessageChannel {
//爆品助力
String PRODUCT_HELP_STATUS_OUTPUT = "productHelpStatusOutput";
String PRODUCT_HELP_STATUS_WX_INPUT = "productHelpStatusWxInput";
/**
* 爆品助力消息發送通道
*
* @return
*/
@Output(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_OUTPUT)
MessageChannel productHelpStatusOutput();
/**
* 爆品助力消息訂閱(微信消息)
*
* @return SubscribableChannel,消息訂閱通道
*/
@Input(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
SubscribableChannel getProductHelpStatusWxInputChannel();
}
配置application.yml
spring:
datasource:
hikari:
maximum-pool-size: 50
minimum-idle: 50
cloud:
stream:
rabbit:
bindings:
#訂閱通道 測試通道
productHelpStatusOutput:
producer:
delayed-exchange: true
productHelpStatusWxInput:
consumer:
auto-bind-dlq: true
republishToDlq: true
requeueRejected: true
delayed-exchange: true
dlq-ttl: ${queue.dlq.ttl}
dlq-dead-letter-exchange:
bindings:
#生產者 爆品助力狀態消息發送通道
productHelpStatusOutput:
destination: productHelpStatusExchange
group: productHelpFailQueueGroup
#消費者
productHelpStatusWxInput:
destination: productHelpStatusExchange
group: productHelpStatusWxGroup
consumer:
max-attempts: 3
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
需要配置等等。。。如上,
spring.cloud.stream.rabbit.bindings.productHelpStatusOutput.producer.delayed-exchange=true
spring.cloud.stream.bindings.productHelpStatusOutput.producer.delayed-exchange=true
測試
打開postman,請求接口,
{
"delay": 10000,
"orderSn":1
}
orderSn訂單和delay延時時間(單位為毫秒)。請求orderSn=1的延時10秒delay=10000,orderSn=2的延時5秒delay=5000
2020-02-21 17:03:45.125 INFO [mq,b4e13b1dc86b0d7f,b4e13b1dc86b0d7f,true] 90220 --- [0.0-1205-exec-3] o.a.m.p.controller.ProductMqController : sendProductHelpStatusMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"}
2020-02-21 17:03:51.769 INFO [mq,820007cc648d3e43,820007cc648d3e43,true] 90220 --- [0.0-1205-exec-4] o.a.m.p.controller.ProductMqController : sendProductHelpStatusMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"}
2020-02-21 17:03:55.369 INFO [mq,b4e13b1dc86b0d7f,2b113702a181d49b,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer : receiveProductHelpStatusWxMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"},receiveTime,2020-02-21 17:03:55
2020-02-21 17:03:56.847 INFO [mq,820007cc648d3e43,60f5a71795043ac2,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer : receiveProductHelpStatusWxMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"},receiveTime,2020-02-21 17:03:56
如上訂單1在 17:03:45
發送消息,10秒后17:03:55
消費者受到消息。訂單2也在5秒后受到消息。