RabbitMQ死循環-延長ACK時間


一、應用背景

  今天做一個需求,要將RabbitMQ中的任務取出並執行,為防止任務執行期間出錯,設置NO_ACK=FALSE標志,這樣、一旦任務沒有應答的話,相應的任務就會被RabbitMQ自動Re-Queue,避免丟失任務。然而、由於任務執行時間較長,通常需要五、六分鍾,甚至更長;我們都知道一旦一個任務被取出執行,該任務就從Ready狀態更改成Unacked狀態。如圖所示:

  當這個任務執行完之后,程序將向RabbitMQ發送ACK消息確認,RabbitMQ在收到ACK消息后,會將該任務移出隊列;然而、問題出在任務尚未執行完畢【執行時間太久】,RabbitMQ再等了一段時間【大約兩三分鍾】后,一直沒有收到ACK確認消息,就將該任務自動Re-Queue了【我是一個生產者,一個消費者模式】,也就是說、我們這里發生了死循環【任務永遠也執行不完,因為會一直Re-Queue】。

二、延長RabbitMQ ACK應答時間

  到這里,我們急需解決的問題就是,怎么能設置RabbitMQ延長等待ACK的時間,百度一下、兩下,各種讀網絡文檔,研究操作RabbitMQ工作的文檔,查了一圈資料也沒查出怎么延長RabbitMQ ACK時間【廢柴啊】。至此、一直查不出來,就想問一下網友的你,你知道怎么延長RabbitMQ接受ACK應答時間么?

三、改變解決問題方式

  在查不出如何延長ACK應答時間后,我將注意力轉向如何檢測當前任務操作超時的,后來在官網看到這么一段話:

  鏈接官網位置:http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout

  

   后面、就簡單測試下將heartbeat參數設置為0,以禁用心跳檢測,這樣基本解決了我的問題;雖然官方不建議這么做,但也是一種解決思路,如果大家有什么更好的解決辦法,煩請在下面留言【先謝謝啦】。

  至此、這個問題基本闡述清楚了,如果有遇到的小伙伴,也請參考下上面的操作。

  測試代碼:

 

# import json
# from concurrent.futures import ThreadPoolExecutor
from queue import Queue
# from threading import Thread

from pika import BasicProperties, BlockingConnection, URLParameters
from pika.exceptions import ConnectionClosed


# from automation.aiclient.aiclient import AsyncAIRequestManager


class RabbitMQManager:
    def __init__(self, host = 'localhost', qname = 'queue'):
        self.params = URLParameters(host)
        self.qname = qname
        self.prod_conn = None
        self.prod_chan = None
        self.cons_conn = None
        self.cons_chan = None
        self.ai_signton = None

    def init_prod_conn(self):
        # create send connection
        self.prod_conn = BlockingConnection(self.params)
        self.prod_chan = self.prod_conn.channel()
        self.prod_chan.queue_declare(queue = self.qname, durable = True)

    def init_cons_conn(self):
        # create receive connection
        self.cons_conn = BlockingConnection(self.params)
        self.cons_chan = self.cons_conn.channel()
        self.cons_chan.basic_qos(prefetch_count = 1)
        self.cons_chan.basic_consume(self.callback, queue = self.qname)

    def produceMessages(self, msg = None):
        try:
            if isinstance(msg, str):
                self.prod_chan.basic_publish(exchange = '',
                                             routing_key = self.qname,
                                             body = msg,
                                             properties = BasicProperties(
                                                 delivery_mode = 2,  # make message persistent
                                             ))
            elif isinstance(msg, Queue):
                while 0 != msg.qsize():
                    item = msg.get()
                    self.prod_chan.basic_publish(exchange = '',
                                                 routing_key = self.qname,
                                                 body = item,
                                                 properties = BasicProperties(
                                                     delivery_mode = 2,  # make message persistent
                                                 ))
            else:
                pass

        except Exception as e:
            if isinstance(e, ConnectionClosed):
                print('Reconnection established!')
                self.init_prod_conn()
                # last connection close, re-produce msg
                self.produceMessages(msg)
            else:
                print('Produce msg exception Occur, please check following error message:')
                print(e)

    def consumeMessages(self):
        try:
            self.cons_chan.start_consuming()
        except Exception as e:
            print('Consume msg exception Occur, please check following error message:')
            print(e)
            if isinstance(e, ConnectionClosed):
                print('Reconnection established!')
                self.init_cons_conn()
                self.consumeMessages()

    def callback(self, ch, method, properties, body):
        # handle message body
        print('callback....')
        print(body)
        try:
            print('Consuming....')
            self.cons_conn.process_data_events()
            # 模擬處理任務時間
            import time
            time.sleep(300)
            # if None == self.ai_signton:
            #     self.ai_signton = AsyncAIRequestManager()
            # self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8')))
            ch.basic_ack(delivery_tag = method.delivery_tag)
            # t = Thread(target = self.ai_signton.syncToDatabase())
            # t.start()

        except Exception as e:
            if isinstance(e, ConnectionClosed):
                raise ConnectionClosed('Connection has been closed, send to reconnection.')
            else:
                print('Current error msg:')
                print(e)

    def close_prod_conn(self):
        if None != self.prod_conn:
            self.prod_conn.close()

    def close_cons_conn(self):
        if None != self.cons_conn:
            self.cons_conn.close()

    def close(self):
        self.close_prod_conn()
        self.close_cons_conn()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM