前言
Celery是一個簡單,靈活,可靠的分布式系統,用於處理大量消息,同時為操作提供維護此類系統所需的工具。它是一個任務隊列,專注於實時處理,同時還支持任務調度。
可以使用的場景如:
- 異步發郵件,這個時候 只需要提交任務給celery 就可以了.之后 由worker 進行發郵件的操作 .
- 跑批接口的任務,需要耗時比較長,這個時候 也可以做成異步任務 .
- 定時調度任務等
Celery 簡介
Celery 扮演生產者和消費者的角色,先了解一下什么是生產者消費者模式。
該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據,如下圖所示:
接下來需要弄清楚幾個問題,誰生產數據(Task),誰是中間件(Broker),誰來消費數據(Worker),消費完之后運行結果(backend)在哪?
看下圖就很清楚了
celery 的5個角色
- Task 就是任務,有異步任務(Async Task)和定時任務(Celery Beat)
- Broker 中間人,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker。Celery 本身不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。
- Worker 執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務並執行它。
- Beat 定時任務調度器,根據配置定時將任務發送給Broker。
- Backend 用於存儲任務的執行結果。
使用環境
Celery 本身不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。那么需要先安裝Redis之類的中間件
docker pull redis:latest
docker run -itd --name redis-test -p 6379:6379 redis
上面是沒有設置密碼的,設置密碼用下面這句
docker run -itd --name myredis -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes
pip 安裝相關依賴包
pip install celery==3.1.26.post2
pip install redis==2.10.6
Task 任務
先寫個最簡單的demo,新建一個tasks.py文件,task任務需使用@shared_task
裝飾器
from celery import Celery
from celery import shared_task
# 實例化,添加broker地址
app = Celery('tasks', broker='redis://ip:6379')
@shared_task
def add(x, y):
return x + y
打開 tasks.py 所在的目錄,啟動 worker,-A 參數表示的是 Celery APP 的名,這里指的是 tasks.py。
worker 是一個執行任務角色,后面的 loglevel=info 記錄日志類型默認是 info。
celery -A tasks worker --loglevel=info
運行結果
D:\demo\demo\aaa>celery -A tasks worker --loglevel=info
[2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x2148e024c50
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379//
[2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors
[2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone
[2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
從運行日志看到有tasks任務
[tasks]
. tasks.add
看到Connected to redis
說明已經連接成功了
觸發任務(delay)
任務已經創建了,那么什么時候去觸發這個任務呢,我們需要在代碼里面去觸發這個任務,接着上面代碼繼續寫
@shared_task
def add(x, y):
return x + y
if __name__ == '__main__':
# 觸發任務
res = add.delay(10, 15)
print(res)
print(type(res)) # AsyncResult
運行結果
7492f49b-6735-46fb-a16d-9ec24bd31e56
<class 'celery.result.AsyncResult'>
通過add任務,調用 .delay()
方法來觸發一次任務,返回 AsyncResult 類,那么執行的任務結果都在 AsyncResult 類里
運行 的時候查看 worker 運行日志,可以看到已經接收到任務Received task
,每個任務會生成一個uuid的task_id,不會重復
[2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98]
[2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25
workder 會自動監聽到推過來的任務,然后執行,可以看到執行結果'succeeded '
backend 任務結果
調用 .delay()
方法觸發任務后,返回 AsyncResult 類,可以查看任務的狀態,任務id和任務結果
D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> res = add.delay(10, 15)
>>> res.task_id
'6a7c8e10-7192-4865-9108-3e98596b9d37'
>>>
>>> res.status
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "E:\python36\lib\site-packages\celery\result.py", line 394, in state
return self._get_task_meta()['status']
File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
>>>
任務執行后會生成一個task_id,查看任務運行狀態,會發現出現異常AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
這是因為任務運行的結果,需要保存到一個地方backend,但是前面實例化的時候只配置一個broker地址,並沒有配置backend地址來接收運行結果
from celery import Celery
from celery import shared_task
# backend接收執行結果
app = Celery('tasks',
broker='redis://ip:6379',
backend='redis://ip:6379')
@shared_task
def add(x, y):
return x + y
重新配置后一定要重啟worker監聽
celery -A tasks worker --loglevel=info
在啟動日志[config]里面會看到results這一項已經配置成功
D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> res = add.delay(10,15)
>>> res.task_id
'5ce249c9-a15b-426a-949b-d1b94bf9f8fa'
>>>
>>> res.status
'SUCCESS'
>>>
>>> res.get()
25
常用的幾個屬性
- res.task_id 任務id唯一的,可以根據id拿到結果
- res.status 任務狀態:PENDING、STARTED、RETRY、FAILURE、SUCCESS
- res.get() 任務運行結果,必須要任務狀態是'SUCCESS',才會有運行結果
- r.successful() 返回布爾值,執行成功返回True
- r.ready() # 返回布爾值, 任務執行完成, 返回 True, 否則返回 False.
- r.wait() # 等待任務完成, 返回任務執行結果.
- r.result # 任務執行結果.
- r.state # 和res.status一樣,任務狀態:PENDING, START, SUCCESS
AsyncResult 獲取結果
當觸發一個任務后,會得到一個task_id,但是我們不會一直去查詢status狀態去獲取結果,可能會過一段時間再去看看運行結果。
那么在已經知道task_id 的情況下,如何去查詢狀態和結果?可以用到AsyncResult 類
from celery.result import AsyncResult
res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa')
print(res.state) # 'SUCCESS'
print(res.get()) # 25
結合django使用,參考前面這篇https://www.cnblogs.com/yoyoketang/p/15422804.html
更多參考教程https://blog.csdn.net/u010339879/article/details/97691231
更多參考教程https://www.pianshen.com/article/2176289575/