flask - celery5.0使用教程


前言

1、環境配置

  • python3.10
  • celery5.0
  • redis

2、推薦幾篇遇到celery問題解決得好文章

代碼部分

目錄結構

applocation.py用於創建flask工廠模式得文件

from datetime import datetime, date

from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder
from flask_cors import CORS
from config.base_config import BaseConfig
from app.base_module.base import db


class JSONEncoder(_JSONEncoder):
    """重寫jsonify能序列化模型對象"""

    def default(self, o):
        if hasattr(o, 'keys') and hasattr(o, '__getitem__'):
            return dict(o)
        if isinstance(o, datetime):
            return o.strftime('%Y-%m-%d %H:%M:%S')
        if isinstance(o, date):
            return o.strftime('%Y-%m-%d')
        return JSONEncoder.default(self, o)


class Flask(_Flask):
    json_encoder = JSONEncoder


flask_app = Flask(__name__)
# 跨域配置
CORS(flask_app, supports_credentials=True, origins="*")
flask_app.config.from_object(BaseConfig)
db.init_app(flask_app)

celery_app.py用於創建celery對象

from celery import Celery
from application import flask_app


def make_celery(app):
    my_celery = Celery(
        app.import_name,
    )
    # celery.conf.update(app.config)
    # 先加載配置文件,這里是config目錄下的celeryconfig.py文件,傳遞參數時,不用帶后綴名
    my_celery.config_from_object("config.celeryconfig")
    # 加載app的配置文件,這里有個坑,在flask配置文件里寫的celery配置得用小寫名
    my_celery.conf.update(app.config)
    # 將flask上下文對象加入celery,后續定義任務時,可以使用flask的orm模型
    class ContextTask(my_celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    my_celery.Task = ContextTask
    return my_celery


my_celery = make_celery(flask_app)

flask配置文件和celery配置文件

windows下啟動celery

先安裝:pip install eventlet
啟動命令1:celery -A celery_app.my_celery worker -l info -P eventlet
啟動命令2: celery -A celery_app.my_celery worker -l info -P solo

出現tasks為空的情況,可能為被加載任務,請在自己編寫的視圖當中導入自己的異步任務,celery加載時能掃描到,如果未掃描到的話,可以在celeryconfig中導入異步任務目錄

出現以下圖片即為啟動成功

啟動celery報循環導入錯誤解決

  • 出現上面那種循環導入的問題,因為博主使用了藍圖,會出現循環引用的情況,暫時沒想到更好的解決方案,所以單獨放了出來,可以看最開始的有個bule.py文件
  • 第二種解決方式,不在文件最開始的位置導入,在視圖函數中導入,可以避免循環導入,類似下圖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM