RabbitMQ 死信隊列


一、死信隊列

死信,顧名思義就是無法被消費的消息,一般來說 Producer 將消息投遞到 broker 或者直接丟到 queue 中,Consumer 從 Queue 中取出消息進行消費,但是某些時候由於特定的原因導致 Queue 中的某些消息無法被消費,這樣的消息如果沒有后續的處理就變成了死信,有死信自然就有了死信隊列

死信隊列有其特殊的應用場景,例如用戶在商城下單成功並點擊去支付的時候,如果在指定的時間內未支付,那么就可以將該下單消息投遞到死信隊列中,至於后續怎么處理死信隊列需要結合具體的應用場景

 

二、死信的來源

通常死信的來源有下面幾種方式

1、消息 TTL (Time To Live) 過期

2、隊列達到了最大長度,無法再添加消息到 MQ 中了

3、消息被拒,並且沒有重新入隊(basic.reject || basic.Nack) && (requeue = false)

 

三、消息 TTL 過期

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明正常消息的交換機(類型為 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交換機綁定正常消息隊列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回調
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消費者的回調
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回調接口");
        };
        // 消費者消費消息
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費消息");
    }
}

2、Consumer02

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道對象
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明死信交換機(topic 類型)
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
        // 死信交換機綁定死信隊列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        // 消息成功之后的回調
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消費者的回調
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回調接口");
        };
        // 消費者消費消息
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
        System.out.println("Consumer02 開始消費消息");
    }
}

3、Producer

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明一個 direct 類型的交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 消息發送 10 s 之后,如果沒有消費者進行消費,那么該消息就稱為死信,它就會進入死信隊列中
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        // 待發送的消息
        String message = "我是一只機智的小毛毛,很可愛,很機智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、測試過程及結果

啟動 Consumer01 將普通交換機、普通隊列注冊到 RabbitMQ 上,啟動 Consumer02 將死信交換機、死信隊列注冊到 RabbitMQ 上

然后為了演示消息超時之后可以進入死信隊列,我們關閉 Consumer01,模擬其接收不到消息,為了不讓死信消息被消費者消費掉,我們關閉 Consumer02,然后啟動生產者 Producer

10 s 之后普通隊列里的消息進入死信隊列中

接着啟動消費者 Consumer02 消費掉死信隊列中的消息

 

四、隊列達到最大長度

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明正常消息的交換機(類型為 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 設置正常隊列的最大長度
        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交換機綁定正常消息隊列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回調
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消費者的回調
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回調接口");
        };
        // 消費者消費消息
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費消息");
    }
}

2、Consumer02 代碼不變

3、Producer 

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明一個 direct 類型的交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 待發送的消息
        String message = "我是一只機智的小毛毛,很可愛,很機智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、測試過程及結果

刪除掉原先的正常交換機、正常隊列、死信交換機、死信隊列,然后按照上面的方式啟動 Consumer01、Consumer02 重新注冊正常交換機、正常隊列、死信交換機、死信隊列,接着關閉 Consumer01、Consumer02,最后啟動 Producer 發送消息(如果 Consumer01 是一直打開的情況下,正常隊列的消息就不會堆積到 6 條)

啟動 Consumer01、Consumer02,發現 Consumer01 消費了 6 條消息,Consumer02 消費了四條消息

 

五、消息被拒

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明正常消息的交換機(類型為 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交換機綁定正常消息隊列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回調
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            if (msg.contains("很機智4")) {
                System.out.println("Consumer01 接收到消息" + msg + "並拒絕簽收該消息");
                //requeue 設置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發送到死信隊列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 取消消費者的回調
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回調接口");
        };
        // 消費者消費消息(一定要開啟手動應答,如果你開啟了自動應答,根本不存在拒絕消息的情況)
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費消息");
    }
}

2、Consumer02

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道對象
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明死信交換機(topic 類型)
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
        // 死信交換機綁定死信隊列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        // 消息成功之后的回調
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println(msg);
        };
        // 取消消費者的回調
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回調接口");
        };
        // 消費者消費消息
        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
        System.out.println("Consumer02 開始消費消息");
    }
}

3、Producer

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 聲明一個 direct 類型的交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 待發送的消息
        String message = "我是一只機智的小毛毛,很可愛,很機智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、測試過程及結果

刪除掉原先的正常交換機、正常隊列、死信交換機、死信隊列,然后重新啟動 Consumer01、Consumer02 注冊正常交換機、正常隊列、死信交換機、死信隊列,接着關閉 Consumer02,啟動 Producer 發送消息

這里有幾點需要注意一下

1、因為只有被拒絕的消息才能進入死信隊列中,所以 Consumer01 不能關閉,為了能看到死信隊列里的消息,不讓它被消費掉,所以需要關閉 Consumer02

2、Consumer01 一定要開啟手動確認,因為自動確認的場景下根本不存在消息被拒絕的情況

打開死信隊列查看被拒絕的消息

啟動 Consumer02 消費死信消息

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM