Celery是異步消息隊列, 可以在很多場景下進行靈活的應用.消息中包含了執行任務所需的的參數,用於啟動任務執行, suoy所以消息隊列也可以稱作
在web應用開發中, 用戶觸發的某些事件需要較長事件才能完成. 可以將任務交給celery去執行, 待任務完成后再將結果返回給用戶. 用戶同步請求觸發的其它任務, 如發送郵件,請求雲服務等也可以交由celery來完成.
celery的另一個重要應用場景則是各種計划任務.
celery由5個主要組件組成:
-
producer: 任務發布者, 通過調用API向celery發布任務的程序, 如web后端的控制器.
-
celery beat: 任務調度, 根據配置文件發布定時任務
-
worker: 實際執行任務的程序
-
broker: 消息代理, 接受任務消息,存入隊列再按順序分發給worker執行.
-
result backend: 存儲結果的服務器, 一般為各種數據庫服務
整體結構如圖所示:
broker是celery的關鍵組件, 目前的可靠選擇有RabbitMQ和Redis, 出於穩定性等原因我們選擇官方推薦的RabbitMQ作為broker.順便安裝librabbitmq作為RabbitMQ的python客戶端.
消息的發送接受過程需要對序列進行序列化和反序列化, 從celery3.2開始官方出於安全性原因拒絕使用python內置的pickle作為序列化方案, 目前celery支持的序列化方案包括:
-
json: 跨語言的序列化方案
-
yaml: 擁有更多數據類型, 但python客戶端性能不如json
-
msgpack: 二進制序列化方案, 比json更小更快
若對可讀性有要求可以采用json方案, 若追求更高的性能則可以選用msgpack.
result backend用於存儲異步任務的結果和狀態, 目前可用的有Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等.
可以使用boundless方式安裝依賴:
pip install "celery[librabbitmq,redis,msgpack]"
第一個異步任務
創建tasks.py
文件, 並寫入:
from celery import Celery
app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1')
@app.task
def add(x, y):
return x + y
這樣我們創建了celery實例, Celery()的第一個參數為當前module的名稱(py文件名或包名).
在終端執行命令以啟動服務器:
celery -A tasks worker -l info
-A tasks
參數指定app為模塊tasks,-l info
參數指定log級別為info.
當看到這條log時說明celery已就緒:
[2016-09-11 18:04:43,758: WARNING/MainProcess] celery@finley-pc ready.
在python中導入任務並執行
>>> from tasks import add
>>> result = add.delay(1,2)
>>> result.result
3
>>> result.status
'SUCCESS'
>>> result.successful()
True
使用一個py文件作為module非常不便, 在更復雜的任務中可以采用python包作為module.
建立python包demo,建立下列文件:
app.py:
from celery import Celery
app = Celery('demo', include=['demo.tasks'])
app.config_from_object('demo.config')
app.start()
config.py
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
tasks.py
from demo.app import app
@app.task
def add(x, y):
return x + y
在終端中啟動:
celery -A demo.app worker -l info
若將-A
參數設為demo
則會默認嘗試啟動demo.celery
.因為該module與celery重名可能在導入時出現錯誤, 所以我們沒有采用這種做法.
celery還支持綁定,日志,重試等特性:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def div(self, x, y):
logger.info('doing div')
try:
result = x / y
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=5, max_retries=3)
return result
bind=true
將app對象作為self參數傳給task函數.
前文的示例需要producer主動檢查任務的狀態,存在諸多不便. 我們可以在task函數中主動通知producer:
from celery import Celery
from demo.app import app
from urllib.request import urlopen
@app.task
def add(x, y):
result = x + y
url = 'http://producerhost/callback/add?result=%d' % result
urlopen(url)
return result
上述示例中我們使用GET請求將結果發送給了producer的回調API, 當然很多情況下可以直接調用回調函數.
Celery易於與Web框架集成, 作者常采用的交互邏輯是:
-
提供提交任務, 查詢任務結果兩個API, 由客戶端決定何時查詢結果
-
采用websocket等技術, 服務器主動向客戶端發送結果
當然也可以采用異步IO模式, 這需要一些擴展包的協助:
安裝tornado-celery: pip install torando-celery
編寫handler:
import tcelery
tcelery.setup_nonblocking_producer()
from demo.tasks import add
calss Users(RequestHandler):
@asynchronous
def get():
add.apply_async(args=[1,2], callback=self.on_success)
def on_success(self, response):
users = response.result
self.write(users)
self.finish()
其它的Web框架也有自己的擴展包:
-
Django: django-celery
-
Tornado: tornado-celery
-
web2py: web2py-celery
-
Pylons: celery-pylons
-
Pyramid: pyramid_celery
計划任務
celery的計划任務有schedule和crontab兩種方式.
在config.py中添加配置:
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'demo.tasks.add',
'schedule': timedelta(seconds=10),
'args': (16, 16)
}
}
啟動beat:
celery beat -A demo.app
然后啟動worker:
celery -A demo.app worker -l info
或者與celery app一同啟動:
celery -B -A demo.app worker -l info
'schedule'可以接受datetime, timedelta或crontab對象:
from celery.schedules import crontab
{
'schedule': crontab(hour=7, minute=30, day_of_week=1),
pass
}
webhook
上文中我們使用本地python函數作為worker, webhook機制允許使用遠程的Web服務作為worker.
在使用webhook作為worker時, broker將消息封裝為http請求發送給worker, 並按照協議解析返回值.
使用webhook需要在CELERY_IMPORTS
參數中包含celery.task.http
, 或者在啟動參數中指定-I celery.task.http
.
broker使用GET或POST方法發送請求, 參數由調用時的關鍵字參數指定. worker返回json格式的響應:
{'status': 'success', 'retval': ...}
在失敗時返回響應:
{'status': 'failure', 'reason': ...}
我們用django作為worker:
from django.http import HttpResponse
import json
def add(request):
x = int(request.GET['x'])
y = int(request.GET['y'])
result = x + y
response = {'status': 'success', 'retval': result}
return HttpResponse(json.dumps(response), mimetype='application/json')
配置django為http://cloudservice/webhook/add
提供Web服務.
從本地添加任務:
>>>from celery.task.http import URL
>>>result = URL('http://cloudservice/webhook/add').get_async(x=10, y=10)
>>>result.get()
20
URL是HttpDispatchTask的快捷方法(shortcut):
>>> from celery.task.http import HttpDispatchTask
>>> res = HttpDispatchTask.delay(
... url='http://cloudservice/webhook/add',
... method='GET', x=10, y=10)
>>> res.get()
20
更多關於celery的內容請參閱: