Celery
1、什么是Celery
- Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注於實時處理的異步任務隊列,同時也支持任務調度。
- 用Python寫的執行 定時任務和異步任務的框架
執行異步任務:
- 創建任務:tasks.py
- 把任務添加到隊列中:add_task.py
- 開啟work,執行任務
- 用命令:celery -A tasks worker -l info
- 在 Windows下:celery -A tasks worker -l info -P eventlet
- 查看任務結果:task_resut.py
多任務結構:
- 重點:執行work的時候:celery -A tasks worker -l info -P eventlet
2、Celery架構

Celery的架構由三部分組成,消息中間件,任務執行單元和任務執行結果存儲(task result store )組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
3、使用場景
異步任務:將耗時操作任務提交給Celery取異步執行 ,比如發送 短信、郵件、消息推送、音頻/視頻處理等等
定時任務:定時執行某件事情,比如每天數據統計
4、Celery的安裝配置
pip install celery
app=Celery('任務名', backend='xxx',broker='xxx')
5、Celery執行異步任務
基本使用
創建項目:celerytest
創建py文件:task.py
`from celery import Celery
import time
# broker:消息中間人用redis
broker = 'redis://127.0.0.1:6397/1'
# 結果存儲在redis中
backend = 'redis://127.0.0.1:6379/2'
# 第一個參數是別名,可以隨便寫
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(x, y):
time.sleep(3)
return x + y
創建py文件:add_task.py,添加任務
import task
if __name__ == '__main__':
# 之前這樣寫,直接就執行 函數
task.add()
# 現在把函數添加到執行隊列中,參數寫在delay中
# result不是函數的執行結果,他是個對象
result = task.add.delay(2, 3)
# 這個任務唯一的id
print(result.id)
創建py文件:result.py,查看任務執行結果
from celery.result import AsyncResult
from task import app
async = AsyncResult(id='ac2a7e52-ef66-4caa-bffd-81414d869f85', app=app)
if async.successful():
# 任務執行的結果,也就是返回值
result = async.get()
print(result)
# result.forget() #將結果刪除
elif async.failed():
print('執行失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常后正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
執行add_task.py,添加任務,並獲取任務ID
執行命令:celery worker -A celery_app_task -l info -P eventlet
執行result.py檢查任務狀態並獲取結果
6、Celery執行定時任務
設定時間讓celery執行一個任務
add_task.py
from celery_app_task import add
from datetime import datetime
# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
啟動一個beat: celery beat -A celery_task -l info
啟動work執行:celery worker -A celery_task -l info -P eventlet
7、Django中使用Celery
在項目目錄下創建celeryconfig.py
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設置並發worker數量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個worker最多執行100個任務被銷毀,可以防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30
在app01目錄下創建tasks.py
from celery import task
@task
def add(a,b):
with open('a.text', 'a',encoding='utf-8') as f:
f.write('a')
print(a+b)
視圖函數views.py
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')
settings.py
INSTALLED_APPS = [
...
'djcelery',
'app01'
]
...
from django_celery import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
