現在繼續學習在集成的框架中如何使用celery.
在Flask中使用celery
在Flask中集成celery需要做到兩點:
-
創建celery的實例對象的名字必須是flask應用程序app的名字,否則celery啟動會失敗;
-
celery必須能順利加載初始化文件。
celery在flask中初始化
由於celery進程的運行和flask進程的運行是相互獨立的,但是在框架中我們希望只使用一份配置文件,這樣可以簡化配置的工作。
from celery import Celery
from flask import Flask
app = Flask(__name__)
def make_celery(app):
celery = Celery(app.import_name)
celery.conf.update(app.config)
return celery
celery = make_celery(app)
# celery的配置文件在app的setting中;
問題:上面的做法程序在初始化的時候可以完成celery的初始化,但是當使用工廠模式創建app的時候,celery的初始化變得困難;
from celery import Celery
from flask import Flask
celery = None
def make_celery(app):
celery = Celery(app.import_name)
celery.conf.update(app.config)
return celery
def create_app(config_name)
app = Flask(__name__)
config_class = config_map[config_name]
app.config.from_object(config_class)
# 初始化celery
global celery
celery = make_celery(app)
-
問題:由於程序初始化的時候並不能創建出app對象,所以celery啟動的時候必須先在tasks中導入app對象才能完成初始化,可能導致循環導入的錯誤;
-
解決:引入Flask-Celery-Helper,幫助我們初始化celery對象;
-
安裝Flask-Celery-Helper
pip install Flask-Celery-Helper
- 將所有額外的需要初始化的對象獨立出來在一個單獨的py模塊。
# extensions.py
from flask_celery import Celery
# 創建celery的實例
celery = Celery()
# __init__.py
from extensions import celery
def create_app(config_name)
app = Flask(__name__)
config_class = config_map[config_name]
app.config.from_object(config_class)
# 對celery進行初始化操作,可以將celery的配置寫在app的配置中
celery.init_app(app=app)
# tasks.py
from extensions import celery
@celery.task()
def add(x,y):
return x + y
注意
-
Flask-Celery-Helper官方目前只支持到python3.4,但樓主使用py3.6也沒有問題;Flask-Celery-Helper不支持celery4.X的版本,否則報錯,因此需要使用celery3.x的版本;
-
調用task方法
# app.py
from tasks import add
@app.route("/index")
def index():
"""一個測試的實例"""
print(add(3+6)) # add函數也能做普通的函數使用
add.apply_async(args=[5,7], queue='eegqueue') # 發送異步任務,指定隊列
return "ok!"
啟動celery
啟動celery之前需要加載flask的app的配置,因此需要創建一個app對象給celery使用。
# run_celery.py
import create_app
flask_app = create_app("develop") # 創建app的同時,對celery完成了加載配置的工作
from extensions import celery # 此時的celery對象已經在上下文中完成初始化
# 找到celery實例的位置,指定worker,指定接收某個隊列的消息,如果不指定則接收所有隊列的消息
celery -A run_celery.celery worker -Q eegqueue --loglevel=info