1. CELERY简介
1.1. celery
clery是一个使用python语言编写的,简单,灵活且稳定的分布式系统。celery是一个任务队列,着力于实时处理,同时还支持任务调度。
1.2. task queue
任务队列被当做一种在线程或机器之间分配任务的机制。任务队列的输入是被称为任务的工作单元。专用工作进程持续监视任务队列,以及时执行新工作。
celery之间使用messages通信,通常使用一个broker来协调clients和workers。为了启动一个任务,client先添加一个message到队列,然后broker分发这个message给wokers。
一个celery系统可以由多个workers和多个brokers组成,通过这样可以支持高可用和水平扩展。
1.3. message transport
celery需要一个message transport来发送和接受消息。RabbitMQ和Redis的代理传输功能齐全。同时也支持其他实验性解决方案。本地开发支持使用SQLite。
1.4. broker
https://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
2. celery 使用
2.1 安装
python和celery版本对照
celery | python |
---|---|
5.0 | >3.6 |
4.4 | 2.7-3.5 |
3.1 | 2.6 |
3.0 | 2.5 |
2.2 | 2.4 |
下载命令
pip install celery
2.2 简单使用
2.2.1 创建一个tasks.py文件。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
# 'tasks'为当前模块的名称,是必须提供的参数,以便在__main__模块中定义任务时可以自动生成名称。
# broker指定要使用的消息代理的URL。这里使用的是RabbitMQ(默认也是RabbitMQ)。
@app.task
def add(x, y):
return x + y
2.2.2 运行celery worker 服务
celery -A tasks worker --loglevel=INFO
# celery worker --help 可以查看命令详细列表
# celery --help 可以查看celery命令的详细列表
2.2.3 调用任务
>>> from tasks import add
>>> add.delay(4, 4)
注:注意终端路径。在tasks.py的目录下进入python终端。
现在可以通过工作程序的控制台查看输出内容。
2.2.4 保存结果
如果要跟踪任务状态,celery需要在某些地方存储或发送状态。这时需要使用result_backend。
有几种内置的result_backends可供选择:SQLAlchemy / Django ORM, MongoDB,Memcached,Redis,RPC(RabbitMQ / AMQP),也可以定义自己的后端。
本文使用的是:RPC。
通过celery中backend参数配置(或者通过在配置模块中使用 result_backend 参数配置)。
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
通过ready()方法放回任务是否已完成处理:
>>> result = add.delay(4, 4)
>>> result.ready()
通过get()方法可以等待结果完成,并获得结果。(很少使用,因为这样就将异步转换为同步调用了)
>>> result.get()
# get中的常用参数:timeout 等待时间,时间到了,任务未完成则抛出异常。
# propagate 默认为true。当设置为false时,如果get()引发异常,此参数可以覆盖此异常。
如果任务引发异常,可以通过 result.traceback 原始回溯异常。
backend 使用资源来存储和传输结果。为了确保释放资源,在调用任务之后必须在每个AsyncResult 实例返回值后调用get()或forget()。
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
实际上,delay方法是apply_async方法的一个快捷方式。
使用apply_async方式可以指定执行选项。上例中,queue参数指定任务将被发送的队列名称,countdown参数指定任务将最早在多长时间之后执行。
2.2.5 signature
delay()和apply_async()方法是非常常用的,并且满足大多数场景使用。但是有时我们并不想简单的将任务发送到队列中,想将一个任务函数(由参数和执行选项组成)作为一个参数传递给另外一个函数中,为了实现此目标,Celery使用一种叫做signatures的东西。
>>> add.signature((2, 2), countdown=10)
# shortcut using
>>> add.s(2, 2)
signature还可以调用delay()和apply_async()
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
# 还可以给定部分参数,创建一个不完整的signature。
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
# 调用delay()时候指定的参数会覆盖signature创建时设置的参数
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)
# debug is now False.
# signature的另一种写法
>>> s4 = signature(add, args(2, 2), countdown=1)
2.2.6 primitives
2.2.6.1 group
一个group并行调用一个任务列表,并返回一个特殊的结果实例,该实例使你可以把结果当做一个组,并且按顺序检索返回值。
>>> from celery import group
>>> group(add.s(i, i) for i in range(10))().get()
#Partial group
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
>>> from celery import group
>>> g1 = signature(add, args(1, 2), countdown=1)
>>> g2 = signature(add, args(2, 2), countdown=1)
>>> g3 = signature(add, args(3, 2), countdown=1)
>>> g = group(g1, g2, g3)
>>> ret = g()
>>> ret.get()
2.2.6.2 chain
任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数。
>>> from celery import chain
>>> c = chain(add.s(4, 4) | add.s(8))
>>> c().get()
2.2.6.2 chord
chord是一个具有回调的组。
@app.task
def xsum(numbers):
return sum(numbers)
from celery import chord
chord((add.s(i, i) for i in range(10)), xsum.s())().get()
3. django中使用celery
文中的列子大部分来自官方文档。
官方文档:https://docs.celeryproject.org/en/stable/