異步消息隊列Celery


Celery是異步消息隊列, 可以在很多場景下進行靈活的應用.消息中包含了執行任務所需的的參數,用於啟動任務執行, suoy所以消息隊列也可以稱作

在web應用開發中, 用戶觸發的某些事件需要較長事件才能完成. 可以將任務交給celery去執行, 待任務完成后再將結果返回給用戶. 用戶同步請求觸發的其它任務, 如發送郵件,請求雲服務等也可以交由celery來完成.

celery的另一個重要應用場景則是各種計划任務.

celery由5個主要組件組成:

  • producer: 任務發布者, 通過調用API向celery發布任務的程序, 如web后端的控制器.

  • celery beat: 任務調度, 根據配置文件發布定時任務

  • worker: 實際執行任務的程序

  • broker: 消息代理, 接受任務消息,存入隊列再按順序分發給worker執行.

  • result backend: 存儲結果的服務器, 一般為各種數據庫服務

整體結構如圖所示:

broker是celery的關鍵組件, 目前的可靠選擇有RabbitMQ和Redis, 出於穩定性等原因我們選擇官方推薦的RabbitMQ作為broker.順便安裝librabbitmq作為RabbitMQ的python客戶端.

消息的發送接受過程需要對序列進行序列化和反序列化, 從celery3.2開始官方出於安全性原因拒絕使用python內置的pickle作為序列化方案, 目前celery支持的序列化方案包括:

  • json: 跨語言的序列化方案

  • yaml: 擁有更多數據類型, 但python客戶端性能不如json

  • msgpack: 二進制序列化方案, 比json更小更快

若對可讀性有要求可以采用json方案, 若追求更高的性能則可以選用msgpack.

result backend用於存儲異步任務的結果和狀態, 目前可用的有Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等.

可以使用boundless方式安裝依賴:

pip install "celery[librabbitmq,redis,msgpack]"

第一個異步任務

創建tasks.py文件, 並寫入:

from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1')


@app.task
def add(x, y):
    return x + y

這樣我們創建了celery實例, Celery()的第一個參數為當前module的名稱(py文件名或包名).

在終端執行命令以啟動服務器:

celery -A tasks worker -l info

-A tasks 參數指定app為模塊tasks,-l info參數指定log級別為info.

當看到這條log時說明celery已就緒:

[2016-09-11 18:04:43,758: WARNING/MainProcess] celery@finley-pc ready.

在python中導入任務並執行

>>> from tasks import add
>>> result = add.delay(1,2)
>>> result.result
3
>>> result.status
'SUCCESS'
>>> result.successful()
True

使用一個py文件作為module非常不便, 在更復雜的任務中可以采用python包作為module.

建立python包demo,建立下列文件:

app.py:

from celery import Celery

app = Celery('demo', include=['demo.tasks'])

app.config_from_object('demo.config')

app.start()

config.py

BROKER_URL = 'redis://127.0.0.1:6379/0'

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

CELERY_TASK_SERIALIZER = 'msgpack'

CELERY_RESULT_SERIALIZER = 'json'

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

tasks.py

from demo.app import app


@app.task
def add(x, y):
    return x + y

在終端中啟動:

 celery -A demo.app worker -l info

若將-A參數設為demo則會默認嘗試啟動demo.celery.因為該module與celery重名可能在導入時出現錯誤, 所以我們沒有采用這種做法.

celery還支持綁定,日志,重試等特性:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(bind=True)
def div(self, x, y):
    logger.info('doing div')
    try:
        result = x / y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)
    return result

bind=true將app對象作為self參數傳給task函數.

前文的示例需要producer主動檢查任務的狀態,存在諸多不便. 我們可以在task函數中主動通知producer:

from celery import Celery
from demo.app import app
from urllib.request import urlopen

@app.task
def add(x, y):
    result = x + y
    url = 'http://producerhost/callback/add?result=%d' % result
    urlopen(url)
    return result

上述示例中我們使用GET請求將結果發送給了producer的回調API, 當然很多情況下可以直接調用回調函數.

Celery易於與Web框架集成, 作者常采用的交互邏輯是:

  • 提供提交任務, 查詢任務結果兩個API, 由客戶端決定何時查詢結果

  • 采用websocket等技術, 服務器主動向客戶端發送結果

當然也可以采用異步IO模式, 這需要一些擴展包的協助:

安裝tornado-celery: pip install torando-celery

編寫handler:

import tcelery
tcelery.setup_nonblocking_producer()

from demo.tasks import add

calss Users(RequestHandler):
    @asynchronous
    def get():
        add.apply_async(args=[1,2], callback=self.on_success)

    def on_success(self, response):
        users = response.result
        self.write(users)
        self.finish()

其它的Web框架也有自己的擴展包:

計划任務

celery的計划任務有schedule和crontab兩種方式.

在config.py中添加配置:

CELERYBEAT_SCHEDULE = {

    'add': {

		'task': 'demo.tasks.add',

		'schedule': timedelta(seconds=10),

		'args': (16, 16)

    }
}

啟動beat:

celery beat -A demo.app

然后啟動worker:

celery -A demo.app worker -l info

或者與celery app一同啟動:

celery -B -A demo.app worker -l info

'schedule'可以接受datetime, timedelta或crontab對象:

from celery.schedules import crontab

{
	'schedule': crontab(hour=7, minute=30, day_of_week=1),
    pass
}

webhook

上文中我們使用本地python函數作為worker, webhook機制允許使用遠程的Web服務作為worker.

在使用webhook作為worker時, broker將消息封裝為http請求發送給worker, 並按照協議解析返回值.

使用webhook需要在CELERY_IMPORTS參數中包含celery.task.http, 或者在啟動參數中指定-I celery.task.http.

broker使用GET或POST方法發送請求, 參數由調用時的關鍵字參數指定. worker返回json格式的響應:

{'status': 'success', 'retval': ...}

在失敗時返回響應:

{'status': 'failure', 'reason': ...}

我們用django作為worker:

from django.http import HttpResponse
import json


def add(request):
    x = int(request.GET['x'])
    y = int(request.GET['y'])
    result = x + y
    response = {'status': 'success', 'retval': result}
    return HttpResponse(json.dumps(response), mimetype='application/json')

配置django為http://cloudservice/webhook/add提供Web服務.

從本地添加任務:

>>>from celery.task.http import URL
>>>result = URL('http://cloudservice/webhook/add').get_async(x=10, y=10)
>>>result.get()
20

URL是HttpDispatchTask的快捷方法(shortcut):

>>> from celery.task.http import HttpDispatchTask
>>> res = HttpDispatchTask.delay(
...     url='http://cloudservice/webhook/add',
...     method='GET', x=10, y=10)
>>> res.get()
20

更多關於celery的內容請參閱:

Celery latest documentation


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM