Flask配置Celery異步任務


前言

​ 在使用flask開發的時候,接口的返回需要很少的時間,所以我們需要將一些耗時的任務,放到異步后台去處理,例如:發送郵件,耗時的CPU任務等。在python web框架中celery這個庫,可能是最合適的。

​ 由於我使用flask的時間比較多,但是當我想把celery很好的與flask進行集成的時候,卻發現並不是那么如意。花費了很久的時間去實踐最后卻是各種報錯。出現了循環導入、app上下文、tasks not found等問題,嘗試了種種卻總是不如人意。

​ 好在功夫不負有心人,在結合官方文檔並查閱了大量資料后,終於把celery很好得集成在了flask項目中。我在這里記錄一下,同時也希望對你們有所幫助。

配置

開發環境 Windows10
python 3.8.6
flask 2.0.x
celery 5.x
broker redis
pool eventlet

simple模式

由於celery 5.0后推薦小寫模式,與flask config大寫規范有沖突,所以我們當同目錄下創建一個celeryconfig.py文件

celeryconfig.py

broker_url='redis://127.0.0.1:6379/1'
result_backend='redis://127.0.0.1:6379/2'

flask simple模式。

simple.py

from flask import Flask
from celery import Celery
import celeryconfig

app = Flask(__name__)
celery_app = Celery(app.import_name,
                    broker=celeryconfig.broker_url,
                    backend=celeryconfig.result_backend)
celery_app.config_from_object(celeryconfig)


@celery_app.task(name='simple/add2')
def add2(x, y):
    return x + y


@app.route('/')
def index():
    results = add2.delay(3, 5)
    return str(results.wait())


if __name__ == '__main__':
    app.run(debug=True)

這些就是單文件模式的代碼,這其中我們添加了一個任務add2,然后啟動flask。

python simple.py

由於celery和flask是同級別的app,所以我們需要一個新的窗口啟動celery,加入-P參數指定異步workereventlet

celery -A simple.celery_app worker -l info -P eventlet

當我們啟動celery之后。看到最后一行的ready的時候,說明我們的celery已經啟動成功了。

然后再看有下面標識說明我們的任務已經被添加成功了。

[tasks]
  . simple/add2

訪問網址:http://127.0.0.1:5000/

image-20210211174559291

同時我們查看一下celery的窗口:

image-20210211174707759

simple模式就結束了

Factory模式

當然我們如果用flask寫一個稍微復雜的東西的話,其實工廠模式我們應該用的更多。下面我們一起來看看工廠模式中的配置。

目錄結構

首先我們先規划一個flask+celery的目錄結構。然后創建下面的文件:

.
├── app
│   ├── __init__.py		  ——app主體文件
│   ├── celeryconfig.py           ——celery配置文件
│   ├── config.py		  ——flask配置文件
│   ├── models.py		  ——模型文件
│   ├── tasks.py		  ——后台任務
│   └── views.py		  ——視圖文件
├── data.db
├── .flaskenv    		  ——flask環境變量
└── server.py			  ——運行文件

我們先創建一個注冊celery的函數,主要功能是使用flask應用上下文。

def register_celery(celery, app):
    class ContextTask(celery.Task):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

然后我們創建create_app函數,將寫好的注冊celery函數加進去。

def create_app(**kwargs):
    app = Flask(__name__)
    app.config.from_pyfile('config.py')
    db.init_app(app)
    register_celery(celery=kwargs.get('celery'), app=app)  # >> 注冊celery
    register_blueprints(app) 
    register_commands(app)
    return app

上面這些都是我們在__init__文件中創建的,下面我們來創建celery的app

打開server.py

from celery import Celery
from app import create_app, celeryconfig


def make_celery(app_name):
    celery = Celery(app_name,
                    broker=celeryconfig.broker_url,
                    backend=celeryconfig.result_backend)
    celery.config_from_object(celeryconfig)
    return celery


my_celery = make_celery(__name__)

app = create_app(celery=my_celery)

我們把celery配置文件和flask工廠應用導入進來。然后創建make_celery函數生成celery應用。

生成celery應用后把celery傳入到flask應用函數中去。這樣把生成注冊分開寫,解決了循環導入的問題。

接着我們創建一個tasks.py文件。

from server import my_celery
from .models import db, Message


@my_celery.task()
def add2(msg):
    message = Message(details=msg)
    db.session.add(message)
    db.session.commit()
    return "success"

從server文件中導入celery應用,然后創建任務。

然后在視圖中引用任務。

from flask import Blueprint, jsonify
from .models import db, Message
from .tasks import add2

th = Blueprint('', __name__)


@th.route('/')
def index():
    res = add2.delay("hello word")
    return jsonify(res.wait())


@th.get('/msgs')
def msg_list():
    messages = Message.query.all()
    results = []
    for message in messages:
        results.append(message.to_json())
    return jsonify(results)

celery的任務可以通過delay, 方法調用,參數在delay中直接傳入。

詳細介紹:

celery文檔

這些 API 定義了標准的執行選項集,也就是下面這三個方法:

  • apply_async(args[, kwargs[, ...]])

    發送一個任務消息。

  • delay(*args, **kwargs)

    直接發送一個任務消息,但是不支持運行參數。

  • calling(__call__)

    應用一個支持調用接口(例如,add(2,2))的對象,意味着任務不會被一個 worker 執行,但是會在當前線程中執行(但是消息不會被發送)。

速查表

  • T.delay(arg, kwarg=value)

    調用 apply_async 的快捷方式(.delay(_args, *_kwargs)等價於調用 .apply_async(args, kwargs))。

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10)

    從現在起, 十秒內執行。

  • T.apply_async(eta=now + timedelta(seconds=10))

    從現在起十秒內執行,指明使用eta。

  • T.apply_async(countdown=60, expires=120)

    從現在起一分鍾執行,但在兩分鍾后過期。

  • T.apply_async(expires=now + timedelta(days=2))

    兩天內過期,使用datetime對象。

例子

delay() 方法就像一個很規則的函數,很方便去調用它:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async() 替代你寫的:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

盡管運行十分方便,但是如果像設置額外的行參數,你必須用 apply_async

運行一下

運行之前我們需要先創建一個.flaskenv文件,指定以下我們的FLASK_APP環境變量是server.py

FLASK_APP=server.py

好了之后,啟動flask

flask run

啟動celery

celery -A server.my_celery worker -l info -P eventlet

老規矩,看一下任務注冊成功沒

[tasks]
  . app.tasks.add2

我們打開瀏覽器查看

image

可以看到執行成功了。再看看命令行。

image-20210821235926547

任務已經成功的執行了。

就這樣我們弄好了 flask+celery項目的配置,並成功執行了任務。

可能相比django+celery的配置就麻煩了許多,所以flask的學習就是要更多更多的去參考社區的資料。所以Google常備身邊。

開源地址

為了方便學習交流,准備了開源地址:

https://gitee.com/wxhou/flask-celery-demo

參考文獻

medium.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM