一、Celery簡介
Celery 是一個 基於python開發的異步任務隊列/基於分布式消息傳遞的作業隊列,
通過它可以輕松的實現任務的異步處理。它側重於實時操作,但對調度支持也很
好。Celery用於生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,
但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery
建議的消息隊列是RabbitMQ,但提供支持Redis, Beanstalk, MongoDB,
CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易於集
成Django, Pylons 和 Flask,使用 django-celery, celery-pylons and Flask-Celery
附加包即可。它的特點:
- 方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
- 使用功能齊備的管理后台或命令行添加,更新,刪除任務.
- 方便把任務和配置管理相關聯.
- 可選多進程, Eventlet 和 Gevent 三種模型並發執行.
- 提供錯誤處理機制.
- 提供多種任務原語, 方便實現任務分組,拆分,和調用鏈.
- 支持多種消息代理和存儲后端.
- Celery 是語言無關的,它提供了python 等常⻅語言的接口支持
celery官方文檔 : http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
二、Celery的相關概念
celery架構圖
- task 就是任務,包括異步任務和定時任務
- broker 中間⼈,接收生產者發來的消息即Task,將任務存⼊隊列。任務的消
費者是Worker。Celery本身不提供隊列服務,推薦用Redis或RabbitMQ實現
隊列服務。 - worker 執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務並執
行它。 - backend 用於存儲任務的執行結果。Celery支持以不同方式存儲任務的結果,
包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,
Apache Cassandra, IronCache 等。 - beat 定時任務調度器,根據配置定時將任務發送給Broler。
三、應用場景
- 異步調用:那些用戶不關心的但是又存在在我們API里面的操作 我們就可以用
異步調用的方式來優化(發送郵件 或者上傳頭像) - 定時任務:定期去統計日志,數據備份,或者其他的統計任務
四、Celery的安裝
-
安裝
pip install celery pip install celery-with-redis #django-celery-results庫基於 Django ORM實現了結果存儲后端 pip install django-celery-results
-
配置
在settings.py文件里設置
ALLOWED_HOSTS = ['*'] INSTALLED_APPS = ( ... 'celery', 'django_celery_results', #把 django_celery_results 加到 INSTALLED_APPS 中 '自⼰的APP' } BROKER_URL='redis://localhost:6379/5' CELERY_RESULT_BACKEND = 'django-db' CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用 json CELERY_RESULT_SERIALIZER = 'json' # 結果序列化為json
-
創建celery實例
在settings.py的同級目錄下新建celery.py
from __future__ import absolute_import #絕對路徑導⼊ from celery import Celery from django.conf import settings import os #設置系統的環境配置用的是Django的 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "工程名字.settings") #實例化celery app = Celery('mycelery') app.conf.timezone = 'Asia/Shanghai' #指定celery的配置來源 用的是項目的配置文件settings.py app.config_from_object("django.conf:settings") #讓celery 自動去發現我們的任務(task) app.autodiscover_tasks() #你需要在app目錄下 新建一個叫tasks.py(一定不要 寫錯)文件
在settings.py同級目錄下的init.py加⼊
from __future__ import absolute_import from .celery import app as celery_app
五、Celery的使用
1、創建任務
在需要使用異步任務的APP目錄下新建tasks.py
from celery import shared_task
import time
@shared_task
def hello_celery(loop):
for i in range(loop):
print('hello')
time.sleep(2)
2、調用
在views.py內的調用
任務函數名.delay(參數,,,,)
3、生成數據庫表
python manage.py migrate django_celery_results
4、啟動worker
celery -A 你的工程名 worker -l info
- 注意:修改tasks.py的內容后 要重啟celery的服務
5 、獲取任務執行結果
異步任務執行完畢后,會自動觸發信號:
- before_task_publish
- after_task_publish
- task_prerun
- task_postrun
- task_success
- task_failure
- task_revoked
from celery.signals import task_success
@task_success.connect(sender=add)
def task_done_handler(sender=None, result=None):
print(result)
六、定時任務和計划任務
- 定時任務
- 啟動: celery -A 你的工程名稱 beat -l info
在settings.py文件添加 CELERYBEAT_SCHEDULE = { 'schedule-test': { 'task':
'app的名字.tasks.hello_celery', 'schedule': timedelta(seconds=3), 'args': (2,) },
}
- 計划任務時間
#setting.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
"every-ten-second-run-my_task": {
"task": "t07.tasks.my_task",
"schedule": crontab(minute="01", hour="15"),
"args": (2,)
}
}
-
坑:
-
我們啟動定時任務服務時 也要先開啟worker
如果只開啟定時服務 沒有開啟worker服務 那么定時任務會被放⼊任務隊
列,但是由於沒有⼲活⼉的worker 那么任務是不會被執行,當worker服
務被啟動后 會⽴刻去任務隊列領任務並執行
-
-
你的任務一定要確保是可以正常執行的
七、其它
1、查看異步任務情況
Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康
狀態進行監控並以可視化的方式展現,
-
安裝flower:
pip install flower
-
啟動flower(默認會啟動一個webserver,端口為5555):
celery flower --broker=redis://localhost:6379/5
-
即可查看
2、內存泄漏
-
說明
⻓時間運行Celery有可能發生內存泄露,可以像下面這樣設置 -
示例代碼
CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每個worker執行了多少任務就會死 掉
常用配置清單
-
說明
-
配置信息
#from kombu import Queue, Exchange # 設置Broker和backend BROKER_URL = 'redis://127.0.0.1:6379/0' # 將數據存放到redis1數據庫,redis默認有16個數據庫 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用json CELERY_RESULT_SERIALIZER = 'json' # 結果序列化為json CELERY_ACCEPT_CONTENT = ['json'] # 分布式接受數據的類型為json CELERY_TIMEZONE = 'Asia/Shanghai' #使用中國上海時區 CELERY_ENABLE_UTC = True CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 后端存儲任務超過一天, 則自動清理數據,單位為秒 CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每個worker最多執行 1000個任務就會被銷毀,可防⽌內存泄露