任務是構建 celery 應用的基礎塊。
任務是可以在任何除可調用對象外的地方創建的一個類。它扮演着雙重角色,它定義了一個任務被調用時會發生什么(發送一個消息),以及一個工作單元獲取到消息之后將會做什么。
每個任務都有不同的名稱,發給 celery 的任務消息中會引用這個名稱,工作單元就是根據這個名稱找到正確的執行函數。
任務消息只有在被工作單元確認后才會從隊列中刪除。工作單元會預先保存許多任務消息,如果工作單元被殺死-由於斷電或者其他原因-任務消息將會重新傳遞給其他工作單元。
理想的任務函數應該是具有冪等性的:這意味着即使一個任務函數以同樣的參數被調用多次也不會導致不可預料的效果。因為工作單元無法探測任務是否是冪等的,所以默認的行為是在即將執行之前預先確認任務消息,這使得已經開始的任務不會再被執行。
如果你的任務函數是冪等的,你可以設置 acks_late
選項讓工作單元在任務執行返回之后再確認任務消息。另見:FAQ 中所述應該重試還是延遲確認?
注意:如果執行任務的子進程被終止(通過調用 sys.exit() 或者通過信號),即使 acks_late
選項被激活,工作單元也會確認當前處理的任務消息。這么做的理由是:
1. 對於會迫使內核發送 SIGSEGV
(段錯誤)或者類似信號給進程的任務,我們不想再重新執行
2. 我們假設刻意終止任務的系統管理員不會想這個任務重新執行
3. 消耗過多內存的任務由觸發內核內存溢出的危險,如果重復執行,同樣的事情還會發生
4. 一直失敗的任務再重新遞送消息時會導致高頻的消息循環影響到整個系統。
如果你真的想在這些情況下重新遞送任務消息,你應該考慮使能 task_reject_on_worker_lost
設置。
告警:
一個無限期阻塞的任務會使得工作單元無法再做其他事情。
如果你的任務里有 I/O 操作,請確保給這些操作加上超時時間,例如使用 requests
庫時給網絡請求添加一個超時時間:
connect_timeout, read_timeout = 5.0, 30.0 response = requests.get(URL, timeout=(connect_timeout, read_timeout))
時間限制對確保所有任務在規定的時間內返回很方便,但是一個超市事件將會強制終止進程,所以應該只有在沒有手動設置超時時間的地方使用。
默認的 prefork
池調度器對長時間任務不是很友好,所以如果你的任務需要運行很長時間,確保在啟動工作單元時使能了 -ofair
選項。更多的信息請查看 Prefork pool prefetch settings
這一節,以及路由長時間任務和短時間任務到指定工作單元的最佳實踐(Automatic routing)。
如果你的工作單元被掛起了,請先看看它運行的是什么任務,而不是先提交問題,因為大部分情況下掛起是由於一個或多個任務阻塞在網絡操作上。
本章將學習定義任務的所有知識,以下是目錄:
- Basic
- Names
- Task Requet
- Logging
- Retrying
- List of Options
- States
- Semipredicates
- Custom task classes
- how it works
- Tips and Best Proctices
- Performance and Strategies
- Example
基礎
通過使用 task()
裝飾器,你可以很容易創建一個任務:
from .models import User @app.task def create_user(username, password): User.objects.create(username=username, password=password)
任務上可以設置很多選項,這些選項作為參數傳遞給裝飾器:
@app.task(serializer='json') def create_user(username, password): User.objects.create(username=username, password=password)
多個裝飾器:
當使用多個裝飾器裝飾任務函數時,確保 task
裝飾器最后應用(在python中,這意味它必須在第一個位置):
@app.task @decorator2 @decorator1 def add(x, y): return x + y
應該如何導入任務裝飾器?什么是 app
?
任務裝飾器可以從 Celery 應用實例上獲取,如果不理解,請先看 First Steps with Celery。
如果你使用 Django
(請看 First steps with Django),或者你是一個庫的作者,那么可能想使用 shared_task()
裝飾器:
from celery import shared_task @shared_task def add(x, y): return x + y
綁定任務
一個綁定任務意味着任務函數的第一個參數總是任務實例本身(self
),就像 Python 綁定方法類似:
logger = get_task_logger(__name__) @task(bind=True) def add(self, x, y): logger.info(self.request.id)
綁定任務在這些情況下是必須的:任務重試(使用 app.Task.retry()
),訪問當前任務請求的信息,以及你添加到自定義任務基類的附加功能。
任務繼承
任務裝飾器的 base
參數可以聲明任務的基類:
import celery class MyTask(celery.Task): def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) @task(base=MyTask) def add(x, y): raise KeyError()
任務名稱
每個任務必須有不同的名稱。
如果沒有顯示提供名稱,任務裝飾器將會自動產生一個,產生的名稱會基於這些信息: 1)任務定義所在的模塊, 2)任務函數的名稱
顯示設置任務名稱的例子:
>>> @app.task(name='sum-of-two-numbers') >>> def add(x, y): ... return x + y >>> add.name 'sum-of-two-numbers'
最佳實踐是使用模塊名稱作為命名空間,這樣的話如果有一個同名任務函數定義在其他模塊也不會產生沖突。
>>> @app.task(name='tasks.add') >>> def add(x, y): ... return x + y
你可以通過任務函數的 .name
屬性獲取任務的名稱:
>>> add.name 'tasks.add'
tasks.py
模塊中定義任務,其自動產生的名稱就是上述這種形式。
@app.task def add(x, y): return x + y
>>> from tasks import add >>> add.name 'tasks.add'
自動命名與相對導入
絕對導入:
對於 python2,開發者的最佳實踐是在每個模塊前添加下面這句代碼:
from __future__ import absolute_import
這會總是強制使用絕對導入,所以使用相對導入的任務將不會出現相對導入相關的問題。
在 python3 中,默認就是絕對導入的,所以不需要再額外添加其他代碼。
相對導入和任務名稱自動生成混合使用時會有些問題,所以如果你使用相對導入,你應該顯示設置任務名稱。
例如,客戶端導入模塊 myapp.tasks
時使用 .tasks
,而工作單元導入模塊使用 myapp.tasks
, 他們產生的名稱會不匹配,任務調用時工作單元會報 NotRegistered
錯誤。
在使用 Django 和 include_apps 中應用 project.myapp-
形式的命名時也會出現同樣的問題:
INSTALLED_APPS = ['project.myapp']
如果你在命名空間 project.myapp
下安裝應用時,任務模塊將會被導入為 project.myapp.tasks
, 所以你必須確保總是使用相同的名稱導入任務:
>>> from project.myapp.tasks import mytask # << GOOD >>> from myapp.tasks import mytask # << BAD!!!
第二個例子里任務的名稱會不一樣,因為工作單元與客戶端在不同的名稱空間下導入模塊:
>>> from project.myapp.tasks import mytask >>> mytask.name 'project.myapp.tasks.mytask' >>> from myapp.tasks import mytask >>> mytask.name 'myapp.tasks.mytask'
基於這一點,你必須在導入模塊時保持一致,這也是 python 的最佳實踐。
同樣的,你不應該使用老式的相對導入:
from module import foo # BAD! from proj.module import foo # GOOD!
新式的相對導入能夠被正常使用:
from .module import foo # GOOD!
如果你使用 celery 的項目里已經重度使用了這些模式,而且你沒時間再去重構現有代碼,那么你可以考慮現實聲明任務名稱而不是依賴於自動名稱生成。
@task(name='proj.tasks.add') def add(x, y): return x + y
改變自動名稱生成形式
4.0 版本新特性。
在有些情況,默認的自動名稱生成規則並不合適。例如你在多個不同模塊定義了多個任務:
project/ /__init__.py /celery.py /moduleA/ /__init__.py /tasks.py /moduleB/ /__init__.py /tasks.py
使用默認的自動名稱生成行為,每個任務都會有一個自動產生的名稱如 moduleA.tasks.taskA, moduleA.tasks.taskB, moduleB.tasks.test
等等。你可能想去掉所有任務名稱中的 tasks 字段。如上面已經指出的,你可以顯示的給每個任務指定名稱,或者你還可以通過覆蓋 app.gen_task_name()
方法修改自動名稱生成行為。繼續以上這個例子,celery.py
可能包含:
from celery import Celery class MyCelery(Celery): def gen_task_name(self, name, module): if module.endswith('.tasks'): module = module[:-6] return super(MyCelery, self).gen_task_name(name, module)
此時,每個任務都一個這種形式的名稱 moduleA.taskA, moduleA.taskB, moduleB.test
。
告警:
確保你的 app.gen_task_name()
函數是一個純函數:意味着對於同樣的輸入它總是會返回相同的輸出。
任務請求
app.Task.request
包含於當前執行任務相關的信息與狀態。
任務請求定義了以下屬性:
id: 執行任務的唯一 id
group: 如果任務屬於一個組,這個屬性表示組 id
chord: 任務所屬 chord 的 id(如果任務是header的一部分)
correlation_id: 自定義 ID,用來處理類似重復刪除操作
args: 位置參數
kwargs: 關鍵字參數
origin: 發送任務消息的主機名
retries: 當前任務已經重試的次數。它是一個從0開始的整數
is_eager: 如果任務是在客戶端本地執行而不是通過工作單元執行,那么這個屬性設置為 True
eta: 任務的原始 ETA(如果存在)。用 UTC 時間表示(依賴於 enable_utc
設置)
expires: 原始的過期時間(如果存在)。用 UTC 時間(依賴於 enable_utc
設置)
hostname: 執行任務的工作單元的節點名稱
delivery_info: 附加的消息傳遞消息。它是一個映射,包含用來遞送任務消息的路由規則以及路由鍵。例如, app.Task.retry()
函數可以根據它來重新發送消息到相同的目標隊列。該映射中鍵的可用性取決於使用的消息中間件
reply-to: 回復發送的目的隊列的名稱(例如在RPC 存儲后端中使用)
called_directly: 如果任務不是由執行單元執行,這個屬性設置為 True
timelimit: 它是一個元組,表示任務上當前激活的(軟性,硬性)時間限制(如果存在)
callbacks: 如果任務執行成功,將被調用的函數簽名的列表
errback: 如果任務還行失敗,將被調用的函數簽名的列表
utc: 如果調用者使能了 UTC(enable_utc
),這個屬性為True
3.1版本新特性
headers: 與任務消息一起發送的消息頭的映射(可以為 None
)
reply_to: 回復發送的目的隊列的名稱
correlation_id: 通常與任務id相同,一般在amqp中用來跟蹤回復是發送到哪里
4.0新特性
root_id: 任務所屬的工作流的第一個任務的唯一 id(如果存在)
parent_id: 調用任務的任務的唯一 id
chain: 組成一個任務鏈的預留任務的列表(如果存在)。列表中最后一項將是當前任務的下一個任務
示例:
下面是一個訪問任務上下文信息的任務函數示例
@app.task(bind=True) def dump_context(self, x, y): print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(self.request))
綁定參數說明這個函數是一個”綁定方法”,所以可以訪問任務實例的屬性和方法。
日志
任務工作單元會自動給你設置日志環境,當然你也可以手動配置日志。
celery 提供了一個特殊的日志句柄 “celery.task”,你可以通過繼承這個句柄自動獲取任務名稱和唯一id作為日志的一部分。
最佳實踐是在模塊的開頭創建一個所有任務公用的日志句柄:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task def add(x, y): logger.info('Adding {0} + {1}'.format(x, y)) return x + y
celery 使用 python 標准日志庫,可以在 python 官方文檔中找到。
你可以使用 print()
函數,因為任意寫入到標准輸出/標准錯誤輸出的東西都會被重定向到日志系統(你可以禁用這個特性,請查看 worker_redirect_stdouts
這一節)。
注意:
如果你在任務函數或者模塊中創建一個日志句柄,任務工作單元不會更新這個重定向行為。
如果你想重定向 sys.stdout
和 sys.stderr
到一個自定義日志句柄,你必須手動使能它。例如:
import sys logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): old_outs = sys.stdout, sys.stderr rlevel = self.app.conf.worker_redirect_stdouts_level try: self.app.log.redirect_stdouts_to_logger(logger, rlevel) print('Adding {0} + {1}'.format(x, y)) return x + y finally: sys.stdout, sys.stderr = old_outs
參數檢查
4.0版本新特性
當你調用任務函數時,Celery 會驗證傳遞的參數,就像調用一個普通函數時 Python 所做的檢查。
>>> @app.task ... def add(x, y): ... return x + y # Calling the task with two arguments works: >>> add.delay(8, 8) <AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c> # Calling the task with only one argument fails: >>> add.delay(8) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "celery/app/task.py", line 376, in delay return self.apply_async(args, kwargs) File "celery/app/task.py", line 485, in apply_async check_arguments(*(args or ()), **(kwargs or {})) TypeError: add() takes exactly 2 arguments (1 given)
你可以通過設置任務的 typing
屬性為 False
來禁用參數檢查。
>>> @app.task(typing=False) ... def add(x, y): ... return x + y # Works locally, but the worker reciving the task will raise an error. >>> add.delay(8) <AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
隱藏參數中的敏感信息
4.0版本新特性。
當使用 task_protocal 2
或者更高版本(默認從 4.0 版本開始),你可以通過使用 argsrepr
和 kwargsrepr
參數來覆蓋日志和監控事件中位置參數和關鍵字參數的顯示:
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)') >>> charge.s(account, card='1234 5678 1234 5678').set( ... kwargsrepr=repr({'card': '**** **** **** 5678'}) ... ).delay()
告警:
對於可以從任務中間件中讀取任務消息或者可以截取到消息的人來說,敏感信息仍然是可以訪問的。
基於這個原因,如果你的消息中含有敏感信息,你應該加密信息,或者如上示例中帶有信用卡號之類的信息可以將其加密存儲到一個安全的存儲,然后任務中從存儲中獲取並解密。
重試
app.Task.retry()
函數可以用來重新執行任務,例如在可恢復錯誤的事件中。
當你調用 retry
函數,它將發送一個新的消息,使用相同的任務 id,而且它會小心確保該消息投遞到原始任務相同的隊列。
一個任務被重試將記錄為一個任務狀態,因此你可以使用結果實例跟蹤任務的進度(查看狀態這一節)。
以下是一個使用 retry
函數的例子:
@app.task(bind=True) def send_twitter_status(self, oauth, tweet): try: twitter = Twitter(oauth) twitter.update_status(tweet) except (Twitter.FailWhaleError, Twitter.LoginError) as exc: raise self.retry(exc=exc)
注意:
app.Task.retry()
調用將會拋出一個異常使得任意 retry
后面的代碼都不會被執行。這個異常就是 Retry
異常,它不是作為一個錯誤來處理而是作為一個semi-predicate 來告訴任務工作單元這個任務將被重試,從而當后端存儲已經使能的情況下工作單元能將正確的狀態存儲到后端存儲。
這是一個常規的操作,而且除非 retry
函數的 throw
參數設置為 False
,這個異常將總是會拋出。
exc
參數是用來傳遞在日志中使用或者在后端結果中存儲的異常信息。異常和堆棧回溯信息都可以在任務狀態中看到(如果后端存儲被使能)。
當任務帶有 max_retries
值,如果已經達到最大嘗試次數,當前異常會被重新被拋出,但是下列情況除外:
- 沒有給定 exc 參數
這種情況下,MaxRetriesExceededError
異常將會被拋出。
- 當前沒有異常
如果沒有原始異常被重新拋出,exc
參數將會被使用,因此:
self.retry(exc=Twitter.LoginError())
將會拋出 exc
給定的異常。
使用自定義重試延遲
當一個任務被重試,它在重試前會等待給定的時間,並且默認的延遲是由 default_retry_delay
屬性定義。默認設置為 3 分鍾。注意延遲設置的單位是秒(int 或者 float)。
你可以通過提供 countdown
參數覆蓋這個默認值。
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes. def add(self, x, y): try: something_raising() except Exception as exc: # overrides the default delay to retry after 1 minute raise self.retry(exc=exc, countdown=60)
對已知異常的自動嘗試
4.0 版本新特性。
有時候,你只想在特定異常拋出時重試任務。
幸運的是,你可以通過使用任務裝飾器中的 autoretry_for
參數讓 Celery 自動嘗試一個任務:
from twitter.exceptions import FailWhaleError @app.task(autoretry_for=(FailWhaleError,)) def refresh_timeline(user): return twitter.refresh_timeline(user)
如果你想給內部調用的 Task.retry
函數傳遞自定義的參數,你可以傳遞 retry_kwargs
參數給任務裝飾器:
@app.task(autoretry_for=(FailWhaleError,), retry_kwargs={'max_retries': 5}) def refresh_timeline(user): return twitter.refresh_timeline(user)
這給手動處理異常提供了另一種方案,以上示例與在任務函數中使用 try...except
語句來重試任務有同樣的效果:
@app.task def refresh_timeline(user): try: twitter.refresh_timeline(user) except FailWhaleError as exc: raise div.retry(exc=exc, max_retries=5)
如果你想在發生任意錯誤時重試,可以這樣:
@app.task(autoretry_for=(Exception,)) def x(): ...
4.1 版本新特性。
如果你的任務依賴於其他服務,例如給 API 發送請求,那么比較好的一個方式是使用指數退避來規避對服務造成沖擊。幸運的是,Celery 的自動重試機制能非常簡單實現這個。只要對聲明 retry_backoff
參數,如下:
from requests.exceptions import RequestException @app.task(autoretry_for=(RequestException,), retry_backoff=True) def x(): ...
默認情況下,指數退避還會引入一個隨機 jitter
來避免所有任務統一時刻運行。指數退避的最大退避延遲默認是 10 分鍾。所有的設置都可以通過選項自定義,如下選項:
Task.autoretry_for
一個異常類的列表或者元組。如果在任務執行期間任何一個其中異常拋出,任務將會被自動重試。默認情況下,沒有異常會被重試。
Task.retry_kwargs
一個映射。使用這個屬性可以自定義重試怎么執行。注意如果你使用下面所述的指數退避選項,那么任務的 countdown
選項將會由Celery的自動重試系統決定,而這個映射中的 countdown
將會被忽略。
Task.retry_backoff
布爾值,或者數字。如果這個選項設置為 True,重試將會按照指數退避規則延遲。第一次重試將延遲1秒,第二次重試將延遲2秒,第三次重試將延遲4秒,第四次重試將延遲8秒,以此類推。(但是,如果 retry_jitter
選項被啟用,延遲值將依據它更新)。如果這個選項設置成數字,它將作為一個延遲因子。例如,如果設置成3,第一次重試延遲3秒,第二次重試延遲6秒,第三次重試延遲12秒,第四次重試延遲24秒,以此類推。默認情況下,這個選項設置成 False
,此時重試不會有延遲。
Task.retry_backoff_max
數字。如果 retry_backoff
被啟用,這個選項將設置兩次重試之間的最大延遲。默認情況下,這個選項設置為 600 秒,即 10 分鍾。
Task.retry_jitter
布爾值。jitter
用來在指數退避中引入隨機性,從而避免隊列中所有任務同時執行。如果這個選項被設置為 True
,那么 retry_backoff
j計算出的延遲值將認為是最大值,實際延遲值是0到最大值之間的一個隨機值。默認情況下,這個選項設置為 True
。
選項列表
任務裝飾器有一系列選項可以用來修改任務的行為,例如可以通過 rate_limit
選項來設置任務的速率限制。
傳遞給任務裝飾器的關鍵字參數將會設置為結果任務類的一個屬性,下面是內建屬性的列表。
General
Task.name
任務注冊的名稱
你可以手動設置任務名稱,或者任務名稱將依據模塊名與類名自動生成。
另見任務名稱這一節。
Task.request
如果任務將被執行,這個屬性會包含當前請求的信息。會使用Thread local 存儲。
另見任務請求這一節。
Task.max_retries
只有在任務函數中調用了 self.retry
函數或者給任務裝飾器傳遞了 autoretry_for
參數時才有用。
放棄任務前的最大嘗試次數。如果嘗試次數超過這個值,MaxRetriesExceededError
異常將會被拋出。
注意:
你必須手動調用 retry
函數,否則發生異常時不會自動重試。
默認值是3。如果設置為 None
,將禁用重試限制,任務將一直重試直到成功。
Task.throws
一個可選的預知錯誤類元組,不認為是真正的錯誤。
這個列表中的錯誤將作為一個失敗記錄到后端存儲中,但是任務工作單元不會將這個時間記錄成一個錯誤,並且不包含異常回溯信息。
示例:
@task(throws=(KeyError, HttpNotFound)): def get_foo(): something()
錯誤類型:
- 預知錯誤(在 Task.throws
中)
日志級別為 Info,堆棧回溯信息不包含在內
- 非預知錯誤
日志級別為 Error,包含堆棧回溯信息
Task.default_retry_delay
任務重試前延遲的默認時間值,以秒為單位。可以是 int 或者 float。默認值是 3 分鍾。
Task.rate_limit
設置任務類型的速率限制(限制給定時間內可以運行的任務數量)。當設置了速率限制后,已經開始的任務仍然會繼續完成,但是任務開始前將等待一些時間。
如果這個屬性設置成 None
,速率限制將不生效。如果設置為整數或者浮點數,它將解釋成”沒秒任務數”。
速率限制可以以秒、分鍾或者小時聲明,只要值后面附加 “/s”, “/m”, “/h”。任務將在給定時間內平均分布。
實例:”100/m”(沒分鍾100個任務)。這將強制同一個任務工作單元啟動兩個任務的時間間隔為最小 600 毫秒。
默認值如 task_default_rate_limit
設置:如果沒有聲明說明默認情況下任務速率限制被禁用。
注意這里指的是每個任務工作單元的限制,不是全局速率限制。要實現全局速率限制(例如,對一個 API 的每秒請求數限制),你必須限定指定的任務隊列。
注意:
如果任務請求帶有 ETA,這個屬性將被忽略。
Task.time_limit
任務的硬性時間限制,以秒為單位。如果沒有設置,那么將使用任務工作單元的默認值。
Task.soft_time_limit
任務的軟性時間限制。如果沒有設置,那么將使用任務工作單元的默認值。
Task.ignore_result
不存儲任務狀態。注意這意味着你不能使用 AsyncResult
來檢測任務是否完成,或者獲取任務返回值。
Task.store_errors_even_if_ignored
如果設置為 True
,即使任務被設置成忽略結果,錯誤也會也存儲記錄
Task.serializer
表示默認使用的序列化方法的一個字符串。默認值是 task_serializer
設置。可以是 pickle,json,yaml
或者任意通過 kombu.serialization.registry
注冊過的自定義序列化方法。
請查看序列化器這一節獲取更多的信息。
Task.compression
表示默認壓縮模式的一個字符串。
默認值如 task_compression
的設置值。可以是 gzip, bzip2
,或者任意通過 kombu.compression
注冊過的自定義壓縮模式。
請查看壓縮這一節獲取更多的信息。
Task.backend
任務的結果后端存儲。celery.backends
中的一個后端存儲類的實例。默認是 app.backend
,由 result_backend
定義。
Task.acks_late
如果設置為 True
,任務消息將會在任務執行完成后確認,而不是剛開始時(默認行為)。
注意:由於任務工作單元可能在任務執行期間崩潰,所以任務可能會被執行多次。因此,需要確保任務是冪等的。
全局默認設置值可以被 task_acks_late
設置覆蓋。
Task.track_started
當設置為 True
,如果任務開始被一個工作單元執行,任務將報告它的狀態為 “started”。默認值是 False
,因為通常的行為是不報告到那種粒度級別。任務狀態可以是 pending, finished, waiting to be retried
。對於長時間運行的任務,如果需要知道任務的運行狀態,”started” 狀態會很有用。
工作執行單元的主機名和進程 id 會記錄在任務狀態的元信息中(即:result.info['pid']
)
全局的默認值會被 task_track_started
設置值覆蓋。
另見:任務的 API 引用文檔。
狀態
Celery 可以追蹤任務的當前狀態。狀態信息包含成功執行的任務的結果值以及執行失敗的任務的異常和堆棧回溯信息。
有幾個可選的存儲后端,他們各有優缺點。(見存儲后端這一節)
在一個任務的生命周期中可以經歷幾個可能的狀態,每個狀態附加有一些元數據。當任務轉變到一個新的狀態,它以前的狀態將被忘記,但是有一些轉變是可以被推演到的,(即如果一個任務當前的狀態是 FAILED
, 那么它可定在某個時刻已經先轉變成了 STARTED
狀態)。
還有一些狀態的集合,如 FAILURE_STATES
的集合,以及 READY_STATE
的集合。
客戶端通過這些狀態集的關系來確定異常是否需要重新拋出(PROPAGATE_STATES
),或者狀態是否能被緩存(如果任務已經 ready
, 那么能被緩存)。
你還可以自定義任務狀態。
結果存儲后端
如果你需要跟蹤任務狀態或者需要任務返回值,那么 Celery 必須存儲或者發送狀態到他可以重新獲取到的地方。有一些內建的存儲后端可供選擇: SQLAlchemy/Django ORM, memcached, RabbitMQ/QPid(rpc), 以及 Redis
- 或者你也可以定義自己的存儲后端。
沒有一個存儲后端使用所有的情況。你應該了解么個存儲后端的優缺點,並選擇最適合你需求的存儲后端。
另見:任務結構存儲后端設置
RPC 存儲后端(RabbitMQ/QPid)
RPC 結果存儲后端(rpc://)比較特殊,他並不真正存儲狀態,而是將狀態作為消息發送。這點區別很重要,因為它意味着結果只能被獲取一次,並且只能被初始化該任務的客戶端獲取。兩個不同的進程不能等待同一個結果。
即使有這個限制,如果你需要實時獲取任務狀態,它仍然是非常棒的一個選擇。使用消息意味着客戶端不需要主動去拉去狀態。
消息默認是短暫的(非持久化),所以,如果消息中間件重新啟動,結果值將不復存在。你可以通過 result_persistent
配置結果后端發送持久消息。
數據庫存儲后端
將狀態存儲到數據庫在很多情況下會很方便,特別是對於已經含有數據庫的網絡應用,但是它也有一些不足。
- 向數據庫詢問新的狀態的耗費很大,因此你應該增加調用
result.get()
這類操作的時間間隔 - 一些使用默認事務隔離級別的數據庫不合適輪詢表的變動
在 MySQL 中,默認的事務隔離級別是 REPEATABLE-READ
:意味着一個事務除非已經提交,否則不能看到其他事務做的更改。
建議更改為 READ-COMMITED
事務隔離級別。
內建狀態
-
PENGDING
任務等待被執行或者狀態未知。任何不知道的任務 id 都被認為在 pending 狀態。 -
STARTED
任務已經開始。默認不記錄此狀態,如果要啟用請查看app.Task.trace_started
。
meta-data: 執行當前任務的工作單元的進程 ID 和主機名。
- SUCCESS
任務已經被成功執行。
meta-data: 結果包含任務的返回值
propagates: 是
ready: 是
-FAILURE
任務執行失敗
meta-data: 結果包含拋出的異常,以及異常拋出時的堆棧回溯信息。
propagates: 是
-RETRY
任務被重試
meta-data: 結果包含導致重試的異常,以及異常拋出時的堆棧回溯信息。
propagates: 是
-REVOKED
任務被取消
propagates: 是
自定義狀態
你可以自定義自己的狀態,只需要提供一個唯一的狀態名稱。狀態名稱通常是大寫的字符串。作為示例你可以查看下 abortable_tasks
,它定義了一個 ABORTED
狀態。
使用 update_state()
可以更新任務的狀態:
@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)})
這里創建的了一個 PROGRESS
狀態,它告訴應用這個任務正在進行中,並且它將 current
和 total
作為它的狀態元信息。這可以用來創建任務進度條。
創建可被 pickle 序列化的異常
一個很少有人注意的python事實是異常定義必須符合一些簡單規則使得它能被 pickle
模塊序列化。
當使用 pickle
作為序列化器時,如果任務拋出的異常不能被 pickle
序列化就不能正常工作。
為了確保異常是可以被 pickle
模塊序列化的,你必須將異常初始化時的原始參數賦值給它的 .args
屬性。確保這點最簡單的方式是讓異常調用 Exception.__init__
。
下面我們看一些示例,有些是正確,有些是錯誤:
# OK: class HttpError(Exception): pass # BAD: class HttpError(Exception): def __init__(self, status_code): self.status_code = status_code # OK: class HttpError(Exception): def __init__(self, status_code): self.status_code = status_code Exception.__init__(self, status_code) # <-- REQUIRED
所以規則就是: 對於支持自定義參數 *args
的異常, Exception.__init__(self, *args)
必須被調用。
對於關鍵字參數沒有特殊的支持,所以如果你想在異常反序列化時保留關鍵字參數,你必須將他們作為常規的參數傳遞。
class HttpError(Exception): def __init__(self, status_code, headers=None, body=None): self.status_code = status_code self.headers = headers self.body = body super(HttpError, self).__init__(status_code, headers, body)
Semipredicates
任務工作單元將任務包裝在一個跟蹤函數中以記錄任務的最終狀態。有一些異常可以用來發送信號給這個跟蹤函數以改變它處理任務返回值的方式。
Ignore
任務可以拋出 Ignore
異常強制使任務工作單元忽略這個任務。這意味着這個任務的任何狀態都不會被保存,但是任務消息仍然被確認(從任務隊列中刪除)。
這可以被用來實現自定義的類似任務取消的功能,或者手動存儲任務結果。
在 redis 中保存取消的任務的示例:
from celery.exceptions import Ignore @app.task(bind=True) def some_task(self): if redis.ismember('tasks.revoked', self.request.id): raise Ignore()
手動存儲結果的示例:
from celery import states from celery.exceptions import Ignore @app.task(bind=True) def get_tweets(self, user): timeline = twitter.get_timeline(user) if not self.request.called_directly: self.update_state(state=states.SUCCESS, meta=timeline) raise Ignore()
Reject
任務可以使用 AMQP
的basic_reject
方法拋出 Reject
異常來拒絕任務消息。如果 Task.acks_late
啟用的話,拒絕消息將不生效。
拒絕一個消息與確認一個消息有同樣的效果,但是一個消息中間件可能實現一些附加的功能。例如 RabbitMQ 支持 Dead Letter Exchanges
的概念,被拒絕的消息會遞送到配置的死信隊列。
拒絕可以用來重新入隊任務消息,但是使用的時間要小心因為這很容易導致無限消息循環。
當任務導致內存溢出時使用拒絕的示例:
import errno from celery.exceptions import Reject @app.task(bind=True, acks_late=True) def render_scene(self, path): file = get_file(path) try: renderer.render_scene(file) # if the file is too big to fit in memory # we reject it so that it's redelivered to the dead letter exchange # and we can manually inspect the situation. except MemoryError as exc: raise Reject(exc, requeue=False) except OSError as exc: if exc.errno == errno.ENOMEM: raise Reject(exc, requeue=False) # For any other error we retry after 10 seconds. except Exception as exc: raise self.retry(exc, countdown=10)
任務消息重新入隊的示例:
from celery.exceptions import Reject @app.task(bind=True, acks_late=True) def requeues(self): if not self.request.delivery_info['redelivered']: raise Reject('no reason', requeue=True) print('received two times')
如果想了解更多的細節請查看消息中間件的 basic_reject
方法。
Retry
Retry
異常是被 Task.retry
方法拋出用來告訴工作單元當前任務將被重試。
自定義任務類
所有的任務都繼承自 app.Task
類。該類的 run()
方法就是任務的函數體。
如以下示例:
@app.task def add(x, y): return x + y
在 celery 內部將做如下包裝:
class _AddTask(app.Task): def run(self, x, y): return x + y add = app.tasks[_AddTask.name]
初始化
任務實例不是對每個請求都初始化一個,而是在任務注冊表中作為一個全局實例。
這意味着 __init__
構造函數只會在每個進程調用一次,並且任務類語義上類似於 Actor
。
假設你如下一個任務:
from celery import Task class NaiveAuthenticateServer(Task): def __init__(self): self.users = {'george': 'password'} def run(self, username, password): try: return self.users[username] == password except KeyError: return False
並且你將每個請求路由到同一個進程,此時它將在請求之間維護狀態。
這也可以用來緩存資源,例如,下面這個任務基類緩存了一個數據庫鏈接:
from celery import Task class DatabaseTask(Task): _db = None @property def db(self): if self._db is None: self._db = Database.connect() return self._db
它可以這樣加入到任務中:
@app.task(base=DatabaseTask) def process_rows(): for row in process_rows.db.table.all(): process_row(row)
process_rows
任務的 db
屬性在同一個進程中將總是保持相同。
Handler
after_return(self, status, retval, task_id, args, kwargs, einfo)
任務返回后調用的處理函數。
參數:
- status - 當前任務狀態
- retval - 任務返回值/異常
- task_id - 任務的唯一ID
- args - 任務的初始參數
- kwargs - 任務的初始關鍵字參數
關鍵字參數:
- einfo - ExceptionInfo
實例,包含異常堆棧回溯信息(如果存在)
這個處理函數的返回值被忽略。
on_failure(self, exc, task_id, args, kwargs, einfo)
當任務失敗時調用。
參數:
- exc - 任務拋出的異常
- task_id - 失敗任務的唯一ID
- args - 失敗任務的原始參數
- kwargs - 失敗任務的原始關鍵字參數
關鍵字參數:
- einfo - ExceptionInfo
實例,包含異常堆棧回溯信息
這個處理函數的返回值被忽略
on_retry(self, exc, task_id, args, kwargs, einfo)
任務重試時由工作單元調用
參數:
- exc - 發送給 retry()
函數的異常
- task_id - 被重試任務的唯一 ID
- args - 被重試任務的原始參數
- kwargs - 被重試任務的原始關鍵字參數
關鍵字參數:
- einfo - ExceptionInfo
實例,包含異常堆棧回溯信息
這個處理函數的返回值被忽略
on_success(self, retval, task_id, args, kwargs)
任務成功執行完由工作單元調用的處理函數。
參數:
- retval - 任務的返回值
- task_id - 任務的唯一 ID
- args - 任務的原始參數
- kwargs - 任務的原始關鍵字參數
這個處理函數的返回值被忽略。
How it works
這里是一些技術細節。這部分內容不是必須知道的,但是你可能會感興趣。
所有定義的任務都可以從任務注冊表中列出來。任務注冊表包含所有任務名稱到對應任務類的映射。你可以通過任務注冊表查看:
>>> from proj.celery import app >>> app.tasks {'celery.chord_unlock': <@task: celery.chord_unlock>, 'celery.backend_cleanup': <@task: celery.backend_cleanup>, 'celery.chord': <@task: celery.chord>}
這是 celery 內建的一些任務。注意任務只有在定義他們的模塊被導入時才會被注冊。
默認的加載器會導入 imports
設置中的所有模塊。
app.task()
裝飾器在你的應用任務注冊表中注冊你的任務。
但任務被發送,實際的任務代碼並不隨之發送,只攜帶當前任務的名稱。當工作單元接收到任務,它會根據任務名稱從任務注冊表中找到實際要執行的代碼。
這意味着你應該總是保持工作單元和客戶端代碼的一致性。這是個缺陷,但是另外的解決方案是一個需要被解決的技術挑戰。
提示與最佳實踐
忽略不想要的結果
如果你不在意任務的返回結果,可以設置 ignore_result
選項,因為存儲結果耗費時間和資源。
@app.task(ignore_result=True) def mytask(): something()
可以通過 task_ignore_result
設置全局忽略任務結果。
更多優化提示
可以在優化指南一節找到附加的優化建議。
避免啟動同步子任務
讓一個任務等待另外一個任務的返回結果是很低效的,並且如果工作單元池被耗盡的話這將會導致死鎖。
盡量讓你的任務異步,例如使用回調函數:
Bad:
@app.task def update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info) @app.task def fetch_page(url): return myhttplib.get(url) @app.task def parse_page(url, page): return myparser.parse_document(page) @app.task def store_page_info(url, info): return PageInfo.objects.create(url, info)
Good:
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)
這里,我們將不同的任務簽名鏈接起來創建一個任務鏈。你可以通過 Canvas: Designing Work-flows
這一節了解任務鏈和其他有用的方案。
默認情況下,celery 不會讓你在一個任務里同步執行其他任務,很少或者極端情況下,你可能不得不這么做。告警:不建議同步執行子任務!
@app.task def update_page_info(url): page = fetch_page.delay(url).get(disable_sync_subtasks=False) info = parse_page.delay(url, page).get(disable_sync_subtasks=False) store_page_info.delay(url, info) @app.task def fetch_page(url): return myhttplib.get(url) @app.task def parse_page(url, page): return myparser.parse_document(page) @app.task def store_page_info(url, info): return PageInfo.objects.create(url, info)
性能與策略
- 粒度
任務粒度是每個子任務需要的總計算量。通常情況下,將任務分解成多個小任務比保持少量長時間任務要更好。
任務越小你可以並行處理的任務就越多,而且不會由於任務長時間運行妨礙工作單元處理其他等待的任務。
但是,執行任何任務都是由花銷的。需要發送任務消息,數據可能不是本地的,等等。所以如果任務的粒度太細,增加的耗費可能會大於這樣做帶來的好處。
另見:Art of Concurrency
這本書有一節專門講述任務粒度這個主題。
[AOC1] Breshears, Clay. Section 2.2.1, “The Art of Concurrency”. O’Reilly Media, Inc. May 15, 2009. ISBN-13 978-0-596-52153-0.
Data locality
處理任務的工作單元應該離待處理的數據越近越好。最好是在內存中有一份拷貝,最壞的情況是從其他地方全量傳輸。
如果數據距離很遠,你可以在特定區域運行另外一個任務,或者如果不可能 - 緩存常用的數據,又或者預先加載將要使用的數據。
最簡單的在工作單元之間共享數據的方法是使用一個分布式緩存系統,例如 memcached。
另見:Jim Gray 寫的 Distributed Computing Economics
這篇論文對數據本地化有詳細的介紹。
狀態
因為 celery 是一個分布式系統,你不知道任務在哪個進程或者哪台服務器上運行。你甚至不知道任務是否會在有限的時間里完成。
古代諺語告訴我們”斷言世界是任務的責任”。它的意思是發起任務后世界觀可以已經改變,所以任務負責確保世界是它應該保持的樣子;如果你有一個任務是對一個搜索引擎重新編索引,並且搜索引擎應該最多每隔5分鍾重新編索引,那么應該是任務的職責來斷言這一點,而不是調用者。
另外一個例子是 Django 的模型對象。他們不應該作為參數傳遞給任務。幾乎總是在任務運行時從數據庫獲取對象是最好的,因為老的數據會導致競態條件。
假象有這樣一個場景,你有一篇文章,以及自動展開文章中縮寫的任務:
class Article(models.Model): title = models.CharField() body = models.TextField() @app.task def expand_abbreviations(article): article.body.replace('MyCorp', 'My Corporation') article.save()
首先,作者創建一篇文章並保存,這時作者點擊一個按鈕初始化一個縮寫展開任務:
>>> article = Article.objects.get(id=102) >>> expand_abbreviations.delay(article)
現在,隊列非常忙,所以任務在2分鍾內都不會運行。與此同時,另一個作者修改了這篇文章,當這個任務最終運行,因為老版本的文章作為參數傳遞給了這個任務,所以這篇文章會回滾到老的版本。
修復這個競態條件很簡單,只要參數傳遞文章的 id 即可,此時可以在任務中重新獲取這篇文章:
@app.task def expand_abbreviations(article_id): article = Article.objects.get(id=article_id) article.body.replace('MyCorp', 'My Corporation') article.save()
>>> expand_abbreviations.delay(article_id)
因為發送打消息可能耗費資源,所以這樣修改甚至可能帶來性能的提升。
數據庫事物
我們看另外一個例子:
from django.db import transaction @transaction.commit_on_success def create_article(request): article = Article.objects.create() expand_abbreviations.delay(article.pk)
這是在數據庫中創建一個文章對象的 Django 視圖,此時傳遞主鍵給任務。它使用 commit_on_success
裝飾器,當視圖返回時該事務會被提交,當視圖拋出異常時會進行回滾。
如果在事務提交之前任務已經開始執行會產生一個競態條件;數據庫對象還不存在。
解決方案是使用 on_commit
回調函數來在所有事務提交成功后啟動任務。
from django.db.transaction import on_commit def create_article(request): article = Article.objects.create() on_commit(lambda: expand_abbreviations.delay(article.pk))
注意:
on_commit
函數只在 Django 1.9 以上版本才可用,如果你使用以前的版本,那么可以使用 django-transaction-hooks
庫添加相關支持。
例子
讓我們看一個真實的例子:博客中用戶提交的評論要做垃圾過濾。當評論創建后,垃圾過濾任務在后台運行,所以用戶無需等待它完成。
我有一個Django博客應用,允許訪客對發表的博客進行評論。這里描述下這個應用的部分模型/視圖以及任務。
blog/models.py
評論模型如下:
from django.db import models from django.utils.translation import ugettext_lazy as _ class Comment(models.Model): name = models.CharField(_('name'), max_length=64) email_address = models.EmailField(_('email address')) homepage = models.URLField(_('home page'), blank=True, verify_exists=False) comment = models.TextField(_('comment')) pub_date = models.DateTimeField(_('Published date'), editable=False, auto_add_now=True) is_spam = models.BooleanField(_('spam?'), default=False, editable=False) class Meta: verbose_name = _('comment') verbose_name_plural = _('comments')
在評論發表的視圖中,我首先將評論寫到數據庫,然后在后台發起垃圾過濾任務。
blog/views.py
from django import forms from django.http import HttpResponseRedirect from django.template.context import RequestContext from django.shortcuts import get_object_or_404, render_to_response from blog import tasks from blog.models import Comment class CommentForm(forms.ModelForm): class Meta: model = Comment def add_comment(request, slug, template_name='comments/create.html'): post = get_object_or_404(Entry, slug=slug) remote_addr = request.META.get('REMOTE_ADDR') if request.method == 'post': form = CommentForm(request.POST, request.FILES) if form.is_valid(): comment = form.save() # Check spam asynchronously. tasks.spam_filter.delay(comment_id=comment.id, remote_addr=remote_addr) return HttpResponseRedirect(post.get_absolute_url()) else: form = CommentForm() context = RequestContext(request, {'form': form}) return render_to_response(template_name, context_instance=context)
過濾垃圾評論我使用 Akismet
,這個服務在開源博客平台 Wordpress 中用來過濾垃圾評論。Akismet
對個人使用是免費的,但是對於商業用途需要收費。你需要先注冊並獲取到 API 秘鑰。
發送 API 請求給 Akismet
我使用 akismet.py
這個庫,它是由 Michael Foord 寫的。
blog/tasks.py
from celery import Celery from akismet import Akismet from django.core.exceptions import ImproperlyConfigured from django.contrib.sites.models import Site from blog.models import Comment app = Celery(broker='amqp://') @app.task def spam_filter(comment_id, remote_addr=None): logger = spam_filter.get_logger() logger.info('Running spam filter for comment %s', comment_id) comment = Comment.objects.get(pk=comment_id) current_domain = Site.objects.get_current().domain akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain)) if not akismet.verify_key(): raise ImproperlyConfigured('Invalid AKISMET_KEY') is_spam = akismet.comment_check(user_ip=remote_addr, comment_content=comment.comment, comment_author=comment.name, comment_author_email=comment.email_address) if is_spam: comment.is_spam = True comment.save() return is_spam
轉自:https://blog.csdn.net/libing_thinking/article/details/78547816