pika 與 rabbitMQ 阻塞連接


 

之前只是用celery, 這次用一下pika

參考rabbitMQ官網的python版,https://www.rabbitmq.com/tutorials/tutorial-one-python.html

沒想到各種坑.

如果說rabbitMQ官網是為了讓新人入門,所以刻意忽略掉細節, 那么必須吐槽pika的官方文檔, 很不好.遠不如celery

 

1 Stream connection lost: BrokenPipeError(32, 'Broken pipe')

使用pika 的BlockingConnection

但啟動后不久, 作為publish的生產端就會掉線:

raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: BrokenPipeError(32, 'Broken pipe')

根據https://www.cnblogs.com/zhaof/p/9774390.html

是要在連接時設置心跳為0,就不會超時自動下線了, 否則RabbitMQ服務器會發過來默認值580

#--------------rabbitMQ------------------
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='localhost',
    heartbeat=0, #never exit after start
    ))

channel = connection.channel()


channel.queue_declare(queue='update_sql')

 

這個錯誤在測試消費端時沒測出來,因為測試使用的發布者和官方文檔里一樣,發完就下線退出了. 這樣當然看不出這個心跳問題.

但是聯調時就暴露了. 真無語.

 

2  content_type

默認的body是二進制的. 然后消費端要

body.decode('utf-8')
 
結果忽然發現 官方代碼示例里這么寫
    channel.basic_publish('exchange_name',
                          'routing_key',
                          'Test Message',
                          pika.BasicProperties(content_type='text/plain',
                                               type='example'))

這似乎時可以發文本的嗎?

然后,看見別人還可以這么寫https://blog.csdn.net/fzlulee/article/details/98480724

        self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))

 

似乎就是html請求頭常見的寫法了? 但是pika里沒有對BasicProperties的詳細文檔,

,源碼里也看不出注釋https://pika.readthedocs.io/en/stable/_modules/pika/spec.html#BasicProperties

 

 

3 ack和durable

ack防止消費者出問題, durable防止rabbitMQ服務器本身出問題

所以ack在消費端定義

    channel.basic_consume(queue='update_sql',
                        auto_ack=False,
                        on_message_callback=callback)

 

而durable在channel里隊列聲明里 在 生產端,消費端都要統一聲明隊列

channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False)

 

引用 https://blog.csdn.net/hlxx55/article/details/80964440

ack

rabbitMQ是默認開啟自動應答的,這樣當rabbitMQ將消息發給消費者,就會從內存中將消息刪除,這樣會帶來一個問題,如果消費者未處理完消息而宕機,那么消息就會丟失。所以,我們將自動應答關閉,當rabbitMQ收到消費者處理完消息的回應后才會從內存中刪除消息。

 

durable

rabbitMQ默認將消息存儲在內存中,若rabbitMQ宕機,那么所有數據就會丟失,所以在聲明隊列的時候可以聲明將數據持久化,但是如果已經聲明了一個未持久化的隊列,那么不能修改,只能將這個隊列刪除或重新聲明一個持久化數據。

4防止消息積壓 

只在消費者這里加上basic_qos就可以了

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
            host= self.HOST_RABBITMQ,
            heartbeat = 0, #never exit after start
            ))
        channel = connection.channel()
        #durable 隊列中消息持久化
        #exclusive (bool) – Don’t allow other consumers on the queue
        #./ exchange 不支持 exclusive
        channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False)
        #1次1條消息
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(queue='update_sql',
                            auto_ack=False, #不自動確認 在callback最后確認 等於 no_ack
                            on_message_callback=self.callback)



        print(' [*] wg-Executor waiting for sql cmds. To exit press CTRL+C')
        channel.start_consuming()

 此外,在消費者的callback函數里,   

最好在最外層用 異常處理包裹起來,確保無論執行結果如何,都在finally里執行ack

try:

except:

else:

finally:

      #不論當前消息是否成功,都表示消息確實處理完了 手動確認 否則沒有ack不再發送新消息 保證確實被處理了再確認
      ch.basic_ack(delivery_tag = method.delivery_tag)

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM