Spring Cloud Stream 使用延遲消息實現定時任務(RabbitMQ)


應用場景

通常在應用開發中我們會碰到定時任務的需求,比如未付款訂單,超過一定時間后,系統自動取消訂單並釋放占有物品。

許多同學的第一反應就是通過spring的schedule定時任務輪詢數據庫來實現,這種方案有一下幾點劣勢:

(1)消耗系統內存,由於定時任務一直在系統中占着進程,比較消耗內存

(2)增加了數據庫的壓力,這個提現在兩方面,一是長時間占着數據庫的連接,而是查詢基數大

(3)存在較大的時間誤差

如果我們利用第三方插件如rabbitmq來實現,就可以解決以上幾種問題。

對於任務的執行時間通常都是有規律性的,可能是每隔半小時執行一次,或者每天凌晨一點執行一次。然而實際業務中還存在另外一種定時任務,它可能需要一些觸發條件才開始定時,比如:編寫博文時候,設置2小時之后發送。對於這些開始時間不確定的定時任務,我們也可以通過Spring Cloud Stream來很好的處理。

為了實現開始時間不確定的定時任務觸發,我們將引入延遲消息的使用。RabbitMQ中提供了關於延遲消息的插件,所以本文就來具體介紹以下如何利用Spring Cloud Stream以及RabbitMQ輕松的處理上述問題。

RabbitMQ延遲消息的插件安裝

關於RabbitMQ延遲消息的插件介紹可以查看官方網站:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

安裝方式很簡單,只需要在這個頁面:http://www.rabbitmq.com/community-plugins.html 中找到rabbitmq_delayed_message_exchange插件,根據您使用的RabbitMQ版本選擇對應的插件版本下載即可。

注意:只有RabbitMQ 3.6.x以上才支持

在下載好之后,解壓得到.ez結尾的插件包,將其復制到RabbitMQ安裝目錄下的plugins文件夾。

 

 然后cd到sbin目錄下,啟動插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

 

 啟動成功,不需要重啟rabbitmq

 

接下來,代碼使用

config中心加入配置

# 消息隊列配置
spring:
  cloud:
#    spring cloud strem  消息隊列分組持久化(Input輸入通道)
    stream:
      rabbit:
        bindings:
          commodityOrderInvalidInput:
            consumer:
              delayed-exchange: true
          commodityOrderInvalidOutput:
            producer:
              delayed-exchange: true
      bindings:
        #延時的待付款訂單
        commodityOrderInvalidInput: #通道名
          group: commodityOrderInvalidGroup #組名
          destination: commodityOrderInvalidTheme #主題名
        commodityOrderInvalidOutput:
          destination: commodityOrderInvalidTheme #指定生產者的通道主題
注意這里的一個新參數spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用來開啟延遲消息的功能,這樣在創建exchange的時候,會將其設置為具有延遲特性的exchange,也就是用到上面我們安裝的延遲消息插件的功能。
在消費端也一樣,需要設置spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true

 

 

 

 

消費端

 

 

 生產端

 

 一條消息的頭信息中包含了x-delay字段,該字段用來指定消息延遲的時間,單位為毫秒。所以上述代碼發送的消息會在7秒之后被消費

 

 

可以發現,該通道的type不一樣

如果報以下錯誤

ERROR [o.s.a.rabbit.connection.CachingConnectionFactory] CachingConnectionFactory.java:1517 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'commodityOrderInvalidTheme' in vhost '/': received ''x-delayed-message'' but current is 'topic', class-id=40, method-id=10)

 

那是之前沒配置正確,通道創建的type類型是topic的

解決方式,檢查配置是否正確,修改后,刪除之前的通道或者修改通道的名稱,重新啟動創建通道,查看rabbitmq控制台,通道type類型為 x-delayed-message,說明成功了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM