django之celery的使用


一、Celery簡介

Celery 是一個 基於python開發的異步任務隊列/基於分布式消息傳遞的作業隊列,
通過它可以輕松的實現任務的異步處理。它側重於實時操作,但對調度支持也很
好。Celery用於生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,
但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery
建議的消息隊列是RabbitMQ,但提供支持Redis, Beanstalk, MongoDB,
CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易於集
成Django, Pylons 和 Flask,使用 django-celery, celery-pylons and Flask-Celery
附加包即可。它的特點:

  • 方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
  • 使用功能齊備的管理后台或命令行添加,更新,刪除任務.
  • 方便把任務和配置管理相關聯.
  • 可選多進程, Eventlet 和 Gevent 三種模型並發執行.
  • 提供錯誤處理機制.
  • 提供多種任務原語, 方便實現任務分組,拆分,和調用鏈.
  • 支持多種消息代理和存儲后端.
  • Celery 是語言無關的,它提供了python 等常⻅語言的接口支持

celery官方文檔 : http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps

二、Celery的相關概念

celery架構圖

  • task 就是任務,包括異步任務和定時任務
  • broker 中間⼈,接收生產者發來的消息即Task,將任務存⼊隊列。任務的消
    費者是Worker。Celery本身不提供隊列服務,推薦用Redis或RabbitMQ實現
    隊列服務。
  • worker 執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務並執
    行它。
  • backend 用於存儲任務的執行結果。Celery支持以不同方式存儲任務的結果,
    包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,
    Apache Cassandra, IronCache 等。
  • beat 定時任務調度器,根據配置定時將任務發送給Broler。

三、應用場景

  • 異步調用:那些用戶不關心的但是又存在在我們API里面的操作 我們就可以用
    異步調用的方式來優化(發送郵件 或者上傳頭像)
  • 定時任務:定期去統計日志,數據備份,或者其他的統計任務

四、Celery的安裝

  • 安裝

    pip install celery
    pip install celery-with-redis
    #django-celery-results庫基於 Django ORM實現了結果存儲后端
    pip install django-celery-results
    
  • 配置

    在settings.py文件里設置

    ALLOWED_HOSTS = ['*']
    INSTALLED_APPS = (
     
     ...
     
     'celery',
     
     'django_celery_results', #把 django_celery_results 加到
    INSTALLED_APPS 中
     '自⼰的APP'
     
    }
    BROKER_URL='redis://localhost:6379/5'
    CELERY_RESULT_BACKEND = 'django-db'
    CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用
    json
    CELERY_RESULT_SERIALIZER = 'json' # 結果序列化為json
    
  • 創建celery實例

    在settings.py的同級目錄下新建celery.py

    from __future__ import absolute_import #絕對路徑導⼊
    from celery import Celery
    from django.conf import settings
    import os
    #設置系統的環境配置用的是Django的
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "工程名字.settings")
    #實例化celery
    app = Celery('mycelery')
    app.conf.timezone = 'Asia/Shanghai'
    #指定celery的配置來源 用的是項目的配置文件settings.py
    app.config_from_object("django.conf:settings")
    #讓celery 自動去發現我們的任務(task)
    app.autodiscover_tasks() #你需要在app目錄下 新建一個叫tasks.py(一定不要
    寫錯)文件
    

    在settings.py同級目錄下的init.py加⼊

    from __future__ import absolute_import
    from .celery import app as celery_app
    

五、Celery的使用

1、創建任務

在需要使用異步任務的APP目錄下新建tasks.py

from celery import shared_task
import time
@shared_task
def hello_celery(loop): 
	for i in range(loop): 
		print('hello')
		time.sleep(2)

2、調用

在views.py內的調用

任務函數名.delay(參數,,,,)

3、生成數據庫表

python manage.py migrate django_celery_results

4、啟動worker

celery -A 你的工程名 worker -l info
  • 注意:修改tasks.py的內容后 要重啟celery的服務

5 、獲取任務執行結果

異步任務執行完畢后,會自動觸發信號:

  • before_task_publish
  • after_task_publish
  • task_prerun
  • task_postrun
  • task_success
  • task_failure
  • task_revoked
from celery.signals import task_success
@task_success.connect(sender=add)
def task_done_handler(sender=None, result=None):
 print(result)

六、定時任務和計划任務

  • 定時任務
    • 啟動: celery -A 你的工程名稱 beat -l info

在settings.py文件添加 CELERYBEAT_SCHEDULE = { 'schedule-test': { 'task':
'app的名字.tasks.hello_celery', 'schedule': timedelta(seconds=3), 'args': (2,) },
}

  • 計划任務時間
#setting.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
     "every-ten-second-run-my_task": {
     "task": "t07.tasks.my_task",
     "schedule": crontab(minute="01", hour="15"),
     "args": (2,)
 	}
}
  • 坑:

    • 我們啟動定時任務服務時 也要先開啟worker

      如果只開啟定時服務 沒有開啟worker服務 那么定時任務會被放⼊任務隊
      列,但是由於沒有⼲活⼉的worker 那么任務是不會被執行,當worker服
      務被啟動后 會⽴刻去任務隊列領任務並執行

  • 你的任務一定要確保是可以正常執行的

七、其它

1、查看異步任務情況

Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康
狀態進行監控並以可視化的方式展現,

  1. 安裝flower:

    pip install flower
    
  2. 啟動flower(默認會啟動一個webserver,端口為5555):

    celery flower --broker=redis://localhost:6379/5
    
  3. 即可查看

    http://localhost:5555

2、內存泄漏

  1. 說明
    ⻓時間運行Celery有可能發生內存泄露,可以像下面這樣設置

  2. 示例代碼

    CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每個worker執行了多少任務就會死
    掉
    

常用配置清單

  1. 說明

  2. 配置信息

    #from kombu import Queue, Exchange
    # 設置Broker和backend
    BROKER_URL = 'redis://127.0.0.1:6379/0' 
    # 將數據存放到redis1數據庫,redis默認有16個數據庫
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用json
    CELERY_RESULT_SERIALIZER = 'json' # 結果序列化為json
    CELERY_ACCEPT_CONTENT = ['json'] # 分布式接受數據的類型為json
    CELERY_TIMEZONE = 'Asia/Shanghai' #使用中國上海時區
    CELERY_ENABLE_UTC = True
     
     
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 后端存儲任務超過一天,
    則自動清理數據,單位為秒
    CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每個worker最多執行
    1000個任務就會被銷毀,可防⽌內存泄露
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM