死信消息和死信隊列


死信消息和死信隊列定義

Dead Letter Exchange 死信隊列(DLX)隊列的簡稱。

另外對於死信消息:通常如果我們的一個消息存在以下的情況下的話則這消息被稱為死信消息:

  • 1: 消息被消費端拒絕,使用 channel.basicNackchannel.basicReject ,並且此時requeue 屬性被設置為false

  • 2: 消息在隊列的存活時間超過設置的TTL時間

  • 3:消息隊列的消息數量已經超過最大隊列長度,無法再繼續新增消息到MQ中

  • 4:一個隊列中的消息的TTL對其他隊列中同一條消息的TTL沒有影響

對於死信消息的處理,Rabbitmq會依據是否配置死信隊列的配置來決定消息的去留! 如果開啟了配置死信隊列信息,則消息會被轉移到這個 死信隊列(DLX)中,如果沒有配置,則此消息會被丟棄!

死信隊列配置

  • 可以為每一個需要使用死信業務的隊列配置一個死信交換機

  • 每個隊列都可以配置專屬自己的死信隊列,相關消息的進入死信隊列需要經過死信交換機來進程歸納處理

  • 死信交換機也只是一個普通的交換機,只是它是用來專門處理死信的交換機

  • 創建隊列時可以給這個隊列附帶一個死信的交換機,在這個隊列里因各自情況出現問題的作廢的消息會被重新發到附帶的交換機,然后讓這個交換機重新路由這條消息。

具體的圖示:

 

 若要使用策略指定DLX,請將鍵“死信交換”添加到策略定義中。例如:

rabbitmqctl    
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

Rabbitmqctl(Windows)    
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues

上面的策略將DLX隊列“my-dlx”應用於所有隊列。上面只是一個例子,實際上不同的隊列可能會使用不同的死字設置(或者根本不使用)。

其他配置死信隊里的方式有:

x-dead-letter-exchange:出現死信(dead letter)之后將死信(dead letter)重新發送到指定exchange

x-dead-letter-routing-key:出現死信(dead letter)之后將死信(dead letter)重新按照指定的routing-key發送
PS:當指定了死信交換機后時,除了通常對聲明隊列的配置權限外,用戶還需要對該隊列具有讀取權限,並對死信交換機具有寫權限。權限在隊列聲明時進行驗證。

完整的一個簡單的示例:

下面的示例主要是演示里: 1:設置消息的過期的時間為2s,2s之后就變為我們的死信

2:變為死信的消息,會被轉移到我們的另一個死信交換機的隊列上

# !/usr/bin/env python
import pika
import sys

# 創建用戶登入的憑證,使用rabbitmq用戶密碼登錄
credentials = pika.PlainCredentials("guest","guest")
# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通過連接創建信道
channel = connection.channel()

# ========
#   創建異常交換器和隊列,用於存放沒有正常處理的消息。
channel.exchange_declare(exchange='xz-dead-letter-exchange',exchange_type='fanout',durable=True)
channel.queue_declare(queue='xz-dead-letter-queue',durable=True)
# 綁定隊列到指定的交換機
channel.queue_bind(queue='xz-dead-letter-queue',exchange= 'xz-dead-letter-exchange',routing_key= 'xz-dead-letter-queue')

# =========

# 通過信道創建我們的隊列 其中名稱是task_queue,並且這個隊列的消息是需要持久化的!PS:持久化存儲存到磁盤會占空間,
# 隊列不能由持久化變為普通隊列,反過來也是!否則會報錯!所以隊列類型創建的開始必須確定的!
arguments = {}
# TTL: ttl的單位是us,ttl=60000 表示 60s
# arguments['x-message-ttl'] = 2000
# 指定死信轉移到另一個交換機上具體的交換機的名稱
arguments['x-dead-letter-exchange'] = 'xz-dead-letter-exchange'
#  auto_delete=False,  # 最后一個隊列解綁則刪除  durable
# durable 和 x-message-ttl 不能同時的存在
channel.queue_declare(queue='task_queue', durable=True,arguments=arguments,auto_delete=False)
# 定義需要發的消息內容
# 開始發布消息到我們的代理服務器上,注意這里沒有對發生消息進行確認發生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message
    properties.expiration='2000'
    body = '小鍾同學你好!{}'.format(i).encode('utf-8')
    print(body.decode('utf-8'))
    channel.basic_publish(
        # 默認使用的/的交換機
        exchange='',
        # 默認的匹配的key
        routing_key='task_queue',
        # 發送的消息的內容
        body=body,
        # 發現的消息的類型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化
    )

connection.close()

運行上面的生產者的代碼后觀察我們的輸出: 中國發出了8個消息

小鍾同學你好!1
小鍾同學你好!2
小鍾同學你好!3
小鍾同學你好!4
小鍾同學你好!5
小鍾同學你好!6
小鍾同學你好!7
小鍾同學你好!8

結果這個8個消息都沒有人去消費的時候:最后都轉移到了死信的隊列里面:

 

 

 

 關於死信隊列需要注意的點(來自官網的說明):

消息在發布到死信隊列后DLX目標隊列后會立即從原始隊列中刪除。這確保沒有可能出現過多的消息積累,從而耗盡代理資源,但這確實意味着,如果目標隊列無法接受消息,消息可能會丟失

死信隊列里面的死信的消費

當我們的死信消費者去消費死信消息時候,需要注意點有:

我們的“死信”消息消息的properties里面的header字段信息中增加一個叫做“x-death"的數組內容,包含了以下字段內容:

<BasicProperties(['delivery_mode=2', "headers={'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}"])>

其中我們的'x-death'內容為::

{'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}

具體每個字段的意思是:

  • queue :進入死信隊列之前來自於哪個的消息隊列名稱
  • reason:這個消息變為死信的原因?expired 表示是因為過期!變為死信!
  • count:這個消息在這個隊列中被死了多少次
  • time:該消息發布時間
  • exchange :消息已發布到哪些交換機上,PS:如果這個消息是多次變為死信的話,這個地方最后就是死信的交換機
  • routing-keys 消息發不來來源的路由keys
  • original-expiration:原消息的過期時間屬性,PS:(如果消息是死信的話)每條消息ttl):。這個過期屬性將從死信中刪除,以防止它在被路由到的任何隊列中再次過期。
  • x-first-death-exchange:第一次變成死死信的時候來源的交換機
  • x-first-death-queue:第一次變成死信的時候來源隊列
  • x-first-death-reason:第一次變成死信的原因:expired 表示是因為過期!
其他變為死信的原因的說明:
rejected: 消息被消費者拒收且回放到消息獨立
expired: 消息的設置來TTL時間到期
maxlen: 超過了隊列運行的最大的值

延遲隊列

RabbitMQ本身沒有直接支持延遲隊列功能,但是通過對死信隊列和過期時間的使用,其實我們可以綜合起上面的兩個特性來實現一個所謂的延遲隊列,延遲隊列的意思就是:

某個消息再某個固定的時間后失效后,則進入到死信隊列里面,其他死信的消費者實時的處理這些過期的消息,這個就可以起到一個延遲處理的效果!

延遲隊列加上惰性隊列這種組合吧!其實也是可以考慮的!,即可以減小內存占用,又可以實現消息的延遲處理

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM