http://blog.csdn.net/zhu_tianwei/article/details/53563311
在實際的業務中我們會遇見生產者產生的消息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支持延遲隊列功能,但是我們可以根據其特性Per-Queue Message TTL和 Dead Letter Exchanges實現延時隊列。也可以通過改特性設置消息的優先級。
1.Per-Queue Message TTL
RabbitMQ可以針對消息和隊列設置TTL(過期時間)。隊列中的消息過期時間(Time To Live, TTL)有兩種方法可以設置。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息進行單獨設置,每條消息TTL可以不同。如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為准。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead message,消費者將無法再收到該消息。
2.Dead Letter Exchanges
當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。消息變成Dead Letter一向有以下幾種情況:
消息被拒絕(basic.reject or basic.nack)並且requeue=false
消息TTL過期
隊列達到最大長度
實際上就是設置某個隊列的屬性,當這個隊列中有Dead Letter時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列,publish可以監聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支持的immediate參數中的向publish確認的功能。
一、在隊列上設置TTL
1.建立delay.exchange
這里Internal設置為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
2.建立延時隊列(delay queue)
如上配置延時5min隊列(x-message-ttl=300000)
x-max-length:最大積壓的消息個數,可以根據自己的實際情況設置,超過限制消息不會丟失,會立即轉向delay.exchange進行投遞
x-dead-letter-exchange:設置為剛剛配置好的delay.exchange,消息過期后會通過delay.exchange進行投遞
這里不需要配置"dead letter routing key"否則會覆蓋掉消息發送時攜帶的routingkey,導致后面無法路由為剛才配置的delay.exchange
3.配置延時路由規則
需要延時的消息到exchange后先路由到指定的延時隊列
1)創建delaysync.exchange通過Routing key將消息路由到延時隊列
2.配置delay.exchange 將消息投遞到正常的消費隊列
配置完成。
下面使用代碼測試一下:
生產者:
- package cn.slimsmart.study.rabbitmq.delayqueue.queue;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class Producer {
- private static String queue_name = "test.queue";
- public static void main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.1.199.169");
- factory.setUsername("admin");
- factory.setPassword("123456");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明隊列
- channel.queueDeclare(queue_name, true, false, false, null);
- String message = "hello world!" + System.currentTimeMillis();
- channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
- System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
- // 關閉頻道和連接
- channel.close();
- connection.close();
- }
- }
消費者:
- package cn.slimsmart.study.rabbitmq.delayqueue.queue;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Consumer {
- private static String queue_name = "test.queue";
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.1.199.169");
- factory.setUsername("admin");
- factory.setPassword("123456");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明隊列
- channel.queueDeclare(queue_name, true, false, false, null);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定消費隊列
- channel.basicConsume(queue_name, true, consumer);
- while (true) {
- // nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
- }
- }
- }
二、在消息上設置TTL
實現代碼:
生產者:
- package cn.slimsmart.study.rabbitmq.delayqueue.message;
- import java.io.IOException;
- import java.util.HashMap;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class Producer {
- private static String queue_name = "message_ttl_queue";
- public static void main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.1.199.169");
- factory.setUsername("admin");
- factory.setPassword("123456");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- HashMap<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("x-dead-letter-exchange", "amq.direct");
- arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
- channel.queueDeclare("delay_queue", true, false, false, arguments);
- // 聲明隊列
- channel.queueDeclare(queue_name, true, false, false, null);
- // 綁定路由
- channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
- String message = "hello world!" + System.currentTimeMillis();
- // 設置延時屬性
- AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- // 持久性 non-persistent (1) or persistent (2)
- AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
- // routingKey =delay_queue 進行轉發
- channel.basicPublish("", "delay_queue", properties, message.getBytes());
- System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
- // 關閉頻道和連接
- channel.close();
- connection.close();
- }
- }
消費者:
- package cn.slimsmart.study.rabbitmq.delayqueue.message;
- import java.util.HashMap;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Consumer {
- private static String queue_name = "message_ttl_queue";
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.1.199.169");
- factory.setUsername("admin");
- factory.setPassword("123456");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- HashMap<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("x-dead-letter-exchange", "amq.direct");
- arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
- channel.queueDeclare("delay_queue", true, false, false, arguments);
- // 聲明隊列
- channel.queueDeclare(queue_name, true, false, false, null);
- // 綁定路由
- channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定消費隊列
- channel.basicConsume(queue_name, true, consumer);
- while (true) {
- // nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
- }
- }
- }
查看資料:
http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
http://www.rabbitmq.com/maxlength.html
https://www.cloudamqp.com/docs/delayed-messages.html