celery介紹
Celery
是由Python
開發、簡單、靈活、可靠的分布式任務隊列,是一個處理異步任務的框架,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery
側重於實時操作,但對調度支持也很好,其每天可以處理數以百萬計的任務。特點:
- 簡單:熟悉
celery
的工作流程后,配置使用簡單 - 高可用:當任務執行失敗或執行過程中發生連接中斷,
celery
會自動嘗試重新執行任務 - 快速:一個單進程的
celery
每分鍾可處理上百萬個任務 - 靈活:幾乎
celery
的各個組件都可以被擴展及自定制
Celery由三部分構成:
- 消息中間件(Broker):官方提供了很多備選方案,支持
RabbitMQ
、Redis
、Amazon SQS
、MongoDB
、Memcached
等,官方推薦RabbitMQ
- 任務執行單元(Worker):任務執行單元,負責從消息隊列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分布式的核心
- 結果存儲(Backend):官方提供了諸多的存儲方式支持:
RabbitMQ
、Redis
、Memcached
,SQLAlchemy
,Django ORM
、Apache Cassandra
、Elasticsearch
等
工作原理:
- 任務模塊
Task
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往消息隊列,而定時任務由Celery Beat
進程周期性地將任務發往消息隊列; - 任務執行單元
Worker
實時監視消息隊列獲取隊列中的任務執行; Woker
執行完任務后將結果保存在Backend
中;
django應用Celery
django
框架請求/響應的過程是同步的,框架本身無法實現異步響應。但是我們在項目過程中會經常會遇到一些耗時的任務, 比如:發送郵件、發送短信、大數據統計等等,這些操作耗時長,同步執行對用戶體驗非常不友好,那么在這種情況下就需要實現異步執行。異步執行前端一般使用ajax
,后端使用Celery
。
項目應用
django
項目應用celery
,主要有兩種任務方式,一是異步任務(發布者任務),一般是web請求,二是定時任務
異步任務redis
1.安裝celery
pip3 install celery
2.celery.py
在主項目目錄下,新建 celery.py
文件:
import os
import django
from celery import Celery
from django.conf import settings
# 設置系統環境變量,安裝django,必須設置,否則在啟動celery時會報錯
# celery_study 是當前項目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings.py')
django.setup()
app = Celery('celery_demo')
app.config_from_object('django.conf.settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
注意:是和settings.py
文件同目錄,一定不能建立在項目根目錄,不然會引起celery
這個模塊名的命名沖突
同時,在主項目的init.py
中,添加如下代碼:
from .celery import celery_app
__all__ = ['celery_app']
3.settings.py
在配置文件中配置對應的redis配置:
# Broker配置,使用Redis作為消息中間件
BROKER_URL = 'redis://127.0.0.1:6379/0'
# BACKEND配置,這里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 結果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任務結果過期時間,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 時區配置
CELERY_TIMEZONE='Asia/Shanghai'
更加詳細的配置可查看官方文檔:http://docs.celeryproject.org/en/latest/userguide/configuration.html
4.tasks.py
在子應用下建立各自對應的任務文件tasks.py
(必須是tasks.py這個名字,不允許修改)
from celery import shared_task
@shared_task
def add(x, y):
return x + y
5.調用任務
在 views.py
中,通過 delay
方法調用任務,並且返回任務對應的 task_id
,這個id用於后續查詢任務狀態
from celery_app.tasks import add
def index(request):
ar = add.delay(10, 6)
return HttpResponse(f'已經執行celery的add任務調用,task_id:{ar.id}')
6.啟動celery
在命令窗口中,切換到項目根目錄下,執行以下命令:
celery worker -A celery_demo -l info
- -A celery_demo:指定項目app
- worker: 表明這是一個任務執行單元
- -l info:指定日志輸出級別
輸出以下結果,代表啟動celery
成功
更多celery
命令的參數,可以輸入celery --help
7.獲取任務結果
在 views.py
中,通過AsyncResult.get()
獲取結果
def get_result(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.ready():
return JsonResponse({"status": ar.state, "result": ar.get()})
else:
return JsonResponse({"status": ar.state, "result": ""})
AsyncResult
類的常用的屬性和方法:
- state: 返回任務狀態,等同
status
; - task_id: 返回
任務id
; - result: 返回任務結果,同
get()
方法; - ready(): 判斷任務是否執行以及有結果,有結果為
True
,否則False
; - info(): 獲取任務信息,默認為結果;
- wait(t): 等待t秒后獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則
wait
期間一直阻塞,直到超時報錯; - successful(): 判斷任務是否成功,成功為
True
,否則為False
;
代碼的准備工作都做完了,我們開始訪問瀏覽器127.0.0.1/celery_app/
,得到以下結果
已經執行celery的add任務調用,task_id:b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb
這一步的作用:啟動add
任務,然后放在消息中間件中,這里我們用的是redis
,就可以通過redis工具查看,如下
然后我們之前啟動的celery
的worker
進程會獲取任務列表,逐個執行任務,執行結束后會保存到backend中,最后通過前端ajax
輪詢一個接口,根據task_id
提取任務的結果
接下來我們訪問http://127.0.0.1:8000/celery_app/get_result/?task_id=b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb
,就能從頁面上查看到結果,如下
{
"status": "SUCCESS",
"result": 16
}
說明定時任務執行成功,返回結果為16
定時任務
在第一步的異步任務的基礎上,進行部分修改即可在
1.settings.py
在settings
文件,配置如下代碼即可
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'mul_every_10_seconds': {
# 任務路徑
'task': 'celery_app.tasks.mul',
# 每10秒執行一次
'schedule': 10,
'args': (10, 5)
},
'xsum_week1_20_20_00': {
# 任務路徑
'task': 'celery_app.tasks.xsum',
# 每周一20點20分執行
'schedule': crontab(hour=20, minute=20, day_of_week=1),
'args': ([1,2,3,4],),
},
}
參數說明如下:
- task:任務函數
- schedule:執行頻率,可以是整型(秒數),也可以是
timedelta
對象,也可以是crontab
對象,也可以是自定義類(繼承celery.schedules.schedule
) - args:位置參數,列表或元組
- kwargs:關鍵字參數,字典
- options:可選參數,字典,任何
apply_async()
支持的參數 - relative:默認是
False
,取相對於beat
的開始時間;設置為True
,則取設置的timedelta
時間
更加詳細的說明參考官方文檔:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
2.啟動celery
分別啟動worker
和beat
celery worker -A celery_demo -l debug
celery beat -A celery_demo -l debug
我們可以看到定時任務會每隔10s就運行任務
運行完的結果會保存在redis
中
任務綁定
Celery
可通過task
綁定到實例獲取到task
的上下文,這樣我們可以在task
運行時候獲取到task
的狀態,記錄相關日志等
我們可以想象這樣一個場景,當任務遇到問題,執行失敗時,我們需要進行重試,實現代碼如下
@shared_task(bind=True)
def add(self, x, y):
try:
logger.info('-add' * 10)
logger.info(f'{self.name}, id:{self.request.id}')
raise Exception
except Exception as e:
# 出錯每4秒嘗試一次,總共嘗試4次
self.retry(exc=e, countdown=4, max_retries=4)
return x + y
說明如下:
- 在裝飾器中加入參數
bind=True
- 在
task
函數中的第一個參數設置為self
self
對象是celery.app.task.Task
的實例,可以用於實現重試等多種功能
接着我們在views.py
文件中,寫入如下視圖函數
def get_result(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.successful():
return JsonResponse({"status": ar.state, "result": ar.get()})
else:
return JsonResponse({"status": ar.state, "result": ""})
接着我們訪問http://127.0.0.1:8000/celery_app/
,創建一個任務id,返回如下結果
已經執行celery的add任務調用,task_id:f55dcfb7-e184-4a29-abe9-3e1e55a2ffad
然后啟動celery命令:
celery worker -A celery_demo -l info
我們會發現celery
中的任務會拋出一個異常,並且重試了4次,這是因為我們在tasks
任務中主動拋出了一個異常
[2021-06-02 11:27:55,487: INFO/MainProcess] Received task: celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad] ETA:[2021-06-02 11:27:59.420668+08:00]
[2021-06-02 11:27:55,488: INFO/ForkPoolWorker-11] Task celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad] retry: Retry in 4s: Exception()
最后我們訪問http://127.0.0.1:8000/celery_app/get_result/?task_id=f55dcfb7-e184-4a29-abe9-3e1e55a2ffad
,查詢任務的結果
{
"status": "FAILURE",
"result": ""
}
由於我們主動拋出異常(為了模擬執行過程中的錯誤),這就導致了我們的狀態為FAILURE
任務鈎子
Celery
在執行任務時,提供了鈎子方法用於在任務執行完成時候進行對應的操作,在Task
源碼中提供了很多狀態鈎子函數如:on_success
(成功后執行)、on_failure
(失敗時候執行)、on_retry
(任務重試時候執行)、after_return
(任務返回時候執行)
-
通過繼承
Task
類,重寫對應方法即可,示例:class MyHookTask(Task): def on_success(self, retval, task_id, args, kwargs): logger.info(f'task id:{task_id} , arg:{args} , successful !') def on_failure(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}') def on_retry(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
-
在對應的
task
函數的裝飾器中,通過base=MyHookTask
指定@shared_task(base=MyHookTask, bind=True) def mul(self, x, y): ......
任務編排
在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery
也能實現這樣的任務,完成這類型的任務通過以下模塊完成:
- group: 並行調度任務
- chain: 鏈式任務調度
- chord: 類似
group
,但分header
和body
2個部分,header
可以是一個group
任務,執行完成后調用body
的任務 - map: 映射調度,通過輸入多個入參來多次調度同一個任務
- starmap: 類似map,入參類似
*args
- chunks: 將任務按照一定數量進行分組
1.group
首先在urls.py
中寫入如下代碼:
path('primitive/', views.test_primitive),
接着在views.py
中寫入視圖函數
from celery import result, group
def test_primitive(request):
lazy_group = group(mul.s(i, i) for i in range(10)) # 生成10個任務
promise = lazy_group()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
在tasks.py
文件中寫入如下代碼
@shared_task
def mul(x, y):
return x * y
說明:
通過task
函數的 s
方法傳入參數,啟動任務,我們訪問http://127.0.0.1:8000/celery_app/primitive/
,會得到以下結果
{
"function": "test_primitive",
"result": [
0,
1,
4,
9,
16,
25,
36,
49,
64,
81
]
}
上面這種方法需要進行等待,如果依然想實現異步的方式,那么就必須在tasks.py
中新建一個task
方法,調用group
,示例如下:
tasks.py
from celery.result import allow_join_result
@shared_task
def first_group():
with allow_join_result():
return group(mul.s(i, i) for i in range(10))().get()
urls.py
path('group_task/', views.group_task),
views.py
def group_task(request):
ar = first_group.delay()
return HttpResponse(f'已經執行celery的group_task任務調用,task_id:{ar.id}')
2.chain
默認上一個任務的結果作為下一個任務的第一個參數
def test_primitive(request):
promise = chain(mul.s(2, 2), mul.s(5), mul.s(8))() # 160
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
任務分割,分為header
和body
兩部分,hearder
任務執行完在執行body
,其中hearder
返回結果作為參數傳遞給body
def test_primitive(request):
# header: [3, 12]
# body: xsum([3, 12])
promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
celery管理和監控
celery
通過flower
組件實現管理和監控功能 ,flower
組件不僅僅提供監控功能,還提供HTTP API
可實現對woker
和task
的管理
官網:https://pypi.org/project/flower/
文檔:https://flower.readthedocs.io/en/latest
1.安裝flower
pip3 install flower
2.啟動flower
flower -A celery_demo--port=5555
- -A:項目名
- --port: 端口號
3.在瀏覽器輸入:http://127.0.0.1:5555
,能夠看到如下頁面
4.通過api操作
curl http://127.0.0.1:5555/api/workers