RabbitMQ死信隊列與延遲隊列


  簡單研究下消息、隊列的生存時間,以及死信隊列、延遲隊列。

簡單的說:

(1) 死信隊列就是消息進入另一個交換機,可以修改其routingKey進入另一個隊列。發生的情況為:當程序手動basicReject(false) 、消息TTL過期、隊列達到最大長度。

(2)隊列和消息都有個TTL生存時間,隊列的TTL到達后隊列會自動刪除,消息不會進入死信隊列;消息的生存時間到達后會進入死信隊列。消息的生存時間可以在隊列設置所有消息的TTL,也可以對某個消息單獨設置TTL

(3) 延遲隊列就是利用死信隊列,給消息設置TTL,到期后進入另一個死信隊列,我們可以監聽另一個死信隊列。

1. 簡介

死信隊列:DLX,dead-letter-exchange。利用DLX,當消息在一個隊列中變成死信 (dead message) 之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。DLX是一個正常的交換機,它們可以像普通的交換機一樣使用。

 延遲隊列:利用死信可以實現延遲隊列。 比如設置隊列有限期為60s,到期移動到另一個隊列。比如訂單30s,30s之后移動到另一個死信隊列,我們可以監聽另一個死信隊列。

1. 進入死信隊列的情況:

消息被拒絕(basic.reject / basic.nack),並且requeue = false

消息TTL過期。TTL:Time To Live的簡稱,即過期時間。RabbitMQ可以對消息和隊列設置TTL。

隊列達到最大長度

2.處理過程

DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。

當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列。可以監聽這個隊列中的消息做相應的處理。

2.隊列和消息的生存時間

  參考:https://www.rabbitmq.com/ttl.html

  首先簡單研究下隊列和消息的生存時間的使用。

1.給隊列中的消息設置ttl為1min

  Define Message TTL for Queues Using x-arguments During Declaration

package rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            Map<String, Object> arg = new HashMap<String, Object>();
            arg.put("x-message-ttl", 60000);
            createChannel.queueDeclare("myQueue", true, false, false,arg);
            
            String message = "測試消息";
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            createChannel.basicPublish("", "myQueue", null, message.getBytes());
            
            System.out.println("消息發送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (createChannel != null) {
                createChannel.close();
            }
            if (newConnection != null) {
                newConnection.close();
            }
        }
        
    }
}

結果:可以看到隊列有ttl屬性,並且1 min后消息自動丟棄。

 2. 設置消息的過期時間

 只對單個消息有效。核心代碼如下:

            createChannel.queueDeclare("myQueue", true, false, false, null);
            String message = "測試消息";
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("60000")
                    .build();
            createChannel.basicPublish("", "myQueue", properties, message.getBytes());

  結果1min后消息自動丟棄。

3.給隊列設置生存時間(了解即可,超時之后不進入死信)

This example in Java creates a queue which expires after it has been unused for 1 minutes.

            Map<String, Object> arg = new HashMap<String, Object>();
            arg.put("x-expires", 60000);
            createChannel.queueDeclare("myQueue", true, false, false, arg);
            String message = "測試消息";
            createChannel.basicPublish("", "myQueue", null, message.getBytes());

結果:隊列有生存時間,1min后隊列自動刪除。

 3.死信隊列Dead Letter Exchanges (DLX)

參考: https://www.rabbitmq.com/dlx.html

 官網中隊發生死信的情況解釋如下:

The message is negatively acknowledged by a consumer using basic.reject or basic.nack with requeue parameter set to false.

The message expires due to per-message TTL; or

The message is dropped because its queue exceeded a length limit

(Note that expiration of a queue will not dead letter the messages in it.)也就是隊列生存時間達到之后不會進入死信隊列。

 1. 死信隊列的簡單使用

1. 生產者:

  聲明隊列的時候用屬性指定其死信隊列交換機名稱。

測試:

package rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            
            // 聲明一個正常的direct類型的交換機
            createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
            // 聲明死信交換機為===order.dead.exchange
            String dlxName = "order.dead.exchange";
            createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
            // 聲明隊列並指定死信交換機為上面死信交換機
            Map<String, Object> arg = new HashMap<String, Object>();
            arg.put("x-dead-letter-exchange", dlxName);
            createChannel.queueDeclare("myQueue", true, false, false, arg);
            
            String message = "測試消息";
            createChannel.basicPublish("order.exchange", "routing_key_myQueue", null, message.getBytes());
            System.out.println("消息發送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (createChannel != null) {
                createChannel.close();
            }
            if (newConnection != null) {
                newConnection.close();
            }
        }
        
    }
}

結果:

(1)生成兩個Exchange

 (2)隊列myQueue的死信隊列有屬性

2. 消費者: 

  一個消費者監聽正常隊列,一個消費者監聽死信隊列。(只是綁定的交換機不同)

消費者一:監聽正常隊列

package rabbitmq;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();

            // 隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            createChannel.queueBind("myQueue", "order.exchange", "routing_key_myQueue");

            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    System.out.println("consumerTag: " + consumerTag);
                    System.out.println("envelope: " + envelope);
                    System.out.println("properties: " + properties);
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到消息: -》 " + string);

                    long deliveryTag = envelope.getDeliveryTag();
                    Channel channel = this.getChannel();
                    System.out.println("拒絕消息, 使之進入死信隊列");
                    System.out.println("時間: " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                    }
                    
                    // basicReject第二個參數為false進入死信隊列或丟棄
                    channel.basicReject(deliveryTag, false);
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }

    }
}

消費者二:監聽死信隊列

package rabbitmq;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer2 {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();

            // 隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue");

            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    System.out.println("時間: " + new Date());
                    
                    System.out.println("consumerTag: " + consumerTag);
                    System.out.println("envelope: " + envelope);
                    System.out.println("properties: " + properties);
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到消息: -》 " + string);

                    long deliveryTag = envelope.getDeliveryTag();
                    Channel channel = this.getChannel();
                    channel.basicAck(deliveryTag, true);
                    System.out.println("死信隊列中處理完消息息");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }

    }
}

結果: 消費者一先正常監聽到,basicReject為false拒絕后進入死信隊列;消費者二監聽的死信隊列收到消息。

消費者一打出的日志如下:

consumerTag: amq.ctag-0noHs24F0FsGe-dfwwqWNw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測試消息
拒絕消息, 使之進入死信隊列
時間: Sat Nov 07 12:18:44 CST 2020

 

消費者二打出的日志如下:

時間: Sat Nov 07 12:18:47 CST 2020
consumerTag: amq.ctag-ajYMpMFkXHDiYWkD3XFJ7Q
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 01:52:19 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測試消息 死信隊列中處理完消息息

注意:

  進入死信隊列之后,headers 加了一些死信相關的信息,包括原隊列以及進入死信的原因。

補充:在隊列進入死信隊列之前也可以修改其routingKey,而且只有在指定x-dead-letter-exchange的前提下才能修改下面屬性,否則會報錯

(1)修改生產者聲明隊列的方式,如下:

            // 聲明一個正常的direct類型的交換機
            createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
            // 聲明死信交換機為===order.dead.exchange
            String dlxName = "order.dead.exchange";
            createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
            // 聲明隊列並指定死信交換機為上面死信交換機
            Map<String, Object> arg = new HashMap<String, Object>();
            arg.put("x-dead-letter-exchange", dlxName);
            // 修改進入死信隊列的routingkey,如果不修改會使用默認的routingKey
            arg.put("x-dead-letter-routing-key", "routing_key_myQueue_dead");
            createChannel.queueDeclare("myQueue", true, false, false, arg);

(2)修改監聽死信隊列的消費者二:

            // 隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue_dead");

結果,收到消費者二收到的信息如下:

時間: Sat Nov 07 12:27:08 CST 2020
consumerTag: amq.ctag-THqpEdYH_-iNeCIccgpuaw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue_dead)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 02:00:41 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 測試消息
死信隊列中處理完消息

4. 延時隊列

  有時我們需要用到延時隊列,比如說一個場景我們在使用抖音搶購有時候五分鍾未下單我們就可以再次搶單。簡單的說,用戶下單了,庫存減一;5分鍾未支付,獲取到該訂單,將商品庫存加一。

  rabbitMQ本身沒提供延時隊列,我們可以利用消息的生存時間和死信隊列實現延時。

1.生產者:聲明隊列的消息生存時間、聲明死信交換機和路由key

package rabbitmq;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();

            // 聲明一個正常的direct類型的交換機
            createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
            // 聲明死信交換機為===order.dead.exchange
            String dlxName = "order.dead.exchange";
            createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
            // 聲明隊列並指定死信交換機為上面死信交換機
            Map<String, Object> arg = new HashMap<String, Object>();
            arg.put("x-dead-letter-exchange", dlxName);
            // 修改進入死信隊列的routingkey,如果不修改會使用默認的routingKey
            arg.put("x-dead-letter-routing-key", "routing_key_myQueue_dead");
            // 設置消息的生存時間是1分鍾,超時進入死信隊列
            arg.put("x-message-ttl", 60000);
            createChannel.queueDeclare("myQueue", true, false, false, arg);
            
            // 綁定正常的queue
            createChannel.queueBind("myQueue", "order.exchange", "routing_key_myQueue");

            String message = "訂單編號: 001, 訂單生成時間: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            createChannel.basicPublish("order.exchange", "routing_key_myQueue", null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (createChannel != null) {
                createChannel.close();
            }
            if (newConnection != null) {
                newConnection.close();
            }
        }

    }
}

2.消費者: 監聽死信交換機和路由key。

package rabbitmq;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer2 {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();

            createChannel.queueDeclare("order.expiredQueue", true, false, false, null);
            // 隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            createChannel.queueBind("order.expiredQueue", "order.dead.exchange", "routing_key_myQueue_dead");
            createChannel.basicConsume("order.expiredQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    String msg = new String(body, "UTF-8");
                    System.out.println("當前時間: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                    System.out.println("死信隊列中收到的訂單信息: " + msg);
                    // 處理超時訂單,庫存加一
                    
                    // 應答
                    long deliveryTag = envelope.getDeliveryTag();
                    Channel channel = this.getChannel();
                    channel.basicAck(deliveryTag, true);
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }

    }
}

結果:

當前時間: 2020-11-07 12:52:48
死信隊列中收到的訂單信息: 訂單編號: 001, 訂單生成時間: 2020-11-07 12:51:48

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM