django-celery 創建多個broker隊列 異步執行任務時指定隊列


一.這里不再詳細述說 django 框架中如何使用celery, 重點放在如何實現創建多個隊列, 並指定隊列存放異步任務

    筆者使用   django-celery==3.2.2 模塊, 配置項及配置參數, 啟動方式請查看官方文檔.

二.在settings.py配置文件中添加配置, 開啟多隊列(前提是已經配置好啟動celery worker的相關配置項, 建議使用多隊列, 一個隊列對應一個worker, 而不是多個worker對應一個隊列)

from kombu import Exchange, Queue

# 定義celery各個隊列的名稱
CELERY_QUEUES = (
    Queue("import_task", Exchange("import_task"), routing_key="task_a"),
    Queue("normal_task", Exchange("normal_task"), routing_key="task_b")
)

CELERY_ROUTES = {
    "tasks.taskA": {"queue": "import_task", "routing_key": "task_a"},
    "tasks.taskB": {"queue": "normal_task", "routing_key": "task_b"}
}

# 注意: 使用 redis 作為 broker 時, 隊列名稱,Exchenge名稱,queue名稱 必須保持一致

"""
這里只定義了兩個隊列
    import_task: 用來存放需要優先執行的重要任務, 如果隊列仍然存在堵塞的情況, 可以根據更小顆粒度划分出更多的隊列
    normal_task: 用來存放執行級別較低的任務, 該類型的任務可以允許存在較長時間的延遲
"""

三.指定隊列啟動worker

進入manage.py 文件所在目錄下, 執行以下命令, 開啟worker, 因為我使用了django-celery模塊, 所以可以使用manage.py 入口文件進行啟動:    -Q  queue_name   指定隊列名稱

python manage.py celery worker -B -l info -Q import_task
python manage.py celery worker -B -l info -Q normal_task
python manage.py celery worker -B -l info -Q celery

如果需要后台運行, 可在命令的最后加上 "&", 如果使用supervisor進行進程管理, 則不可以加上 "&", docker部署請自行參考docker 官方文檔對 dockerfile 使用方式的說明.

注意: 這里添加了一個使用 celery 隊列的worker, 因為在進行任務發送時, 如果沒有指明隊列, 將默認發送至隊列名稱為celery的隊列中.

四.調用異步任務

from .tasks import send_emailMes_task

send_emailMes_task.apply_async((params_1, params_2), {"params_3_key": params_3_value}, queue="import_task")

這里筆者導入了自定義的需要異步執行的任務: send_emailMes_task

需要注意的是:

使用異步任務對象下的apply_async(), 而不是delay(), 后者無法指定隊列名稱

參數:  (params_1, params_2),  使用這樣的方式傳遞實參, 需要使用*agrs接收

參數:  {"active_token":token},  使用這樣的方式傳遞命名參數, 需要使用**kwagrs接收

參數:  queue,  指定將任務發送至那個隊列

五.完成以上操作以后就可以進行程序的執行了.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM