簡介
Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統
專注於實時處理的異步任務隊列,同時也支持任務調度
架構圖
......
Celery使用場景
異步任務:將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等
定時任務:定時執行某件事情,比如每天數據統計
安裝
pip install eventlet pip install celery 消息中間件:RabbitMQ/Redis app=Celery('任務名',backend='xxx',broker='xxx')
Celery單任務異步提交使用

創建一個任意項目
在項目里創建一個 celery_app_task.py 文件 (相當於雇工人干活)
import celery import time # 消息隊列存儲在redis中,連接redis # broker='redis://127.0.0.1:6379/2' 不加密碼 # backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式 backend = 'redis://27.0.0.1:6379/1' # 消息存儲redis文件1 broker = 'redis://127.0.0.1:6379/2' # 結果存儲redis文件2 # 第一個參數,給celery命個名,可以任意命名 cel = celery.Celery('test', backend=backend, broker=broker) # 往下寫任務(發郵件等一系列任務) @cel.task # 生成任務,以后需要提交的任務寫這里 def add(x, y): return x + y
創建一個 add_task.py 文件
from Django項目.day32.celerytest.celery_app_task import add result = add.delay(4, 5) # celery異步提交任務到消息中間件里 print(result.id)
創建一個 result.py 文件 (查看結果)
from celery.result import AsyncResult from Django項目.day32.celerytest.celery_app_task import cel async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) # 注意 id 參數的值為add_task.py執行的結果 if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
celery提交單任務異步執行順序
1. 先執行 add.py 文件 2. 在執行 linux系統下:celery worker -A celery_app_task -l info windows系統下:celery worker -A celery_app_task -l info -P eventlet 3. 最后執行 result.py # 查看結果
Celery多任務異步提交使用
多任務結構圖

pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery連接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 所有任務函數 │ └── tasks2.py # 所有任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務
按照結構圖創建文件夾celery_task
在celery_task里面創建 celery.py(必須取名為celery) 文件,(雇多個工人)子路徑
import celery import time # 消息隊列存儲在redis中,連接redis # broker='redis://127.0.0.1:6379/2' 不加密碼 # backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式 # 消息存儲redis文件1 backend = 'redis://127.0.0.1:6379/1' # 結果存儲redis文件2 broker = 'redis://127.0.0.1:6379/2' # 第一個參數,給celery命個名,可以任意命名 cel = celery.Celery('test', backend=backend, broker=broker, include=['celery_task.task1', 'celery_task.task2']) # 異步執行多個任務(雇了多個工人) # 處理時區問題 # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
雇佣工作工人,linux系統下:celery worker -A celery_app_task -l info
雇佣工作工人,windows系統下:celery worker -A celery_app_task -l info -P eventlet
創建task1.py文件,子路徑
from celery_task.celery import cel @cel.task # 生成task1的任務 def add(x, y): return x + y
創建task2.py文件,子路經
from celery_task.celery import cel @cel.task # 生成task2的任務 def add(x, y): return x - y
創建send_task.py文件(Celery的根路徑下),添加到消息中間件里
from celery_task import task1,task2 result1 = task1.add.delay(4, 5) # celery異步提普通交任務1到消息中間件里 print(result1.id) result2 = task2.add.delay(4, 5) # celery異步提交普通任務2到消息中間件里 print(result2.id)
創建check_result.py文件(Celery的根路徑下),拿到任務執行結果
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="cbe2cb11-259a-4877-850d-8e736dd76eaf", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
celery提交多任務異步執行順序
1.先執行 send_task.py 文件 # 一次性提交多個任務 2.在執行 linux系統下:celery worker -A celery_app_task -l info windows系統下:celery worker -A celery_app_task -l info -P eventlet 3.最后執行 check_result.py # 查看結果
Celery執行定時任務
Celery架構圖

創建send_task.py文件,跟路徑,提交定時任務
from celery_task import task1, task2 from datetime import datetime # 方法一 # 提交定時任務,設置定時任務執行時間 # v1 = datetime(2019, 4, 20, 16, 50, 50) # 參數為年月日時分秒 # print(v1)# 轉成utc時間 # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = task1.add.apply_async(args=[1, 3], eta=v2) # 提交定時任務 # print(result.id) # 方法二(推薦) ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) # seconds往后延遲多少秒的時間片段
task_time = utc_ctime + time_delay # 使用apply_async並設定時間,
result = task1.add.apply_async(args=[4, 3], eta=task_time) # 提交定時任務,eta指定執行時間
print(result.id)
創建celery_task文件夾
在celery_task文件夾里,創建celery.py文件,子路徑,雇佣處理定時任務的工人
from celery import Celery import time from datetime import timedelta # from celery.schedules import crontab # 消息隊列存儲在redis中,連接redis # broker='redis://127.0.0.1:6379/2' 不加密碼 # backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式 backend = 'redis://127.0.0.1:6379/1' # 消息存儲redis文件1 broker = 'redis://127.0.0.1:6379/2' # 結果存儲redis文件2 # 第一個參數,給celery命個名,可以任意命名 cel = Celery('test', backend=backend, broker=broker,
# 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類
include=['celery_task.tasks1','celery_task.tasks2'])
# 處理時區問題 # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
# 雇佣工人幫我提交任務到消息隊列里 cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.task1.add', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), # 每分鍾執行一次,也可以每小時等 'schedule': timedelta(seconds=2), 'args': ('test',) # 給執行函數傳遞參數 }
# 每年每月等提交任務到消息隊列里 # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # # 每年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 每年4月11日8點42分提交任務到消息隊列里 # 'schedule': crontab(minute=42, hour=8), # 每天8點42分提交任務到消息隊列里 # 'args': (16, 16) # }, }
雇佣提交定時任務到消息隊列里命令:celery beat -A celery_task -l info
雇佣工作工人,linux系統下:celery worker -A celery_app_task -l info
雇佣工作工人windows系統下:celery worker -A celery_app_task -l info -P eventlet
創建task2.py文件,子路經,生成任務2
from celery_task.celery import cel @cel.task # 生成task2的任務 def add(x, y): return x + y
創建task1.py文件,子路經,生成任務1
from celery_task.celery import cel @cel.task # 生成task1的任務 def add(x, y): return x - y
創建check_result.py文件,跟路徑,查看定時任務執行結果
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="e6742e1f-a6a4-43b8-915a-a221e33db756", app=cel) # id為提交任務時給我的id,send_task.py里的result if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
