1. CELERY簡介
1.1. celery
clery是一個使用python語言編寫的,簡單,靈活且穩定的分布式系統。celery是一個任務隊列,着力於實時處理,同時還支持任務調度。
1.2. task queue
任務隊列被當做一種在線程或機器之間分配任務的機制。任務隊列的輸入是被稱為任務的工作單元。專用工作進程持續監視任務隊列,以及時執行新工作。
celery之間使用messages通信,通常使用一個broker來協調clients和workers。為了啟動一個任務,client先添加一個message到隊列,然后broker分發這個message給wokers。
一個celery系統可以由多個workers和多個brokers組成,通過這樣可以支持高可用和水平擴展。
1.3. message transport
celery需要一個message transport來發送和接受消息。RabbitMQ和Redis的代理傳輸功能齊全。同時也支持其他實驗性解決方案。本地開發支持使用SQLite。
1.4. broker
https://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
2. celery 使用
2.1 安裝
python和celery版本對照
celery | python |
---|---|
5.0 | >3.6 |
4.4 | 2.7-3.5 |
3.1 | 2.6 |
3.0 | 2.5 |
2.2 | 2.4 |
下載命令
pip install celery
2.2 簡單使用
2.2.1 創建一個tasks.py文件。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
# 'tasks'為當前模塊的名稱,是必須提供的參數,以便在__main__模塊中定義任務時可以自動生成名稱。
# broker指定要使用的消息代理的URL。這里使用的是RabbitMQ(默認也是RabbitMQ)。
@app.task
def add(x, y):
return x + y
2.2.2 運行celery worker 服務
celery -A tasks worker --loglevel=INFO
# celery worker --help 可以查看命令詳細列表
# celery --help 可以查看celery命令的詳細列表
2.2.3 調用任務
>>> from tasks import add
>>> add.delay(4, 4)
注:注意終端路徑。在tasks.py的目錄下進入python終端。
現在可以通過工作程序的控制台查看輸出內容。
2.2.4 保存結果
如果要跟蹤任務狀態,celery需要在某些地方存儲或發送狀態。這時需要使用result_backend。
有幾種內置的result_backends可供選擇:SQLAlchemy / Django ORM, MongoDB,Memcached,Redis,RPC(RabbitMQ / AMQP),也可以定義自己的后端。
本文使用的是:RPC。
通過celery中backend參數配置(或者通過在配置模塊中使用 result_backend 參數配置)。
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
通過ready()方法放回任務是否已完成處理:
>>> result = add.delay(4, 4)
>>> result.ready()
通過get()方法可以等待結果完成,並獲得結果。(很少使用,因為這樣就將異步轉換為同步調用了)
>>> result.get()
# get中的常用參數:timeout 等待時間,時間到了,任務未完成則拋出異常。
# propagate 默認為true。當設置為false時,如果get()引發異常,此參數可以覆蓋此異常。
如果任務引發異常,可以通過 result.traceback 原始回溯異常。
backend 使用資源來存儲和傳輸結果。為了確保釋放資源,在調用任務之后必須在每個AsyncResult 實例返回值后調用get()或forget()。
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
實際上,delay方法是apply_async方法的一個快捷方式。
使用apply_async方式可以指定執行選項。上例中,queue參數指定任務將被發送的隊列名稱,countdown參數指定任務將最早在多長時間之后執行。
2.2.5 signature
delay()和apply_async()方法是非常常用的,並且滿足大多數場景使用。但是有時我們並不想簡單的將任務發送到隊列中,想將一個任務函數(由參數和執行選項組成)作為一個參數傳遞給另外一個函數中,為了實現此目標,Celery使用一種叫做signatures的東西。
>>> add.signature((2, 2), countdown=10)
# shortcut using
>>> add.s(2, 2)
signature還可以調用delay()和apply_async()
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
# 還可以給定部分參數,創建一個不完整的signature。
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
# 調用delay()時候指定的參數會覆蓋signature創建時設置的參數
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)
# debug is now False.
# signature的另一種寫法
>>> s4 = signature(add, args(2, 2), countdown=1)
2.2.6 primitives
2.2.6.1 group
一個group並行調用一個任務列表,並返回一個特殊的結果實例,該實例使你可以把結果當做一個組,並且按順序檢索返回值。
>>> from celery import group
>>> group(add.s(i, i) for i in range(10))().get()
#Partial group
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
>>> from celery import group
>>> g1 = signature(add, args(1, 2), countdown=1)
>>> g2 = signature(add, args(2, 2), countdown=1)
>>> g3 = signature(add, args(3, 2), countdown=1)
>>> g = group(g1, g2, g3)
>>> ret = g()
>>> ret.get()
2.2.6.2 chain
任務一個一個執行,一個執行完將執行return結果傳遞給下一個任務函數。
>>> from celery import chain
>>> c = chain(add.s(4, 4) | add.s(8))
>>> c().get()
2.2.6.2 chord
chord是一個具有回調的組。
@app.task
def xsum(numbers):
return sum(numbers)
from celery import chord
chord((add.s(i, i) for i in range(10)), xsum.s())().get()
3. django中使用celery
文中的列子大部分來自官方文檔。
官方文檔:https://docs.celeryproject.org/en/stable/