一、應用背景
今天做一個需求,要將RabbitMQ中的任務取出並執行,為防止任務執行期間出錯,設置NO_ACK=FALSE標志,這樣、一旦任務沒有應答的話,相應的任務就會被RabbitMQ自動Re-Queue,避免丟失任務。然而、由於任務執行時間較長,通常需要五、六分鍾,甚至更長;我們都知道一旦一個任務被取出執行,該任務就從Ready狀態更改成Unacked狀態。如圖所示:

當這個任務執行完之后,程序將向RabbitMQ發送ACK消息確認,RabbitMQ在收到ACK消息后,會將該任務移出隊列;然而、問題出在任務尚未執行完畢【執行時間太久】,RabbitMQ再等了一段時間【大約兩三分鍾】后,一直沒有收到ACK確認消息,就將該任務自動Re-Queue了【我是一個生產者,一個消費者模式】,也就是說、我們這里發生了死循環【任務永遠也執行不完,因為會一直Re-Queue】。
二、延長RabbitMQ ACK應答時間
到這里,我們急需解決的問題就是,怎么能設置RabbitMQ延長等待ACK的時間,百度一下、兩下,各種讀網絡文檔,研究操作RabbitMQ工作的文檔,查了一圈資料也沒查出怎么延長RabbitMQ ACK時間【廢柴啊】。至此、一直查不出來,就想問一下網友的你,你知道怎么延長RabbitMQ接受ACK應答時間么?
三、改變解決問題方式
在查不出如何延長ACK應答時間后,我將注意力轉向如何檢測當前任務操作超時的,后來在官網看到這么一段話:
鏈接官網位置:http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout

后面、就簡單測試下將heartbeat參數設置為0,以禁用心跳檢測,這樣基本解決了我的問題;雖然官方不建議這么做,但也是一種解決思路,如果大家有什么更好的解決辦法,煩請在下面留言【先謝謝啦】。
至此、這個問題基本闡述清楚了,如果有遇到的小伙伴,也請參考下上面的操作。
測試代碼:

# import json
# from concurrent.futures import ThreadPoolExecutor
from queue import Queue
# from threading import Thread
from pika import BasicProperties, BlockingConnection, URLParameters
from pika.exceptions import ConnectionClosed
# from automation.aiclient.aiclient import AsyncAIRequestManager
class RabbitMQManager:
def __init__(self, host = 'localhost', qname = 'queue'):
self.params = URLParameters(host)
self.qname = qname
self.prod_conn = None
self.prod_chan = None
self.cons_conn = None
self.cons_chan = None
self.ai_signton = None
def init_prod_conn(self):
# create send connection
self.prod_conn = BlockingConnection(self.params)
self.prod_chan = self.prod_conn.channel()
self.prod_chan.queue_declare(queue = self.qname, durable = True)
def init_cons_conn(self):
# create receive connection
self.cons_conn = BlockingConnection(self.params)
self.cons_chan = self.cons_conn.channel()
self.cons_chan.basic_qos(prefetch_count = 1)
self.cons_chan.basic_consume(self.callback, queue = self.qname)
def produceMessages(self, msg = None):
try:
if isinstance(msg, str):
self.prod_chan.basic_publish(exchange = '',
routing_key = self.qname,
body = msg,
properties = BasicProperties(
delivery_mode = 2, # make message persistent
))
elif isinstance(msg, Queue):
while 0 != msg.qsize():
item = msg.get()
self.prod_chan.basic_publish(exchange = '',
routing_key = self.qname,
body = item,
properties = BasicProperties(
delivery_mode = 2, # make message persistent
))
else:
pass
except Exception as e:
if isinstance(e, ConnectionClosed):
print('Reconnection established!')
self.init_prod_conn()
# last connection close, re-produce msg
self.produceMessages(msg)
else:
print('Produce msg exception Occur, please check following error message:')
print(e)
def consumeMessages(self):
try:
self.cons_chan.start_consuming()
except Exception as e:
print('Consume msg exception Occur, please check following error message:')
print(e)
if isinstance(e, ConnectionClosed):
print('Reconnection established!')
self.init_cons_conn()
self.consumeMessages()
def callback(self, ch, method, properties, body):
# handle message body
print('callback....')
print(body)
try:
print('Consuming....')
self.cons_conn.process_data_events()
# 模擬處理任務時間
import time
time.sleep(300)
# if None == self.ai_signton:
# self.ai_signton = AsyncAIRequestManager()
# self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8')))
ch.basic_ack(delivery_tag = method.delivery_tag)
# t = Thread(target = self.ai_signton.syncToDatabase())
# t.start()
except Exception as e:
if isinstance(e, ConnectionClosed):
raise ConnectionClosed('Connection has been closed, send to reconnection.')
else:
print('Current error msg:')
print(e)
def close_prod_conn(self):
if None != self.prod_conn:
self.prod_conn.close()
def close_cons_conn(self):
if None != self.cons_conn:
self.cons_conn.close()
def close(self):
self.close_prod_conn()
self.close_cons_conn()
