一、應用背景
今天做一個需求,要將RabbitMQ中的任務取出並執行,為防止任務執行期間出錯,設置NO_ACK=FALSE標志,這樣、一旦任務沒有應答的話,相應的任務就會被RabbitMQ自動Re-Queue,避免丟失任務。然而、由於任務執行時間較長,通常需要五、六分鍾,甚至更長;我們都知道一旦一個任務被取出執行,該任務就從Ready狀態更改成Unacked狀態。如圖所示:
當這個任務執行完之后,程序將向RabbitMQ發送ACK消息確認,RabbitMQ在收到ACK消息后,會將該任務移出隊列;然而、問題出在任務尚未執行完畢【執行時間太久】,RabbitMQ再等了一段時間【大約兩三分鍾】后,一直沒有收到ACK確認消息,就將該任務自動Re-Queue了【我是一個生產者,一個消費者模式】,也就是說、我們這里發生了死循環【任務永遠也執行不完,因為會一直Re-Queue】。
二、延長RabbitMQ ACK應答時間
到這里,我們急需解決的問題就是,怎么能設置RabbitMQ延長等待ACK的時間,百度一下、兩下,各種讀網絡文檔,研究操作RabbitMQ工作的文檔,查了一圈資料也沒查出怎么延長RabbitMQ ACK時間【廢柴啊】。至此、一直查不出來,就想問一下網友的你,你知道怎么延長RabbitMQ接受ACK應答時間么?
三、改變解決問題方式
在查不出如何延長ACK應答時間后,我將注意力轉向如何檢測當前任務操作超時的,后來在官網看到這么一段話:
鏈接官網位置:http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout
后面、就簡單測試下將heartbeat參數設置為0,以禁用心跳檢測,這樣基本解決了我的問題;雖然官方不建議這么做,但也是一種解決思路,如果大家有什么更好的解決辦法,煩請在下面留言【先謝謝啦】。
至此、這個問題基本闡述清楚了,如果有遇到的小伙伴,也請參考下上面的操作。
測試代碼:
# import json # from concurrent.futures import ThreadPoolExecutor from queue import Queue # from threading import Thread from pika import BasicProperties, BlockingConnection, URLParameters from pika.exceptions import ConnectionClosed # from automation.aiclient.aiclient import AsyncAIRequestManager class RabbitMQManager: def __init__(self, host = 'localhost', qname = 'queue'): self.params = URLParameters(host) self.qname = qname self.prod_conn = None self.prod_chan = None self.cons_conn = None self.cons_chan = None self.ai_signton = None def init_prod_conn(self): # create send connection self.prod_conn = BlockingConnection(self.params) self.prod_chan = self.prod_conn.channel() self.prod_chan.queue_declare(queue = self.qname, durable = True) def init_cons_conn(self): # create receive connection self.cons_conn = BlockingConnection(self.params) self.cons_chan = self.cons_conn.channel() self.cons_chan.basic_qos(prefetch_count = 1) self.cons_chan.basic_consume(self.callback, queue = self.qname) def produceMessages(self, msg = None): try: if isinstance(msg, str): self.prod_chan.basic_publish(exchange = '', routing_key = self.qname, body = msg, properties = BasicProperties( delivery_mode = 2, # make message persistent )) elif isinstance(msg, Queue): while 0 != msg.qsize(): item = msg.get() self.prod_chan.basic_publish(exchange = '', routing_key = self.qname, body = item, properties = BasicProperties( delivery_mode = 2, # make message persistent )) else: pass except Exception as e: if isinstance(e, ConnectionClosed): print('Reconnection established!') self.init_prod_conn() # last connection close, re-produce msg self.produceMessages(msg) else: print('Produce msg exception Occur, please check following error message:') print(e) def consumeMessages(self): try: self.cons_chan.start_consuming() except Exception as e: print('Consume msg exception Occur, please check following error message:') print(e) if isinstance(e, ConnectionClosed): print('Reconnection established!') self.init_cons_conn() self.consumeMessages() def callback(self, ch, method, properties, body): # handle message body print('callback....') print(body) try: print('Consuming....') self.cons_conn.process_data_events() # 模擬處理任務時間 import time time.sleep(300) # if None == self.ai_signton: # self.ai_signton = AsyncAIRequestManager() # self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8'))) ch.basic_ack(delivery_tag = method.delivery_tag) # t = Thread(target = self.ai_signton.syncToDatabase()) # t.start() except Exception as e: if isinstance(e, ConnectionClosed): raise ConnectionClosed('Connection has been closed, send to reconnection.') else: print('Current error msg:') print(e) def close_prod_conn(self): if None != self.prod_conn: self.prod_conn.close() def close_cons_conn(self): if None != self.cons_conn: self.cons_conn.close() def close(self): self.close_prod_conn() self.close_cons_conn()