前言
在使用flask開發的時候,接口的返回需要很少的時間,所以我們需要將一些耗時的任務,放到異步后台去處理,例如:發送郵件,耗時的CPU任務等。在python web框架中celery這個庫,可能是最合適的。
由於我使用flask的時間比較多,但是當我想把celery很好的與flask進行集成的時候,卻發現並不是那么如意。花費了很久的時間去實踐最后卻是各種報錯。出現了循環導入、app上下文、tasks not found等問題,嘗試了種種卻總是不如人意。
好在功夫不負有心人,在結合官方文檔並查閱了大量資料后,終於把celery很好得集成在了flask項目中。我在這里記錄一下,同時也希望對你們有所幫助。
配置
開發環境 | Windows10 |
---|---|
python | 3.8.6 |
flask | 2.0.x |
celery | 5.x |
broker | redis |
pool | eventlet |
simple模式
由於celery 5.0后推薦小寫模式,與flask config大寫規范有沖突,所以我們當同目錄下創建一個celeryconfig.py
文件
celeryconfig.py
broker_url='redis://127.0.0.1:6379/1'
result_backend='redis://127.0.0.1:6379/2'
flask simple模式。
simple.py
from flask import Flask
from celery import Celery
import celeryconfig
app = Flask(__name__)
celery_app = Celery(app.import_name,
broker=celeryconfig.broker_url,
backend=celeryconfig.result_backend)
celery_app.config_from_object(celeryconfig)
@celery_app.task(name='simple/add2')
def add2(x, y):
return x + y
@app.route('/')
def index():
results = add2.delay(3, 5)
return str(results.wait())
if __name__ == '__main__':
app.run(debug=True)
這些就是單文件模式的代碼,這其中我們添加了一個任務add2
,然后啟動flask。
python simple.py
由於celery和flask是同級別的app,所以我們需要一個新的窗口啟動celery,加入-P參數指定異步workereventlet
celery -A simple.celery_app worker -l info -P eventlet
當我們啟動celery之后。看到最后一行的ready的時候,說明我們的celery已經啟動成功了。
然后再看有下面標識說明我們的任務已經被添加成功了。
[tasks]
. simple/add2
同時我們查看一下celery的窗口:
simple模式就結束了
Factory模式
當然我們如果用flask寫一個稍微復雜的東西的話,其實工廠模式我們應該用的更多。下面我們一起來看看工廠模式中的配置。
目錄結構
首先我們先規划一個flask+celery的目錄結構。然后創建下面的文件:
.
├── app
│ ├── __init__.py ——app主體文件
│ ├── celeryconfig.py ——celery配置文件
│ ├── config.py ——flask配置文件
│ ├── models.py ——模型文件
│ ├── tasks.py ——后台任務
│ └── views.py ——視圖文件
├── data.db
├── .flaskenv ——flask環境變量
└── server.py ——運行文件
我們先創建一個注冊celery的函數,主要功能是使用flask應用上下文。
def register_celery(celery, app):
class ContextTask(celery.Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
然后我們創建create_app函數,將寫好的注冊celery函數加進去。
def create_app(**kwargs):
app = Flask(__name__)
app.config.from_pyfile('config.py')
db.init_app(app)
register_celery(celery=kwargs.get('celery'), app=app) # >> 注冊celery
register_blueprints(app)
register_commands(app)
return app
上面這些都是我們在__init__
文件中創建的,下面我們來創建celery的app
打開server.py
from celery import Celery
from app import create_app, celeryconfig
def make_celery(app_name):
celery = Celery(app_name,
broker=celeryconfig.broker_url,
backend=celeryconfig.result_backend)
celery.config_from_object(celeryconfig)
return celery
my_celery = make_celery(__name__)
app = create_app(celery=my_celery)
我們把celery配置文件和flask工廠應用導入進來。然后創建make_celery函數生成celery應用。
生成celery應用后把celery傳入到flask應用函數中去。這樣把生成和注冊分開寫,解決了循環導入的問題。
接着我們創建一個tasks.py文件。
from server import my_celery
from .models import db, Message
@my_celery.task()
def add2(msg):
message = Message(details=msg)
db.session.add(message)
db.session.commit()
return "success"
從server文件中導入celery應用,然后創建任務。
然后在視圖中引用任務。
from flask import Blueprint, jsonify
from .models import db, Message
from .tasks import add2
th = Blueprint('', __name__)
@th.route('/')
def index():
res = add2.delay("hello word")
return jsonify(res.wait())
@th.get('/msgs')
def msg_list():
messages = Message.query.all()
results = []
for message in messages:
results.append(message.to_json())
return jsonify(results)
celery的任務可以通過delay, 方法調用,參數在delay中直接傳入。
詳細介紹:
celery文檔
這些 API 定義了標准的執行選項集,也就是下面這三個方法:
-
apply_async(args[, kwargs[, ...]])
發送一個任務消息。
-
delay(*args, **kwargs)
直接發送一個任務消息,但是不支持運行參數。
-
calling(
__call__
)應用一個支持調用接口(例如,add(2,2))的對象,意味着任務不會被一個 worker 執行,但是會在當前線程中執行(但是消息不會被發送)。
速查表
-
T.delay(arg, kwarg=value)
調用 apply_async 的快捷方式(.delay(_args, *_kwargs)等價於調用 .apply_async(args, kwargs))。
-
T.apply_async((arg,), {'kwarg': value})
-
T.apply_async(countdown=10)
從現在起, 十秒內執行。
-
T.apply_async(eta=now + timedelta(seconds=10))
從現在起十秒內執行,指明使用eta。
-
T.apply_async(countdown=60, expires=120)
從現在起一分鍾執行,但在兩分鍾后過期。
-
T.apply_async(expires=now + timedelta(days=2))
兩天內過期,使用datetime對象。
例子
delay()
方法就像一個很規則的函數,很方便去調用它:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
用 apply_async()
替代你寫的:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
盡管運行十分方便,但是如果像設置額外的行參數,你必須用 apply_async
運行一下
運行之前我們需要先創建一個.flaskenv
文件,指定以下我們的FLASK_APP環境變量是server.py
FLASK_APP=server.py
好了之后,啟動flask
flask run
啟動celery
celery -A server.my_celery worker -l info -P eventlet
老規矩,看一下任務注冊成功沒
[tasks]
. app.tasks.add2
我們打開瀏覽器查看
可以看到執行成功了。再看看命令行。
任務已經成功的執行了。
就這樣我們弄好了 flask+celery項目的配置,並成功執行了任務。
可能相比django+celery的配置就麻煩了許多,所以flask的學習就是要更多更多的去參考社區的資料。所以Google常備身邊。
開源地址
為了方便學習交流,准備了開源地址:
https://gitee.com/wxhou/flask-celery-demo