Dramatiq遇到的坑


Dramatiq

Bogdanp/dramatiq: A fast and reliable background task processing library for Python 3. (github.com)

是一個Python3的任務隊列框架, 比較輕量化, 使用RabbitMQRedis作為存儲介質, 當時看有2.xk的start就使用了, 結果在使用中發現了幾個問題, 這里記錄一下.

任務"不發送"

測試時, 發現總有些任務會遺留在管道中, work好像不接收, 也沒有執行, 當重啟work后就立刻接受了.

當時使用的存儲介質是redis, 查看了源代碼后發現問題

因為redis本身沒有ack機制, dramatiq就自己實現了ACK, 在接受到任務后, 執行任務, 執行完成后放入到另一個xx.ack管道中同時刪除原有管道的這一條數據.(消費者端實現ACK)

那么問題就出現了, 首先, 如果一個任務運行需要1小時, 那么在這1小時中, 查看redis會發現一直在原有管道中, 不知道是否是正在執行.

另外, 如果在這個任務執行中, 因為某些原因導致當前進程崩潰了, 如果沒有來得及將中斷的任務重新放入管道中(dramatiq雖然有處理措施, 但是畢竟是Python的多進程方式, 並不是很能保證可靠性), 就會造成這個任務"丟失"

最后決定更換存儲介質RabbitMQ, Dramatiq在使用RabbitMQ作為broker時使用的是RabbitMQ自帶的ACK機制, 更加的成熟

ACK錯誤

客戶端報錯

Consumer encountered a connection error: (406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out......

更換為RabbitMQ之后, 發現消費者會報ACK錯誤, 在RabbitMQ中體現為生產者獲取了任務后沒有返回ACK, 在超時后被RabbitMQ重新放回任務隊列

在上一次排查問題的過程中, 我發現消費者需要等待代碼執行完畢后再ACK, 這本身也是沒有問題的, 我搭配Dramatiq自身的超時來設置RabbitMQACK超時, 比如這個任務可能會執行2小時, 我設置Dramatiq的超時為2.5小時, 設置RabbitMQ的超時為3小時, 這樣讓程序能自己處理, 但是還是出現問題

后來查看源代碼時, 發現, Dramatiq有一個功能是預讀, 即每次提前獲取n個任務, 放在內存中, 當某一個任務完成后直接從內存中獲取新任務, 省去網絡io. 那這樣就會出現問題, 因為在獲取到任務后, RabbitMQ就會認為該任務處於等待ACK的狀態, 如果每個任務需要1小時, Dramatiq獲取了兩個任務, 那么第二個任務就會超時, 當第二個任務執行完成后, Dramatiq進行ACK時就會被RabbitMQ攔截

看了代碼后發現這個預讀的數量參數可調, 從環境變量中獲取, 我們設置

export dramatiq_queue_prefetch=1

讓他只獲取一個然后立刻執行這一個即可

心跳超時

修復了ACK之后, 又發現了一個問題, 隔一段時間RabbitMQ就會有錯誤輸出

[erro] <0.2398.0> missed heartbeats from client, timeout: 60s

這代表的是客戶端的心跳出現了問題, 當心跳超時后, RabbitMQ會主動斷開與這個客戶端的連接

查看項目的日志, 沒有發現有任何錯誤或者警告輸出

之前兩個問題都是消費者的錯誤, 於是先排查的消費者, 發現消費者有心跳的維持且正常運行了

又回來看生產者的源代碼, 發現Dramatiq根本沒有實現生產者的心跳, 但是因為每次生產者發送任務時, 發送任務的代碼寫的是死循環, 連接被RabbitMQ斷開后,或者因為別的原因沒有被發送, 就再次生成新的連接, 再次發送, 而當發送失敗時, 只打了一個Debug檔的日志, 可能作者也知道這個問題, 然后就粗暴的使用捕捉錯誤然后重新建立連接的方式去做了😅

如果要解決, 可以設置心跳為0來去除心跳檢測, url連接支持這個參數, 比如

amqp://worker:BEQFRAmC5@127.0.0.1:5672?heartbeat=0

創建broker失效

我們是將FastApiDramatiq結合使用, 當接受到請求后做處理, 然后通過Dramatiq發送出去

為了更好的代碼結構, 我們將代碼整合了一下, 結果會出現連接不到RabbitMQ的問題

Dramatiq的使用一般是

import dramatiq

@dramatiq.actor(max_retries=0, queue_name="test", actor_name="test")
def scan(work):
  pass

建立如上的scan函數, 主要是加入裝飾器dramatiq.actor

調用時是

from . import work
worker.iot.scan.send(send_info)

設置broker是

import dramatiq

rabbitmq_broker = RabbitmqBroker(url=RABBITMQ_URL)
rabbitmq_broker.add_middleware(ConfigMiddleware())
dramatiq.set_broker(rabbitmq_broker)

改動主要是將設置broker的部分放置進了fastapi的啟動事件, 原先是放置進了work文件夾的__init__.py

剛開始以為是不是broker的設置比較晚了, 導致broker沒有正常生效

后來發現在Dramatiq中broker是一個Golobal變量, 測試也發現即使在后面set_broker也可以

繼續深入, 發現問題所在了

因為dramatiq.actor是作為裝飾器使用, 而Python的裝飾器內的代碼, 實際上在導入時會執行, 舉個例子

registry = []

def register(func):
    print('running register(%s)' % func)
    registry.append(func)
    return func

@register
def f1():
    print('running f1()')

@register
def f2():
    print('running f2()')

def f3():
    print('running f3()')

def main():
    print('running main()')
    print('registry ->', registry)
    f1()
    f2()
    f3()

if __name__ == '__main__':
    main()

打印結果為

running register(<function f1 at 0x0000027913AA8708>)
running register(<function f2 at 0x0000027913AA8E58>)
running main()
registry -> [<function f1 at 0x0000027913AA8708>, <function f2 at 0x0000027913AA8E58>]
running f1()
running f2()
running f3()

沒錯, 假如有個裝飾器函數 a, 將函數 b 包裹在 a 中, 也就是

def a():
  pass

@a
def b():
  pass

在Python導入到這個代碼時, 會將被 a 包裹的 b 變成 a(b), 會執行函數 a, 生成一個新的函數 a(b), 然后每次調用 b 時, 實際上在調用這個新的函數

Dramatiq中, 在裝飾器 dramatiq.actor 中的代碼進行了初始化操作, 此時就將全局的broker生成了客戶端供自己使用, 此時的全局broker還沒有人為的設置, 變成了默認的127.0.0.1RabbitMQ, 而后運行send時直接使用此客戶端發送, 因為我們在修改代碼后, 將創建broker的部分放置在了引用work之后, 導致了先生成了客戶端, 而后來的自定義broker沒有正確的被actor使用, 使用的是本地的RabbitMQ, 因為本地沒有, 理所當然的就發送失敗了

解決方法是排查項目的運行順序, 在導入work之前先進行set_broker操作, 在work__init__.py

from app.task.broker import setup_broker; setup_broker()    # noqa
from . import xx

當導入包時運行__init__.py, 優先set_broker


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM