RabbitMQ 死信隊列 + TTL介紹


一、RabbitMQ的的死信隊列+ TTL

1、什么是TTL

  • time to live 消息存活時間
  • 如果消息在存活時間內未被消費,則會被清除
  • RabbitMQ支持兩種ttl設置
    • 單獨消息進行配置ttl
    • 整個隊列進行配置ttl(居多)

2、什么是rabbitmq的死信隊列

  • 沒有被及時消費的消息存放的隊列

3、什么是rabbitmq的死信交換機

  • Dead Letter Exchange(死信交換機,縮寫: DLX)當消息成為死信后,會被重新發送到另⼀個交換機,這個交換機就是DLX死信交換機。

4、消息有哪幾種情況成為死信

  • 消費者拒收消息basic.reject/ basic.nack,並且沒有重新入隊 requeue=false
  • 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
  • 隊列的消息長度達到極限
  • 結果:消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

二、RabbitMQ管控台消息TTL測試

隊列過期時間使用參數,對整個隊列消息統⼀過期

  • x-message-ttl
  • 單位ms(毫秒)

消息過期時間使用參數(如果隊列頭部消息未過期,隊列中級消息已經過期,消息還在隊列里面)

  • expiration
  • 單位ms(毫秒)

兩者都配置的話,時間短的先觸發

1、RabbitMQ Web控制台測試

新建死信交換機(和普通沒區別)

新建死信隊列 (和普通沒區別)

死信交換機和隊列綁定

 

新建普通隊列,設置過期時間、指定死信交換機

測試:直接web控制台往product_qeueu發送消息即可,會看到消息先是在product_qeueu隊列停留10秒(因為沒有消費者消費),然后該消息從product_qeueu移入到dead_queue。

三、RabbitMQ的延遲隊列和應用場景

1、什么是延遲隊列

⼀種帶有延遲功能的消息隊列, Producer 將消息發送到消息隊列服務端,但並不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某⼀個時間投遞到Consumer進行消費,該消息即定時消息。

2、使用場景

  • 通過消息觸發⼀些定時任務,比如在某⼀固定時間點向用戶發送提醒消息
  • 用戶登錄之后5分鍾給用戶做分類推送、用戶多少天未登錄給用戶做召回推送;
  • 消息生產和消費有時間窗⼝要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送⼀條延時消息。這條消息將會在30分鍾以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略

四、實戰

1、背景

JD、淘系、天貓、拼多多電商平台,規定新注冊的商家,審核通過后需要在【規定時間】內上架商品,否則凍結賬號。

2、代碼開發

死信交換機和死信隊列開發,topic交換機和隊列開發,綁定死信交換機

package net.xdclass.xdclasssp.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
    //死信隊列
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";

    //死信交換機
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";

    //進入死信隊列的路由key
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";

    //創建死信交換機
    @Bean
    public Exchange lockMerchantDeadExchange() {
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
    }

    //創建死信隊列
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    //綁定死信交換機和死信隊列
    @Bean
    public Binding lockMerchantBinding() {
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
    }

    //普通隊列,綁定的個死信交換機
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";

    //普通的topic交換機
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";

    //路由key
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";

    //創建普通交換機
    @Bean
    public Exchange newMerchantExchange() {
        return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
    }

    //創建普通隊列
    @Bean
    public Queue newMerchantQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息過期后,進入到死信交換機
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息過期后,進入到死信交換機的路由key
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
        //過期時間,單位毫秒
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    //綁定交換機和隊列
    @Bean
    public Binding newMerchantBinding() {
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null);
    }
}

消息生產和消費

  • 消息生產
    • 投遞到普通的topic交換機
    • 消息過期,進入死信交換機
  • 消息消費
    • 消費者監聽死信交換機的隊列

MerchantAccountController 模擬請求

@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("check")
    public Object check(){

        //修改數據庫的商家賬號狀態  TODO

        rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家賬號通過審核");

        Map<String,Object> map = new HashMap<>();
        map.put("code",0);
        map.put("msg","賬號審核通過,請10秒內上傳1個商品");
        return map;
    }
}

MerchantMQListener消費

package net.xdclass.xdclasssp.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {

    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("body="+body);
        //做復雜業務邏輯  TODO

        //告訴broker,消息已經被確認
        channel.basicAck(msgTag,false);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM