celery详解


1. CELERY简介

1.1. celery

clery是一个使用python语言编写的,简单,灵活且稳定的分布式系统。celery是一个任务队列,着力于实时处理,同时还支持任务调度。

1.2. task queue

任务队列被当做一种在线程或机器之间分配任务的机制。任务队列的输入是被称为任务的工作单元。专用工作进程持续监视任务队列,以及时执行新工作。
celery之间使用messages通信,通常使用一个broker来协调clients和workers。为了启动一个任务,client先添加一个message到队列,然后broker分发这个message给wokers。
一个celery系统可以由多个workers和多个brokers组成,通过这样可以支持高可用和水平扩展。

1.3. message transport

celery需要一个message transport来发送和接受消息。RabbitMQ和Redis的代理传输功能齐全。同时也支持其他实验性解决方案。本地开发支持使用SQLite。

1.4. broker

https://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

2. celery 使用

2.1 安装

python和celery版本对照

celery python
5.0 >3.6
4.4 2.7-3.5
3.1 2.6
3.0 2.5
2.2 2.4

下载命令
pip install celery

2.2 简单使用

2.2.1 创建一个tasks.py文件。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')  
# 'tasks'为当前模块的名称,是必须提供的参数,以便在__main__模块中定义任务时可以自动生成名称。
# broker指定要使用的消息代理的URL。这里使用的是RabbitMQ(默认也是RabbitMQ)。

@app.task
def add(x, y):
    return x + y

2.2.2 运行celery worker 服务

celery -A tasks worker --loglevel=INFO
# celery worker --help  可以查看命令详细列表
# celery --help  可以查看celery命令的详细列表

2.2.3 调用任务

>>> from tasks import add
>>> add.delay(4, 4)

注:注意终端路径。在tasks.py的目录下进入python终端。

现在可以通过工作程序的控制台查看输出内容。

2.2.4 保存结果

如果要跟踪任务状态,celery需要在某些地方存储或发送状态。这时需要使用result_backend。
有几种内置的result_backends可供选择:SQLAlchemy / Django ORM, MongoDB,Memcached,Redis,RPC(RabbitMQ / AMQP),也可以定义自己的后端。
本文使用的是:RPC。
通过celery中backend参数配置(或者通过在配置模块中使用 result_backend 参数配置)。

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

通过ready()方法放回任务是否已完成处理:

>>> result = add.delay(4, 4)
>>> result.ready()

通过get()方法可以等待结果完成,并获得结果。(很少使用,因为这样就将异步转换为同步调用了)

>>> result.get()
# get中的常用参数:timeout 等待时间,时间到了,任务未完成则抛出异常。
#                 propagate 默认为true。当设置为false时,如果get()引发异常,此参数可以覆盖此异常。

如果任务引发异常,可以通过 result.traceback 原始回溯异常。
backend 使用资源来存储和传输结果。为了确保释放资源,在调用任务之后必须在每个AsyncResult 实例返回值后调用get()或forget()。

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

实际上,delay方法是apply_async方法的一个快捷方式。
使用apply_async方式可以指定执行选项。上例中,queue参数指定任务将被发送的队列名称,countdown参数指定任务将最早在多长时间之后执行。

2.2.5 signature

delay()和apply_async()方法是非常常用的,并且满足大多数场景使用。但是有时我们并不想简单的将任务发送到队列中,想将一个任务函数(由参数和执行选项组成)作为一个参数传递给另外一个函数中,为了实现此目标,Celery使用一种叫做signatures的东西。

>>> add.signature((2, 2), countdown=10)
# shortcut using
>>> add.s(2, 2)

signature还可以调用delay()和apply_async()

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
# 还可以给定部分参数,创建一个不完整的signature。
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
# 调用delay()时候指定的参数会覆盖signature创建时设置的参数
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   
# debug is now False.
# signature的另一种写法
>>> s4 = signature(add, args(2, 2), countdown=1)

2.2.6 primitives

2.2.6.1 group

一个group并行调用一个任务列表,并返回一个特殊的结果实例,该实例使你可以把结果当做一个组,并且按顺序检索返回值。

>>> from celery import group
>>> group(add.s(i, i) for i in range(10))().get()
#Partial group
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
>>> from celery import group
>>> g1 = signature(add, args(1, 2), countdown=1)
>>> g2 = signature(add, args(2, 2), countdown=1)
>>> g3 = signature(add, args(3, 2), countdown=1)
>>> g = group(g1, g2, g3)
>>> ret = g()
>>> ret.get()
2.2.6.2 chain

任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数。

>>> from celery import chain
>>> c = chain(add.s(4, 4) | add.s(8))
>>> c().get()
2.2.6.2 chord

chord是一个具有回调的组。

@app.task
def xsum(numbers):
    return sum(numbers)

from celery import chord
chord((add.s(i, i) for i in range(10)), xsum.s())().get()

3. django中使用celery

文中的列子大部分来自官方文档。
官方文档:https://docs.celeryproject.org/en/stable/


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM