八、RabbitMq死信隊列與延遲隊列


1 死信隊列

1.1 死信的概念

先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由於特定的原因導致 queue 中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,有死信自然就有了死信隊列。

應用場景:為了保證訂單業務的消息數據不丟失,需要使用到 RabbitMQ 的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中.還有比如說: 用戶在商城下單成功並點擊去支付后在指定時間未支付時自動失效

1.2 死信的來源

  1. 消息 TTL 過期,表示消息在隊列中等待的最大時間。如果不設置TTL,表示消息永遠不會過期
  2. 隊列達到最大長度(隊列滿了,無法再添加數據到 mq 中)
  3. 消息被拒絕(basic.reject 或 basic.nack)並且 requeue=false.

1.3 死信實戰

代碼架構圖

死信架構

1.3.1 消息 TTL 過期

生產者代碼

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //設置消息的 TTL 時間
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            //該信息是用作演示隊列個數限制
            for (int i = 1; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
                System.out.println("生產者發送消息:" + message);
            }
        }
    }
}

消費者 C1 代碼(啟動之后關閉該消費者 模擬其接收不到消息)

public class Consumer01 {
    //普通交換機名稱
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //聲明死信和普通交換機 類型為 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //聲明死信隊列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信隊列綁定死信交換機與 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常隊列綁定死信隊列信息
        Map<String, Object> params = new HashMap<>();
        //正常隊列設置死信交換機 參數 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常隊列設置死信 routing-key 參數 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信隊列效果0

消費者 C2 代碼(以上步驟完成后 啟動 C2 消費者 它消費死信隊列里面的消息)

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信隊列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信隊列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信隊列效果1.png

1.3.2 隊列達到最大長度

  1. 消息生產者代碼去掉 TTL 屬性
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //該信息是用作演示隊列個數限制
            for (int i = 1; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
                System.out.println("生產者發送消息:" + message);
            }
        }
    }
}
  1. C1 消費者修改以下代碼(啟動之后關閉該消費者 模擬其接收不到消息)

params.put("x-max-length", 6);

//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設置死信交換機 參數 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設置死信 routing-key 參數 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 設置正常隊列長度的限制
params.put("x-max-length", 6);

注意此時需要把原先隊列刪除 因為參數改變了

  1. C2 消費者代碼不變(啟動 C2 消費者)

死信隊列效果2

1.3.3 消息被拒

  1. 消息生產者代碼同上生產者一致

  2. C1 消費者代碼(啟動之后關閉該消費者 模擬其接收不到消息)

public class Consumer01 {
    //普通交換機名稱
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //聲明死信和普通交換機 類型為 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //聲明死信隊列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信隊列綁定死信交換機與 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常隊列綁定死信隊列信息
        Map<String, Object> params = new HashMap<>();
        //正常隊列設置死信交換機 參數 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常隊列設置死信 routing-key 參數 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "並拒絕簽收該消息");
                //requeue 設置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發送到死信隊列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
        });
    }
}

死信隊列效果3.png

  1. C2 消費者代碼不變

啟動消費者 1 然后再啟動消費者 2

死信隊列效果4.png

2 延遲隊列

2.1 延遲隊列概念

延時隊列,隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。

2.2 延遲隊列使用場景

  1. 訂單在十分鍾之內未支付則自動取消

  2. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。

  3. 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。

  4. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。

  5. 預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議

這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鍾之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉;看起來似乎使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然后處理不就完事了嗎?如果數據量比較少,確實可以這樣做,比如:對於“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對於時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對於數據量比較大,並且時效性較強的場景,如:“訂單十分鍾內未支付則關閉“,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下

延遲隊列流程

2.3 RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設置了 TTL 屬性或者進入了設置 TTL 屬性的隊列,那么這條消息如果在 TTL 設置的時間內沒有被消費,則會成為"死信"。如果同時配置了隊列的 TTL 和消息的TTL,那么較小的那個值將會被使用,有兩種方式設置 TTL。

2.3.1 消息設置 TTL

這一種方式便是針對每條消息設置 TTL

//設置消息的 TTL 時間
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

2.3.2 隊列設置 TTL

第一種是在創建隊列的時候設置隊列的“x-message-ttl”屬性

params.put("x-message-ttl", 10000);

//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設置死信交換機 參數 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設置死信 routing-key 參數 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 設置正常隊列長度的限制
params.put("x-max-length", 6);
// 設置隊列超時時間為10秒
params.put("x-message-ttl", 10000);

String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);

2.3.3 兩者的區別

如果設置了隊列的 TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列被丟到死信隊列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需要注意的一點是,如果不設置 TTL,表示消息永遠不會過期,如果將 TTL 設置為 0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會被丟棄。

3 總結

延遲隊列,其實就是應用了TTL+死信隊列,來達到延遲隊列小效果,想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL 則剛好能讓消息在延遲多久之后成為死信,另一方面,成為死信的消息都會被投遞到死信隊列里,這樣只需要消費者一直消費死信隊列里的消息就完事了,因為里面的消息都是希望被立即處理的消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM