關於消息的Time To Live(TTL)生存時間


TTL 其實就是一個消息存在有效時間,也可以說是最大存活時間,通常單位是毫秒

RabbitMQ的TTL的設置,RabbitMQ可以針對消息也可以針對隊列來設置TTL:

  • 關於消息的設置:對於特定消息的過期時間的設置,在消息發送的時候可以進行指定,每條消息的過期時間可以不同。

  • 關於隊列的設置RabbitMQ支持設置隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的超時時間配置,那么消息會變成死信,自動清除(沒配死信隊列的情況下)

  • 混合雙打的情況設置:如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為准。

  • 不設置TT的情況:,不設置表示消息不會過期;如果TTL設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟棄。

配置TTL的時間和方式:

使用策略為隊列定義消息TTL 使用命令行進行配置設置:

rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

rabbitmqctl (Windows)    
rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues

上面的設置將對對所有隊列應用60秒的TTL配置!

還可以通過使用接口請求進行設置:

curl -i -u guest:guest -H "content-type:application/json"  -XPUT 
-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}
代碼設置python代碼: queue_declare 中設置 x-message-ttl 參數,可以控制被 publish 到 queue 中的 message 被丟棄前能夠存活的時間,當某個 message 在 queue 留存的時間超過了配置的 TTL 值時,我們說該 message “已死”。

import pika
import sys

# 創建用戶登入的憑證,使用rabbitmq用戶密碼登錄
credentials = pika.PlainCredentials("guest","guest")
# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通過連接創建信道
channel = connection.channel()
# 通過信道創建我們的隊列 其中名稱是task_queue,並且這個隊列的消息是需要持久化的!PS:持久化存儲存到磁盤會占空間,
# 隊列不能由持久化變為普通隊列,反過來也是!否則會報錯!所以隊列類型創建的開始必須確定的!
channel.queue_declare(queue='task_queue', durable=True)
# 定義需要發的消息內容
# 開始發布消息到我們的代理服務器上,注意這里沒有對發生消息進行確認發生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message
    properties.expiration='2000'
    body = '小鍾同學你好!{}'.format(i).encode('utf-8')
    print(body)
    channel.basic_publish(
        # 默認使用的/的交換機
        exchange='',
        # 默認的匹配的key
        routing_key='task_queue',
        # 發送的消息的內容
        body=body,
        # 發現的消息的類型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化
    )

# 關於消息持久化需要注意幾個點,並非百分百的可達!主要原因有幾個點:
# 1:消息可達率,和網絡可通性和抖動之類有關!
# 2:還有和我們的RabbitMQ寫入磁盤時候的有關
# 可持久的需要條件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,)
# 關閉鏈接
connection.close()

上面的代碼是一個生產者端的代碼:其中最關鍵的代碼設置是:

   properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message
    properties.expiration='2000'

設置每個消息的過期時間是2秒,觀察我們的隊列的信息,一直發的情況下,他的待消費的內容還是比較少,那是因為的已經過期了!被丟棄了!

單獨對某消息的設置過期時間,和隊列的持久化的特性不沖突!!!

但是對隊列設置過期的時間話,那么和隊列的持久化的特性就會產生沖突!!!

針對隊列中所有消息設置TTL的方式:

出現的問題:

pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'task_queue' in vhost '/': received the value '2000' of type 'longstr' but current is none")

原因是:

我們即設置隊列為需持久化,但是又設置了過期時間!所以產生的沖突!!!

修改后還是繼續出現:

pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'task_queue' in vhost '/': received 'false' but current is 'true'")

這個是因為我們的一開始創建的隊列,本來就有這個屬性了!它是不能動態的修改這個隊列的屬性的!最好的的方式就是刪除這個隊列咯!如果還想繼續用這個隊列名稱的話!!或重新的新建一個! 還有一點:生產者和消費者對queue的聲明函數里,這個durable也記得需要保持一致!!!!

刪除隊列后出現新的問題:

pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - invalid arg \'x-message-ttl\' for queue \'task_queue\' in vhost \'/\': "expected integer, got longstr"')

原因是:

不能設置為字符設置為字符串的類型!!!

完整的針對隊列設置TTL的示例代碼:

# !/usr/bin/env python
import pika
import sys

# 創建用戶登入的憑證,使用rabbitmq用戶密碼登錄
credentials = pika.PlainCredentials("guest","guest")
# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通過連接創建信道
channel = connection.channel()
# 通過信道創建我們的隊列 其中名稱是task_queue,並且這個隊列的消息是需要持久化的!PS:持久化存儲存到磁盤會占空間,
# 隊列不能由持久化變為普通隊列,反過來也是!否則會報錯!所以隊列類型創建的開始必須確定的!
arguments = {}
# TTL: ttl的單位是us,ttl=60000 表示 60s
arguments['x-message-ttl'] = 2000
#  auto_delete=False,  # 最后一個隊列解綁則刪除  durable
# durable 和 x-message-ttl 不能同時的存在
channel.queue_declare(queue='task_queue', durable=False,arguments=arguments,auto_delete=False)
# 定義需要發的消息內容
# 開始發布消息到我們的代理服務器上,注意這里沒有對發生消息進行確認發生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message
    # properties.expiration='2000'
    body = '小鍾同學你好!{}'.format(i).encode('utf-8')
    print(body)
    channel.basic_publish(
        # 默認使用的/的交換機
        exchange='',
        # 默認的匹配的key
        routing_key='task_queue',
        # 發送的消息的內容
        body=body,
        # 發現的消息的類型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化
    )

# 關於消息持久化需要注意幾個點,並非百分百的可達!主要原因有幾個點:
# 1:消息可達率,和網絡可通性和抖動之類有關!
# 2:還有和我們的RabbitMQ寫入磁盤時候的有關
# 可持久的需要條件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,)
# 關閉鏈接
connection.close()

如果綜合存在的話,驗證一下,比如我設置隊列的過期時間是1秒,消息的時間2秒: 混合雙打的情況設置:如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為准。這個是可以看得到!不貼代碼了!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM