基礎
本文檔描述 Celery 中任務實例和 Canvas 使用的統一 “Calling API”。
API 中定義了一個執行選項的標准集,以及三個方法:
- apply_async(args[, kwargs[, ...]])
發送任務消息
- delay(*args, **kwargs)
發送任務消息的簡寫,不支持執行選項
- calling(__call__)
直接調用任務對象,意味着任務不會被工作單元執行,而是在當前進程中執行(不會發送任務消息)
Quick Cheat Sheet
- T.delay(arg, kwarg=value)
.apply_async 方法的參數簡寫方式。(.delay(*args, **kwargs) 會調用 .apply_async(args, kwargs))
- T.apply_async((arg,), {'kwarg': value})
從現在開始10秒后執行任務。
- T.apply_async(countdown=10)
從現在開始10秒后執行任務,這里使用 eta 聲明
- T.apply_async(eta=now+timedelta(seconds=10))
從現在開始1分鍾后執行任務,任務過期時間為2分鍾
- T.apply_async(countdown=60, expires=120)
任務過期時間為2天,使用 datetime 設置
- T.apply_async(expires=now+timedelta(days=2))
示例
使用 delay 方法很方便,就像使用一個常規函數:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
使用 apply_async() 方法,你必須這樣寫:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
因此,delay要方便得多,但是如果你想設置額外的執行選項,你不得不使用 apply_async。
本文檔接下來將深入講解執行選項。所有的例子都使用一個名為 add 的任務,它返回兩個參數的和。
@app.task
def add(x, y):
return x + y
提示:
如果任務沒有在當前進程注冊,你可以使用 send_task() 方法依據名稱調用對應任務。
還有其他的方法…
當讀到 Canvas 這一節時,你將會學習到關於啟動任務的更多知識,signature 是用來傳遞函數調用簽名的對象,(例如在網絡上傳輸),並且他們還支持API調用:
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
Linking (callbacks/errbacks)
Celery 支持鏈接任務,這使得執行一個任務之后接着執行另一個任務。回調任務會將父任務的結果作為本任務函數的部分參數。
add.apply_async((2, 2), link=add.s(16))
這里第一個任務 (4) 將會發送到另一個任務將 16 與前面結果相加,形成表達式 (2 + 2) + 16 = 20
如果任務拋出異常(errback),你也可以讓回調函數執行,但是與常規的回調不同的是它將會傳遞父任務的ID而不是結果值。這是由於並不總是可以序列化拋出的異常,並且這種情況下,錯誤回調需要啟用一個結果存儲后端,另外任務需要自己獲取父任務的結果。
下面是一個錯誤回調的例子:
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(uuid, exc, result.traceback))
它可以使用 link_error 執行選項添加進任務:
add.apply_async((2, 2), link_error=error_handler.s())
除此之外,link和 link_error 執行選項可以在一個列表中聲明:
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
callback/errbacks 將按順序執行,並且所有回調函數調用時將使用父任務的返回值作為部分參數。
What’s s?
這里使用的add.s 被稱為一個簽名。如果你不知道他們是什么,你可以看 canvas guide 這一節。從那里你還可以學習到 chain: 一個將任務串起來的簡單方法。
實際操作中,link 執行選項被當做一個內部原語,你可能並不直接使用它,而是使用 chain。
On message
Celery 通過設置 setting_on_message 回調支持捕獲所有狀態變更。
例如,對於長時間任務,你可以通過如下類似操作更新任務進度:
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
def on_raw_message(body):
print(body)
r = hello.apply_async()
print(r.get(on_message=on_raw_message, propagate=False))
將會產生如下輸出:
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
ETA and Countdown
ETA(估計到達時間)使你可以聲明任務將被執行的最早時間。以后,countdown 是設置 ETA 的快捷方式。
>>> result = add.apply_async((2, 2), countdown=3) >>> result.get() # this takes at least 3 seconds to return 20
任務保證在聲明的日期和時間后執行,當時不一定是所聲明的准確時間。可能的原因是消息中間件的最后期限隊列中可能包含多個等待執行的任務,或者是嚴重的網絡延遲。為了保證你的任務能及時執行,你應該監控隊列的阻塞情況。使用Munin或者類似的工具來獲取報警,那么能采取恰當的措施來減輕負載。請看 Munin。
countdown 是一個整數, 但是eta 必須是一個 datetime 對象,用來聲明一個精確的日期和時間(包含毫秒精度,以及時區信息):
>>> from datetime import datetime, timedelta >>> tomorrow = datetime.utcnow() + timedelta(days=1) >>> add.apply_async((2, 2), eta=tomorrow)
Expiration
expires 參數定義了一個可選的過期時間,可以是任務發布后的秒數,或者使用 datetime 聲明一個日期和時間。
>>> # Task expires after one minute from now. >>> add.apply_async((10, 10), expires=60) >>> # Also supports datetime >>> from datetime import datetime, timedelta >>> add.apply_async((10, 10), kwargs, ... expires=datetime.now() + timedelta(days=1)
當工作單元接收到一個過期任務,它會將任務標記為 REVOKED(TaskRevokeError)。
Message Sending Retry
當鏈接失敗,celery 會重試發送任務消息,並且重試行為可以設置 - 比如重試的頻率,或者最大重試次數 - 或者禁用所有。
禁用消息發送重試,你可以設置重試的執行選項為 False:
add.apply_async((2, 2), retry=False)
相關設置:
task_publish_retry
task_publish_retry_policy
重試策略
重試策略是一個映射,用來控制重試怎樣進行,包含如下鍵:
-
max_retries
放棄重試前的最大重試次數,這種情況下導致重試的異常會被重新拋出。
None值意味着一直重試
默認重試3次 -
interval_start
定義首次重試間隔的秒數(浮點數或者整數)。默認是0(首次重試會立即進行) -
interval_step
每進行一次重試,這個值會加到重試延遲里(浮點數或者整數)。默認是0.2。 -
interval_max
重試之間間隔的最大秒數(浮點數或者整數)。默認是0.2。
例如,關聯到默認的策略:
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
用於重試的最長時間會是 0.4 秒。時間默認設置得相對短是由於如果消息中間件斷鏈接了會導致鏈接失敗重試堆積效果 - 例如,許多WEB服務器會由於處理等待重試而阻塞其他的請求。
Connection Error Handling
當你發送一個任務消息,而消息傳輸鏈接丟失了,或者鏈接不能被初始化了,一個 OperationError 錯誤將會被拋出:
>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
如果有設置了重試配置,這種錯誤只有在達到最大重試次數,或者立即關閉的情況下才會發生。
你也可以這樣處理這種錯誤:
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
Serializers
客戶端和工作單元之間的數據傳輸需要序列化,所以每個celery 的消息都有一個 content_type 請求頭用來描述編碼使用的序列化方法。
默認的序列化器是 json,但是你可以通過 task_serializer 設置修改序列化器,或者針對單個任務,甚至單個消息設置序列化器。
內建的序列化器有 JSON, pickle, YAML 以及 msgpack,你還可以將自定義的序列化器注冊到 Kombu 序列化器注冊表。
另見:
Kombu 用戶指南中的消息序列化。
每個選項都有優點和缺點。
- json - JSON 在許多語言中都有支持,現在是 python 標准的一部分(從2.6開始),而且通過使用現代化的 python 庫,例如 simplejson,可以非常快的編碼。
JSON 的主要缺點是它限制了你只能使用如下數據類型:
strings, Unicode, floats, Boolean, dictionaries, and lists. Decimals 與 dates 明顯都沒有。
二進制數據會使用 Base64 編碼傳輸,比原生支持二進制數據類型的序列化方法增加了 34% 的數據傳輸量。
但是,如果你的數據滿足以上限制,並且你需要跨語言支持,那么默認的 JSON 序列化可能是你最佳的選擇。
查看 http://json.org 獲取更多的信息。
- pickle
如果你不想支持除 python 外的其他語言,那么 pickle 編碼將使你獲得所有 python 數據類型的支持(除了類實例),發送二進制文件時消息更小,並且比 JSON 處理稍快。
-yaml
YAML 有許多與 json 相似的特性,但是它原生支持更多的數據類型(包括日期,遞歸引用,等等)。但是,YAML 的python庫比 JSON 庫要慢一些。如果你需要一個更富有表達性的數據類型的集合,並且需要保持跨語言兼容,那么 YAML 會是比上述其他序列化更好的選擇。
查看 http://yaml.org/獲取更多的消息。
- msgpack
msgpack 是一個特性上與 JSON 類似的一個二進制序列化格式。但是應用時間還比較短,在這個時間點對它的支持是實驗性的。
查看 http://msgpack.org/ 獲取更多的消息。
使用的編碼方式在消息頭中可查看到,所以工作單元知道怎么反序列化任何任務。如果你使用一個自定義的序列化器,那么它必須也在工作單元可用。
下列順序用來確定在發送消息時使用什么序列化器:
1. 執行選項 serializer
2. Task.serializer 屬性
3. task_serializer 設置
為單個任務調用設置自定義的序列化器的示例:
>>> add.apply_async((10, 10), serializer='json')
Compression
Celery 使用 gzip或者 bzip2 壓縮消息。你也可以創建自己的壓縮模式,並注冊到 Kombu 壓縮模式注冊表。
下列順序用來確定當發送消息時使用什么壓縮模式:
1. 執行選項 compression
2. Task.compression 屬性
3. task_compression 設置
當調用一個任務時聲明壓縮模式的示例:
>>> add.apply_async((2, 2), compression='zlib')
Connections
Automatic Pool Support
從 2.3 版本開始支持自動連接池,所以你沒有必要手動處理連接與發布者來重用這些連接。
從2.5版本開始,連接池默認被啟用。
可以查看 broker_pool_limit 設置獲取更多的信息。
你可以通過創建一個發布者來手動處理連接:
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for args in numbers:
res = add.apply_async((2, 2), publisher=publisher)
results.append(res)
print([res.get() for res in results])
當然,這個特殊的例子用 group 更更好的表達:
>>> from celery import group >>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)] >>> res = group(add.s(i, j) for i, j in numbers).apply_async() >>> res.get() [4, 8, 16, 32]
Routing options
Celery 可以路由任務到不同的隊列。
簡單的路由(name <-> name)是通過 queue 選項來實現:
add.apply_async(queue='priority.high')
你可以使用 -Q 命令行參數將工作單元分配到 priority.high 隊列:
$ celery -A proj worker -l info -Q celery,priority.high
另見:
不建議在代碼中硬編碼隊列名稱,最佳到實踐是配置路由器(task_routes)
想了解更多關於路由的信息,請查看Routing Tasks這一節。
高級選項
想要完全利用 AMQP 的路由能力的高級用戶可以考慮這些選項。有興趣可以參考 routing guide。
- exchange
消息發送的目的 exchange 的名稱(或者 kombu.entity.Exchange)
- routing_key
用來決定路由的鍵
- priority
0~255之間的數字,其中255具有最高優先級。
有支持:RabbitMQ, Redis(優先級相反,0具有最高優先級)。
轉自:https://blog.csdn.net/libing_thinking/article/details/78563222
