Dramatiq
Bogdanp/dramatiq: A fast and reliable background task processing library for Python 3. (github.com)
是一個Python3的任務隊列框架, 比較輕量化, 使用RabbitMQ
或Redis
作為存儲介質, 當時看有2.xk的start就使用了, 結果在使用中發現了幾個問題, 這里記錄一下.
任務"不發送"
測試時, 發現總有些任務會遺留在管道中, work好像不接收, 也沒有執行, 當重啟work后就立刻接受了.
當時使用的存儲介質是redis
, 查看了源代碼后發現問題
因為redis
本身沒有ack
機制, dramatiq
就自己實現了ACK
, 在接受到任務后, 執行任務, 執行完成后放入到另一個xx.ack
管道中同時刪除原有管道的這一條數據.(消費者端實現ACK
)
那么問題就出現了, 首先, 如果一個任務運行需要1小時, 那么在這1小時中, 查看redis
會發現一直在原有管道中, 不知道是否是正在執行.
另外, 如果在這個任務執行中, 因為某些原因導致當前進程崩潰了, 如果沒有來得及將中斷的任務重新放入管道中(dramatiq
雖然有處理措施, 但是畢竟是Python
的多進程方式, 並不是很能保證可靠性), 就會造成這個任務"丟失"
最后決定更換存儲介質為RabbitMQ
, Dramatiq
在使用RabbitMQ
作為broker
時使用的是RabbitMQ
自帶的ACK
機制, 更加的成熟
ACK錯誤
客戶端報錯
Consumer encountered a connection error: (406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out......
更換為RabbitMQ
之后, 發現消費者會報ACK錯誤
, 在RabbitMQ
中體現為生產者獲取了任務后沒有返回ACK
, 在超時后被RabbitMQ
重新放回任務隊列
在上一次排查問題的過程中, 我發現消費者需要等待代碼執行完畢后再ACK
, 這本身也是沒有問題的, 我搭配Dramatiq
自身的超時來設置RabbitMQ
的ACK
超時, 比如這個任務可能會執行2小時, 我設置Dramatiq
的超時為2.5小時, 設置RabbitMQ
的超時為3小時, 這樣讓程序能自己處理, 但是還是出現問題
后來查看源代碼時, 發現, Dramatiq
有一個功能是預讀
, 即每次提前獲取n個任務, 放在內存中, 當某一個任務完成后直接從內存中獲取新任務, 省去網絡io. 那這樣就會出現問題, 因為在獲取到任務后, RabbitMQ
就會認為該任務處於等待ACK
的狀態, 如果每個任務需要1小時, Dramatiq
獲取了兩個任務, 那么第二個任務就會超時, 當第二個任務執行完成后, Dramatiq
進行ACK
時就會被RabbitMQ
攔截
看了代碼后發現這個預讀的數量參數可調, 從環境變量中獲取, 我們設置
export dramatiq_queue_prefetch=1
讓他只獲取一個然后立刻執行這一個即可
心跳超時
修復了ACK
之后, 又發現了一個問題, 隔一段時間RabbitMQ
就會有錯誤輸出
[erro] <0.2398.0> missed heartbeats from client, timeout: 60s
這代表的是客戶端的心跳出現了問題, 當心跳超時后, RabbitMQ
會主動斷開與這個客戶端的連接
查看項目的日志, 沒有發現有任何錯誤或者警告輸出
之前兩個問題都是消費者的錯誤, 於是先排查的消費者, 發現消費者有心跳的維持且正常運行了
又回來看生產者的源代碼, 發現Dramatiq
根本沒有實現生產者的心跳, 但是因為每次生產者發送任務時, 發送任務的代碼寫的是死循環, 連接被RabbitMQ
斷開后,或者因為別的原因沒有被發送, 就再次生成新的連接, 再次發送, 而當發送失敗時, 只打了一個Debug
檔的日志, 可能作者也知道這個問題, 然后就粗暴的使用捕捉錯誤然后重新建立連接的方式去做了😅
如果要解決, 可以設置心跳為0來去除心跳檢測, url連接支持這個參數, 比如
amqp://worker:BEQFRAmC5@127.0.0.1:5672?heartbeat=0
創建broker失效
我們是將FastApi
與Dramatiq
結合使用, 當接受到請求后做處理, 然后通過Dramatiq
發送出去
為了更好的代碼結構, 我們將代碼整合了一下, 結果會出現連接不到RabbitMQ
的問題
Dramatiq
的使用一般是
import dramatiq
@dramatiq.actor(max_retries=0, queue_name="test", actor_name="test")
def scan(work):
pass
建立如上的scan函數, 主要是加入裝飾器dramatiq.actor
調用時是
from . import work
worker.iot.scan.send(send_info)
設置broker是
import dramatiq
rabbitmq_broker = RabbitmqBroker(url=RABBITMQ_URL)
rabbitmq_broker.add_middleware(ConfigMiddleware())
dramatiq.set_broker(rabbitmq_broker)
改動主要是將設置broker的部分放置進了fastapi
的啟動事件, 原先是放置進了work
文件夾的__init__.py
里
剛開始以為是不是broker的設置比較晚了, 導致broker沒有正常生效
后來發現在Dramatiq
中broker是一個Golobal變量, 測試也發現即使在后面set_broker
也可以
繼續深入, 發現問題所在了
因為dramatiq.actor
是作為裝飾器使用, 而Python
的裝飾器內的代碼, 實際上在導入時會執行, 舉個例子
registry = []
def register(func):
print('running register(%s)' % func)
registry.append(func)
return func
@register
def f1():
print('running f1()')
@register
def f2():
print('running f2()')
def f3():
print('running f3()')
def main():
print('running main()')
print('registry ->', registry)
f1()
f2()
f3()
if __name__ == '__main__':
main()
打印結果為
running register(<function f1 at 0x0000027913AA8708>)
running register(<function f2 at 0x0000027913AA8E58>)
running main()
registry -> [<function f1 at 0x0000027913AA8708>, <function f2 at 0x0000027913AA8E58>]
running f1()
running f2()
running f3()
沒錯, 假如有個裝飾器函數 a, 將函數 b 包裹在 a 中, 也就是
def a():
pass
@a
def b():
pass
在Python導入到這個代碼時, 會將被 a 包裹的 b 變成 a(b)
, 會執行函數 a, 生成一個新的函數 a(b)
, 然后每次調用 b 時, 實際上在調用這個新的函數
而Dramatiq
中, 在裝飾器 dramatiq.actor
中的代碼進行了初始化操作, 此時就將全局的broker
生成了客戶端供自己使用, 此時的全局broker
還沒有人為的設置, 變成了默認的127.0.0.1
的RabbitMQ
, 而后運行send
時直接使用此客戶端發送, 因為我們在修改代碼后, 將創建broker
的部分放置在了引用work
之后, 導致了先生成了客戶端, 而后來的自定義broker
沒有正確的被actor
使用, 使用的是本地的RabbitMQ
, 因為本地沒有, 理所當然的就發送失敗了
解決方法是排查項目的運行順序, 在導入work
之前先進行set_broker
操作, 在work
的__init__.py
中
from app.task.broker import setup_broker; setup_broker() # noqa
from . import xx
當導入包時運行__init__.py
, 優先set_broker