Celery連接rabbitmq報Connection reset by peer


問題復現

在使用celery + rabbitmq作為broker時,啟動一定時間后 會 由於celery和rabbitmq的心跳檢測機制 認為連接有問題,先報 如下錯誤

Too many heartbeats missed

再過一段時間,由於認為心跳有問題,會斷開tcp連接,就會報 如下錯誤:

ConnectionResetError: [Errno 104] Connection reset by peer

這時 tcp連接被斷開,但是 celery的worker還在繼續處理任務,等worker處理完任務 需要回寫ack告訴rabbitmq此任務已經完成,由於tcp已經斷開連接,就會報如下錯誤(原因是 tcp斷開連接,向一個已經關閉的socket寫入數據時就是這個Broken pipe錯誤)

解決方法

  • 既然心跳機制有問題,就去掉心跳機制, celery配置中 將 broker_heartbeat 值改為0 即可;

解決此問題的限制情景

  • celery連接的broker必須為rabbitmq(不能為redis等等)
  • celery的transport必須是 pyamqp,一般默認就是pyamqp,而不是librabbitmq(只要不安裝librabbitmq庫)

其他

  • 此bug即使在目前最新的celery 5.1.2版本仍然無法解決,只能通過設置參數禁用心跳機制;
  • 經過1周的測試,禁用心跳機制后,celery worker正常消費任務,無任何問題;

heartbeat通常用來檢測通信的對端是否存活(未正常關閉socket連接而異常crash)。其基本原理是檢測對應的socket連接上數據的收發是否正常,如果一段時間內沒有收發數據,則向對端發送一個心跳檢測包,如果一段時間內沒有回應則認為心跳超時,即認為對端可能異常crash了。

rabbitmq也不例外,heatbeat在客戶端和服務端之間用於檢測對端是否正常,即客戶端與服務端之間的tcp鏈接是否正常。

關於rabbitmq心跳

1.heartbeat檢測時間間隔可在配置文件rabbitmq.config中增加配置項{heartbeat,Timeout}進行配置,其中Timeout指定時間間隔,單位為秒,另外客戶端也可以配置heartbeat時間。

如果服務端沒有配置
默認代理心跳時間:
RabbitMQ 3.2.2:580秒
RabbitMQ 3.5.5:60秒

3.心跳每 heartbeat timeout / 2 秒發送一次,服務器兩次沒有接收到則斷開tcp連接,以前的連接將失效,客戶端需要重新連接。
4.如果你使用Java, .NET and Erlang clients,服務器與客戶端會協商heartbeat時間

  • 如果其中一個值為0,則使用兩者中較大的一個
  • 否則,使用兩者中較小的一個
  • 兩個值都為0,則表示要禁用心跳,則服務端與客戶端維持此tcp連接,不會斷開。
  • 注意:在python客戶端上直接設置為0,則禁用心跳。

禁用心跳在python客戶端該如何設置:

  • 在py3:ConnectionParameters設置heartbeat_interval=0即可。
  • 在py2:ConnectionParameters設置heartbeat=0即可。

5.連接上的任何流量(傳輸的有效數據、確認等)都將被計入有效心跳,當然也包括心跳幀。

6.我在網上看到有人問到這個問題:

為什么服務端宕機,在心跳檢測機制下,服務器側斷開連接,而客戶端這邊不能檢測到tcp斷開,我測試過,客戶端確實不能檢測到tcp連接斷開,只有當客戶端在這個tcp有操作后,才能檢測到,當然在一個斷開的tcp連接上做操作會報錯(如發送消息)。

import pika  
import time  

credit = pika.PlainCredentials(username='cloud', password='cloud')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.32.1.12', credentials=credit))
channel = connection.channel()  
while True:
    connect_close = connection.is_closed
    connect_open = connection.is_open
    channel_close = channel.is_closed
    channel_open = channel.is_open
    
    print("connection is_closed ", connect_close)
    print("connection is_open ", connect_open)
    print("channel is_closed ", channel_close)
    print("channel is_open ", channel_open)
    print("")
    time.sleep(5)

7.一些RabbitMQ客戶端(Bunny,Java,.NET,Objective-C,Swift)提供了一種在網絡故障后自動恢復連接的機制,而pika只能通過檢測連接異常后再重新創建連接的方式。

示例代碼:通過檢測連接異常,重新創建連接:

import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue
from retry import retry

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)
    try:
        channel.start_consuming()
    # Don't recover connections closed by server
    except pika.exceptions.ConnectionClosedByBroker:
        pass
consume()

heartbeat的實現

rabbitmq在收到來自客戶端的connection.tune-ok信令后,啟用心跳檢測,rabbitmq會為每個tcp連接創建兩個進程用於心跳檢測,一個進程定時檢測tcp連接上是否有數據發送(這里的發送是指rabbitmq發送數據給客戶端),如果一段時間內沒有數據發送給客戶端,則發送一個心跳包給客戶端,然后循環進行下一次檢測;

另一個進程定時檢測tcp連接上是否有數據的接收,如果一段時間內沒有收到任何數據,則判定為心跳超時,最終會關閉tcp連接。另外,rabbitmq的流量控制機制可能會暫停heartbeat檢測,這里不展開描述。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM