摘要:
本篇博文是“Java秒殺系統實戰系列文章”的第十篇,本篇博文我們將采用RabbitMQ的死信隊列的方式處理“用戶秒殺成功生成訂單后,卻遲遲沒有支付”的情況,一起來見識一下RabbitMQ死信隊列在實際業務環境下的強大之處!
內容:
對於消息中間件RabbitMQ,Debug其實在前面的篇章中已經簡單分享介紹過了,在這里就不再贅述了!在本文我們將采用RabbitMQ的死信隊列實現這樣的業務需求:“用戶在秒殺成功並成功創建一筆訂單記錄后,理論上應該是執行去支付的操作,但是卻存在着一種情況是用戶遲遲不肯去支付~至於原因,不得而知!”
對於這種場景,各位小伙伴可以在一些商城平台體驗一下,即挑選完商品,加入購物車后,點擊去結算,這個時候會有個倒計時,提醒你需要在指定的時間內完成付款,否則訂單將失效!
對於這種業務邏輯的處理,傳統的做法是采用“定時器的方式”,定時輪詢獲取已經超過指定時間的訂單,然后執行一系列的處理措施(比如再爭取給用戶發送短信,提醒超過多長時間訂單就要失效了等等。。。),在這個秒殺系統中,我們將借助RabbitMQ死信隊列這一組件,對該訂單執行“失效”的措施!
“死信隊列”,顧明思議,是可以延時、延遲一定的時間再處理消息的一種特殊隊列,它相對於“普通的隊列”而言,可以實現“進入死信隊列的消息不立即處理,而是可以等待一定的時間再進行處理”的功能!而普通的隊列則不行,即進入隊列后的消息會立即被對應的消費者監聽消費,如下圖所示為普通隊列的基本消息模型:
而對於“死信隊列”,它的構成以及使用相對而言比較復雜一點,在正常情況,死信隊列由三大核心組件組成:死信交換機+死信路由+TTL(消息存活時間~非必需的),而死信隊列又可以由“面向生產者的基本交換機+基本路由”綁定而成,故而生產者首先是將消息發送至“基本交換機+基本路由”所綁定而成的消息模型中,即間接性地進入到死信隊列中,當過了TTL,消息將“掛掉”,從而進入下一個中轉站,即“面下那個消費者的死信交換機+死信路由”所綁定而成的消息模型中。如下圖所示:
下面,我們以實際的代碼來構建死信隊列的消息模型,並將此消息模型應用到秒殺系統的上述功能模塊中。
(1)首先,需要在RabbitmqConfig配置類創建死信隊列的消息模型,其完整的源代碼如下所示:
//構建秒殺成功之后-訂單超時未支付的死信隊列消息模型
@Bean public Queue successKillDeadQueue(){ Map<String, Object> argsMap= Maps.newHashMap(); argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange")); argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key")); return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap); } //基本交換機
@Bean public TopicExchange successKillDeadProdExchange(){ return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false); } //創建基本交換機+基本路由 -> 死信隊列 的綁定
@Bean public Binding successKillDeadProdBinding(){ return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key")); } //真正的隊列
@Bean public Queue successKillRealQueue(){ return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true); } //死信交換機
@Bean public TopicExchange successKillDeadExchange(){ return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false); } //死信交換機+死信路由->真正隊列 的綁定
@Bean public Binding successKillDeadBinding(){ return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key")); }
其中,環境變量對象實例env讀取的變量是配置在application.properties配置文件中的,取值如下所示:
#訂單超時未支付自動失效-死信隊列消息模型 mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key #單位為ms mq.kill.item.success.kill.expire=20000
(2)成功創建了消息模型之后,緊接着,我們需要在通用的RabbitMQ發送消息服務類RabbitSenderService中開發“發送消息入死信隊列”的功能,在該功能方法中,我們指定了消息的存活時間TTL,取值為配置的變量:mq.kill.item.success.kill.expire 的值,即20s;其完整的源代碼如下所示:
//秒殺成功后生成搶購訂單-發送信息入死信隊列,等待着一定時間失效超時未支付的訂單
public void sendKillSuccessOrderExpireMsg(final String orderCode){ try { if (StringUtils.isNotBlank(orderCode)){ KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode); if (info!=null){ rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange")); rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key")); rabbitTemplate.convertAndSend(info, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties mp=message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class); //TODO:動態設置TTL(為了測試方便,暫且設置20s)
mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire")); return message; } }); } } }catch (Exception e){ log.error("秒殺成功后生成搶購訂單-發送信息入死信隊列,等待着一定時間失效超時未支付的訂單-發生異常,消息為:{}",orderCode,e.fillInStackTrace()); } }
從該“發送消息入死信隊列”的代碼中,我們可以看到,消息首先是先入到“基本交換機+基本路由”所綁定的死信隊列的消息模型中的!當消息到了TTL,自然會從死信隊列中出來(即“解脫了”),然后進入下一個中轉站,即:“死信交換機+死信路由” 所綁定而成的真正隊列的消息模型中,最終真正被消費者監聽消費!
此時,可以將整個項目、系統運行在外置的tomcat服務器中,然后打開RabbitMQ后端控制台應用,找到該死信隊列,可以看到該死信隊列的詳細信息,如下圖所示:
(3)最后,是需要在RabbitMQ通用的消息監聽服務類RabbitReceiverService 中監聽“真正隊列”中的消息並進行處理:在這里我們是對該訂單進行失效處理(前提是還沒付款的情況下!),其完整的源代碼如下所示:
//用戶秒殺成功后超時未支付-監聽者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer") public void consumeExpireOrder(KillSuccessUserInfo info){ try { log.info("用戶秒殺成功后超時未支付-監聽者-接收消息:{}",info); if (info!=null){ ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode()); if (entity!=null && entity.getStatus().intValue()==0){ itemKillSuccessMapper.expireOrder(info.getCode()); } } }catch (Exception e){ log.error("用戶秒殺成功后超時未支付-監聽者-發生異常:",e.fillInStackTrace()); } }
其中,失效更新訂單的記錄的操作由 itemKillSuccessMapper.expireOrder(info.getCode()); 來實現,其對應的動態Sql的寫法如下所示:
<!--失效更新訂單信息-->
<update id="expireOrder"> UPDATE item_kill_success SET status = -1 WHERE code = #{code} AND status = 0
</update>
(4)至此,關於RabbitMQ死信隊列消息模型的代碼實戰已經完畢了!最后我只需要在“用戶秒殺成功創建訂單的那一刻,發送消息入死信隊列”的地方調用即可,其調用代碼如下所示:
/** * 通用的方法-記錄用戶秒殺成功后生成的訂單-並進行異步郵件消息的通知 * @param kill * @param userId * @throws Exception */
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{ //TODO:記錄搶購成功后生成的秒殺訂單記錄
ItemKillSuccess entity=new ItemKillSuccess(); String orderNo=String.valueOf(snowFlake.nextId()); //entity.setCode(RandomUtil.generateOrderCode()); //傳統時間戳+N位隨機數
entity.setCode(orderNo); //雪花算法
entity.setItemId(kill.getItemId()); entity.setKillId(kill.getId()); entity.setUserId(userId.toString()); entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue()); entity.setCreateTime(DateTime.now().toDate()); //TODO:學以致用,舉一反三 -> 仿照單例模式的雙重檢驗鎖寫法
if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){ int res=itemKillSuccessMapper.insertSelective(entity); if (res>0){ //TODO:進行異步郵件消息的通知=rabbitmq+mail
rabbitSenderService.sendKillSuccessEmailMsg(orderNo); //TODO:入死信隊列,用於 “失效” 超過指定的TTL時間時仍然未支付的訂單
rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo); } } }
最后,是進行自測:點擊“搶購”按鈕,用戶秒殺成功后,會發送一條消息入死信隊列(這一點可以在RabbitMQ后端控制台中可以看到一條正Ready好的消息),等待20s,即可看到消息轉移到真正的隊列,並被真正的消費者監聽消費,如下所示:
好了,關於“RabbitMQ死信隊列”的介紹以及應用實戰本文就暫且介紹到這里了,此種方式可以很靈活對“超時未支付的訂單”,進行很好的處理,而且整個過程是“自動、自然”的,而無需人為去手動點擊按鈕觸發了!當然啦,萬事萬物都並非十全十美的,死信隊列也是如此,在一篇文章中我們將介紹此種方式的瑕疵之處,並采用相應的解決方案進行處理!
補充:
1、目前,這一秒殺系統的整體構建與代碼實戰已經全部完成了,完整的源代碼數據庫地址可以來這里下載:gitee.com/steadyjack/… 記得Fork跟Star啊!! 技術交流群:605610429(Java實戰基地交流1群)
2、最后,不要忘記了關注一下Debug的技術微信公眾號: