RabbitMQ死信隊列


死信隊列是什么

死信,Dead Letter,一種消息機制,當消費者去消費隊列中的消息時,如果隊列中的消息出現了以下的情況:

  • 消費端執行nack或者reject時,設置requeue=false;
  • 消息在隊列中的時間超過設置的TTL(Time To Live)時間;
  • 隊列中消息的數量超過設置的最大數量;

那么這些消息就可以被稱之為死信消息,在配置了死信隊列的情況下,死信消息會進入死信隊列,如果沒有配置死信隊列,這些死信消息會被丟棄。

理解死信隊列

死信隊列並不僅僅只是一個queue,還包含死信交換機(Dead Letter Exchange),關於死信隊列和死信交換機要說明幾點:

死信交換機可以是fanout、direct、topic等類型,和普通交換機並無不同;

死信交換機要綁定要業務隊列上才會生效;

給死信交換機綁定的隊列稱之為死信隊列,其實就是普通的隊列,沒有任何特殊之處;

並不是整個項目只能設置一個死信交換機和死信隊列,可以根據業務需要設置多個或者單個死信交換機使用不同的routing-key;

代碼示例

配置文件

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: lzm
    password: lzm
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual  # 手動ack
        default-requeue-rejected: false # 設置為false,requeue或reject

創建交換機和隊列以及綁定

 /**
 * 死信交換機
 */
@Bean
public DirectExchange dlxExchange(){
	return new DirectExchange(dlxExchangeName);
}

/**
 * 死信隊列
 */
@Bean
public Queue dlxQueue(){
	return new Queue(dlxQueueName);
}

/**
 * 死信隊列綁定死信交換機
 */
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
	return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}

/**
 * 業務隊列
 */
@Bean
public Queue queue(){
	Map<String,Object> params = new HashMap<>();
	params.put("x-dead-letter-exchange",dlxExchangeName);//聲明當前隊列綁定的死信交換機
	params.put("x-dead-letter-routing-key",dlxRoutingKey);//聲明當前隊列的死信路由鍵
	params.put("x-message-ttl",10000);//設置隊列消息的超時時間,單位毫秒,超過時間進入死信隊列
	params.put("x-max-length", 10);//生命隊列的最大長度,超過長度的消息進入死信隊列
	return QueueBuilder.durable(queueName).withArguments(params).build();
}

/**
 * 業務交換機
 */
@Bean
public FanoutExchange fanoutExchange(){
	return new FanoutExchange(exchangeName,true,false);
}

/**
 * 業務隊列和業務交換機的綁定
 */
@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange){
	return  BindingBuilder.bind(queue).to(fanoutExchange);
}

注意創建業務隊列的部分,設置業務隊列的超時時間是10s,隊列中消息最大數量為10。

上面代碼中,業務交換機為fanout類型的交換機,死信交換機為Direct類型的交換機。

生產者

public void send(){
	for (int i = 0; i < 5; i++) {
		CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,message -> {
			message.getMessageProperties().setExpiration(3000+"");//發送消息時設置消息的超時時間
			return message;
		},correlationData);
	}
}

注意:

隊列中消息的超時時間可以是在創建隊列時設置,表示對隊列中所有的消息生效,也可以在發送消息時設置,兩者相比取最小值作為TTL的值。

先不啟動消費者,此時啟動生產者並向其中發送消息,剛發送完消息時如下所示:

三秒后消息自動進入死信隊列中

這也就驗證了上述所說的,當消息在隊列中的時間超過TTL的時間時,消息會自動進入死信隊列。針對這一特性,可以給消息設置過期時間后發送到某個隊列,從而來進行延遲消費

注意看上圖的紅框中的內容:

Lim:表示設置了隊列中消息數量x-max-length參數

DLX:表示設置了死信交換機x-dead-letter-exchange參數

DLK:表示設置了死信路由鍵x-dead-letter-routing-key參數,不設置該值時,消息在進入死信隊列后,路由鍵保持原來的不變,設置了該值,消息的路由鍵就變為新設置的值。

下面我們啟動消費者,並且模擬在某些情況下執行nack操作,先看消費者代碼

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
	try {
		if(msg.indexOf("5")>-1){
			throw new RuntimeException("拋出異常");
		}
		log.info("消息{}消費成功",msg);
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	} catch (Exception e) {
		log.error("接收消息過程中出現異常,執行nack");
		//第三個參數為true表示異常消息重新返回隊列,會導致一直在刷新消息,且返回的消息處於隊列頭部,影響后續消息的處理
		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
		log.error("消息{}異常",message.getMessageProperties().getHeaders());
	}
}

當消息中包含5時拋出異常,執行nack,其他消息都執行ack,生產者發送0-9共10條消息,執行結果如下:

同時查看死信隊列中的數據,確實只有1條

並且消息的交換機以及路由鍵都是我們在代碼中設置好的值

同時消息的headers中也會將進入死信隊列的原因以及次數等進行說明

也就是說在執行nack,同時設置requeue=false時,消息會自動進入死信隊列

最后我們再測試一下最大數量的問題,前面我們設置隊列中最大數量是10,此時關閉消費者,同時刪除隊列的TTL,然后發送20條數據到業務隊列中

可以看到業務隊列和死信隊列各有10條數據,也就是說隊列中的消息數量超過設置的最大數量時,消息會進入死信隊列

總結

死信交換機和死信隊列都只是普通的交換機和隊列,只不過被用來處理死信消息,而死信消息的產生是由於TTL過期或者隊列中的消息數超過最大消息數,再或者時消費端reject或者nack消息時設置了requeue=false,消息變為死信后,由死信交換機路由到死信隊列,再由專門的消費者消費死信隊列中的消息。

死信隊列更多的是用來保證消息的可靠性,主要用於比較重要的隊列,用以確保未被正確消費的消息不會丟失,其實也可以不用死信隊列,在消費端出現異常時,可以將消息從當前隊列ack掉,再將其發送到其他隊列,然后再單獨處理其他隊列,這都是可以的。

本節測試代碼參考碼雲.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM