Celery實際使用與內存泄漏問題(面試)


1.實際使用

​ 監控task的執行結果:任務id,結果,traceback,children,任務狀態

​ 配置 backend='redis://127.0.0.1:6379/5'給Celery的app對象,直接在redis中查看

​ 還可以

​ 健壯celerycelery -A proj worker -l info

☁  proj  tree
├── __init__.py 
├── celery.py | app=Clery('proj',include=['proj.tasks']) app.config_from_object('proj.config') if __name__==__main__: app.start() ├── config.py | CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6' BROKER_URL = 'redis://127.0.0.1:6379/5' └── tasks.py | @app.task # 注意這個文件名必須是tasks.py def add(x, y): return x + y 

​ tasks可以有多個在celery.py中添加一行代碼加載任務函數

​ app.autodiscover_tasks(['proj.sms', 'proj.email'])

​ Scheduler計划定時任務:celery -A proj worker -B -l info

#config.py CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區 from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', # 指定要執行的函數任務 'schedule': timedelta(seconds=30), # 指定計划時間間隔30s執行一次task 'args': (16, 16) }, }

​ celery.schedules import crontab定時周期任務:(比如每周一執行一次 )

​ 只需要修改 'schedule': crontab(hour=7, minute=30, day_of_week=1),

2.celery擴展使用

​ 指定隊列名:

​ 啟動加上-Q參數 celery -A proj worker --loglevel=info -Q 'testq'

​ 跑任務時 add.delay(3,4,queue='testq')

​ 指定開啟的worker進程數:單個Celery進程每分鍾就可以處理數百萬個任務

​ 底層是調用的Python的multiprocessing模塊中的Pool進程池思想來做

​ 啟動加上-c參數 celery -A proj worker --loglevel=info -c 2 2個worker進程來同時搶任務

​ 圖像化查看broker里面的數據,查看任務狀態,以及任務的詳細信息:flower的webUI

​ pip install flower 注意創建celery實例app時指定的broker設置的redis/5

​ 任意目錄執行 celery flower --port=5555 --broker=redis://localhost:6379/5

3.DJango-celery模式(嵌入到大型DJango項目中)

​ 應用: django調用celery跑異步任務,常見場景有注冊成功,發送郵件可以異步來防止網絡IO阻塞,以及耗時間的任務,可以在WEB應用中使用這種異步方式

  1. 安裝django-celery==3.1.17 與celery==3.1.17對應
  2. 創建celery必須的數據庫表結構 python manage.py migrate
  3. django項目的settings.py文件中追加如下內容:backend,任務執行結果超時時間,worker並發數也就是 -c 指定的數據,指定任務周期存儲在orm數據庫中
  4. 在django的app應用目錄下創建tasks.py任務文件@task def add(x,y):
  5. 開啟django服務和celery服務,雖然耦合了,還要開python manage.py celery worker --loglevel=info

4.內存泄漏問題

celery內存泄露分析

celery配置項如下

CELERYD_CONCURRENCY = 2      celery worker並發數 CELERYD_MAX_TASKS_PER_CHILD = 5   每個worker最大執行任務數

 
執行celery -A ansibleAPI.celery worker啟動celery,通過ps -ef | grep celery可以看到兩個celery worker進程(8226,8228)。

利用celery worker進行某個任務,當worker沒有執行到最大任務時(即銷毀重建),每執行一次任務占用內存必然有所增加,任務數為9,10時(celery均勻調度,並發數*最大任務數),分別有原8228 worker被銷毀,重新創建9386 worker及原8226 worker被銷毀,重新創建9564 worker,此時,運行第9次時,占用總內存有所下降,運行第10次時,總內存回到初如值,同樣任務執行第19、20次情況類似。

celery並發計算規則
celery任務並發只與celery配置項CELERYD_CONCURRENCY 有關,與CELERYD_MAX_TASKS_PER_CHILD沒有關系,即CELERYD_CONCURRENCY=2,只能並發2個worker,此時任務處理較大的文件時,執行兩次可以看到兩個task任務並行執行,而執行第三個任務時,開始排隊,直到兩個worker執行完畢。

結論
celery執行完任務不釋放內存與原worker一直沒有被銷毀有關,因此CELERYD_MAX_TASKS_PER_CHILD可以適當配置小點,而任務並發數與CELERYD_CONCURRENCY配置項有關,每增加一個worker必然增加內存消耗,同時也影響到一個worker何時被銷毀,因為celery是均勻調度任務至每個worker,因此也不宜配置過大,適當配置。

你可能感興趣的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM