前言
1、環境配置
- python3.10
- celery5.0
- redis
2、推薦幾篇遇到celery問題解決得好文章
- celery5.0開始使用小寫得配置名
- Flask 使用 Celery 避免循環引用
- 解決報錯(consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: timed out.)
代碼部分
目錄結構

applocation.py用於創建flask工廠模式得文件
from datetime import datetime, date
from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder
from flask_cors import CORS
from config.base_config import BaseConfig
from app.base_module.base import db
class JSONEncoder(_JSONEncoder):
"""重寫jsonify能序列化模型對象"""
def default(self, o):
if hasattr(o, 'keys') and hasattr(o, '__getitem__'):
return dict(o)
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(o, date):
return o.strftime('%Y-%m-%d')
return JSONEncoder.default(self, o)
class Flask(_Flask):
json_encoder = JSONEncoder
flask_app = Flask(__name__)
# 跨域配置
CORS(flask_app, supports_credentials=True, origins="*")
flask_app.config.from_object(BaseConfig)
db.init_app(flask_app)
celery_app.py用於創建celery對象
from celery import Celery
from application import flask_app
def make_celery(app):
my_celery = Celery(
app.import_name,
)
# celery.conf.update(app.config)
# 先加載配置文件,這里是config目錄下的celeryconfig.py文件,傳遞參數時,不用帶后綴名
my_celery.config_from_object("config.celeryconfig")
# 加載app的配置文件,這里有個坑,在flask配置文件里寫的celery配置得用小寫名
my_celery.conf.update(app.config)
# 將flask上下文對象加入celery,后續定義任務時,可以使用flask的orm模型
class ContextTask(my_celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
my_celery.Task = ContextTask
return my_celery
my_celery = make_celery(flask_app)
flask配置文件和celery配置文件

windows下啟動celery
先安裝:pip install eventlet
啟動命令1:celery -A celery_app.my_celery worker -l info -P eventlet
啟動命令2: celery -A celery_app.my_celery worker -l info -P solo
出現tasks為空的情況,可能為被加載任務,請在自己編寫的視圖當中導入自己的異步任務,celery加載時能掃描到,如果未掃描到的話,可以在celeryconfig中導入異步任務目錄
出現以下圖片即為啟動成功

啟動celery報循環導入錯誤解決

- 出現上面那種循環導入的問題,因為博主使用了藍圖,會出現循環引用的情況,暫時沒想到更好的解決方案,所以單獨放了出來,可以看最開始的有個
bule.py文件 - 第二種解決方式,不在文件最開始的位置導入,在視圖函數中導入,可以避免循環導入,類似下圖

