RabbitMQ延遲隊列(Python版)


作者:Bge的博客

原文鏈接:https://blog.csdn.net/weixin_43437629/article/details/87196729

最近在做一任務時,遇到需要延遲處理的數據,最開始的做法是現將數據存儲在數據庫,然后寫個腳本,隔五分鍾掃描數據表再處理數據,實際效果並不好。因為系統本身一直在用RabbitMQ做異步處理任務的中間件,所以想到是否可以利用RabbitMQ實現延遲隊列。功夫不負有心人,RabbitMQ雖然沒有現成可用的延遲隊列,但是可以利用其兩個重要特性來實現之:1、Time To Live(TTL)消息超時機制;2、Dead Letter Exchanges(DLX)死信隊列。下面將具體描述實現原理以及實現代

延遲隊列的基礎原理

Time To Live(TTL)

RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為准),則消息變為dead letter(死信)
RabbitMQ消息的過期時間有兩種方法設置。

  • 通過隊列(Queue)的屬性設置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)
  • 對消息單獨設置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為准。消息在隊列的生存時間一旦超過設置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。

  • x-dead-letter-exchange:出現死信(dead letter)之后將dead letter重新發送到指定exchange
  • x-dead-letter-routing-key:出現死信(dead letter)之后將dead letter重新按照指定的routing-key發送

隊列中出現死信(dead letter)的情況有:

  • 消息或者隊列的TTL過期。(延遲隊列利用的特性)
  • 隊列達到最大長度
  • 消息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

綜合上面兩個特性,將隊列設置TTL規則,隊列TTL過期后消息會變成死信,然后利用DLX特性將其轉發到另外的交換機和隊列就可以被重新消費,達到延遲消費效果。

 

 

延遲隊列設計及實現(Python)

從上面描述,延遲隊列的實現大致分為兩步:

  1. 產生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的消息延遲處理時間相同,所以本實現中采用 Queue TTL設置隊列的TTL,如果需要將隊列中的消息設置不同的延遲處理時間,則設置Per-Message TTL(官方文檔

  2. 設置死信的轉發規則,Dead Letter Exchanges設置方法(官方文檔
    完整代碼如下:

"""
Created on Fri Aug  3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
    def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
        self.exchange_type = "direct"
        self.connection_string = conn_str
        self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
        self.channel = self.connection.channel()
        self._declare_retry_queue() #RetryQueue and RetryExchange
        logging.debug("connection established")
    def close_connection(self):
        self.connection.close()
        logging.debug("connection closed")
    def declare_exchange(self, exchange):
        self.channel.exchange_declare(exchange=exchange,
                                      exchange_type=self.exchange_type,
                                      durable=True)
    def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue,
                                   durable=True,)
    def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
        """
        創建延遲隊列
        :param TTL: ttl的單位是us,ttl=60000 表示 60s
        :param queue:
        :param DLX:死信轉發的exchange
        :return:
        """
        arguments={}
        if DLX:
            #設置死信轉發的exchange
            arguments[ 'x-dead-letter-exchange']=DLX
        if TTL:
            arguments['x-message-ttl']=TTL
        print(arguments)
        self.channel.queue_declare(queue=queue,
                                   durable=True,
                                   arguments=arguments)
    def _declare_retry_queue(self):
        """
        創建異常交換器和隊列,用於存放沒有正常處理的消息。
        :return:
        """
        self.channel.exchange_declare(exchange='RetryExchange',
                                      exchange_type='fanout',
                                      durable=True)
        self.channel.queue_declare(queue='RetryQueue',
                                   durable=True)
        self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
    def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
        """
        發送消息到指定的交換器
        :param exchange: RabbitMQ交換器
        :param msg: 消息實體,是一個序列化的JSON字符串
        :return:
        """
        if delay==0:
            self.declare_queue(routing_key)
        else:
            self.declare_delay_queue(routing_key,TTL=TTL)
        if exchange!='':
            self.declare_exchange(exchange)
        self.channel.basic_publish(exchange=exchange,
                                   routing_key=routing_key,
                                   body=msg,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       type=exchange
                                   ))
        self.close_connection()
        print("message send out to %s" % exchange)
        logging.debug("message send out to %s" % exchange)
    def start_consume(self,callback,queue='#',delay=1):
        """
        啟動消費者,開始消費RabbitMQ中的消息
        :return:
        """
        if delay==1:
            queue='RetryQueue'
        else:
            self.declare_queue(queue)
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(  # 消費消息
                callback,  # 如果收到消息,就調用callback函數來處理消息
                queue=queue,  # 你要從那個隊列里收消息
            )
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop_consuming()
    def stop_consuming(self):
        self.channel.stop_consuming()
        self.close_connection()
    def message_handle_successfully(channel, method):
        """
        如果消息處理正常完成,必須調用此方法,
        否則RabbitMQ會認為消息處理不成功,重新將消息放回待執行隊列中
        :param channel: 回調函數的channel參數
        :param method: 回調函數的method參數
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)
    def message_handle_failed(channel, method):
        """
        如果消息處理失敗,應該調用此方法,會自動將消息放入異常隊列
        :param channel: 回調函數的channel參數
        :param method: 回調函數的method參數
        :return:
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")

消費者代碼如下:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
        msg = body.decode()
        print(msg)
        # 如果處理成功,則調用此消息回復ack,表示消息成功處理完成。
        RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)

 

更多參考:

【RabbitMQ】一文帶你搞定RabbitMQ死信隊列

【RabbitMQ】一文帶你搞定RabbitMQ延遲隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM