一般使用celery來做Django的異步消息隊列
先安裝必要的包
pip3 install celery
我的項目目錄結構:
celeryApp.py
1 import celery 2 import os 3 4 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "videoApp.settings") 5 6 import django 7 django.setup() 8 9 app = celery.Celery(main="celery_app", broker='redis://localhost:6379/2') 10 11 app.autodiscover_tasks(['apps.video_manager'])
app.autodiscover_tasks()
接收參數為tasks.py文件所在的包
看一下 autodiscover_tasks的源碼:
看注釋,說的很清楚,如果你的tasks.py文件在 foo.bar.tasks.py, 那么參數傳遞 foo.bar,該方法接受列表,會將列表里的所有tasks.py的任務都注冊
任務函數:
apps/video_manager/tasks.py
1 from videoApp.celeryApp import app 2 import time 3 4 @app.task(name="fir_task") 5 def my_task(): 6 with open('yeah.txt', 'w') as f: 7 f.write(str(time.time())) 8 return "success"
現在在項目根目錄執行命令,開啟celery
celery -A videoApp.celeryApp worker -l info
控制台會顯示info信息,注意看:
如果tasks里的函數出現在了這里,說明任務注冊成功了,否則注冊失敗,如果注冊失敗,在進行調用的時候,會報錯,任務未注冊
任務里的return,return出去的數據僅僅是作為日志顯示用的,目前未發現有別的用處。
需要補充的是,任務一旦注冊,代碼就會寫到內存里,這時候即便更新了任務代碼再調用,也是執行的舊代碼,需要重啟celery才會再次更新新代碼。
任務調用方式:
1 from apps.video_manager.tasks import my_task 2 3 my_task.delay(*args, **kwargs)
delay的參數就是任務接受的參數,如果注冊的任務接受參數,那么調用時將參數傳遞給delay即可。