之前只是用celery, 這次用一下pika
參考rabbitMQ官網的python版,https://www.rabbitmq.com/tutorials/tutorial-one-python.html
沒想到各種坑.
如果說rabbitMQ官網是為了讓新人入門,所以刻意忽略掉細節, 那么必須吐槽pika的官方文檔, 很不好.遠不如celery
1 Stream connection lost: BrokenPipeError(32, 'Broken pipe')
使用pika 的BlockingConnection
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: BrokenPipeError(32, 'Broken pipe')
根據https://www.cnblogs.com/zhaof/p/9774390.html
是要在連接時設置心跳為0,就不會超時自動下線了, 否則RabbitMQ服務器會發過來默認值580
#--------------rabbitMQ------------------ import pika connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost', heartbeat=0, #never exit after start )) channel = connection.channel() channel.queue_declare(queue='update_sql')
這個錯誤在測試消費端時沒測出來,因為測試使用的發布者和官方文檔里一樣,發完就下線退出了. 這樣當然看不出這個心跳問題.
但是聯調時就暴露了. 真無語.
2 content_type
默認的body是二進制的. 然后消費端要
channel.basic_publish('exchange_name', 'routing_key', 'Test Message', pika.BasicProperties(content_type='text/plain', type='example'))
這似乎時可以發文本的嗎?
然后,看見別人還可以這么寫https://blog.csdn.net/fzlulee/article/details/98480724
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
似乎就是html請求頭常見的寫法了? 但是pika里沒有對BasicProperties的詳細文檔,
,源碼里也看不出注釋https://pika.readthedocs.io/en/stable/_modules/pika/spec.html#BasicProperties
3 ack和durable
ack防止消費者出問題, durable防止rabbitMQ服務器本身出問題
所以ack在消費端定義
channel.basic_consume(queue='update_sql', auto_ack=False, on_message_callback=callback)
而durable在channel里隊列聲明里 在 生產端,消費端都要統一聲明隊列
channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False)
引用 https://blog.csdn.net/hlxx55/article/details/80964440
ack
rabbitMQ是默認開啟自動應答的,這樣當rabbitMQ將消息發給消費者,就會從內存中將消息刪除,這樣會帶來一個問題,如果消費者未處理完消息而宕機,那么消息就會丟失。所以,我們將自動應答關閉,當rabbitMQ收到消費者處理完消息的回應后才會從內存中刪除消息。
durable
rabbitMQ默認將消息存儲在內存中,若rabbitMQ宕機,那么所有數據就會丟失,所以在聲明隊列的時候可以聲明將數據持久化,但是如果已經聲明了一個未持久化的隊列,那么不能修改,只能將這個隊列刪除或重新聲明一個持久化數據。
4防止消息積壓
只在消費者這里加上basic_qos就可以了
connection = pika.BlockingConnection( pika.ConnectionParameters( host= self.HOST_RABBITMQ, heartbeat = 0, #never exit after start )) channel = connection.channel() #durable 隊列中消息持久化 #exclusive (bool) – Don’t allow other consumers on the queue #./ exchange 不支持 exclusive channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False) #1次1條消息 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='update_sql', auto_ack=False, #不自動確認 在callback最后確認 等於 no_ack on_message_callback=self.callback) print(' [*] wg-Executor waiting for sql cmds. To exit press CTRL+C') channel.start_consuming()
此外,在消費者的callback函數里,
最好在最外層用 異常處理包裹起來,確保無論執行結果如何,都在finally里執行ack
try:
except:
else:
finally: