應用場景
通常在應用開發中我們會碰到定時任務的需求,比如未付款訂單,超過一定時間后,系統自動取消訂單並釋放占有物品。
許多同學的第一反應就是通過spring的schedule定時任務輪詢數據庫來實現,這種方案有一下幾點劣勢:
(1)消耗系統內存,由於定時任務一直在系統中占着進程,比較消耗內存
(2)增加了數據庫的壓力,這個提現在兩方面,一是長時間占着數據庫的連接,而是查詢基數大
(3)存在較大的時間誤差
如果我們利用第三方插件如rabbitmq來實現,就可以解決以上幾種問題。
對於任務的執行時間通常都是有規律性的,可能是每隔半小時執行一次,或者每天凌晨一點執行一次。然而實際業務中還存在另外一種定時任務,它可能需要一些觸發條件才開始定時,比如:編寫博文時候,設置2小時之后發送。對於這些開始時間不確定的定時任務,我們也可以通過Spring Cloud Stream來很好的處理。
為了實現開始時間不確定的定時任務觸發,我們將引入延遲消息的使用。RabbitMQ中提供了關於延遲消息的插件,所以本文就來具體介紹以下如何利用Spring Cloud Stream以及RabbitMQ輕松的處理上述問題。
RabbitMQ延遲消息的插件安裝
關於RabbitMQ延遲消息的插件介紹可以查看官方網站:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
安裝方式很簡單,只需要在這個頁面:http://www.rabbitmq.com/community-plugins.html 中找到rabbitmq_delayed_message_exchange
插件,根據您使用的RabbitMQ版本選擇對應的插件版本下載即可。
注意:只有RabbitMQ 3.6.x以上才支持
在下載好之后,解壓得到.ez
結尾的插件包,將其復制到RabbitMQ安裝目錄下的plugins
文件夾。
然后cd到sbin目錄下,啟動插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟動成功,不需要重啟rabbitmq
接下來,代碼使用
config中心加入配置
# 消息隊列配置
spring:
cloud:
# spring cloud strem 消息隊列分組持久化(Input輸入通道)
stream:
rabbit:
bindings:
commodityOrderInvalidInput:
consumer:
delayed-exchange: true
commodityOrderInvalidOutput:
producer:
delayed-exchange: true
bindings:
#延時的待付款訂單
commodityOrderInvalidInput: #通道名
group: commodityOrderInvalidGroup #組名
destination: commodityOrderInvalidTheme #主題名
commodityOrderInvalidOutput:
destination: commodityOrderInvalidTheme #指定生產者的通道主題
spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange
,用來開啟延遲消息的功能,這樣在創建exchange的時候,會將其設置為具有延遲特性的exchange,也就是用到上面我們安裝的延遲消息插件的功能。
spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true
消費端
生產端
一條消息的頭信息中包含了x-delay
字段,該字段用來指定消息延遲的時間,單位為毫秒。所以上述代碼發送的消息會在7秒之后被消費
可以發現,該通道的type不一樣
如果報以下錯誤
ERROR [o.s.a.rabbit.connection.CachingConnectionFactory] CachingConnectionFactory.java:1517 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'commodityOrderInvalidTheme' in vhost '/': received ''x-delayed-message'' but current is 'topic', class-id=40, method-id=10)
那是之前沒配置正確,通道創建的type類型是topic的
解決方式,檢查配置是否正確,修改后,刪除之前的通道或者修改通道的名稱,重新啟動創建通道,查看rabbitmq控制台,通道type類型為 x-delayed-message,說明成功了