rabbitmq 實現延遲隊列的兩種方式


原文地址:https://blog.csdn.net/u014308482/article/details/53036770

ps: 文章里面延遲隊列=延時隊列

什么是延遲隊列

延遲隊列存儲的對象肯定是對應的延時消息,所謂”延時消息”是指當消息被發送以后,並不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進行消費。

場景一:在訂單系統中,一個用戶下單之后通常有30分鍾的時間進行支付,如果30分鍾之內沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延時隊列將訂單信息發送到延時隊列。

場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發送到延時隊列,當指令設定的時間到了再將指令推送到只能設備。


RabbitMQ如何實現遲隊列

方法一

AMQP協議和RabbitMQ隊列本身沒有直接支持延遲隊列功能,但是可以通過以下特性模擬出延遲隊列的功能。 
但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲隊列:

RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為准),則消息變為dead letter(死信)

RabbitMQ針對隊列中的消息過期時間有兩種方法可以設置。

  • A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
  • B: 對消息進行單獨設置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為准。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead letter

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。

  • x-dead-letter-exchange:出現dead letter之后將dead letter重新發送到指定exchange
  • x-dead-letter-routing-key:出現dead letter之后將dead letter重新按照指定的routing-key發送

隊列出現dead letter的情況有:

  • 消息或者隊列的TTL過期

  • 隊列達到最大長度

  • 消息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

綜合上述兩個特性,設置了TTL規則之后當消息在一個隊列中變成死信時,利用DLX特性它能被重新轉發到另一個Exchange或者Routing Key,這時候消息就可以重新被消費了。

設置方法:

第一步:設置TTL產生死信,有兩種方式Per-Message TTL和 Queue TTL,第一種可以針對每一條消息設置一個過期時間使用於大多數場景,第二種針對隊列設置過期時間、適用於一次性延時任務的場景

還有其他產生死信的方式比如消費者拒絕消費 basic.reject 或者 basic.nack ( 前提要設置消費者的屬性requeue=false) 
- Per-Message TTL (對每一條消息設置一個過期時間)(官方文檔

java client發送一條只能駐留60秒的消息到隊列:

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000");//設置消息的過期時間為60秒 channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes); //這條消息發送到相應的隊列之后,如果60秒內沒有被消費,則變為死信
  • 1
  • 2
  • 3
  • 4
  • 5
  • Queue TTL (對整個隊列設置一個過期時間)

創建一個隊列,隊列的消息過期時間為30分鍾(這個隊列30分鍾內沒有消費者消費消息則刪除,刪除后隊列內的消息變為死信)

java client方式:

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args); rabbitmqctl命令方式(.* 為所有隊列, 可以替換為指定隊列): rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues rabbitmqctl (Windows): rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

第二步:設置死信的轉發規則(如果沒有任何規則,則直接丟棄死信) 
- Dead Letter Exchanges設置方法(官方文檔

Java Client方式:
//聲明一個直連模式的exchange channel.exchangeDeclare("some.exchange.name", "direct"); //聲明一個隊列,當myqueue隊列中有死信產生時,會轉發到交換器some.exchange.name Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); //如果設置死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設默認為消息發送到本隊列時用的routing key //args.put("x-dead-letter-routing-key", "some-routing-key"); channel.queueDeclare("myqueue", false, false, false, args); 命令行方式(.* 為所有隊列, 可以替換為指定隊列): 設置 "dead-letter-exchange" rabbitmqctl: rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues rabbitmqctl (Windows): rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues 設置 "dead-letter-routing-key" rabbitmqctl: rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queues rabbitmqctl (Windows): rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

方法二

在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上。

插件源碼地址: 
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下載地址: 
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安裝:

進入插件安裝目錄 
{rabbitmq-server}/plugins/(可以查看一下當前已存在的插件) 
下載插件 
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  • 1

(如果下載的文件名稱不規則就手動重命名一下如: 
rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange (關閉插件) rabbitmq-plugins disable rabbitmq_delayed_message_exchange
  • 1
  • 2
  • 3
  • 4

插件使用

通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性 
x-delayed-message是插件提供的類型,並不是rabbitmq本身的

// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5

發送消息的時候通過在header添加”x-delay”參數來控制消息的延時時間

// ... elided code ... byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); // ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用示例:

消息發送端:

import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { // 隊列名稱 private final static String EXCHANGE_NAME="delay_exchange"; private final static String ROUTING_KEY="key_delay"; @SuppressWarnings("deprecation") public static void main(String[] argv) throws Exception { /** * 創建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 聲明x-delayed-type類型的exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, args); Map<String, Object> headers = new HashMap<String, Object>(); //設置在2016/11/04,16:45:12向消費端推送本條消息 Date now = new Date(); Date timeToPublish = new Date("2016/11/04,16:45:12"); String readyToPushContent = "publish at " + sf.format(now) + " \t deliver at " + sf.format(timeToPublish); headers.put("x-delay", timeToPublish.getTime() - now.getTime()); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() .headers(headers); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(), readyToPushContent.getBytes()); // 關閉頻道和連接 channel.close(); connection.close(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

消息接收端:

import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Recv { // 隊列名稱 private final static String QUEUE_NAME = "delay_queue"; private final static String EXCHANGE_NAME="delay_exchange"; public static void main(String[] argv) throws Exception, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.queueDeclare(QUEUE_NAME, true,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE_NAME, true, queueingConsumer); SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); try { System.out.println("****************WAIT***************"); while(true){ QueueingConsumer.Delivery delivery = queueingConsumer .nextDelivery(); // String message = (new String(delivery.getBody())); System.out.println("message:"+message); System.out.println("now:\t"+sf.format(new Date())); } } catch (Exception exception) { exception.printStackTrace(); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

啟動接收端,啟動發送端 
運行結果:

****************WAIT*************** message:publish at 2016-11-04 16:44:16.887 deliver at 2016-11-04 16:45:12.000 now: 2016-11-04 16:45:12.023
  • 1
  • 2
  • 3

結果顯示在我們2016-11-04 16:45:12.023接收到了消息,距離我們設定的時間2016-11-04 16:45:12.023有23毫秒的延遲

Note:使用rabbitmq-delayed-message-exchange插件時發送到隊列的消息數量在web管理界面可能不可見,不影響正常功能使用

Note :使用過程中發現,當一台啟用了rabbitmq-delayed-message-exchange插件的RAM節點在重啟的時候會無法啟動,查看日志發現了一個Timeout異常,開發者解釋說這是節點在啟動過程會同步集群相關數據造成啟動超時,並建議不要使用Ram節點

插件開發者: 
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM