1.實際使用
監控task的執行結果:任務id,結果,traceback,children,任務狀態
配置 backend='redis://127.0.0.1:6379/5'給Celery的app對象,直接在redis中查看
還可以
健壯celery:celery -A proj worker -l info
☁ proj tree
├── __init__.py
├── celery.py | app=Clery('proj',include=['proj.tasks']) app.config_from_object('proj.config') if __name__==__main__: app.start() ├── config.py | CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6' BROKER_URL = 'redis://127.0.0.1:6379/5' └── tasks.py | @app.task # 注意這個文件名必須是tasks.py def add(x, y): return x + y
tasks可以有多個在celery.py中添加一行代碼加載任務函數
app.autodiscover_tasks(['proj.sms', 'proj.email'])
Scheduler計划定時任務:celery -A proj worker -B -l info
#config.py CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區 from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', # 指定要執行的函數任務 'schedule': timedelta(seconds=30), # 指定計划時間間隔30s執行一次task 'args': (16, 16) }, }
celery.schedules import crontab定時周期任務:(比如每周一執行一次 )
只需要修改 'schedule': crontab(hour=7, minute=30, day_of_week=1),
2.celery擴展使用
指定隊列名:
啟動加上-Q參數 celery -A proj worker --loglevel=info -Q 'testq'
跑任務時 add.delay(3,4,queue='testq')
指定開啟的worker進程數:單個Celery進程每分鍾就可以處理數百萬個任務
底層是調用的Python的multiprocessing模塊中的Pool進程池思想來做
啟動加上-c參數 celery -A proj worker --loglevel=info -c 2 2個worker進程來同時搶任務
圖像化查看broker里面的數據,查看任務狀態,以及任務的詳細信息:flower的webUI
pip install flower 注意創建celery實例app時指定的broker設置的redis/5
任意目錄執行 celery flower --port=5555 --broker=redis://localhost:6379/5
3.DJango-celery模式(嵌入到大型DJango項目中)
應用: django調用celery跑異步任務,常見場景有注冊成功,發送郵件可以異步來防止網絡IO阻塞,以及耗時間的任務,可以在WEB應用中使用這種異步方式
- 安裝
django-celery==3.1.17與celery==3.1.17對應 - 創建celery必須的數據庫表結構
python manage.py migrate - django項目的settings.py文件中追加如下內容:backend,任務執行結果超時時間,worker並發數也就是 -c 指定的數據,指定任務周期存儲在orm數據庫中
- 在django的app應用目錄下創建tasks.py任務文件
@task def add(x,y): - 開啟django服務和celery服務,雖然耦合了,還要開
python manage.py celery worker --loglevel=info
4.內存泄漏問題
celery內存泄露分析
celery配置項如下
CELERYD_CONCURRENCY = 2 celery worker並發數 CELERYD_MAX_TASKS_PER_CHILD = 5 每個worker最大執行任務數
執行celery -A ansibleAPI.celery worker啟動celery,通過ps -ef | grep celery可以看到兩個celery worker進程(8226,8228)。
利用celery worker進行某個任務,當worker沒有執行到最大任務時(即銷毀重建),每執行一次任務占用內存必然有所增加,任務數為9,10時(celery均勻調度,並發數*最大任務數),分別有原8228 worker被銷毀,重新創建9386 worker及原8226 worker被銷毀,重新創建9564 worker,此時,運行第9次時,占用總內存有所下降,運行第10次時,總內存回到初如值,同樣任務執行第19、20次情況類似。
celery並發計算規則
celery任務並發只與celery配置項CELERYD_CONCURRENCY 有關,與CELERYD_MAX_TASKS_PER_CHILD沒有關系,即CELERYD_CONCURRENCY=2,只能並發2個worker,此時任務處理較大的文件時,執行兩次可以看到兩個task任務並行執行,而執行第三個任務時,開始排隊,直到兩個worker執行完畢。
結論
celery執行完任務不釋放內存與原worker一直沒有被銷毀有關,因此CELERYD_MAX_TASKS_PER_CHILD可以適當配置小點,而任務並發數與CELERYD_CONCURRENCY配置項有關,每增加一個worker必然增加內存消耗,同時也影響到一個worker何時被銷毀,因為celery是均勻調度任務至每個worker,因此也不宜配置過大,適當配置。

