RabbitMQ配置死信隊列


死信隊列

消息傳輸過程中難免會產生一些無法及時處理的消息,這些暫時無法處理的消息有時候也是需要被保留下來的,於是這些無法被及時處理的消息就變成了死信。
既然需要保留這些死信,那么就需要一個容器來存儲它們以便后續需要時將它們取出來進行處理,於是就有了死信隊列。
在RabbitMQ中當一個消息變成死信后會被重新發送到一個死信交換機(DLXs)中,當下列情況發生時隊列中的消息會變成死信:
1:當消費端使用手動ack時,requeue屬性為false時,消息被拒絕(basic.reject, basic.nack),換句話說就是消息被拒絕接收又不能回到原始隊列中去
2:消息過期
3:隊列超出最大限制導致消息無法發送到隊列

死信交換機

死信交換機只是一個普通的交換機,它的聲明使用與普通交換機沒有什么區別
對於任意的隊列,一個死信交換機可以被定義通過客戶端使用隊列參數,或者在服務端使用策略(polices),強烈建議使用polices的方式配置死信交換機,因為它不需要修改客戶端代碼,也不需要重啟服務。

下面介紹使用RabbitMQ的管理界面進行死信隊列的配置

  1. 進入policies配置頁面
  2. 配置死信策略
  3. 在交換機界面配置死信交換機
  4. 在隊列頁面新建一個用於接收死信的死信隊列
  5. 配置死信交換機的路由規則
  6. 配置完成

測試

生產者代碼

public class DeadSender {

    private static final String EXCHANGE_NAME = "amqp.car";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.135.88");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明一個名為amqp.car的交換機,將消息發送至該交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //每隔一秒發送一條消息,設置routind Key 為 car
        while (true) {
            channel.basicPublish(EXCHANGE_NAME, "car", null, "我是死信消息".getBytes("UTF-8"));
            TimeUnit.SECONDS.sleep(1);
        }
    }
}
  1. 消費者代碼,拒絕所有消息用於測試
public class DeadReceive {

    private static final String EXCHANGE_NAME = "amqp.car";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.135.88");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //聲明一個隊列用於接收消息
        String queueName = "car.queue";
        channel.queueDeclare(queueName, true, false, false, null);
        //綁定隊列,設置routing key為car
        channel.queueBind(queueName, EXCHANGE_NAME, "car");
        //消息接收后的回調方法
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 收到信息:" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            //收到消息后直接拒絕,並設置requeue屬性為false,這樣被拒絕的消息就不會重新回到原始隊列中而是轉發到死信交換機
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        };
        //接收消息,關閉自動ack
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    }
}
  1. 分別運行消費者與生產者,在RabbitMQ的管理界面觀察隊列的狀態


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM