目錄
前文列表
分布式任務隊列 Celery
分布式任務隊列 Celery —— 詳解工作流
分布式任務隊列 Celery —— 應用基礎
前言
緊接前文,繼續深入了解 Celery Tasks。示例代碼依舊在前文的基礎上進行修改。
Tasks 是 Celery 的基石,原型類為 celery.app.task:Task,它提供了兩個核心功能:
- 將任務消息發送到隊列
- 聲明 Worker 接收到消息后需要執行的具體函數
Task 的實例化
使用裝飾器 app.task 來裝飾一個普通函數,就可以輕松創建出一個任務函數。
from proj.celery import app
@app.task
def add(x, y):
return x + y
值得注意的是,任務函數本質上已經不再是一個普通函數,而是一個 celery.app.task:Task 實例對象。
>>> from proj.task.tasks import add
>>> dir(add)
['AsyncResult', 'MaxRetriesExceededError', ..., 'apply_async', 'delay', u'name', 'on_bound', 'on_failure', 'on_retry', 'on_success', 'request', 'retry', 'subtask', ...]
所以任務函數才可以調用 delay/apply_async 等屬於 Task 的實例屬性和方法。
>>> add.apply_async
<bound method add.apply_async of <@task: proj.task.tasks.add of proj at 0x7fedb363a790>>
>>> add.delay
<bound method add.delay of <@task: proj.task.tasks.add of proj at 0x7fedb363a790>>
這是一個非常重要的認識「app.task 的 “裝飾” 動作,其實是 Task 的實例化過程」,而裝飾器的參數就是 Task 初始化的參數。
官方文檔(http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options)提供了完整的 app.task 裝飾器參數列表。
除此之外,還需要注意「多裝飾器順序」的坑,app.task 應該始終放到最后使用,才能保證其穩定有效。
app.task
@decorator2
@decorator1
def add(x, y):
return x + y
任務的名字
每個任務函數都具有唯一的名字,這個名字被包含在任務消息中,Worker 通過該名字來找到具體執行的任務函數。默認的,Task 會啟用自動命名,將函數的全路徑名作為任務名。
>>> add.name
u'proj.task.tasks.add'
當然了,也可以通過指定裝飾器參數 name 來指定任務名。
@app.task(name='new_name')
def add(x, y):
return x + y
>>> from proj.task.tasks import add
>>> add.name
'new_name'
但為了避免命名沖突的問題,一般不建議這么做,除非你很清楚自己的做什么。
任務的綁定
既然任務函數本質是一個 Task 實例對象,那么當然也可以應用 self 綁定特性。
# 啟用綁定:
@app.task(bind=True)
def add(self, x, y):
print("self: ", self)
return x + y
>>> add.delay(1, 2)
<AsyncResult: 1982dc85-694b-4ceb-849b-5f69e40b4fe9>
綁定對象 self 十分重要,Task 的很多高級功能都是依靠它作為載體來調用的。例如:任務重試功能,請求上下文功能。
任務的重試
任務重試功能的實現為 Task.retry,它將任務消息重新發送到同一個的隊列中,以此來重啟任務。
@app.task(bind=True, max_retries=3)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
- max_retries 指定了最大的重試次數
- exc 指定將異常信息輸出到日志,需要開啟 result backend。
如果你僅希望觸發特定異常時才進行重試,可以應用 Task 的「Automatic retry for known exceptions」特性。
# 只有在觸發 FailWhaleError 異常時,才會重試任務,且最多重試 5 次。
@app.task(autoretry_for=(FailWhaleError,), retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
任務的請求上下文
在 Celery 請求 Worker 執行任務函數時,提供了請求的上下文,這是為了讓任務函數在執行過程中能夠訪問上下文所包含的任務狀態和信息。
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
>>> from proj.task.tasks import dump_context
>>> dump_context.delay(1, 2)
<AsyncResult: 00bc9f96-98df-4bca-a4a3-4774c535a44c>
捕獲請求上下文中的有用信息,有利於我們去分辨和調查一個任務的執行狀況。
完整的上下文屬性列表,可以查閱官方文檔(http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request)。
任務的繼承
使用 app.task 裝飾器默認會實例化原生的 Task 類,這雖然能滿足大部分的應用場景需求,但並非全部。所以 Celery 允許我們通過繼承 Task 類,來衍生出特異化的基類。這一特性在復雜的應用場景中將會十分有效。
- 重新指定適用於的所有任務的默認基類
def make_app(context):
app = Celery('proj')
app.config_from_object('proj.celeryconfig')
default_exchange = Exchange('default', type='direct')
web_exchange = Exchange('task', type='direct')
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('high_queue', web_exchange, routing_key='hign_task'),
Queue('low_queue', web_exchange, routing_key='low_task'),
)
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'periodic_task_add': {
'task': 'proj.task.tasks.add',
'schedule': crontab(minute='*/1'),
'args': (2, 2)
},
}
TaskBase = app.Task
class ContextTask(TaskBase):
abstract = True
context = ctx
def __call__(self, *args, **kwargs):
"""Will be execute when create the instance object of ContextTesk. """
LOG.info(_LI("Invoked celery task starting: %(name)s[%(id)s]"),
{'name': self.name, 'id': self.request.id})
return super(ContextTask, self).__call__(*args, **kwargs)
# 任務執行成功前做什么
def on_success(self, retval, task_id, args, kwargs):
"""Invoked after the task is successfully execute. """
LOG.info(_LI("Task %(id)s success: [%(ret)s]."),
{'id': task_id, 'ret': retval})
return super(ContextTask, self).on_success(retval, task_id,
args, kwargs)
# 任務執行失敗后做什么
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Invoked after the task failed to execute. """
msg = _LE("Task [%(id)s] failed:\n"
"args : %(args)s\n"
"kwargs : %(kw)s\n"
"detail :%(err)s") % {
'id': task_id, 'args': args,
'kw': kwargs, 'err': six.text_type(exc)}
LOG.exception(msg)
return super(ContextTask, self).on_failure(exc, task_id, args,
kwargs, einfo)
# 重新賦予默認基類
app.Task = ContextTask
return app
- 為不同類型的任務繼承出具有偏向屬性的基類
Import celery
class JSONTask(celery.Task):
serializer = 'json'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
class XMLTask(celery.Task):
serializer = 'xml'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 指定不同的基類
@task(base=JSONTask)
def add_json(x, y):
raise KeyError()
# 指定不同的基類
@task(base=XMLTask)
def add_xml(x, y):
raise KeyError()