TTL 其實就是一個消息存在有效時間,也可以說是最大存活時間,通常單位是毫秒
RabbitMQ的TTL的設置,RabbitMQ可以針對消息也可以針對隊列來設置TTL:
-
關於消息的設置:對於特定消息的過期時間的設置,在消息發送的時候可以進行指定,每條消息的過期時間可以不同。
-
關於隊列的設置:RabbitMQ支持設置隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的超時時間配置,那么消息會變成死信,自動清除(沒配死信隊列的情況下)。
-
混合雙打的情況設置:如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為准。
-
不設置TT的情況:,不設置表示消息不會過期;如果TTL設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟棄。
配置TTL的時間和方式:
使用策略為隊列定義消息TTL 使用命令行進行配置設置:
rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues rabbitmqctl (Windows) rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues
上面的設置將對對所有隊列應用60秒的TTL配置!
還可以通過使用接口請求進行設置:
curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' http://localhost:15672/api/queues/{vhost}/{queuename}
import pika import sys # 創建用戶登入的憑證,使用rabbitmq用戶密碼登錄 credentials = pika.PlainCredentials("guest","guest") # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) # 通過連接創建信道 channel = connection.channel() # 通過信道創建我們的隊列 其中名稱是task_queue,並且這個隊列的消息是需要持久化的!PS:持久化存儲存到磁盤會占空間, # 隊列不能由持久化變為普通隊列,反過來也是!否則會報錯!所以隊列類型創建的開始必須確定的! channel.queue_declare(queue='task_queue', durable=True) # 定義需要發的消息內容 # 開始發布消息到我們的代理服務器上,注意這里沒有對發生消息進行確認發生成功!!! import time for i in range(1,100): time.sleep(1) properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message properties.expiration='2000' body = '小鍾同學你好!{}'.format(i).encode('utf-8') print(body) channel.basic_publish( # 默認使用的/的交換機 exchange='', # 默認的匹配的key routing_key='task_queue', # 發送的消息的內容 body=body, # 發現的消息的類型 properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化 ) # 關於消息持久化需要注意幾個點,並非百分百的可達!主要原因有幾個點: # 1:消息可達率,和網絡可通性和抖動之類有關! # 2:還有和我們的RabbitMQ寫入磁盤時候的有關 # 可持久的需要條件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,) # 關閉鏈接 connection.close()
上面的代碼是一個生產者端的代碼:其中最關鍵的代碼設置是:
properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message properties.expiration='2000'
設置每個消息的過期時間是2秒,觀察我們的隊列的信息,一直發的情況下,他的待消費的內容還是比較少,那是因為的已經過期了!被丟棄了!
單獨對某消息的設置過期時間,和隊列的持久化的特性不沖突!!!
但是對隊列設置過期的時間話,那么和隊列的持久化的特性就會產生沖突!!!
針對隊列中所有消息設置TTL的方式:
出現的問題:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'task_queue' in vhost '/': received the value '2000' of type 'longstr' but current is none")
原因是:
我們即設置隊列為需持久化,但是又設置了過期時間!所以產生的沖突!!!
修改后還是繼續出現:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'task_queue' in vhost '/': received 'false' but current is 'true'")
這個是因為我們的一開始創建的隊列,本來就有這個屬性了!它是不能動態的修改這個隊列的屬性的!最好的的方式就是刪除這個隊列咯!如果還想繼續用這個隊列名稱的話!!或重新的新建一個! 還有一點:生產者和消費者對queue的聲明函數里,這個durable也記得需要保持一致!!!!
刪除隊列后出現新的問題:
pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - invalid arg \'x-message-ttl\' for queue \'task_queue\' in vhost \'/\': "expected integer, got longstr"')
原因是:
不能設置為字符設置為字符串的類型!!!
完整的針對隊列設置TTL的示例代碼:
# !/usr/bin/env python import pika import sys # 創建用戶登入的憑證,使用rabbitmq用戶密碼登錄 credentials = pika.PlainCredentials("guest","guest") # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) # 通過連接創建信道 channel = connection.channel() # 通過信道創建我們的隊列 其中名稱是task_queue,並且這個隊列的消息是需要持久化的!PS:持久化存儲存到磁盤會占空間, # 隊列不能由持久化變為普通隊列,反過來也是!否則會報錯!所以隊列類型創建的開始必須確定的! arguments = {} # TTL: ttl的單位是us,ttl=60000 表示 60s arguments['x-message-ttl'] = 2000 # auto_delete=False, # 最后一個隊列解綁則刪除 durable # durable 和 x-message-ttl 不能同時的存在 channel.queue_declare(queue='task_queue', durable=False,arguments=arguments,auto_delete=False) # 定義需要發的消息內容 # 開始發布消息到我們的代理服務器上,注意這里沒有對發生消息進行確認發生成功!!! import time for i in range(1,100): time.sleep(1) properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒為單位表示 TTL 值,6 秒的 message # properties.expiration='2000' body = '小鍾同學你好!{}'.format(i).encode('utf-8') print(body) channel.basic_publish( # 默認使用的/的交換機 exchange='', # 默認的匹配的key routing_key='task_queue', # 發送的消息的內容 body=body, # 發現的消息的類型 properties=properties# pika.BasicProperties中的delivery_mode=2指明message為持久的,1 的話 表示不是持久化 2:表示持久化 ) # 關於消息持久化需要注意幾個點,並非百分百的可達!主要原因有幾個點: # 1:消息可達率,和網絡可通性和抖動之類有關! # 2:還有和我們的RabbitMQ寫入磁盤時候的有關 # 可持久的需要條件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,) # 關閉鏈接 connection.close()
如果綜合存在的話,驗證一下,比如我設置隊列的過期時間是1秒,消息的時間2秒: 混合雙打的情況設置:如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為准。這個是可以看得到!不貼代碼了!